likes
comments
collection
share

你可以不用RxJava,但必须得领悟它的思想!

作者站长头像
站长
· 阅读数 21

前言

Rx 是一种响应式编程的思想,如今有很多语言都支持这种思想:RxJavaRxJsRxSwift...。它是基于特殊的观察者模式来实现的,能够轻松的实现异步事件响应流,避免回调地狱的产生。这种思维和我们平时的编程思维不太相同,它以数据流为核心,处理数据的输入,处理以及输出,因此这个框架学习起来是有一定难度的,加之 RxJava 操作符众多,这又给初学者立了个下马威。

本篇博客会 分析其原理与思想 ,从源码层次对其进行深入分析。

✔️ 本文阅读时长约为: 10min

本篇博客适合已经会简单使用 RxJava 框架并想了解其原理与思想的同学~

RxJava的观察者模式

标准的观察者设计模式 中,通常都是一个被观察者,多个观察者。例如,在抖音中ABCD四人都关注了一个博主 。那么当博主 发布新视频时,甲会去通知ABCD四人,告诉他们我已经发布了新的视频,你们现在可以前去观看。

你可以不用RxJava,但必须得领悟它的思想!

RxJava的观察者设计模式 与前者有些许差异,我的理解是在 RxJava 中只有一个被观察者和一个观察者,被观察者就是数据源也就是 起点 ,而观察者就是 终点起点终点 这一段过程做什么事情由我们自己定义,像加卡片一样往里堆,但始终只有一个 起点 和一个 终点 ,这也就是俗称的卡片式编程。

你可以不用RxJava,但必须得领悟它的思想!

举个栗子

我们的起点是发起网络请求得到一张图片,终点是得到一张加了 水印高斯模糊 的图片。那么我们就需要在 起点终点 这一过程中往里面加两张卡片。一张卡片是加水印的功能,另一张卡片是加高斯模糊的功能,最终我们会在 终点 得到我们的想要的效果,这就是 RxJava 的一个过程,大家看是不是比 标准观察者模式 的耦合度更低呢?

现在结合代码来看一下呢:

    // 示例:创建Observable
    // create方法中的参数是我们自定义的source,下文会提到
    Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
    //----------------------------------被观察者--------------------------------------
        @Override
        public void subscribe(ObservableEmitter<Object> e) throws Exception {
            // 数据源(起点),假设我们这里得到了一张图片,开始向下传递
            e.onNext();
        }
    //-----------------------------------卡片1----------------------------------------
    }).map(new Function<Bitmap, Bitmap>() {
        // 1.加水印的卡片
        @Override
        public Bitmap apply(Bitmap bitmap) throws Exception {
            // TODO 这里我们将图片加上水印
        }
    //-----------------------------------卡片2----------------------------------------
    }).map(new Function<Bitmap, Bitmap>() {
        // 2.加高斯模糊的卡片
        @Override
        public Bitmap apply(Bitmap bitmap) throws Exception {
            // TODO 这里我们将图片加上高斯模糊
        }
    //-----------------------------------观察者---------------------------------------
    // 订阅被观察者
    }).subscribe(new Observer<Bitmap>() {
        @Override
        public void onSubscribe(Disposable d) {}

        @Override
        public void onNext(Bitmap bitmap) {
            // 最终获得加了水印和高斯模糊的图片
            // do what you want...
        }

        @Override
        public void onError(Throwable e) {}

        @Override
        public void onComplete() {}
    });

我们可以看到,RxJava 链式调用后在终点能得到我们想要的结果,我们对 Bitmap 的两个操作以 卡片 形式加到了 起点终点 的过程中,这就是 RxJava 的魅力之一。

了解 RxJava 大体的执行流程后,现在我们就从源码角度来解释整个框架内部逻辑是怎样运转的。

RxJava如何将事件逐步传递

首先我们先来看看这些操作符内部逻辑是什么样子的,这里以 createmap 为例子展示:

1️⃣. create

// Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    // 常规空检查
    ObjectHelper.requireNonNull(source, "source is null");
    // 这里是个hook,其实直接放回的是 ObservableCreate
    // 返回的其实就是我们传进来的参数
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

2️⃣. map

// Observable.java
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    // 常规空检查
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    // 这里是个hook,其实直接放回的是 ObservableMap
    // 返回的其实就是我们传进来的参数
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

从上面的例子中可以看出,这两个操作符不管是结构还是逻辑几乎是一模一样,只是 onAssembly 方法中传递的参数不同(其实也可以说是相同的,他们都是 Observable 的子类)。实际上,几乎所有的操作符都是这样的结构,会在内部 new ObservableXXX()

为什么我们的操作符都能链式调用呢?这就跟上面说到的特点有关了,由于几乎所有操作符都是上述结构,返回值的都是 ObservableXXX,而他们都是 Observable 的子类,所以我们能一直通过 .XXXX的形式调用 Observable 中的操作符,一直往里面加 卡片 来完成我们的需求。

⚡⚡ 那么,ObservableXXX类有什么用呢?


1.ObervableXXX的整体认识

其实这些 ObservableXXX 就像工厂中的一个个流水线组装货物的机器,货物从 起点终点 传输,当货物传输到不同的机器上时,这台机器会操作货物完成这一层的组装工作,最后会将其传输到下一个机器上去,完成那个机器的组装任务,直至传输到 终点 整个货物组装完毕。

回头看上面的例子,我们加入的两张卡片就像是两台机器一样,map 机器完成水印工作后,将 货物 扔给后一个 map 机器,由它完成高斯模糊工作,最后 货物组装完成,被传输到了终点,在这里我们就能拿到最终期望的结果。

接下来我会举两个例子,带着大家看看它们的内部逻辑。因为几乎所有的 ObservableXXX 的逻辑和功能都相似,因此大家在看完下面几个例子后,可以自行去源码中轻松查看你想看的部分。

2.ObservableMap内部运转逻辑

这样一说相信大家对它有了一个整体的感受,那么我们现在以 ObservableMap 为例,重点讲解一下内部逻辑吧。

⚡ 在讲解源码之前,我们需要先知道一个点:代码当中的 source 指的都是上一个操作符返回的 Observable 对象,function 指的是调用此操作符需要传递的参数,即匿名实现类,另外下文还会出现 自定义source,这个就是我们在使用 create操作符 时传递的参数,也称 顶层source ,它是货物运输的 起点

// ObservableMap.java


// Observable继承自AbstractObservableWithUpstream,AbstractObservableWithUpstream又继承Observable
// Observable实现了ObservableSource接口
// 因此这里会重写 subscribe 方法
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        // source 是上一层操作符返回的Observable对象
        // function 是我们使用此操作符传递的那个匿名实现类,<可以称其为这一个机器需要做的操作>
        // 在本类中其实就是实现了Function接口的匿名类,里边有个方法叫作apply
        // <本层机器处理货物时>,会看到apply方法
        super(source);
        this.function = function;
    }

    // 此方法是 subscribe 的具体实现方法
    // 我们进行订阅时,即调用subscribe时,最终会调用此方法,<最终通知起点开始传输货物>
    @Override
    public void subscribeActual(Observer<? super U> t) {
        // t代表我们调用subscribe时传递的参数,即匿名实现类,<我们可以称其为后一台机器要执行的操作>
        // 将t和function包装一下,再调用 source 的 subscribe,<也就是前面那台机器的subscribe>
        // 这里的subscribe可以理解为通知上层机器开始运输货物
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    // 内部类,<将后一台机器执行的操作和上一个机器绑定起来>
    // 就是将后一个机器的操作做了层封装,包裹了一层
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        // <此方法是货物到达本机器时,本机器的相应操作>
        @Override
        public void onNext(T t) {
            // 这些判空直接不看
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;
            // 这里会执行apply方法,是我们使用map操作符传递的实现了Function接口的匿名内部类中的方法
            // <本层机器开始处理上层机器传递过来的货物>
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            // 本层机器的货物处理完毕,扔给下层机器继续处理
            // <actual代表下层机器>
            actual.onNext(v);
        }
    }
    

总的来说 ObservableMap 做了两件事,第一就是调用 subscribeActual 通知上层机器让其开始传递货物,第二就是调用 onNext 处理传递到本层的货物,最后再扔给下一个机器去处理。

其实 ObservableXXX 结构和逻辑都差不太多,我将最开始给的例子用图表示了出来:

你可以不用RxJava,但必须得领悟它的思想!

相信大家可以从这张图中看出,整个运转逻辑是一个 U型结构,当我们调用 subscribe 时,就会通知 起点 运输货物,其实也就是设置了观察者。然后,货物就会一层一层往下传递,最终流到我们的终点。如果我们中途想继续添加需求,直接往中间那个过程添加机器(或者说是卡片)就行了,非常方便,而且整个链式调用下来,代码会比不用 RxJava 显得更加干净整洁易读。

3.ObservableCreate内部运转逻辑

那么起点是怎么接收下面的 机器 传递上来的运输货物的信号并开始运输货物的呢?

public final class ObservableCreate<T> extends Observable<T> {
    // 我们自己写的source
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        // 这个source是我们自定义的,也就是使用create操作符时传递的那个参数
        // 这就是我们的所说的起点、源头
        this.source = source;
    }

    // 这个就是起点触发传送任务的方法
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 这个observer是从下面机器经过一层层包装后传递过来的observer
        // 实际上调用的是我们使用subscribe传递参数当中的onSubscribe方法,即终点的onSubscribe方法
        observer.onSubscribe(parent);

        // 开始传输任务,将货物往下传递
        try {
            // 这里的source就是顶层的自定义source
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    // 这个就是发射器,用于将货物从起点往下传递的工具
    // 通常情况下,我们在自定义source中都会调用此类的onNext方法开始向下传输货物
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        // 向下传输货物的方法
        @Override
        public void onNext(T t) {
            ...
            if (!isDisposed()) {
                // 将货物扔给下一层机器去处理
                observer.onNext(t);
            }
        }
        ...
    }
    ...
}

至此,两个典型的 ObservableXXX 讲解完毕,可能大家对源码当中的 sourcesubscribeActual 中的参数 observer/t 傻傻分不清楚,不知道这些是指 前一个机器 ,还是 后一个机器 ,那么接下来请看下面这幅图。

你可以不用RxJava,但必须得领悟它的思想!

source 是前一个操作符返回的 Observable 对象,这个相信大家都容易理解。这里着重说一下上述图的下半部分,大家还记得终点是怎么通知起点开始传输 货物 的吗?没错,是通过每一个 Observable.subscribeActual 方法 (机器)中的参数不断向起点传递,每个 subscribeActual 方法中都会调用 source.subscribe(),也就是通知前面机器传输包裹,每向前传递一次就 封一层包裹 ,最后触发 起点 传输货物,之后就开始一层层 拆包裹 ,最终拆到终点(subscribe)中我们自己实现的 ObservableOnSubscribe 中。

小结

为了便于大家理解前面所讲述的知识点,这里我们用文章开头的那个例子继续深入讲解,我们来看上述代码是如何执行的,再贴一遍代码:

Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> e) throws Exception {
                e.onNext();
            }
        }).map(new Function<Bitmap, Bitmap>() {
            @Override
            public Object apply(Bitmap bitmap) throws Exception {
                // todo
            }
        }).map(new Function<Bitmap, Bitmap>() {
            @Override
            public ObservableSource<?> apply(Bitmap bitmap) throws Exception {
                // todo
            }
        }).subscribe(new Observer<Bitmap>() {
            @Override
            public void onSubscribe(Disposable d) {}

            @Override
            public void onNext(Bitmap bitmap) {}

            @Override
            public void onError(Throwable e) {}

            @Override
            public void onComplete() {}
    });

1️⃣ 我们通过 .subscribe(),发起了订阅,开始观察被观察者,结合我们之前讲的,我们实际上调用的是map 返回对象中的 subscribe 方法,即 ObservableMap

// ObservableMap.java

//它会继续调用上层机器的subscribe,通知上层机器传输货物
public void subscribeActual(Observer<? super U> t) {
    // 封包裹,传递给上层机器
    source.subscribe(new MapObserver<T, U>(t, function));
}

2️⃣ 这里它会继续调用上层机器的 subscribe,由于上个操作符仍是 map,所以我们直接跳过,看 create 操作符。

// ObservableCreate.java

protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    // 这里是调用下一层机器的onSubscribe方法
    observer.onSubscribe(parent);

    try {
        // 调用我们自定义的soruce,开始传输货物
        // 例子中我们自定义soruce中写的代码是emitter.onNext,开始传输货物
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

3️⃣ 到这里,我们已经通知 起点 发起传送任务,那么接下来就是传输货物的过程,逐步 拆包 的过程。随着货物的运输,代码会不断调用最外层包裹中的 observable 对象的 onNext 方法,看起来就像在拆包裹一样,这其实也是前面说的运输包裹的过程,如此反复,最后到达我们的终点的 onNext 方法,我们拿到最终期望的货物,至此,整个过程基本完毕。

✔️ 现在是不是脑海里面又出现了前面所说的 U型结构 呢?

RxJava是如何完成线程切换的?

RxJava 能完成线程切换是通过 subscribeOnobserveOn。正常情况下我们都是这么用的:

observable.subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(...)

前面反复强调整个框架的执行过程是一个 U型结构,请大家思考一下,subscribeOnobserveOn 切换线程的核心代码是在 subscribeActual 中还是 onNext 中?

回忆一下,我们通知 起点 开始传输货物时,是通过 subscribeActual 逐层往上传递的,紧接着就开始运输货物处理货物了,就比如 map 机器中的 apply 方法就是处理货物的具体方法。因此,这一段任务都应该在 子线程 中完成,所以 subscribeOn的切换线程的核心代码是在subscribeActual中的。

observeOn 的作用就是让终点的 onNext 方法中的代码在主线程中执行,那么理应只需要在 observeOn操作符中的onNext中切换线程即可。

有了上述猜想,我们现在进入源码验证一下。

1.subscribeOn的线程切换

// 不仅有io线程,还有以下这些线程

SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

IO = RxJavaPlugins.initIoScheduler(new IOTask());

TRAMPOLINE = TrampolineScheduler.instance();

NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());

我们首先来看看使用该操作符时传递的参数 Schedulers.io() 到底是个什么东西。我们一层层点进去最后会发现它其实就是 new了一个线程池 :

// 一路点进去,会发现其实就是new了一个线程池
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);

其中的 CacheWorkerPool 的源码如下:

// IoSchedule.java,这个类将线程池中的创建、调度、执行等方法做了一下封装

...

// 这个方法就是得到线程池
public Worker createWorker() {
    // EventLoopWorker继承自Worker,是对线程池做了一下封装,包裹了一层
    return new EventLoopWorker(pool.get());
}

// 实际调用线程池的方法
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }
    // 里面就是调用线程池的submit等方法
    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}

...

// CacheWorkerPool是IoSchedule中的内部类
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
    this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
    this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
    this.allWorkers = new CompositeDisposable();
    this.threadFactory = threadFactory;

    ScheduledExecutorService evictor = null;
    Future<?> task = null;
    if (unit != null) {
        // 这里的核心线程数是1就足够了,因为我们子线程中的任务都是链式调用的
        evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
        task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, 
            this.keepAliveTime, TimeUnit.NANOSECONDS);
    }
    evictorService = evictor;
    evictorTask = task;
}
...

所以,其实 .io() 就是传了一个核心线程数为1的线程池,当然我们还可以传入 .computation().newThread()等方法,这些其实都是创建了一个特殊的线程池。

我们知道,这些操作符都是在内部 new ObservableXXX类,所以源码我们直接看ObservableSubscirbeOn

// ObservableSubscribeOn.java

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    // 这个是调度器,就是对线程池封装了一下
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    
    // 我们重点看在这个方法中是如何完成线程切换的
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        // onSubscribe的回调
        s.onSubscribe(parent);
        // subscribeTask实际上是一个runnable
        // 调用scheduler.scheduleDirect(runnable),将runnable放入线程池中去执行
        // 这里是重中之重,具体细节我们稍后讲解
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    ...

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            // 上面那个runnable其实就是这个
            // 这里是调用上层机器的subscirbe,通知起点开始传输货物
            source.subscribe(parent);
        }
    }
}

从上面我们可以知道,我们将 source.subscribe() 扔进了线程池中,那么就做到了在其之后的所有代码都是在子线程中执行了,直到代码走到 observeOn,在 ObservableObserveOn 中的 onNext() 方法将线程切回主线程。

接下来我们讲讲具体细节,看看 scheduleDirect 里面的逻辑。

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    // createWorker()方法眼熟吗?这就是在IoSchedule当中的方法
    // Worker类本质上就是线程池,只是对其做了一层封装
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    // 将传进来的runnable再做一层封装,这里的runnable对应上面的例子就是source.subscribe
    DisposeTask task = new DisposeTask(decoratedRun, w);
    // 这个方法也眼熟吧?
    // 在线程池中执行此任务
    w.schedule(task, delay, unit);

    return task;
}

由上面这一过程我们可以知道,当通知到 本层的机器 传输货物时,这一层机器会将 subscribeActual 方法扔进线程池中去执行,即后续的代码都是运行在子线程中了。

那么它又是如何将线程切回来的呢?

2.observeOn的线程切换

理解了上面线程切换后,我们很容易能理解下面切换主线程的代码,现在直接进入 ObservableObserveOn 源码。

// ObservableObserveOn.java

@Override
protected void subscribeActual(Observer<? super T> observer) {
   // 同样可以理解为对线程的封装,只不过这里内部是handler
   Scheduler.Worker w = scheduler.createWorker();
   // 同样是封包裹,把自己封装一层,扔给前面的机器
   source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));

}

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {

   ...

    // 重点关注onNext方法,切回主线程的代码都在这里面
    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        ...
        
        // 切换线程的方法在这里面,内部调用的是HandlerWorker的schedule
        // 里面又调用的是HandlerSchedule的schedule
        schedule();
    }

起点 开始传输货物时,即开始逐层拆包裹,这时候会逐层调用 onNext,最后就会在 ObservableObserveOn 中的 onNext 方法中切换线程。而在 onNext 中又调用的是 HandlerSchedule 中的 schedule 方法进行最终的线程切换。

// HandlerSchedule.java

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    ...
    // 这里的run就是下一层机器的onNext方法,例子中就是终点的onNext
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.
    
    // 调用Handler,实现最终的线程切换
    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

    // 如果dispose,就停止RxJava后续流程,整个运输带不会再工作了
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }

    return scheduled;
}

画个图做个总结,让大家理解更清晰一点:

你可以不用RxJava,但必须得领悟它的思想!

写在最后

篇幅很长,能看到最后很不容易,给自己一个 大大的赞 吧!👏👏

如果觉得写的不错😀😀,就给个赞再走吧~

创作实属不易,你的肯定是我创作的动力,下次见!

如果本篇博客有任何错误的地方,请大家批评指正,不胜感激。