ReactiveX

用于具有可观察流的异步编程的API。(An API for asynchronous programming with observable streams.)

Github星跟蹤圖

ReactiveX是通过使用可观察序列来组合异步和基于事件的程序的库。

它扩展了观察者模式以支持数据和/或事件的序列,并添加了允许您以声明方式组合序列的操作符,同时提取对低级线程,同步,线程安全,并发数据结构和非同步数据结构等问题的关注,阻塞 I/O。

观察者通过成为访问多个项目的异步序列的理想方式来填补空白

single items multiple items
synchronous T getData() Iterable<T> getData()
asynchronous Future<T> getData() Observable<T> getData()

它有时被称为 “functional reactive programming”(“函数响应式编程”),但这是一个不正确的词。ReactiveX 可以是函数式的,并且它可以是响应式的,但是“函数响应式编程”是不同的物种。一个主要的不同之处在于,函数响应式编程随时间不断变化的值进行操作,而ReactiveX对随时间发射的离散值进行操作。 (有关函数响应式编程的更精确信息,请参阅Conal Elliott的工作。)

ReactiveX是一个多语言的实现 -- ReactiveX目前以各种语言实现,以尊重这些语言习惯的方式,越来越多的语言正在被快速地添加。

如:RxJava、RxJS、RxKotlin、Rx.NET、RxNetty、RxPY、RxScala、RxSwing、Android等等。

  • RxJava - JVM的活动扩展 - 用于使用Java VM的可观察序列编写异步和基于事件的程序的库;
  • RxJS -用于JavaScript的反应式编程库http://reactivex.io/rxjs;
  • RxAndroid - Reactive Extensions for Android;
  • ...

主要指標

概覽
名稱與所有者ReactiveX/RxJava
主編程語言Java
編程語言Shell (語言數: 3)
平台
許可證Apache License 2.0
所有者活动
創建於2013-01-08 20:11:48
推送於2025-10-13 07:24:54
最后一次提交
發布數283
最新版本名稱v3.1.12 (發布於 )
第一版名稱0.1.2 (發布於 2013-02-04 23:55:21)
用户参与
星數48.4k
關注者數2.2k
派生數7.6k
提交數6.3k
已啟用問題?
問題數3174
打開的問題數13
拉請求數3346
打開的拉請求數1
關閉的拉請求數708
项目设置
已啟用Wiki?
已存檔?
是復刻?
已鎖定?
是鏡像?
是私有?

RxJava: Reactive Extensions for the JVM


codecov.io
Maven Central

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

Version 2.x (Javadoc)

  • single dependency: Reactive-Streams
  • continued support for Java 6+ & Android 2.3+
  • performance gains through design changes learned through the 1.x cycle and through Reactive-Streams-Commons research project.
  • Java 8 lambda-friendly API
  • non-opinionated about source of concurrency (threads, pools, event loops, fibers, actors, etc)
  • async or synchronous execution
  • virtual time and schedulers for parameterized concurrency

Version 2.x and 1.x will live side-by-side for several years. They will have different group ids (io.reactivex.rxjava2 vs io.reactivex) and namespaces (io.reactivex vs rx).

See the differences between version 1.x and 2.x in the wiki article What's different in 2.0. Learn more about RxJava in general on the Wiki Home.

Version 1.x

The 1.x version is end-of-life as of March 31, 2018. No further development, support, maintenance, PRs and updates will happen. The Javadoc of the very last version, 1.3.8, will remain accessible.

Getting started

Setting up the dependency

The first step is to include RxJava 2 into your project, for example, as a Gradle compile dependency:

implementation "io.reactivex.rxjava2:rxjava:2.x.y"

(Please replace x and y with the latest version numbers: Maven Central
)

Hello World

The second is to write the Hello World program:

package rxjava.examples;

import io.reactivex.*;

public class HelloWorld {
    public static void main(String[] args) {
        Flowable.just("Hello world").subscribe(System.out::println);
    }
}

If your platform doesn't support Java 8 lambdas (yet), you have to create an inner class of Consumer manually:

import io.reactivex.functions.Consumer;

Flowable.just("Hello world")
  .subscribe(new Consumer<String>() {
      @Override public void accept(String s) {
          System.out.println(s);
      }
  });

Base classes

RxJava 2 features several base classes you can discover operators on:

Some terminology

Upstream, downstream

The dataflows in RxJava consist of a source, zero or more intermediate steps followed by a data consumer or combinator step (where the step is responsible to consume the dataflow by some means):

source.operator1().operator2().operator3().subscribe(consumer);

source.flatMap(value -> source.operator1().operator2().operator3());

Here, if we imagine ourselves on operator2, looking to the left towards the source, is called the upstream. Looking to the right towards the subscriber/consumer, is called the downstream. This is often more apparent when each element is written on a separate line:

source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer)

Objects in motion

In RxJava's documentation, emission, emits, item, event, signal, data and message are considered synonyms and represent the object traveling along the dataflow.

Backpressure

When the dataflow runs through asynchronous steps, each step may perform different things with different speed. To avoid overwhelming such steps, which usually would manifest itself as increased memory usage due to temporary buffering or the need for skipping/dropping data, a so-called backpressure is applied, which is a form of flow control where the steps can express how many items are they ready to process. This allows constraining the memory usage of the dataflows in situations where there is generally no way for a step to know how many items the upstream will send to it.

In RxJava, the dedicated Flowable class is designated to support backpressure and Observable is dedicated for the non-backpressured operations (short sequences, GUI interactions, etc.). The other types, Single, Maybe and Completable don't support backpressure nor should they; there is always room to store one item temporarily.

Assembly time

The preparation of dataflows by applying various intermediate operators happens in the so-called assembly time:

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;

At this point, the data is not flowing yet and no side-effects are happening.

Subscription time

This is a temporary state when subscribe() is called on a flow that establishes the chain of processing steps internally:

flow.subscribe(System.out::println)

This is when the subscription side-effects are triggered (see doOnSubscribe). Some sources block or start emitting items right away in this state.

Runtime

This is the state when the flows are actively emitting items, errors or completion signals:


Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

Practically, this is when the body of the given example above executes.

Simple background computation

One of the common use cases for RxJava is to run some computation, network request on a background thread and show the results (or error) on the UI thread:

import io.reactivex.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

This style of chaining methods is called a fluent API which resembles the builder pattern. However, RxJava's reactive types are immutable; each of the method calls returns a new Flowable with added behavior. To illustrate, the example can be rewritten as follows:

Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

Typically, you can move computations or blocking IO to some other thread via subscribeOn. Once the data is ready, you can make sure they get processed on the foreground or GUI thread via observeOn.

Schedulers

RxJava operators don't work with Threads or ExecutorServices directly but with so called Schedulers that abstract away sources of concurrency behind a uniform API. RxJava 2 features several standard schedulers accessible via Schedulers utility class.

  • Schedulers.computation(): Run computation intensive work on a fixed number of dedicated threads in the background. Most asynchronous operator use this as their default Scheduler.
  • Schedulers.io(): Run I/O-like or blocking operations on a dynamically changing set of threads.
  • Schedulers.single(): Run work on a single thread in a sequential and FIFO manner.
  • Schedulers.trampoline(): Run work in a sequential and FIFO manner in one of the participating threads, usually for testing purposes.

These are available on all JVM platforms but some specific platforms, such as Android, have their own typical Schedulers defined: AndroidSchedulers.mainThread(), SwingScheduler.instance() or JavaFXSchedulers.gui().

In addition, there is option to wrap an existing Executor (and its subtypes such as ExecutorService) into a Scheduler via Schedulers.from(Executor). This can be used, for example, to have a larger but still fixed pool of threads (unlike computation() and io() respectively).

The Thread.sleep(2000); at the end is no accident. In RxJava the default Schedulers run on daemon threads, which means once the Java main thread exits, they all get stopped and background computations may never happen. Sleeping for some time in this example situations lets you see the output of the flow on the console with time to spare.

Concurrency within a flow

Flows in RxJava are sequential in nature split into processing stages that may run concurrently with each other:

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

This example flow squares the numbers from 1 to 10 on the computation Scheduler and consumes the results on the "main" thread (more precisely, the caller thread of blockingSubscribe). However, the lambda v -> v * v doesn't run in parallel for this flow; it receives the values 1 to 10 on the same computation thread one after the other.

Parallel processing

Processing the numbers 1 to 10 in parallel is a bit more involved:

Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

Practically, parallelism in RxJava means running independent flows and merging their results back into a single flow. The operator flatMap does this by first mapping each number from 1 to 10 into its own individual Flowable, runs them and merges the computed squares.

Note, however, that flatMap doesn't guarantee any order and the end result from the inner flows may end up interleaved. There are alternative operators:

  • concatMap that maps and runs one inner flow at a time and
  • concatMapEager which runs all inner flows "at once" but the output flow will be in the order those inner flows were created.

Alternatively, the Flowable.parallel() operator and the ParallelFlowable type help achieve the same parallel processing pattern:

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

Dependent sub-flows

flatMap is a powerful operator and helps in a lot of situations. For example, given a service that returns a Flowable, we'd like to call another service with values emitted by the first service:

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

inventorySource.flatMap(inventoryItem ->
    erp.getDemandAsync(inventoryItem.getId())
    .map(demand 
        -> System.out.println("Item " + inventoryItem.getName() + " has demand " + demand));
  )
  .subscribe();

Continuations

Sometimes, when an item has become available, one would like to perform some dependent computations on it. This is sometimes called continuations and, depending on what should happen and what types are involved, may involve various operators to accomplish.

Dependent

The most typical scenario is to given a value, invoke another service, await and continue with its result:

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

It is often the case also that later sequences would require values from earlier mappings. This can be achieved by moving the outer flatMap into the inner parts of the previous flatMap for example:

service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)

Here, the original value will be available inside the inner flatMap, courtesy of lambda variable capture.

Non-dependent

In other scenarios, the result(s) of the first source/dataflow is irrelevant and one would like to continue with a quasi independent another source. Here, flatMap works as well:

Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
  .subscribe(System.out::println, Throwable::printStackTrace);

however, the continuation in this case stays Observable instead of the likely more appropriate Single. (This is understandable because
from the perspective of flatMapSingle, sourceObservable is a multi-valued source and thus the mapping may result in multiple values as well).

Often though there is a way that is somewhat more expressive (and also lower overhead) by using Completable as the mediator and its operator andThen to resume with something else:

sourceObservable
  .ignoreElements()           // returns Completable
  .andThen(someSingleSource)
  .map(v -> v.toString())

The only dependency between the sourceObservable and the someSingleSource is that the former should complete normally in order for the latter to be consumed.

Deferred-dependent

Sometimes, there is an implicit data dependency between the previous sequence and the new sequence that, for some reason, was not flowing through the "regular channels". One would be inclined to write such continuations as follows:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.just(count.get()))
  .subscribe(System.out::println);

Unfortunately, this prints 0 because Single.just(count.get()) is evaluated at assembly time when the dataflow hasn't even run yet. We need something that defers the evaluation of this Single source until runtime when the main source completes:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.defer(() -> Single.just(count.get())))
  .subscribe(System.out::println);

or

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.fromCallable(() -> count.get()))
  .subscribe(System.out::println);

Type conversions

Sometimes, a source or service returns a different type than the flow that is supposed to work with it. For example, in the inventory example above, getDemandAsync could return a Single<DemandRecord>. If the code example is left unchanged, this will result in a compile time error (however, often with misleading error message about lack of overload).

In such situations, there are usually two options to fix the transformation: 1) convert to the desired type or 2) find and use an overload of the specific operator supporting the different type.

Converting to the desired type

Each reactive base class features operators that can perform such conversions, including the protocol conversions, to match some other type. The following matrix shows the available conversion options: