Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
68 views
in Technique[技术] by (71.8m points)

c# - A way to push buffered events in even intervals

What I'm trying to achieve is to buffer incoming events from some IObservable ( they come in bursts) and release them further, but one by one, in even intervals. Like this:

-oo-ooo-oo------------------oooo-oo-o-------------->

-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->

Since I'm quite new to Rx, I'm not sure if there already is a Subject or an operator that does just this. Maybe It can be done by composition?

update:

Thanks to Richard Szalay for pointing out the Drain operator, I found another example by James Miles of Drain operator usage. Here's how I managed to get it to work in a WPF app:

    .Drain(x => {
        Process(x);
        return Observable.Return(new Unit())
            .Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
    }).Subscribe();

I had some fun, because omitting the scheduler parameter causes the app to crash in debug mode without any exception showing up ( I need to learn how to deal with exceptions in Rx). The Process method modifies the UI state directly, but I guess it's quite simple to make an IObservable out of it (using a ISubject?).

update:

In the meantime I've been experimenting with ISubject, the class below does what I wanted - it lets out buffered Ts in a timely manner:

public class StepSubject<T> : ISubject<T>
{
    IObserver<T> subscriber;
    Queue<T> queue = new Queue<T>();
    MutableDisposable cancel = new MutableDisposable();
    TimeSpan interval;
    IScheduler scheduler;
    bool idle = true;

    public StepSubject(TimeSpan interval, IScheduler scheduler)
    {
        this.interval = interval;
        this.scheduler = scheduler;
    }

    void Step()
    {
        T next;
        lock (queue)
        {
            idle = queue.Count == 0;
            if (!idle)
                next = queue.Dequeue();
        }

        if (!idle)
        {
            cancel.Disposable = scheduler.Schedule(Step, interval);
            subscriber.OnNext(next);
        }
    }

    public void OnNext(T value)
    {
        lock (queue)
            queue.Enqueue(value);

        if (idle)
            cancel.Disposable = scheduler.Schedule(Step);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        subscriber = observer;
        return cancel;
    }
}

This naive implementation is stripped from OnCompleted and OnError for clarity, also only single subscription allowed.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

It's actually tricker than it sounds.

Using Delay doesn't work because the values will still happen in bulk, only slightly delayed.

Using Interval with either CombineLatest or Zip doesn't work, since the former will cause source values to be skipped and the latter will buffer interval values.

I think the new Drain operator (added in 1.0.2787.0), combined with Delay should do the trick:

source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x));

The Drain operator works like SelectMany, but waits until the previous output completes before calling the selector with the next value. It's still not exactly what you are after (the first value in a block will also be delayed), but it's close: The usage above matches your marble diagram now.

Edit: Apparently the Drain in the framework doesn't work like SelectMany. I'll ask for some advice in the official forums. In the meantime, here's an implementation of Drain that does what you're after:

Edit 09/11: Fixed errors in implementation and updated usage to match your requested marble diagram.

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...