Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
407 views
in Technique[技术] by (71.8m points)

spring - How to avoid repeating maping operations for similar subscribers in Sping Reactor?

I have one publisher that emits strings and many subscribers that may use the same mapping function for creating models with different filters.

Publisher:

val publisher: Flux<String> = ...

Subscriber#1

val sub1 = publisher.map{veryExpensiveConverter.convert(it)}
                    .filter(it.metric<10)

Subscriber#2

val sub2 = publisher.map{veryExpensiveConverter.convert(it)}
                    .filter(it.metric>5)

Subscriber#3

val sub3 = sub2.map{cheapConverter.convert(it)}
                    .filter(it.metric>8)

Subscriber#4

val sub4 = sub3.map{yetAnotherConverter.convert(it)}
                    .filter(it.metric>80)

In the end I subscribe on all fluxes

Flux.merge(sub1, sub2, sub3, ..., subn)
     .map{//some logic for following data of subscribers}
     .subscribe()

The problem: veryExpensiveConverter is executed several time for the same published record for each subscriber. The execution flow looks

Input1 -> veryExpensiveConverter -> filter1 -> output1
       -> veryExpensiveConverter -> filter2 -> output2
       -> veryExpensiveConverter -> cheapConverter -> filter3 -> output3

I would like too have

Input1 -> veryExpensiveConverter -> filter1 -> output1  
                                 -> filter2 -> output2
                                 -> cheapConverter -> filter3 -> output3

What's a pattern is the most suitable for avoiding execution of the same mapping for each subscriber?

question from:https://stackoverflow.com/questions/65877264/how-to-avoid-repeating-maping-operations-for-similar-subscribers-in-sping-reacto

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You can .share() at some level to ensure that every subscription to that shared part only trigger a single subscription above it.

You can also look into .publish().xxx() methods for more advanced auto-triggers (.share() will start its source as soon as the first subscription comes in).

Something like this:

val expensiveDoneOnce = publisher
    .map{veryExpensiveConverter.convert(it)}
    .publish()
    .refCount(2)
val sub1 = expensiveDoneOnce.filter(it.metric < 10)
val sub2 = expensiveDoneOnce.filter(it.metric > 5)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...