likes
comments
collection
share

rxjava3.0学习笔记-基础知识

作者站长头像
站长
· 阅读数 2

rxjava3.0学习笔记-基础知识

官方github地址:github.com/ReactiveX/R…

RxJava是一个响应式扩展的Java虚拟机实现:一个通过使用可观察序列来组合异步和基于事件的程序的库。

它扩展了观察者模式,以支持数据/事件序列,并添加了操作符,允许您以声明的方式组合序列,同时抽象出对低级线程、同步、线程安全和并发数据结构的关注。

1.设置依赖项

implementation "io.reactivex.rxjava3:rxjava:3.1.1"

2.基本类

io.reactivex.rxjava3.core.Flowable

0...N流,支持响应式流和背压

io.reactivex.rxjava3.core.Observable

0...N流,不支持背压

io.reactivex.rxjava3.core.Single

只有一个item数据或只有一个error的流

io.reactivex.rxjava3.core.Completable

没有items的流,只有完成或者错误的信号的流

io.reactivex.rxjava3.core.Maybe

没有items的流,只有一个item或者只有一个error的流。

3.一些专业术语

3.1 Upstream, downstream

​ RxJava中的数据流包括一个源、零个或多个中间步骤,后面跟着一个数据消费者或组合步骤(该步骤以某种方式负责消费数据流):

source.operator1().operator2().operator3().subscribe(consumer);
source.flatMap(value -> source.operator1().operator2().operator3());

​ 这里,假设我们现在在operator2上,向左看源的方向被称为Upstream。向右看向订阅者/消费者称为downstream。当每个元素都写在单独的一行时,这通常更明显:

source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer)

3.2 Objects in motion

​ 在RxJava的文档中,emissionemitsitemeventsignaldatamessage 都被认为是同义词,表示沿着数据流移动的对象。

3.3 背压 Backpressure

​ 当数据流通过异步步骤运行时,每个步骤可能以不同的速度执行不同的操作。为了避免过度的这些步骤,这通常会表现为由于临时缓冲或跳过/丢弃数据的需要而增加内存使用,所谓的backpressure ,这是一种流控制形式,步骤可以表示它们准备处理多少项。这允许在通常没有办法让步骤知道上游将向它发送多少项的情况下约束数据流的内存使用。

​ 在RxJava中,专用的Flowable类被指定为支持backpressure ,而Observable则专门用于非backpressure 操作(短序列、GUI交互等)。其他类型,Single, Maybe和complete不支持反压,也不应该支持;

3.4 组装时间 Assembly time

通过应用各种中间操作符来准备数据流被称为 组装时间 Assembly time。

在这一时间点上,数据还没有流动,没有副作用发生。

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;

3.5 订阅时间 Subscription time

当在内部建立处理步骤链的流上调用subscribe()时,这是一个临时状态:

flow.subscribe(System.out::println)

这是当订阅副作用被触发时(参见doOnSubscribe)。在这种状态下,一些源会立即阻塞或开始发射项目。

3.6 运行时间 Runtime

​ 这是流主动发出items、errors 或completion信号时的状态:

Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

实际上,这是上面给定示例的主体执行的时间。

4.后台线程 异步执行 Simple background computation

​ RxJava的一个常见用例是在后台线程上运行一些计算,网络请求,并在UI线程上显示结果(或错误):

import io.reactivex.rxjava3.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

​ 这种类型的链接方法被称为fluent API,它类似于构建器模式。然而,RxJava的响应式类型是不可变的,每个方法调用都返回一个带有添加行为的新Flowable。为了说明问题,可以将示例重写如下:

Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);

​ 通常,您可以通过subscribeOn将计算或阻塞IO移动到其他线程。一旦数据准备好了,您就可以通过observeOn确保它们在前台或GUI线程上得到处理。

5.调度器 Schedulers

​ RxJava操作符不能直接与线程或ExecutorServices一起工作,而是使用的调度器,在统一的API后面抽象出并发源。RxJava 3提供了几个可以通过schedulers实用程序类访问的标准调度器。

Schedulers.computation():在后台固定数量的专用线程上运行计算密集型的工作。大多数异步操作符将其作为默认的调度程序。

**Schedulers.io():**在一组动态变化的线程上运行类似I/ o或阻塞的操作。好像没有用到线程池,需要小心线程太多。

Schedulers.single():以顺序的FIFO方式在单个线程上运行工作。

**Schedulers.trampoline():**在一个参与线程中以顺序的FIFO方式运行工作,通常用于测试目的。

​ 除了某些特定的平台外,这些功能在所有JVM平台上都可用,例如Android,它们定义了自己的典型调度程序

AndroidSchedulers.mainThread(), SwingScheduler.instance() or JavaFXSchedulers.gui().

此外,还可以通过Schedulers.from(Executor)将现有的Executor(及其子类型,如ExecutorService)包装到Scheduler中。例如,这可以用于拥有一个更大但仍然固定的线程池(分别与**computation()io()**不同)。

thread . sleep (2000);最后并非偶然。在RxJava中,默认的调度程序运行在守护线程上,这意味着一旦Java主线程退出,它们都被停止了,后台计算可能永远不会发生。在这个示例的情况下,休眠一段时间可以让您看到控制台上流的输出,并且有多余的时间。

下面示例说明每一种Schedulers,在一个循环里,有可能生成多少个线程。

public class _04Schedulers {

    private static final List<Long> threadIds = Collections.synchronizedList(new ArrayList<>());
    private static int maxThreadCount = 0;

    public static void main(String[] args) throws Throwable {
        System.out.println("start");
        threadIds.clear();
        maxThreadCount = 0;

        //线程池 最大线程数是10
//        Scheduler scheduler =  Schedulers.from(Executors.newFixedThreadPool(10));

        //computation最大线程数8
//        Scheduler scheduler =  Schedulers.computation();

        //io 最大线程数100
//        Scheduler scheduler =  Schedulers.io();

        //single 最大线程数1
        Scheduler scheduler =  Schedulers.single();

        for (int i = 0; i < 100; i++) {
            testSchedulers(scheduler);
        }
        Thread.sleep(5000);
        System.out.println("最大线程数 "+maxThreadCount);
        System.out.println("end");
    }

    private static void testSchedulers(Scheduler scheduler) {

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                Thread.sleep(100);

                Long threadId = Thread.currentThread().getId();
                if (!threadIds.contains(threadId)) {
                    threadIds.add(threadId);
                    if (threadIds.size() != maxThreadCount) {
                        maxThreadCount = threadIds.size();
                        emitter.onNext(maxThreadCount);
                    }
                }
                emitter.onComplete();
            }
        }).subscribeOn(scheduler)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Throwable {
                        System.out.println(integer);
                    }
                });

    }
}

6.流中的并发性 Concurrency within a flow

RxJava中的流在本质上是顺序的,它们被划分为可以彼此并发运行的处理阶段:

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

这个示例流将计算Scheduler上从1到10的数字平方,并在“主线程”上消费结果(更准确地说,是blockingSubscribe的调用线程),然而,v -> v * v对于这个流并不并行运行;它在同一个计算线程上一个接一个地接收值1到10。

上面Schedulers.computation()只会创建一个线程

7.并行处理 Parallel processing

并行处理数字1到10有点复杂:

Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

实际上,RxJava中的并行性意味着运行独立的流,并将它们的结果合并到单个流中。运算符flatMap首先将从1到10的每个数字映射到它自己的Flowable中,运行它们并合并计算出的平方。但是,请注意,flatMap不保证任何顺序,来自内部流的项可能会交错结束。

有一些可供选择的操作符:

concatMap 每次映射并运行一个内部流

concatMapEager 它“一次”运行所有内部流,但输出流将按照这些内部流创建的顺序运行。

或者,Flowable.parallel()操作符和ParallelFlowable类型帮助实现相同的并行处理模式:

既保证了10个并发处理,也保证了输出是顺序的。

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

8.Dependent sub-flows

flatMap是一个强大的操作符,在很多情况下都有帮助。例如,给定一个返回Flowable的服务,我们想用第一个服务发出的值来调用另一个服务。

后面的流需要前一个流的数据,可以用flatMap。

public static void main(String[] args) throws InterruptedException {
    //例如第一个网络请求,
    Flowable<Long> inventorySource = Flowable.just(1L);
    //
    inventorySource.flatMap(new Function<Long, Publisher<String>>() {
        @Override
        public Publisher<String> apply(Long aLong) throws Throwable {
            System.out.println("ThreadId:"+Thread.currentThread().getId());
            //依赖第一个请求的第二个请求。
            return Flowable.just(aLong.toString()+"_flatMap");
        }
    }).map(new Function<String, String>() {
        @Override
        public String apply(String string) throws Throwable {
            System.out.println("ThreadId:"+Thread.currentThread().getId());
            return string+"_map";
        }
    }).subscribeOn(Schedulers.computation())
            .subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Throwable {
            System.out.println("ThreadId:"+Thread.currentThread().getId());
            System.out.println(s);
        }
    });

    Thread.sleep(1000);
}

输出。

ThreadId:12 ThreadId:12 ThreadId:12 1_flatMap_mapn().

8.1依赖 Dependent

最典型的场景是给定一个值,调用另一个服务,等待并继续执行它的结果:

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

通常情况下,后面的序列也需要来自前面映射的值,这可以通过将外层的flatMap移动到之前的flatMap的内部部分来实现。

service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)

在这里,原始值将在内部flatMap中可用,由lambda变量捕获提供。

8.2 Non-dependent

在其他场景中,第一个源/数据流的结果是不相关的,人们希望继续使用准独立的另一个源。在这里,flatMap也可以工作。

Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
  .subscribe(System.out::println, Throwable::printStackTrace);

然而,在这种情况下,继续保持Observable而不是更合适的Single。这是可以理解的,因为从flatMapSingle的角度来看,sourceObservable是一个多值源,因此映射也可能产生多个值。

但是,通常有一种更有表现力(也更低开销)的方法,即使用Completable作为中介及其操作符,然后继续使用其他东西。

sourceObservable
  .ignoreElements()           // returns Completable
  .andThen(someSingleSource)
  .map(v -> v.toString())

sourceObservable和someSingleSource之间唯一的依赖是前者必须正常完成,后者才能被消费。

8.3 延迟依赖 Deferred-dependent

有时,在前一个序列和新序列之间存在隐式的数据依赖关系,由于某种原因,没有通过“常规渠道”。人们会倾向于这样写

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.just(count.get()))
  .subscribe(System.out::println);

不幸的是,这将输出0,因为Single.just(count.get())是在装配时间时计算的,此时数据流还没有运行。我们需要一些东西来推迟这个Single source的计算,直到运行时,当 main source完成:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.defer(() -> Single.just(count.get())))
  .subscribe(System.out::println);

or

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.fromCallable(() -> count.get()))
  .subscribe(System.out::println);

8.4类型转换 Type conversions

有时,源或服务返回的类型与应该使用它的流的类型不同。例如,在上面的库存示例中,getDemandAsync可以返回一个Single。如果代码示例保持不变,这将导致编译时错误(然而,通常带有关于缺乏重载的误导性错误消息)。

在这种情况下,通常有两个选项来修复转换:1)转换为所需的类型,或2)查找并使用支持不同类型的特定操作符的重载。

每个响应式基类都有一些操作符,它们可以执行这样的转换,包括协议转换,以匹配其他类型。下列矩阵显示了可用的转换选项:

转换为所需的类型

FlowableObservableSingleMaybeCompletable
FlowabletoObservablefirst, firstOrError, single, singleOrError, last, lastOrError1firstElement, singleElement, lastElementignoreElements
ObservabletoFlowable2first, firstOrError, single, singleOrError, last, lastOrError1firstElement, singleElement, lastElementignoreElements
SingletoFlowable3toObservabletoMaybeignoreElement
MaybetoFlowable3toObservabletoSingleignoreElement
CompletabletoFlowabletoObservabletoSingletoMaybe

1.当将一个多值源转换为一个单值源时,应该决定将多个源值中的哪个值作为结果。

2.将Observable转变为Flowable需要一个额外的决定:如何处理源可观察对象潜在的不受约束的流?通过BackpressureStrategy参数或标准的Flowable操作符(如onBackpressureBuffer),有几种可用的策略(如缓冲、下降、保持最新),onBackpressureDrop, onBackpressureLatest,这也允许进一步自定义反压行为。

3.当只有(最多)一个源时,没有反压的问题,因为它可以一直储存到下游准备消耗。

使用所需类型的重载

许多常用的操作符都有可以处理其他类型的重载。它们通常以目标类型的后缀命名:

OperatorOverloads
flatMapflatMapSingle, flatMapMaybe, flatMapCompletable, flatMapIterable
concatMapconcatMapSingle, concatMapMaybe, concatMapCompletable, concatMapIterable
switchMapswitchMapSingle, switchMapMaybe, switchMapCompletable

这些操作符使用后缀而不是使用具有不同签名的相同名称的原因是类型擦除。Java并不认为诸如operator(Function<T, Single>)和operator(Function<T, Maybe>)这样的签名不同(不像c#),而且由于擦除,这两个操作符最终会成为具有相同签名的重复方法。

Operator naming conventions

在编程中,命名是最困难的事情之一,因为名称应该不是很长、很有表现力、很容易记住。不幸的是,目标语言(和预先存在的约定)可能不会在这方面提供太多帮助(不可用的关键字、类型擦除、类型歧义等)。

不能使用关键字 Unusable keywords

在原始RX.NET中,发出单个项并完成该项的操作符称为Return(T),因为Java的约定是方法名以小写字母开头,所以应该是return(T),这在Java中是一个关键字,因此不可用.因此,RxJava将这个操作符命名为just(T)。操作符Switch也存在同样的限制,必须将其命名为switchOnNext。另一个例子是Catch,它被命名为onErrorResumeNext。

类型擦除 Type erasure

许多期望用户提供返回响应式类型的函数的操作符不能重载,因为function <T, X>周围的类型擦除将这些方法签名变成重复的。RxJava选择通过添加类型作为后缀来命名这些操作符:

Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)

Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)

Type ambiguities

即使某些操作符在类型擦除方面没有问题,但它们的签名可能会出现二义性,特别是在使用Java 8和lambda时。例如,有几个concatWith的重载将各种其他响应性基类型作为参数(为了在底层实现中提供方便和性能优势)

Flowable<T> concatWith(Publisher<? extends T> other);

Flowable<T> concatWith(SingleSource<? extends T> other);

Publisher和SingleSource都以功能接口(带有一个抽象方法的类型)的形式出现,并可能鼓励用户尝试提供一个lambda表达式:

someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);

不幸的是,这种方法不起作用,本例根本没有打印出2,事实上,从2.1.10版本开始,它甚至不能编译,因为至少存在4个concatWith重载,并且编译器发现上面的代码有歧义。

在这种情况下,用户可能想推迟一些计算,直到someesource完成,因此正确的明确的操作符应该被推迟:

someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);

有时,添加后缀是为了避免可能编译但在流中产生错误类型的逻辑歧义:

Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> mergeArray(Publisher<? extends T>... sources);

当函数接口类型作为类型参数T参与进来时,这也会产生歧义。

Error handling

数据流可能会失败,此时错误会被发送给使用者。但有时,多个源可能会失败,这时我们可以选择是否等待所有源完成或失败。为了表示这种机会,许多操作符名称都带有DelayError后缀(而其他操作符的重载中带有DelayError或delayErrors boolean标志):

Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);

当然,各种后缀可以一起出现:

Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);

Base class vs base type

由于基类上有大量的静态方法和实例方法,因此可以认为基类很繁重。RxJava 3的设计很大程度上受到了响应流规范的影响,因此,库为每个响应类型提供了一个类和一个接口:

TypeClassInterfaceConsumer
0..N backpressuredFlowablePublisher1Subscriber
0..N unboundedObservableObservableSource2Observer
1 element or errorSingleSingleSourceSingleObserver
0..1 element or errorMaybeMaybeSourceMaybeObserver
0 element or errorCompletableCompletableSourceCompletableObserver

reactive.streams. publisher是外部反应流库的一部分,它是通过由活性流规范控制的标准化机制与其他活性库交互的主要类型。

接口的命名约定是将Source附加到半传统类名,没有FlowableSource,因为Publisher是由响应流库提供的(并且子类型对互操作也没有帮助)。然而,这些接口在响应流规范的意义上并不是标准的,目前仅是RxJava特定的。