Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
963 views
in Technique[技术] by (71.8m points)

asynchronous - How to wait multiple nested async calls by using of RxJava-Android?

I'm new to RxJava, here's my case,

  1. send request A and will get List<A> back
  2. for each A, send request AA and will get AA back, bind A and AA then
  3. there is B & BB with similar logic
  4. do something only after all requests complete

Example:

request(url1, callback(List<A> listA) {
    for (A a : listA) {
        request(url2, callback(AA aa) {
            a.set(aa);
        }
    }
}

A and B are independent

How to structure the code? I also used Retrofit as network client.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

OK, I think this should solve the first part of your problem:

Notice that the second call to flatMap is given 2 arguments - there is a version of flatMap that not only produces an Observable for each input item but that also take a second function which in turn will combine each item from the resulting Observable with the corresponding input item.

Have a look at the third graphic under this heading to get an intuitive understanding:

https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#flatmap-concatmap-and-flatmapiterable

Observable<A> obeservableOfAs = retrofitClient.getListOfAs()
.flatMap(new Func1<List<A>, Observable<A>>() {

    @Override
    public Observable<A> call(List<A> listOfAs) {
        return Observable.from(listOfAs);
    }

)}
.flatMap(new Func1<A, Observable<AA>>() {

    @Override
    public Observable<AA> call(A someA) {
        return retrofitClient.getTheAaForMyA(someA);
    }

},
new Func2<A, AA, A>() {

    @Override
    public A call(A someA, AA theAaforMyA) {
        return someA.set(theAaforMyA);
    }

})
...

From here on I am still not sure how you want to continue: Are you ready to just subscribe to the resulting Observable of As? That way you could handle each of the As (onNext) or just wait until all are done (onCompleted).

ADDENDUM: To collect all Items into a single List at the end, that is turn your Observable<A> into an Observable<List<A>> use toList().

https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#tolist

So you have:

Observable<List<A>> observableOfListOfAs = observableOfAs.toList();

If you need more fine grained control over the construction of your list, you can also use reduce.

https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#reduce

For the Bs, simply duplicate the whole flow you used for the As.

You can then use zip to wait for both flows to complete:

Observable.zip(
    observableOfListOfAs,
    observableOfListOfBs,
    new Func2<List<A>, List<B>, MyPairOfLists>() {

        @Override
        public MyPairOfLists call(List<A> as, List<B> bs) {
            return new MyPairOfLists(as, bs);
        }
    }
)
.subscribe(new Subscriber<MyPairOfLists>() {

    // onError() and onCompleted() are omitted here

    @Override
    public void onNext(MyPairOfLists pair) {
        // now both the as and the bs are ready to use:

        List<A> as = pair.getAs();
        List<B> bs = pair.getBs();

        // do something here!
    }
});

I suppose you can guess the definition of MyPairOfLists.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...