Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
313 views
in Technique[技术] by (71.8m points)

c# - How do I share an observable with publish and connect?

I have an observable data stream that I am applying operations to, splitting into two separate streams, applying more (distinct) operations to each of the two streams, and merging together again. I am trying to share the observable between two subscribers using Publish and Connect but each of the subscribers seems to be using a separate stream. That is, in the example below, I see "Doing an expensive operation" printed once for each item in the stream for both of the subscribers. (Imagine the expensive operation as being something that should happen only once between all subscribers, as such I am trying to reuse the stream.) I have used Publish and Connect to try and share the merged observable with both subscribers, but it seems to have the wrong effect.

Example with the issue:

var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
    // Converting to strings is an expensive operation
    Console.WriteLine("Doing an expensive operation");
    return string.Format("#{0}", i);
});

var a = expensive.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = expensive.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });

var connectable = Observable.Merge(a, b).Publish();
connectable.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
connectable.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));
connectable.Connect();

I see the following output:

Doing expensive operation
Doing expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing expensive operation
Doing expensive operation
Subscriber B got: { Source = B, Value = #1 }

(Output continues, truncated for brevity.)

How can I share the observable with both subscribers?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You have published the wrong observable.

With the current code you are merging and then publishing like this Observable.Merge(a, b).Publish();. Now since a & b are defined against expensive you still get two subscriptions to expensive.

The subscriptions create these pipelines:

Original

You can see this if you take out the .Publish(); from your code. The output becomes:

Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }

This creates these pipelines:

No Publish

So, by shifting the .Publish() back up to expensive you eliminate the problem. That's where you really needed it because it is the expensive operation after all.

This is the code you needed:

var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
    // Converting to strings is an expensive operation
    Console.WriteLine("Doing an expensive operation");
    return string.Format("#{0}", i);
});

var connectable = expensive.Publish();

var a = connectable.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = connectable.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });

var merged = Observable.Merge(a, b);

merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));

connectable.Connect();

That nicely produces the following:

Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }
Doing an expensive operation
Subscriber A got: { Source = A, Value = #2 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #3 }

And this gives you these pipelines:

Expensive Publish

You can see from this image that there is still duplication. That's fine because these parts aren't expensive.

The duplication is actually important. Shared parts of the pipelines make their endpoints vulnerable to errors and thus to early termination. The less sharing the better for the robustness of the code. It's only when you have an expensive operation that you should worry about publishing. Otherwise you should just let the pipelines be themselves.

Here's an example to show it. If you don't have a published source then, if one source produces an error then it doesn't pull down all of the pipelines.

Separate

But once you introduce a shared observable then a single error will bring down all of the pipelines.

Shared


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...