开源软件名称:ReactiveX/RxJava开源软件地址:https://github.com/ReactiveX/RxJava开源编程语言:Java 99.9%开源软件介绍:RxJava: Reactive Extensions for the JVMRxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences. It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures. Javadoc)Version 3.x (
Learn more about RxJava in general on the Wiki Home. Version 2.xThe 2.x version is end-of-life as of February 28, 2021. No further development, support, maintenance, PRs and updates will happen. The Javadoc of the very last version, 2.2.21, will remain accessible. Version 1.xThe 1.x version is end-of-life as of March 31, 2018. No further development, support, maintenance, PRs and updates will happen. The Javadoc of the very last version, 1.3.8, will remain accessible. Getting startedSetting up the dependencyThe first step is to include RxJava 3 into your project, for example, as a Gradle compile dependency: implementation "io.reactivex.rxjava3:rxjava:3.x.y" (Please replace Hello WorldThe second is to write the Hello World program: package rxjava.examples;
import io.reactivex.rxjava3.core.*;
public class HelloWorld {
public static void main(String[] args) {
Flowable.just("Hello world").subscribe(System.out::println);
}
} Note that RxJava 3 components now live under Base classesRxJava 3 features several base classes you can discover operators on:
Some terminologyUpstream, downstreamThe dataflows in RxJava consist of a source, zero or more intermediate steps followed by a data consumer or combinator step (where the step is responsible to consume the dataflow by some means): source.operator1().operator2().operator3().subscribe(consumer);
source.flatMap(value -> source.operator1().operator2().operator3()); Here, if we imagine ourselves on source
.operator1()
.operator2()
.operator3()
.subscribe(consumer) Objects in motionIn RxJava's documentation, emission, emits, item, event, signal, data and message are considered synonyms and represent the object traveling along the dataflow. BackpressureWhen the dataflow runs through asynchronous steps, each step may perform different things with different speed. To avoid overwhelming such steps, which usually would manifest itself as increased memory usage due to temporary buffering or the need for skipping/dropping data, so-called backpressure is applied, which is a form of flow control where the steps can express how many items are they ready to process. This allows constraining the memory usage of the dataflows in situations where there is generally no way for a step to know how many items the upstream will send to it. In RxJava, the dedicated Assembly timeThe preparation of dataflows by applying various intermediate operators happens in the so-called assembly time: Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
; At this point, the data is not flowing yet and no side-effects are happening. Subscription timeThis is a temporary state when flow.subscribe(System.out::println) This is when the subscription side-effects are triggered (see RuntimeThis is the state when the flows are actively emitting items, errors or completion signals: Observable.create(emitter -> {
while (!emitter.isDisposed()) {
long time = System.currentTimeMillis();
emitter.onNext(time);
if (time % 2 != 0) {
emitter.onError(new IllegalStateException("Odd millisecond!"));
break;
}
}
})
.subscribe(System.out::println, Throwable::printStackTrace); Practically, this is when the body of the given example above executes. Simple background computationOne of the common use cases for RxJava is to run some computation, network request on a background thread and show the results (or error) on the UI thread: import io.reactivex.rxjava3.schedulers.Schedulers;
Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- wait for the flow to finish This style of chaining methods is called a fluent API which resembles the builder pattern. However, RxJava's reactive types are immutable; each of the method calls returns a new Flowable<String> source = Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); Typically, you can move computations or blocking IO to some other thread via SchedulersRxJava operators don't work with
These are available on all JVM platforms but some specific platforms, such as Android, have their own typical In addition, there is an option to wrap an existing The Concurrency within a flowFlows in RxJava are sequential in nature split into processing stages that may run concurrently with each other: Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println); This example flow squares the numbers from 1 to 10 on the computation Parallel processingProcessing the numbers 1 to 10 in parallel is a bit more involved: Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println); Practically, parallelism in RxJava means running independent flows and merging their results back into a single flow. The operator Note, however, that
Alternatively, the Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println); Dependent sub-flows
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
inventorySource
.flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
.map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
.subscribe(System.out::println); ContinuationsSometimes, when an item has become available, one would like to perform some dependent computations on it. This is sometimes called continuations and, depending on what should happen and what types are involved, may involve various operators to accomplish. DependentThe most typical scenario is to given a value, invoke another service, await and continue with its result: service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next)) It is often the case also that later sequences would require values from earlier mappings. This can be achieved by moving the outer service.apiCall()
.flatMap(value ->
service.anotherApiCall(value)
.flatMap(next -> service.finalCallBoth(value, next))
) Here, the original Non-dependentIn other scenarios, the result(s) of the first source/dataflow is irrelevant and one would like to continue with a quasi independent another source. Here, Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
.subscribe(System.out::println, Throwable::printStackTrace); however, the continuation in this case stays Often though there is a way that is somewhat more expressive (and also lower overhead) by using sourceObservable
.ignoreElements() // returns Completable
.andThen(someSingleSource)
.map(v -> v.toString()) The only dependency between the Deferred-dependentSometimes, there is an implicit data dependency between the previous sequence and the new sequence that, for some reason, was not flowing through the "regular channels". One would be inclined to write such continuations as follows: AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.just(count.get()))
.subscribe(System.out::println); Unfortunately, this prints AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.defer(() -> Single.just(count.get())))
.subscribe(System.out::println); or AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.fromCallable(() -> count.get()))
.subscribe(System.out::println); Type conversionsSometimes, a source or service returns a different type than the flow that is supposed to work with it. For example, in the inventory example above, In such situations, there are usually two options to fix the transformation: 1) convert to the desired type or 2) find and use an overload of the specific operator supporting the different type. Converting to the desired typeEach reactive base class features operators that can perform such conversions, including the protocol conversions, to match some other type. The following matrix shows the available conversion options:
1: When turning a multi-valued source into a single-valued source, one should decide which of the many source values should be considered as the result. 2: Turning an 3: When there is only (at most) one source item, there is no problem with backpressure as it can be always stored until the downstream is ready to consume. Using an overload with the desired typeMany frequently used operator has overloads that can deal with the other types. These are usually named with the suffix of the target type:
The reason these operators have a suffix instead of simply having the same name with different signature is type erasure. Java doesn't consider signatures such as Operator naming conventionsNaming in programming is one of the hardest things as names are expected to be not long, expressive, capturing and easily memorable. Unfortunately, the target language (and pre-existing conventions) may not give too much help in this regard (unusable keywords, type erasure, type ambiguities, etc.). Unusable keywordsIn the original Rx.NET, the operator that emits a single item and then completes is called Type erasureMany operators that expect the user to provide some function returning a reactive type can't be overloaded because the type erasure around a Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper) Type ambiguitiesEven though certain operators have no problems from type erasure, their signature may turn up being ambiguous, especially if one uses Java 8 and lambdas. For example, there are several overloads of Flowable<T> concatWith(Publisher<? extends T> other);
Flowable<T> concatWith(SingleSource<? extends T> other); Both someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace); Unfortunately, this approach doesn't work and the example does not print The user in such situations probably wanted to defer some computation until the someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace); Sometimes, a suffix is added to avoid logical ambiguities that may compile but produce the wrong type in a flow: Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> mergeArray(Publisher<? extends T>... sources); This can get also ambiguous when functional interface types get involved as the type argument |