likes
comments
collection
share

RxJava3源码解析

作者站长头像
站长
· 阅读数 15

RxJava的本质上可以理解为一个异步操作库,能用非常简单的逻辑去处理繁琐的异步操作库。所以,它能在一定程度上替代项目中的handler、AsyncTask等等。有的时候,开源框架光是会用是不行的,想做进一步扩展就还得了解其原理。本文旨在梳理RxJava3源码流程,如有不当之处,请尽情指出。如需了解使用方式等,点此直达

观察者模式概念

观察者设计模式是RxJava中的一个核心设计模式。举个容易理解的例子,前段时间最火的世界杯,决赛阿根廷VS法兰西。

RxJava3源码解析

无数球迷熬夜看球。这时你可以把这场决赛当作被观察者,而我们这些熬夜看球的球迷呢就是观察者,每当梅西或者姆巴佩进球的时候,球迷就会欢呼,这就类似观察者模式中数据的变化。当然,梅西球迷看到姆巴佩进球不一定会有反应,就像姆巴佩球迷看到梅西进球一样,反应不会很大(观察数据不同)。

在APP开发过程中也会发生类似情况,如果你需要关心一个对象的数据,同时页面上的UI状态也会跟这个对象所绑定,则我们在这个对象发生变化的时候,我们就需要通知所有页面去做相应改变,这就是观察者模式。

再说个不是很恰当的例子,我们对于微信公众号,当微信公众号发生消息推送的时候,我们会去关注去看他,如果对其内容感兴趣,我们会去点开它、阅读它。这也是一种观察者模式,所以可以说:如果A对象对B对象的数据十分敏感,当B对象产生变化的一瞬间,A对象要做出反应,这时候A对象就是观察者,B对象就是被观察者。同时,可以多个观察者对应一个被观察者,就像一个公众号可以有很多人关注一样。

观察者模式就是我们众多的观察者对我们的被观察者的数据高度敏感变化之后的反应,这是一种多对一的关系,多个观察者对应一个被观察者。

下面是观察者模式的类图:

RxJava3源码解析 可以说,RxJava不属于传统意义上的观察者模式,实际上是属于一种扩展的观察者模式。(本文中的RxJava使用版本是3.0)

RxJava中的观察者模式

RxJava中的观察者模式少不了以下内容:1、被观察者;2、观察者;3、订阅。

被观察者

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
        emitter.onNext("something");
    }
});

上述代码是一个最简单的被观察者,可见这一切的起始都是create(),我们先看该方法的实现:

RxJava3源码解析 其中这一行代码:

Objects.requireNonNull(source, "source is null");

作用仅是非空判断,如果是空则抛出异常,

RxJava3源码解析 关键代码还是在return那里:

return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));

在这里RxJavaPlugins.onAssembly()是全局 hook 方法,为什么这样说呢,查看其他被观察者对象声明的方法时,不难发现最后还是调用的这个方法,如:

RxJava3源码解析 RxJava3源码解析 RxJava3源码解析 所以,在这里我们重点关注此方法内部实现即可: RxJava3源码解析 这里onObservableAssembly变量默认为空,所以这里作用就是直接把source参数返回出去。(如果我没记错的话,原RxJava1.0源码此方法内直接返回传入参数,2.X修改成如此)

所以需要关注的创建过程在传入RxJavaPlugins.onAssembly()的参数中,通过ObservableCreate对象生成的自定义source:

RxJava3源码解析 RxJava3源码解析

观察者

public interface Observer<@NonNull T> {

    /**
     * Provides the {@link Observer} with the means of cancelling (disposing) the
     * connection (channel) with the {@link Observable} in both
     * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
     * @param d the {@link Disposable} instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Provides the {@link Observer} with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(@NonNull T t);

    /**
     * Notifies the {@link Observer} that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onComplete}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(@NonNull Throwable e);

    /**
     * Notifies the {@link Observer} that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@code Observable} will not call this method if it calls {@link #onError}.
     */
    void onComplete();

}

源码中Observer是个接口。创建观察者就是创建这个接口的实例,而接口中的方法也就是直接暴露给用户。

三、订阅

这里要重点关注Observable与Observer订阅的过程,都在subscribe()里。调用Observable的subscribe(),其方法源码如下:

RxJava3源码解析 前面两个Objects.requireNonNull()都是非空判断,这里无需多言,可见重点是在subscribeActual()中。但这里,此方法是个抽象方法:

RxJava3源码解析 毫无疑问,实现方法肯定在ObservableCreate中(因为subscribe()前的对象是个Observable,上面非观察者的创建流程分析后得出最终调用的是new ObservableCreate)。

RxJava3源码解析 在此方法中,

CreateEmitter<T> parent = new CreateEmitter<>(observer);

生成一个发射器,传入参数是我们自定义的观察者。

RxJava3源码解析 观察其源码会发现,CreateEmitter继承了ObservableEmitter接口,是ObservableEmitter的实现,其onNext()、onError()、onComplete()方法被调用时会调用观察者observer的同名方法。

而在创建新发射器之后、在source.subscribe(...)之前执行了observer.onSubscribe(parent);

RxJava3源码解析 此方法就是订阅的观察者接口中的onSubscribe()回调:

RxJava3源码解析 这也就是为什么在观察者的接口的几个方法中onSubscribe()会先执行的原因。随后的source.subscribe(parent); 即自定义的source订阅了CreateEmitter,而这里的自定义source即为ObservableOnSubscribe的实例,parent为CreateEmitter的实例。则我们的代码中的被观察者的ObservableOnSubscribe的subscribe()会被回调:

RxJava3源码解析

两种观察者的区别

两点不同:

在标准的观察者设计模式中,是一个“被观察者”,多个“观察者”,并且需要“被观察者”发出改变通知后,所有的“观察者”才能观察到;

在RxJava观察者设计模式中,是多个“被观察者”,一个“观察者”,并且需要 起点 和 终点 在“订阅”一次后,才发出改变通知,终点(观察者)才能观察到。

RxJava3源码解析

map 操作符源码流程

map操作符能直接对发射出来的事件进行处理并且产生新的事件,然后再次发射,下面是一个很简单的代码例子:

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {

    }
}).map(new Function<String, Bitmap>() {
    @Override
    public Bitmap apply(String s) throws Throwable {
        return null;
    }
}).subscribe(new Observer<Bitmap>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {

    }

    @Override
    public void onNext(@NonNull Bitmap bitmap) {

    }

    @Override
    public void onError(@NonNull Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

还是熟悉的链式调用,熟悉的配方。还是三个切入点:被观察者、观察者、订阅。其中被观察者构建流程已在上面讲过,这里不赘述。最终还是来到了map():

RxJava3源码解析 这里调用的全局RxJavaPlugins.onAssembly(),可见重点在方法内的参数new ObservableMap<>(this, mapper) RxJava3源码解析 这里你会发现与我们刚才说的Observable有些相似,但也有不同,一些相同的推断这里就不说了。这里.subscribe()最终还是调用ObservableMap里面的subscribeActual()。而在subscribeActual()中又是对其Observer进行了一层包装: RxJava3源码解析 而在我们的代码中(注意,这里不是源码),.map(...)之前的Observable.create(...): RxJava3源码解析 会先经过ObseravbleCreate.subscribe()再调用subscribeActual(): RxJava3源码解析 在这里最终传入的observer对象是在ObservableMap中经过subscribeActual()包装后的对象。其onSubscribe()逻辑与Observable调用时序差不多。回调回代码中经过Function接口的apply()转换对象: RxJava3源码解析 再接着发起Observer.onNext(),需要关注的点是onNext()流程。最终调用CreateEmitter发射器中的onNext()时,即observer.onNext(t),代码又会调入ObservableMap中的onNext(): RxJava3源码解析 在此输出对应泛型并回调回去。

RxJava的操作符流程知晓Observable和map流程即可,其余操作符流程都可从中举一反三。

线程切换

除了观察者模式,RxJava中另一个重要的核心点就是线程切换了。在RxJava中完成线程切换的主要是subscribeOn()和observerOn()。

subscribeOn 源码解析

作用一句话概括,给subscribeOn()上面的代码分配线程。例如下述示例:

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
        emitter.onNext("test");
    }
})
        .subscribeOn(Schedulers.io())
        .subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {

    }

    @Override
    public void onNext(@NonNull String s) {

    }

    @Override
    public void onError(@NonNull Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

代码很简单,这里将发射器发射“test”事件丢到了io线程处理。这回我们先看下Schedulers.io()中的源码。

RxJava3源码解析 又是熟悉的字眼,RxJavaPlugins。不难猜出这又是个全局hook方法,我们观察下其实现方法:

RxJava3源码解析 这里onIoHandler默认为null,实际上还是返回经过apply()类型转换接口转换的defaultScheduler对象。此方法差不多到底了,这时我们返回上一级:

RxJava3源码解析 这里的IO是个静态常量:

RxJava3源码解析 (这里看出不仅仅是IO,Schedulers.线程常量都用static final修饰了,在这个类被调用的时候就会初始化)

RxJava3源码解析 而这里的方法RxJavaPlugins.initIoScheduler()也是个hook方法。

RxJava3源码解析 除去第一行的判断非空代码,和默认为null的onInitIoHandler。可推断出最后返回的对象还是defaultScheduler,即IOTask对象。

RxJava3源码解析 IOTask是个静态内部类,实现了Supplier接口且有返回值:

RxJava3源码解析 还是一个静态内部类,最后返回的是IoScheduler()。同样的,如果不是IO策略,则对应其他的Schedulers.线程常量,最后返回的是各自的XXScheduler。

RxJava3源码解析 继续跟踪IoScheduler()。

RxJava3源码解析 顺藤摸瓜,继续跟踪this():

RxJava3源码解析 继续跟踪至start()里:

RxJava3源码解析 可看到很多线程池策略设置等,继续跟踪shutdown():

RxJava3源码解析

RxJava3源码解析 到这里你会发现,无论是用的哪一种Schedulers策略,无论最后调用到哪一种XXScheduler(),最终都会调用到线程池。到这里就可以了,如果要讲Java线程池的话,本文篇幅就不太够了。就此,Schedulers.io()解析完毕。

继续观察原代码的subscribeOn()

RxJava3源码解析

RxJava3源码解析 可以看到除了非空判断和hook方法,我们要重点关注ObservableSubscribeOn()。

RxJava3源码解析 根据之前我们对Observable(被观察者)和Observer(观察者)的源码流程的解析可知,ObservableCreate.subscribe()的实现方法是ObservableSubscribeOn()里的subscribeActual()。而subscribeActual()内的逻辑则是对传进来的观察者进行SubscribeOnObserver包装,然后将包装好后的对象传给SubscribeTask()。

RxJava3源码解析 此Task实现Runnable(),不难猜出这个对象是给线程池管理。继续看scheduler.scheduleDirect()。这里的scheduler是传进来的Scheduler.IO()策略。这时我们观察scheduleDirect():

RxJava3源码解析 此方法在Scheduler类中,IoScheduler是其子类。继续跟踪scheduleDirect():

RxJava3源码解析 createWorker()在这是个抽象方法,分布到具体子类中实现,例如IoScheduler中:

RxJava3源码解析 继续回到Scheduler的scheduleDirect()中,在createWorker()后,又是个hook方法:

RxJava3源码解析 在这里相当于是把传入进来的Runnable包装了一层,然后交给PeriodicDirectTask():

RxJava3源码解析 而PeriodicDirectTask实现了Runnablehe、Disposable等接口,不难看出其中有中断操作。

而最终这个Task交给w.schedule()处理。而w,即worker对象,在策略是Scherdulers.IO()时,则是调用IoScheduler中的worker对象的schedule():

RxJava3源码解析

RxJava3源码解析

RxJava3源码解析 顺藤摸瓜,可看到最终调用的还是JDK中的ScheduledExecutorService:

RxJava3源码解析 最后可以说,这几个方法的传递就是一个SubscribeTask任务不停的包装然后传递给线程池控制。现在回到subscribeActual()中来:

RxJava3源码解析 经过上述的推断,可理解scheduler.scheduleDirect()作用是把new SubscribeTask(parent))这个包装好的任务放入特定的线程池管理中。

RxJava3源码解析 最终,订阅。

observeOn源码解析

作用一句话概括,observeOn的作用是给observeOn()下面的代码分配线程。在源代码中我们加入observeOn(AndroidSchedulers.mainThread())的设置:

RxJava3源码解析 观察AndroidSchedulers.mainThread()的内部实现:

RxJava3源码解析 继续观察onMainThreadScheduler():

RxJava3源码解析 可以看到又是熟悉的感觉,非空判断和默认为null的onMainThreadHandler,又是一个以传入值为准的hook方法。返回上一层,我们重点观察常量MAIN_THREAD,可以看到其定义为:

RxJava3源码解析 返回的MainHolder.DEFAULT定义为:

RxJava3源码解析 可见,这里通过new Handler(Looper.getMainLooper())传入HandlerScheduler()中的handler绝对处于主线程状态(new Handler(...)这种做法不能避免用户在子线程进行new Handler的操作...)。

RxJava3源码解析 HandlerScheduler继承自Scheduler,那么其中肯定也有createWorker():

RxJava3源码解析 而这里创建的HandlerWorker中的handler是传入HandlerScheduler()中的绝对主线程状态的handler。

而这个主线程handler又是在哪里起作用的呢?这里要看HandlerWorker调用的HandlerScheduler()中的schedule()(具体怎样调用到此处,后面说observeOn的流程会说到):

RxJava3源码解析 同时在此方法内,主线程handler延时发送message。而在send Mesange之前,ScheduledRunnable(handler, run)也是将run丢到主线程执行。可以说AndroidSchedulers.mainThread()的作用就是拿到主线程的handler。

RxJava3源码解析 原代码中,我们给observeOn()传入的参数也就是主线程的handler,接下来看observeOn的方法:

RxJava3源码解析

RxJava3源码解析 除去之前经常见面的非空判断和hook方法,我们观察ObservableObserveOn:

RxJava3源码解析 在其具体实现subscribeActual()中有一个scheduler是否属于TrampolineScheduler的判断,作用跟Schedulers.trampoline()策略一样,就是判断当前是否改变了Scheduler策略。如果属于TrampolineScheduler就是没有改变Scheduler策略,则照常订阅流程,执行source.subscribe(observer)。如果产生了变化,则进行createWorker()等操作。我们继续观察ObserveOnObserver,可看见onNext()等方法声明:

RxJava3源码解析 其中都有一个schedule(),我们继续观察schedule():

RxJava3源码解析 此shedule调用的是HandlerScheduler中的schedule(),至此完成闭环。

RxJava3源码解析 注意这里的一个this,说明此类实现了Runnable():

RxJava3源码解析 那么此类肯定覆写了run(): RxJava3源码解析 而最终HandlerScheduler的schedule()通过主线程的handler执行了此类中的run(),outputFused默认false,代码执行到drainNormal():

void drainNormal() {
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = downstream;

    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }

        for (;;) {
            boolean d = done;
            T v;

            try {
                ... //省略
            }
            boolean empty = v == null;

            if (checkTerminated(d, empty, a)) {
                return;
            }

            if (empty) {
                break;
            }

            a.onNext(v);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

可看到最终代码通过a.onNext(v)将结果传递出去,而HandlerScheduler.schedule()则将结果事件切换到主线程,最终的结果在主线程中传递。

总结

1、Worker就是RxJava实现线程切换的关键,以ObserveOn为例,在执行subscribe时,会创建Woker并传入ObserveOnObserver,在最后执行onNext等回调时,会调用Worker的schedule方法来切换线程。

  2、我们知道subscribeOn只有第一次调用起效果,而ObserveOn每一次调用都起效果。那是因为每一次调用subscribeOn其实就是在包装一次observer与observable,无论包装多少次,都会以最里面一层也就是第一次调用subscribeOn那一层为主,所以只有第一次起效果。而ObserveOn是在subscribe后包装了observer,在observer的onXXX()的schedule()中进行的线程转换,所以每一次调用都有作用。