likes
comments
collection
share

RxJava的线程切换

作者站长头像
站长
· 阅读数 11

RxJava 线程切换

前言

在上篇文章对RxJava 的工作流程进行的简单的分析,今天来分享一下线程切换的流程。如果觉得源码枯燥可以直接移至文末看图理解。

实例代码

Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Object> emitter) {
                emitter.onNext("123");
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Object>() {
        .........
        }

我们都subscribeOn 是切换上游线程,observeOn是切换下游环境,接下来我们就看下它是怎么切换的。

我们先看下 Schedulers.io() 是什么

@NonNull
public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}
->
Scheduler IO = RxJavaPlugins.initIoScheduler(new IOTask());  
->
static final class IOTask implements Supplier<Scheduler> {
        @Override
        public Scheduler get() {
            return IoHolder.DEFAULT;
        }
    }  
->
static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
 } 

我们通过层层分解,层层递进,了解到 Schedulers.io() 最终返回的是一个 IoScheduler()

可以暂时将它理解为一个任务调度器,用来执行我们的任务。

接下来我们在看下 AndroidSchedulers.mainThread() 。

public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
->
 private static final Scheduler MAIN_THREAD =
        RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
->
 Scheduler DEFAULT  = new HandlerScheduler(new Handler(Looper.getMainLooper()), true);

可以看到这里返回的是一个 HandlerScheduler ,这里是对Handler进行了一个封装,所以归根结底,向主线程切换任务还是通过handler 来完成的,接下来我们就看看其中的细枝末节。

两个参数分析完了,然后来看下两个操作符,看看它俩做了些什么事情。

首先我们来看下subscribeOn

public final Observable<TsubscribeOn(@NonNull Scheduler scheduler) {
    Objects.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}

这里是对 IoScheduler() 和上游封装的包裹进行的二次封装

上游的包裹:这里指的通过create创建的 ObservableCreate

observeO操作符:

public final Observable<TobserveOn(@NonNull Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}
->
 public final Observable<TobserveOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
    }

在这里是对 HandlerScheduler 和 上游封装的包裹进行的二次封装

上游的包裹:这里指的是通过subscribeOn 操作符 创建的 ObservableSubscribeOn

然后我们开始看订阅这里了,我们的流程从 subscribe 这里才刚刚开始。

经过上一篇文章的分析,我们可以知道调用的 subscribeActual 方法都是在上游操作符创建的封装对象里,所以我们直接看 ObservableObserveOn 的 subscribeActual 方法。

如果感觉这段讲解有些跳跃,可以先看一下上篇文章《浅析RxJava》

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
         .........
         Scheduler.Worker w = scheduler.createWorker();
         source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
    }

在这里是将我们自定义的 Observer 封装成 ObserveOnObserver ,这里的source 是我们上游的封装的包裹,这里指的就是通过subscribeOn 操作符创建的 ObservableSubscribeOn。最终会调用到 ObservableSubscribeOn 的 subscribeActual

方法。

@Override
public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
    observer.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

这里是将下游传上来的 ObserveOnObserver 再次进行封装,封装成 SubscribeOnObserver ,然后再将 SubscribeOnObserver 封装成 SubscribeTask,其实就是一个Runnable。

流程我们稍后再进行分析,我们先来看看任务是异步任务是怎么切换的。

通过上文分析得知,此处的 scheduler 为 subscribeOn 操作符传入的参数,也就是 IoScheduler() 。

接下来我们再看 scheduler的 scheduleDirect 方法。

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    DisposeTask task = new DisposeTask(decoratedRun, w);
    w.schedule(task, delay, unit);
    return task;
}

在这里是通过 createWorker() 创建了一个 Worker ,由这个 Worker 去执行 具体的任务。

createWork()是抽象方法,我们需要看IoScheduler 的具体实现。

public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}
->
 public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
 ....
  return threadWorker.scheduleActual(action, delayTime, unit, tasks);
 }
->
  NewThreadWorker
  public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
           Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

           ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
           .........
           if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        return sr;
    }  

经过层层挖掘,我们看到任务最后是通过 executor 来执行的,executor 就是内部维护的线程池

private final ScheduledExecutorService executor;

至此,整个工作流 就切换为了子线程来工作。

接下来我们继续分析封装的SubscribeTask

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;
    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }
    @Override
    public void run() {
        source.subscribe(parent);
    }
}

在任务执行的时候,会通过source 继续将 SubscribeOnObserver向上游传送。这里的source 指的是create 创建的ObservableCreate ,source.subscribe 就会直接调用到 ObservableCreate 的 subscribeActual

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<>(observer);
    observer.onSubscribe(parent);
    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

一直到这里,跟我们在上篇文章分析的流程就一样了。

将下游传过来的SubscribeOnObserver 再次封装成 CreateEmitter 发射器,然后通过source 继续向上传递,这里的souce 就是指的是我们在create 中传递进去的ObservableOnSubscribe。

然后在ObservableOnSubscribe 的 subscribe 中,通过 emitter.onNext 将我们的数据开始进行下发。

ObservableEmitter
@Override
public void onNext(T t) {
    ........
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

这里的observer 是SubscribeOnObserver

SubscribeOnObserver
@Override
public void onNext(T t) {
    downstream.onNext(t);
}

这里的downstream 是指 ObservableObserveOn

ObserveOnObserver
@Override
public void onNext(T t) {
   ...... 
   if (sourceMode != QueueDisposable.ASYNC) {
     queue.offer(t);
   }  
    schedule();
}

这里的 sourceMode 未被赋值,会调用 queue.offer(t) ,将数据放入到队列中。

接下来再看 schedule() 做了些什么 ?

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

通过上文的分析,我们可以知道 observeOn 操作符创建的 Scheduler 为 HandlerScheduler ,所以这里的 worker.schedule(this) 方法调用的是 HandlerScheduler 的内部静态子类 HandlerWorker 的 schedule 方法。

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    ...........
    if (disposed) {
        return Disposable.disposed();
    }
    run = RxJavaPlugins.onSchedule(run);
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    Message message = Message.obtain(handler, scheduled);
    message.obj = this;
    if (async) {
        message.setAsynchronous(true);
    }
    handler.sendMessageDelayed(message, unit.toMillis(delay));
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposable.disposed();
    }
    return scheduled;
}

最终是在这里通过Handler将任务切换到了主线程执行。

ObserveOnObserver 类实现了Runnable 接口, worker.schedule(this) 是将自身交给Handler 去执行。所以最终的结果还会由 ObserveOnObserver 的run方法来执行。

public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

这里我们是典型的使用方式,我们直接来看下 drainNormal();

void drainNormal() {
    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = downstream;
    for (;;) {
        ............
        for (;;) {
            T v = q.poll();
            a.onNext(v);
        }
        ..............
    }
}

在这里 将数据从队列中取出,然后调用下游的 onNext ,这里的 downstream 也就是我们最后自定义的观察者 Observer 了。

整个过程也好比是一个封包裹和拆包裹的过程。用洋葱模型表示一下会更加的形象。

RxJava的线程切换

最后上图!

RxJava的线程切换

可能文字的叙述还是太抽象, 用这样一张图来表示整个流程可能相对好理解一些。

写在最后

纸上得来终觉浅,绝知此事要躬行。如果有时间还是建议自己跟一遍源码流程,这样才能真正理解。

今天就水到这里,希望对大家有所帮助。

转载自:https://juejin.cn/post/7110449275166785550
评论
请登录