Rxjava2调用链线程切换解析一、Rxjava2的用法 用法很简单,但源码有点复杂,这里为了模拟多次subscribe
一、Rxjava2的用法
用法很简单,但源码有点复杂,这里为了模拟多次subscribeOn和observeOn,给Observable这个类新增了两个方法和两个类,便于分析调试。
- Q1: Rxjava的链式调用怎么实现的?
不看源码,还真不知道,以为链式调用肯定通过Builder设计模式实现,其实Rxjava不是,仅仅Observable这个类就有15000+的代码。链式调用采用装饰器设计模式实现,除了最后的订阅,每次链式调用一次,都套娃一次。
我们上面的Observable最后一次链式调用是ObserveOn2,那最终的Observable就是ObservableObserveOn2。将
ObservableOnSubscribe称之为
伪Observable,因其就只有一个subscribe方法,跟其他Observable不一样,ObservableOnSubscribe的subscribe方法可用于发射器发射数据(调用观察者observer的onNext、onComplete、onError)等,同样CreateEmitter可以认为是一个
伪Observer。
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Throwable;
}
//from Observable
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//这里的source就是伪Observable
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
ps: 都知道Observable套娃了,那它对应的observer套娃了吗?也套娃了。
二、用户Observer的OnSubscribe方法调用流程
-
2.1、订阅触发-->ObservableObserveOn2.subscribe(observer)
observable就是ObservableObserveOn2,observeOn2的调用是切换到主线程。 先看其subscribe方法。
public final void subscribe(Observer<? super T> observer) {
.....
subscribeActual(observer);
.....
}
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer); //当前线程模式
} else //我们这里是AndroidSchedulers.mainThread()--->安卓主线程模式
Scheduler.Worker w = scheduler.createWorker()
source.subscribe(new ObserveOnObserver2<T>(observer, w, delayError, bufferSize));
}
}
参数observer就是用户observer,进来之后,被ObserveOnObserver2封装了下,而且把调度器、buffersize(128)也加进去了。这里切换安卓主线程的功能是来自JakeWharton的RxAndroid库(专为Rxjava适配安卓用),先看下HandlerWorker,因为面试会问,所以要讲讲。
-
2.1.1、Rxjava是如何切换到主线程的?
public static Scheduler mainThread() {
return new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
final class HandlerScheduler extends Scheduler {
.....
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
}
private static final class HandlerWorker extends Worker {
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
......
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this;
handler.sendMessageDelayed(message, unit.toMillis(delay));
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
.....
}
兄弟们,这里线程切换就是用的handler。但是Runnable是谁,不用猜,肯定是ObserveOnObserver2,这样就可以在他的onNext、onComplete、onError先经过Handler的切换线程,然后分发给用户observer。
static final class ObserveOnObserver2<T> implements Observer<T>, Runnable {
final Observer<? super T> downstream;
......
ObserveOnObserver2(Observer<? super T> actual, Scheduler.Worker worker, int bufferSize) {
this.downstream = actual;
.....
}
@Override
public void onSubscribe(Disposable d) {
//没有这个调用worker.schedule(this);方法
}
@Override
public void onNext(T t) {
.....
worker.schedule(this);
}
@Override
public void onError(Throwable t) {
.....
worker.schedule(this);
}
@Override
public void onComplete() {
....
worker.schedule(this);
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
......
T v = q.poll();
downstream.onNext(v);
}
}
当onNext被触发的时候,worker.schedule(this),执行线程切换,然后handler处理message时,就走run方法,这里会走drainNormal(测试时发现,连续多个observeOn调用,前面的几个observeOn都是走drainFused,只有最后一个drainNormal),然后下发给下游observer,这里就是用户observer,所以用户observer的onNext、onComplete、onNext方法所在的线程,只与距离它最近的observeOn方法中设置的线程有关。 但是用户observer的onSubscribe方法所在的线程在哪里决定?后面告知。
-
2.1.2、继续ObservableObserveOn2的subscribeActual
-
2.2、ObservableMap.subscribe(observer)
同样也是先 subscribe(observer) --> subscribeActual(observer)
map主要的操作
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
U v = mapper.apply(t)
downstream.onNext(v);
}
.....
}
map的操作就太简单了,将apply方法的返回值作为onNext链中的新值,往下传递。
-
2.3、ObservableObserveOn.subscribe(observer)
ObserveOnObserver构造传入的Worker是IoScheduler的EventLoopWorker。不用太care,io线程池,还能有啥,一般都是核心线程数量和max的线程数量一样,都等于cpu的数量即可。但是Rxjava不是这么处理的,稍复杂,这里不讨论。就只看成线程池提交任务处理即可。ps:线程池处理的是其下游的observer的onNext、onComplete、onError,不是当前ObserveOnObserver的。 与之前说的:用户observer的onNext由距离它最近的observeOn设置的线程决定,同理。
-
2.4、ObservableSubscribeOn2.subscribe(observer)
现在转到subscribeOn了,跟之前observeOn不一样。 先看subscribeActual方法中observer.onSubscribe(parent),这里observer是ObserveOnObserver 这么快这个下游的onSubscribe被执行了,一直以为是从最里层ObservableCreate方法的subscribeActual调用Observer的onSubscribe开始呢,实际上距离ObserveOn最近的一个subscribeOn就开始调用下游的onSubscribe了。
-
2.4.1、ObserveOnObserver.onSubscribe
@Override
public void onSubscribe(Disposable d) {
//d就是SubscribeOnObserver2
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
......
return
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//继续调用下游的onSubscribe
downstream.onSubscribe(this);
}
}
这里的d就是SubscribeOnObserver2,他不是QueueDisposable类型,所以这里会继续调用下游的onSubscribe。ObserveOnObserver的下游是MapObserver。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {
// from MapObserver
public final void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
this.qd = (QueueDisposable<T>)d;
}
downstream.onSubscribe(this)
}
}
这里MapObserver是个QueueDisposable,也会调用下游的onSusbcribe,MapObserver的下游是ObserveOnObserver2。
// from ObserveOnObserver2
@Override
public void onSubscribe(Disposable d) {
//d就是SubscribeOnObserver2
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
......
return
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//继续调用下游的onSubscribe
downstream.onSubscribe(this);
}
}
这里ObserveOnObserver2就调用用户observer的onSubscribe了。
总结一下用户observer的onSusbcribe的调用过程 那用户observer的onSubscribe回调了,请问它处在哪个线程? 就是Rxjava发起订阅所在的线程。
三、用户Observer的onNext方法调用流程
-
3.1、继续从ObservableSubscribeOn2的subscribeActual开始
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver2<T> parent;
SubscribeTask(SubscribeOnObserver2<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
public Disposable scheduleDirect(@NonNull Runnable run) {
final Worker w = new EventLoopWorker(pool.get())
w.schedule(run, 0, TimeUnit.NANOSECONDS);
return task;
}
又看到了source.subscribe(parent),此时说明subscribeOn2(schedulers.IO)决定了 ObservableSubscribeOn.subscribe(subscribeOnObserver2)方法执行的线程 ,真的是厉害哦。
-
3.2、ObservableSubscribeOn的subscribe(observer)
从subscribe(observer)---->subscribeActual(observer)
observer.onSubscribe(parent),表示subscribeOnObserver2的onSubscribe被调用。 就只是给subscribeOnObserver2的upstream字段设置值为subscribeOnObserver,所以subscribeOnObserver2的上游是subscribeOnObserver,下游是从构造传入的,为observeOnObserver。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver2<T> parent;
SubscribeTask(SubscribeOnObserver2<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
public Disposable scheduleDirect(@NonNull Runnable run) {
final Worker w = new EventLoopWorker(pool.get().getEventLoop());
w.schedule(run, 0, TimeUnit.NANOSECONDS);
return task;
}
这里的Schedulers.computation()就简单看成是computation线程池执行任务source.subscribe(parent) 就行, 表明subscribeOn(Schedulers.computation())决定了 ObservableCreate.subscribe(subscribeOnObserver)方法执行的线程。
-
3.3、ObservableCreate.subscribe(observer)
从subscribe(observer)---->subscribeActual(observer) 使用subscribeOnObserver创建了发射器CreateEmitter。 subscribeOnObserver调用onSubscribe(parent),设置上游observer为createEmitter source.subscribe(parent),这里的source就是那个伪Observable(ObservableOnSubscribe)。 这里终于看到了emitter的onNext调用,前面提到Schedulers.computation()决定了这里的subscribe方法执行的线程。
-
3.4、CreateEmitter.onNext方法
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
observer.onNext直接就是subscribeOnObserver的onNext调用了
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
@Override
public void onNext(T t) {
downstream.onNext(t);
}
.....
}
downstream.onNext直接就是subscribeOnObserver2的onNext调用了
static final class SubscribeOnObserver2<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable
@Override
public void onNext(T t) {
downstream.onNext(t);
}
.....
}
downstream.onNext直接就是observeOnObserver的onNext调用了,onNext此时所在的线程还是computation线程。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
@Override
public void onNext(T t) {
......
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t); //将数据放到队列
}
if (getAndIncrement() == 0)
worker.schedule(this);
}
}
@Override
public void run() {
if (outputFused) {
drainFused()
} else {
drainNormal();
}
}
void drainNormal() {
.....
T v = q.poll();
downstream.onNext(v);
}
}
同样也是将数据往下游分发,observeOnObserver下游的mapObserver, 它的onNext将会在IO线程。
mapObserver的onNext只是将数据做个简单的处理,然后继续下游分发数据。
@Override
public void onNext(T t) {
......
U v = mapper.apply(t)
downstream.onNext(v);
}
mapObserver的下游observeOnObserver2的onNext方法继续分发数据。
static final class ObserveOnObserver2<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
@Override
public void onNext(T t) {
......
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t); //将数据放到队列
}
if (getAndIncrement() == 0)
worker.schedule(this);
}
}
@Override
public void run() {
if (outputFused) {
drainFused()
} else {
drainNormal();
}
}
void drainNormal() {
.....
T v = q.poll();
downstream.onNext(v);
}
}
跟之前的observeOnObserver一样代码,此时observeOnObserver2的onNext跟MapObserver的onNext是在同一个线程,都是在io线程执行,observeOnObserver2的下游就是用户observer了,在这里经过主线程代码切换后,用户observer的onNext的方法就在主线程执行了。
四、分析线程
有人说多次subscribeOn,只有第一次生效,其实每次都生效。 每次subscribeOn设置线程都会对它source Observable的subscribe方法(或者subscribeActual)产生影响。 每次observeOn设置线程都会对它下游的Observer的onNext、onComplete、onError产生影响。
被观察者 | subscribe方法 | 观察者 | onSubscribe方法 | onNext/onComplete/onError方法 |
---|---|---|---|---|
伪Observable | computation线程 | - | - | computation线程 |
ObservableCreate | computation线程 | CreateEmitter | - | computation线程 |
ObservableSubscribeOn | io线程 | SubscribeOnObserver | computation线程 | computation线程 |
ObservableSubscribeOn2 | 用户线程 | SubscribeOnObserver2 | io线程 | computation线程 |
ObservableObserveOn | 用户线程 | ObserveOnObserver | 用户线程 | computation线程 |
ObservableMap | 用户线程 | MapObserver | 用户线程 | io线程 |
ObservableObserveOn2 | 用户线程 | ObserveOnObserver2 | 用户线程 | io线程 |
- | - | 用户observer | 用户线程 | 主线程 |
ps:用户线程就是指observable.subscribe(observer)代码调用的线程。 CreateEmitter不是观者者,没有继承Observer,它只是含有onNext、onComplete、onError方法而已。
转载自:https://juejin.cn/post/7015609207470686221