RxJava源码解析(二)—线程调度器Scheduler
在RxJava中,有个很重要的概念叫做"线程调度器"—Scheduler。它用一种隐式的方法屏蔽掉了我们之前通过回调方式的线程调用。我们先看个例子:
Observable<String> ob = Observable.just("str1","str2");
ob.map(new Func1<String, String>() {
@Override
public String call(String t) {
System.out.println("function call " + Thread.currentThread());
return "[" + t + "]";
}})
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String t) {
System.out.println("onNext call " + Thread.currentThread());
System.out.println("onNext "+t);
}
});
代码中,我们通过一个字符串生成了一个Observable对象,而这个对象我们又通过一个map映射映射成为一个新的Observable对象(这部分的知识请参照第一章RxJava源码解析(一)从一个例子开始)。在这之后,我们有通过调用observeOn方法设置了一个叫做Schedulers.newThread()的调度器。这个函数的目的是为了告诉你的被观察者,当你的数据返回的时候需要往哪个线程上post你的数据消息,换句话说,也就是你所定义的Subscriber对象的onCompleted/onError/onNext的执行线程。这段代码最后输出:
//output:
function call Thread[main,5,main]//map映射发生在默认线程也就是虚拟机主线程中
function call Thread[main,5,main]//map映射发生在默认线程也就是虚拟机主线程中
onNext call Thread[RxNewThreadScheduler-1,5,main] // 消息回调函数处理在一个新的线程中
onNext [str1]
onNext call Thread[RxNewThreadScheduler-1,5,main] // 消息回调函数处理在一个新的线程中
onNext [str2]
本章,我们将重点关注这个调度器,那么我们首先要思考的问题是,这个调度器将会提供什么功能呢?这就要回头看下我们能用这个调度器干什么了?首先,我们需要调度器去帮助我们生成一个线程其次,当我们以后得到了结果,我们还要需要调度器往调度器线程中发送一个消息,以便可以执行订阅者的回调函数好的,基于我们上面的需求,我们将看下,在RxJava的调度器实现中,是如何实现我们所需要的功能的。我们先来看下Observable对象所提供的observeOn函数,这个函数有多个函数重载,最终都会调用到三个参数的observeOn方法:
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
这里调用到了RxJava中一个很重要的操作符号
lift。lift函数引入了一个叫做Operator的新类型,在上述的例子中这个类型的实现类是一个叫做OperatorObserveOn的策略类型。我们看下这个lift函数定义:
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
我们所传入的
Operator对象最终被包装成为一个OnSubscribeLift对象,OnSubscribeLift对象是我们非常熟悉的OnSubscribe类型的子类。第一章我们说到OnSubscribe提供一种处理订阅者注册订阅后的策略。按照我们上面的例子,我们调用过map函数后调用observeOn函数,此时传入的onSubscribe对应的就是map产生的OnSubscribeMap对象。而参数operator对应observeOn函数中的OperatorObserveOn对象。我们先来看下Operator类的定义:
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
// cover for generics insanity
}
Operator也是一种映射关系函数,转换类型是通过Subscriber<T>->Subscriber<R>。也就是说,Operator是一个直接转化新的Subscriber的映射函数。这样就可以在订阅前拦截订阅操作。比如:
Observable<String> ob = Observable.just("str1","str2");
ob.map(new Func1<String, String>() {
@Override
public String call(String t) {
System.out.println("function call " + Thread.currentThread());
return "[" + t + "]";
}})
.lift(new Operator<String,String>(){
@Override
public Subscriber<String> call(Subscriber<? super String> st) {
return new Subscriber<String>() {
@Override
public void onNext(String t) {
long startTime = System.currentTimeMillis();
System.out.println("onNext begin");
st.onNext(t);//用于监控订阅者的执行时间
System.out.println("onNext execute on next time = "+(System.currentTimeMillis() - startTime)+"ms");
}
};
}})
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String t) {
IO.waitTime(5000);
System.out.println("call onNext "+t);
}
});
比如,我们为了监控订阅者订阅的时候有多少的时间消耗,我们通过
lift函数在我们的订阅者外包装了一层Subscriber,这样我们就可以依赖于包装的Subscriber对象进行函数监控:
//output:
function call Thread[main,5,main]
onNext begin//开启监控
call onNext [str1]
onNext execute on next time = 5005ms//监控结束
也就是说,上述的例子中我们的流程图应该是:
lift后流程图
好的,有了上面的概念,我们可以来看下OperatorObserveOn的代码,我们看下它给我们生成了一个什么样的订阅者:
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
....
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
.....
}
lift函数流程图
Lift函数执行完后,会将我们所注册的Subscriber装饰成为一个ObserveOnSubscriber对象。"lift后流程图"的红色框框部分以后注明了这个对象的功能。我们先来看下ObserveOnSubscriber对象的onCompleted/onError三个方法:
@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}
@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
RxJavaHooks.onError(e);
return;
}
error = e;
finished = true;
schedule();
}
由于
onCompleted和onError是互斥的,且只会被调用一次,因此会用一个finished的boolean变量来进行拦截,然后调用schedule()函数来处理剩下逻辑:
final AtomicLong counter = new AtomicLong();
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
由于
counter在调用getAndIncrement()后就大于0,因此recursiveScheduler.schedule(this)只会被调用一次,recursiveScheduler的定义在ObserveOnSubscriber的构造器中:
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
...
}
scheduler就是我们传入的Schedulers.newThread()对象,实际上是一个NewThreadScheduler对象:
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);//recursiveScheduler的类型是一个NewThreadWorker
}
可以看出,
recursiveScheduler最终会被置为NewThreadWorker类型
public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
....
executor = exec;
}
NewThreadWorker构造器中,定义了一个核心线程为1的ScheduledThreadPool线程池。(ScheduledThreadPool是一个很特殊的线程池,这个线程池的主要是为了支持延迟任务,或者定时任务。)recursiveScheduler.schedule(this)实际上就是调用NewThreadWorker的schedule(Action0)方法。
@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, null);
}
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (isUnsubscribed) {
return Subscriptions.unsubscribed();
}
return scheduleActual(action, delayTime, unit);
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
....
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
...
}
schedule方法最终会调用到scheduleActual方法,action对象会被包装成为一个ScheduledAction的Runable对象提交给线程池executor。而线程池会调用ScheduledAction的run()方法,在run()方法中,又会调用Action0的call()方法:
@Override
public void run() {
try {
.....
action.call();
} catch (OnErrorNotImplementedException e) {
....
}
}
如果刚才的代码已经把你给绕懵了,不要紧,我们再来回顾一下流程:1. 我们通过lift函数注册了一个叫做
OperatorObserveOn的Operator对象2. lift函数会构造一个叫做OnSubscribeLift的对象用于构造一个Observable对象3. 当订阅者Subscriber对象订阅Observable的时候,根据调用链,会优先使用OnSubscribeLift对象作为优先处理对象。4OnSubscribeLift调用call(Subscriber)方法,在该类的call方法中,会通过内部的Operator对象(也就是OperatorObserveOn对象)的Subscriber call(Subscriber)方法,生成一个新的订阅者ObserveOnSubscriber5. 新的订阅者对象ObserveOnSubscriber被OnSubscribeLift对象传递给上层的OnSubscribe对象处理,也就是走如RxJava源码解析(一)从一个例子开始)中的流程,最后会走到OnSubscribeFromArray对象中,然后遍历里面的数组生产者6.OnSubscribeFromArray遍历数组中的成员,然后调用订阅者的onNext和onCompleted。而最终要调用到的订阅者就是ObserveOnSubscriber对象。7.ObserveOnSubscriber对象的onNext()和onCompleted()方法会触发执行schedule()方法,schedule()方法会调用Scheduler.Worker.schedule(Action0)方法,而这个Action0对象就是ObserveOnSubscriber类型8. 当我们选择Schedulers.newThread()调度器的时候,Scheduler.Worker对象实际类型为NewThreadWorker对象,而NewThreadWorker.schedule(Action0)中会将Action0对象包装成为ScheduledAction对象,ScheduledAction本质是一个Runnable类型,因此它可以被提交到线程池中,调用ScheduledAction.run()方法,而ScheduledAction.run()方法中,又会调用Action0.call()9. 步骤8中Action0实现的类型为ObserveOnSubscriber类型,此时调用ObserveOnSubscriber.call()方法会从queue队列中读取onNext参数值并检测是否已经结束,注意,由于当前函数是由我们调度器生成的Worker对象中的线程池调用的,因此当前的全部回调操作都发生在Worker所构建的线程中。#####总结实际上,我们从上面可以看出,我们通过lift函数所构造出来的
ObserveOnSubscriber对象,实际上是生成了一个OnSubscriber的装饰对象。而这个对象的具体操作,都被封装到了call()方法中去,换句话说,我们的调度器实际上就是提供一个容器,给我们的call()方法提供上下文。基于我们上述的结论,我们实际上就可以写出我们自己的调度器:
private static class SchedulerImpl extends Scheduler {
@Override
public Worker createWorker() {
// TODO Auto-generated method stub
return new WorkerImpl();
}
}
private static class WorkerImpl extends Scheduler.Worker {
@Override
public void unsubscribe() {}
@Override
public boolean isUnsubscribed() {
return false;
}
@Override
public Subscription schedule(Action0 action) {
return schedule(action,0,null);
}
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
Thread thread = new Thread() {
public void run() {
action.call();
};
};
thread.setName("test");
thread.start();
return null;
}
}
这个调度器的写法非常的简单:1.我们先构建一个
Scheduler用于管理我们的Worker2.observeOn会给我们提供一个ObserveOnSubscriber类型的Action0对象,作为参数调用Worker.schedule(Action0 action)方法3.我们生成了一个独立的线程"test",并在线程中调用Action0.call ()方法,这样就可以将事件发送到我们所订阅的真正的Subscriber上了最后输出日志:
output:call onNext [str1]call onNext [str2]call onCompleted Thread[test,5,main]
转载自:https://juejin.cn/post/6844903496257372168