Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
889 views
in Technique[技术] by (71.8m points)

project reactor - Trying to manage multiple Flux/Mono, starting a few of them before others, and combining some of them, and getting a bit lost

I have a module that accepts entity IDs and a "resolution type" as parameters, and then gathers data (primarily) asynchronously via multiple operations that return Fluxes. The resolution is broken into multiple (primarily, again) asynchronous operations that each work on gathering different data types that contribute to the resolution. I say "primarily" asynchronously because some of the resolution types require some preliminary operation(s) that must happen synchronously to provide information for the remaining asynchronous Flux operations of the resolution. Now, while this synchronous operation is taking place, at least a portion of the overall asynchronous resolution operation can begin. I would like to start these Flux operations while the synchronous operations are taking place. Then, once the synchronous data has been resolved, I can get each Flux for the remaining operations underway. Some resolution types will have all Flux operations returning data, while others gather less information, and some of the Flux operations will remain empty. The resolution operations are time-expensive, and I would like to be able to start some Flux operations earlier so that I can compress the time a bit -- that is quite important for what I am accomplishing. So eager subscription is ideal, as long as I can guarantee that I will not miss any item emission.

With that in mind, how can I:

  1. Create a "holder" or a "container" for each of the Flux operations that will be needed to resolve everything, and initialize them as empty (like Flux.empty())
  2. Add items to whatever I can create in item 1 above -- it was initialized as empty, but I might want the data from one or multiple finite and asynchronous Flux operations, but I do not care to keep them separate, and they can appear as one stream when I will use collectList() on them to produce a Mono.
  3. When some of these Flux operations should start before some of the others, how can I start them, and ensure that I do not miss any data? And if I start a name resolution Flux, for example, can I add to it, as in item 2 above? Let's say that I want to start retrieving some data, then perform a synchronous operation, and then I create another name resolution Flux from the result of the synchronous operation, can I append this new Flux to the original name resolution Flux, since it will be returning the same data type? I am aware of Flux.merge(), but it would be convenient to work with a single Flux reference that I can keep adding to, if possible.

Will I need a collection object, like a list, and then use a merge operation? Initially, I thought about using a ConnectableFlux, until I realized that it is for connecting multiple subscribers, rather than for connecting multiple publishers. Connecting multiple publishers is what I think would be a good answer for my need, unless this is a common pattern that can be handled in a better way.

I have only been doing reactive programming for a short time, so please be patient with the way I am trying to describe what I want to do. If I can better clarify my intentions, please let me know where I have been unclear, and I will gladly attempt to clear it up. Thanks in advance for your time and help!

EDIT: Here is the final Kotlin version, nice and concise:

private val log = KotlinLogging.logger {}

class ReactiveDataService {
    private val createMono: () -> Mono<List<Int>> = {
        Flux.just(9, 8, 7)
            .flatMap {
                Flux.fromIterable(List(it) { Random.nextInt(0, 100) })
                    .parallel()
                    .runOn(Schedulers.boundedElastic())
            }
            .collectList()
            .cache()
    }

    private val processResults: (List<String>, List<String>) -> String =
        { d1, d2 -> "
downstream 1: $d1
downstream 2: $d2" }

    private val convert: (List<Int>, Int) -> Flux<String> =
        { data, multiplier -> Flux.fromIterable(data.map { String.format("%3d", it * multiplier) }) }

    fun doQuery(): String? {
        val mono = createMono()
        val downstream1 = mono.flatMapMany { convert(it, 1) }.collectList()
        val downstream2 = mono.flatMapMany { convert(it, 2) }.collectList()
        return Mono.zip(downstream1, downstream2, processResults).block()
    }
}

fun main() {
    val service = ReactiveDataService()
    val start = System.currentTimeMillis()
    val result = service.doQuery()
    log.info("{}
Total time: {}ms", result, System.currentTimeMillis() - start)
}

And the output:

downstream 1: [ 66,  39,  40,  88,  97,  35,  70,  91,  27,  12,  84,  37,  35,  15,  45,  27,  85,  22,  55,  89,  81,  21,  43,  62]
downstream 2: [132,  78,  80, 176, 194,  70, 140, 182,  54,  24, 168,  74,  70,  30,  90,  54, 170,  44, 110, 178, 162,  42,  86, 124]
Total time: 209ms

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

It sounds like an ideal job for reactor. The synchronous calls can be wrapped to return as Fluxes (or Monos) using an elastic scheduler to allow them to be executed in parallel. Then using the various operators you can compose them all together to make a single Flux which represents the result. Subscribe to that Flux and the whole machine will kick off.

I think you need to use Mono.flatMapMany instead of Flux.usingWhen.

public class ReactiveDataService {
  public static void main(final String[] args) {
    ReactiveDataService service = new ReactiveDataService();
    service.doQuery();
  }

  private Flux<Integer> process1(final List<Integer> data) {
    return Flux.fromIterable(data);
  }

  private Flux<Integer> process2(final List<Integer> data) {
    return Flux.fromIterable(data).map(i -> i * 2);
  }

  private String process3(List<Integer> downstream1, List<Integer> downstream2) {
    System.out.println("downstream 1: " + downstream1);
    System.out.println("downstream 2: " + downstream2);
    return "Done";
  }

  private void doQuery() {
    final Mono<List<Integer>> mono =
        Flux.just(9, 8, 7)
            .flatMap(
                limit ->
                    Flux.fromStream(
                            Stream.generate(() -> new Random().nextInt(100))
                                .peek(
                                    i -> {
                                      try {
                                        Thread.sleep(500);
                                      } catch (InterruptedException ignored) {
                                      }
                                    })
                                .limit(limit))
                        .parallel()
                        .runOn(Schedulers.boundedElastic()))
            .collectList()
            .cache();
    final Mono<List<Integer>> downstream1 = mono.flatMapMany(this::process1).collectList();
    final Mono<List<Integer>> downstream2 = mono.flatMapMany(this::process2).collectList();
    Mono.zip(downstream1, downstream2, this::process3).block();
  }
}


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...