Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
453 views
in Technique[技术] by (71.8m points)

rx java - Rx: a zip-like operator that continues after one of the streams ended?

I'm looking to combine streams (observables) that start and end asynchronously:

-1----1----1----1---|->
     -2----2--|->
[ optional_zip(sum) ]
-1----3----3----1---|->

What I need it for: Adding audio streams together. They're streams of audio "chunks", but I'm going to represent them with integers here. So there's the first clip playing:

-1----1----1----1---|->

and then a second one starts, a bit later:

     -2----2--|->

The result of combining them by sum should be:

-1----3----3----1---|->

But the standard zip completes if any of the zipped streams end. I want this optional_zip to keep going even if one of the streams ends. Is there any way of doing this in Rx, or do I have to implement it myself by modifying the existing Zip?

note: I'm using RxPy, but the community here seems small and Rx operators seem to be pretty universal across languages, so I tagged it as rx-java and rx-js too.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I'd tackle this problem by breaking it into two parts. First, I'd want something that takes an Observable<Observable<T>> and produces an Observable<Observable<T>[]> where the array contains only the "active" (i.e. non-complete) observables. Any time a new element is added to the outer observable, and any time one of the inner observables completes, a new array would be emitted containing the appropriate observables. This is essentially a "scan" reduction of the primary stream.

Once you've got something that can do that, you can use flatMapLatest and zip to get what you want.

My basic attempt at the first part is as follows:

function active(ss$) {
    const activeStreams = new Rx.Subject();
    const elements = [];
    const subscriptions = [];

    ss$.subscribe(s => {
        var include = true;
        const subscription = s.subscribe(x => {}, x => {}, x => {
            include = false;
            const i = elements.indexOf(s);
            if (i > -1) {
                elements.splice(i, 1);
                activeStreams.onNext(elements.slice());
            }
        });

        if (include) {
            elements.push(s);
            subscriptions.push(subscription);
            activeStreams.onNext(elements.slice());
        }   
    });

    return Rx.Observable.using(        
        () => new Rx.Disposable(() => subscriptions.forEach(x => x.dispose())),
        () => activeStreams
    );
}

From there, you'd just zip it and flatten it out like so:

const zipped = active(c$).flatMapLatest(x =>
    x.length === 0 ? Rx.Observable.never()
  : x.length === 1 ? x[0]
  : Rx.Observable.zip(x, (...args) => args.reduce((a, c) => a + c))
);

I've made the assumptions that zero active streams should yield nothing, one active stream should yield its own elements, and two or more streams should all zip together (all of which is reflected in the map application).

My (admittedly fairly limited) testing has this combination yielding the results you were after.

Great question, by the way. I've not seen anything that solves the first part of the problem (though I'm by no means an Rx expert; if someone knows of something that already does this, please post details).


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...