likes
comments
collection
share

RxJava原理保姆级教学-炒冷饭

作者站长头像
站长
· 阅读数 15

看完这篇文章你会了解到什么

  1. Observable、Single、Flowable、Completable、Maybe的区别
  2. Rxjava 常规的流程
  3. subscribeOn多次调用是否生效的原因
  4. observeOn多次调用是否生效的原因

Observable、Flowable、Single、Completable、Maybe的区别

区别
Observable可多次发送事件,直到 onComplete 或 onError 被调用结束订阅可以发送多次
Flowable和 Observable 一样。并且可以处理背压(下游处理速度比上游发送速度慢)可以发送多次且处理流速问题
Single单次发送事件(onSuccess、onError),发完即结束订阅。只能发送单次一般用于网络请求
Completable单次发送事件(onError、onComplete),发完即结束订阅只能发送单次没有失败只有完成
Maybe单次发送事件(onSuccess、onError、onComplete),发完即结束订阅。只能发送单次相当于 Completable 和Single 结合

小结

ObservableFlowable可以发送多次。其他的只能发送单次,且发送完即结束。根据自身业务选择更合适的api使用。例如应用内部传递数据,可以使用Completable不关心结果。网络请求成功失败使用Single。

Rxjava 常规的流程

以下面代码为例

Single.just(1)
        .subscribe(object : SingleObserver<Int> {
            override fun onSuccess(t: Int?) {
            }

            override fun onSubscribe(d: Disposable?) {
            }

            override fun onError(e: Throwable?) {
            }

        })

Singlejust

Single.just()点击源码你会发现下面的实现

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <@NonNull T> Single<T> just(T item) {
    //1、判空操作
    Objects.requireNonNull(item, "item is null");
    //2、RxJavaPlugins.onAssembly hook用的,最后返回类型还是Single<T>
    //3、创建了一个SingleJust的子类,这里的item也就是我们示例代码中的source 1
    return RxJavaPlugins.onAssembly(new SingleJust<>(item));
}

而SingleJust的结构是这样的

public final class SingleJust<T> extends Single<T> {

    final T value;
    //1.示例中我们的source 1
    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    //2 记住这个方法subscribeActual 待会分析,后续我们会多次提到他。先称之为 老熟人
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposable.disposed());
        observer.onSuccess(value);
    }

}

那么我的示例代码如果去除掉判空和hook操作,可以这样写。

SingleJust(1).subscribe(object:SingleObserver<Int>{
    override fun onSuccess(t: Int?) {

    }

    override fun onSubscribe(d: Disposable?) {

    }

    override fun onError(e: Throwable?) {

    }

在SingleJust的源码中,并没有subscribe方法,那么去看一下他的父类Single

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull SingleObserver<? super T> observer) {
    //判空
    Objects.requireNonNull(observer, "observer is null");
    //hook
    observer = RxJavaPlugins.onSubscribe(this, observer);

    Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

    try {
        //老熟人
        subscribeActual(observer);
    } catch (NullPointerException ex) {
        throw ex;
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        NullPointerException npe = new NullPointerException("subscribeActual failed");
        npe.initCause(ex);
        throw npe;
    }
}

还记得子类SingleJust中实现的subscribeActual方法吗。那如果不出意外的话父类中的subscribeActual应该是抽象方法。继续在父类Single中寻找果然没有意外。

protected abstract void subscribeActual(@NonNull SingleObserver<? super T> observer);

PS:味道有点类似于我们写UI,BaseActivity 中的getLayoutId操作

abstract class BaseActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(getLayoutId())
    }

    protected abstract fun getLayoutId(): Int
}


class MainActivity : BaseActivity() {
    override fun getLayoutId(): Int {
        return R.layout.activity_main
    }
}

我们继续回到RxJava来 父类中调用的subscribe去除些判空操作和hook,其实就是调用的SingleJust中实现的subscribeActual并且把我们创建的observer作为参数传递了进去。那么现在回头来看我们的SingleJust无比的清晰。

@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
    //通过onSubscribe 给 observer观察者回调一个dispose
    observer.onSubscribe(Disposable.disposed());
    //通过onSuccess 给观察者回调我们的source 也就是我们示例中的1
    observer.onSuccess(value);
}

SingleMap

那么给我们的示例再增加一点代码

Single.just(1)
        .map(object : Function<Int, String> {
            override fun apply(t: Int): String {
                return t.toString()
            }

        })
        .subscribe(object : SingleObserver<String> {
            override fun onSuccess(t: String?) {

            }

            override fun onSubscribe(d: Disposable?) {

            }

            override fun onError(e: Throwable?) {

            }

        })

前面还是一样的逻辑。 我们最初的Single 被包装成了SingleJust,map跟进源码会发现我们再次被包装成了SingleMap,熟悉的感觉,判空,hook,创建对象。区别的地方在于,SingleJust传递的参数是数据源,而map传递的参数是this(this 是啥?在此示例中是我们的SingleJust对象),和mapper(mapper又是啥?是我们在外面创建的 object : Function<Int, String>{ 巴拉巴拉 }这一堆具体的int 转 String 方法!)

@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
    Objects.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}

此刻不妨大胆的猜一猜,其他转换类型的操作符,比如filter,zipWith,flatMap等等,也会把this当做参数传递进去。没错我们在不知不觉中开启了无限套娃模式。当然在设计模式中专业的名字叫做装饰者模式。好奇的同学可以打开浏览器搜索,在本文我们不做深入探讨。

@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Maybe<T> filter(@NonNull Predicate<? super T> predicate) {
    Objects.requireNonNull(predicate, "predicate is null");
    return RxJavaPlugins.onAssembly(new MaybeFilterSingle<>(this, predicate));
}

@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <@NonNull U, @NonNull R> Maybe<R> zipWith(@NonNull MaybeSource<? extends U> other, @NonNull BiFunction<? super T, ? super U, ? extends R> zipper) {
    Objects.requireNonNull(other, "other is null");
    return zip(this, other, zipper);
}

@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <@NonNull R> Single<R> flatMap(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
    Objects.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new SingleFlatMap<>(this, mapper));
}

好的回到我们的示例中。Single.just(1).map(巴拉巴拉).subscrible(巴拉巴拉) 我们从Single 被变成SingleJust,又被包装成SingleMap。那么这个subscrible方法其实是调用的SingleMap的subscrible方法。我们跟进SingleMap的源码

//为了阅读方便我们在看一下map的api
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
    Objects.requireNonNull(mapper, "mapper is null");
    //this 就是我们的SingleJust(1) , mapper 我们自己为了转换,创建的对象
    return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}


public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;

    final Function<? super T, ? extends R> mapper;

    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        //1、数据源,示例中的SingleJust(1)
        this.source = source;
        //2、类型转换的实现对象
        this.mapper = mapper;
    }

    //3、老熟人,还记得这块逻辑吗。 复习一下SingleMap没有subscribe方法所以去父类Single中找,
    // 父类中subscribe中调用抽象方法subscribeActual。也就是子类SingleMap实现的subscribeActual方法
    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        //4、source 也就是 SingleJust(1),我们称之为上游。
        //5、MapSingleObserver创建了一个观察者去 订阅上游。
        //6、t 最外面我们自己创建的观者
        //7、**重点分析** source.subscribe 这里SingleMap 再去触发自己上游的 subscribe 方法。
        //也就是我们示例中的SingleJust(1)的 subscribeActual 方法,其中只有回调onSubscribe 和回调
//onSuccess,都回调到我们的MapSingleObserver 
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }

    static final class MapSingleObserver<T, R> implements SingleObserver<T> {

        final SingleObserver<? super R> t;

        final Function<? super T, ? extends R> mapper;

        MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
            //8、外部的观察者
            this.t = t;
            this.mapper = mapper;
        }

        @Override
        public void onSubscribe(Disposable d) {
            //9、由上面7触发的回调。在此回调到外面的观察者(也就是我们写代码自己实现的那个观察者)
            t.onSubscribe(d);
        }

        //10、由上面7触发的回调
        @Override
        public void onSuccess(T value) {
            R v;
            try {
            //11、类型转换,我们外面使用map(巴拉巴拉)自己是实现的方法
                v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(e);
                return;
            }
            //12、最终结果回调给外部
            t.onSuccess(v);
        }

        @Override
        public void onError(Throwable e) {
            t.onError(e);
        }
    }
}

整理一下流程 RxJava原理保姆级教学-炒冷饭

SingleCreate

修改一下我们的示例代码

Single.create(object : SingleOnSubscribe<Int> {
    //记住这里,又一个订阅方法?
    override fun subscribe(emitter: SingleEmitter<Int>) {
        //A1、**一会分析如何触发记住这里SingleOnSubscribe 对象,以及参数 emitter**
        emitter.onSuccess(1)
        Log.d(TAG, "subscribe is run on main Thread ? " +
                "${Thread.currentThread().name == mainLooper.thread.name}")
    }
}).subscribe(object : SingleObserver<Int> {
            override fun onSuccess(t: Int) {
                Log.d(TAG, "onSuccess is run on main Thread ? " +
                        "${Thread.currentThread().name == mainLooper.thread.name}")
            }

            override fun onSubscribe(d: Disposable?) {
                // A2、一会分析
            }

            override fun onError(e: Throwable?) {

            }
        })

SingleCreate 和 SingleJust 有点小差别,跟进一下源码

@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <@NonNull T> Single<T> create(@NonNull SingleOnSubscribe<T> source) {
    Objects.requireNonNull(source, "source is null");
    //1、创建一个新的SingleCreat对象 参数传入source
    //   source也就是我们在外部自己实现的object : SingleOnSubscribe<Int>巴拉巴拉那一大堆上游数据源
    return RxJavaPlugins.onAssembly(new SingleCreate<>(source));
}


public final class SingleCreate<T> extends Single<T> {

    final SingleOnSubscribe<T> source;

    public SingleCreate(SingleOnSubscribe<T> source) {
        //2、外部自己实现的object : SingleOnSubscribe<Int>巴拉巴拉那一大堆上游数据源
        this.source = source;
    }

    //3、老熟人 subscribe会走到这里
    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        Emitter<T> parent = new Emitter<>(observer);
        //4、创建了一个parent,通过onSubscribe回调给下游的观察者
        //(parent,实现了Disposable)也就是示例中的A2
        observer.onSubscribe(parent);

        try {
        //5、parent订阅上游。触发上游的subscribe方法。
        //也就是触发外部自己实现的object : SingleOnSubscribe<Int>巴拉巴拉那一大堆的subscribe方法。
        //还记得是啥吗,不记得往上翻一下,SingleCreate示例中的A1,我们在subscribe方法中调用了emitter.onSuccess(1)
        //emitter 又是啥?是subscribe方法的参数,没错就是parent!,emitter.onSuccess(1),也就是parent.onSuccess(1)
        //**这块如果不李姐,建议从SingleCreate部分开始重新再看一下。
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    static final class Emitter<T>
    extends AtomicReference<Disposable>
    implements SingleEmitter<T>, Disposable {

        private static final long serialVersionUID = -2467358622224974244L;

        final SingleObserver<? super T> downstream;

        Emitter(SingleObserver<? super T> downstream) {
            this.downstream = downstream;
        }

        //6、由上面5触发
        @Override
        public void onSuccess(T value) {
            if (get() != DisposableHelper.DISPOSED) {
            //以原子方式设置为给定值,并返回以前的值。
                Disposable d = getAndSet(DisposableHelper.DISPOSED);
                if (d != DisposableHelper.DISPOSED) {
                    try {
                        if (value == null) {
                            downstream.onError(ExceptionHelper.createNullPointerException("onSuccess called with a null value."));
                        } else {
                        //7、给下游回调成功结果 
                            downstream.onSuccess(value);
                        }
                    } finally {
                        if (d != null) {
                            d.dispose();
                        }
                    }
                }
            }
        }
        。。。省略无关代码
}

我们示例约等于

SingleCreate<Int>({emitter ->emitter.onSuccess(1)})
        .subscribe(巴拉巴拉)

再给他增加一个subscribeOn(Schedulers.io())

SingleCreate<Int>({emitter ->emitter.onSuccess(1)})
        .subscribeOn(Schedulers.io())
        .subscribe(巴拉巴拉)

SingleSubscribeOn

subscribeOn跟进一下源码

public final Single<T> subscribeOn(@NonNull Scheduler scheduler) {
    Objects.requireNonNull(scheduler, "scheduler is null");
    //this 是上游也就是示例中的SingleCreate<Int>({emitter ->emitter.onSuccess(1)})
    return RxJavaPlugins.onAssembly(new SingleSubscribeOn<>(this, scheduler));
}


public final class SingleSubscribeOn<T> extends Single<T> {
    final SingleSource<? extends T> source;

    final Scheduler scheduler;

    public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    //老熟人subscribe 会调到这里
    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        //创建一个parent对象,且这个parent对象实现了runnable接口(当然他也实现了SingleObserver和Disposable接口)!
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
        //parent 作为disposeable 回调给观察者
        observer.onSubscribe(parent);
        //**重点**A1、这里scheduler 是我们传入的scheduler.io 待会分析
        Disposable f = scheduler.scheduleDirect(parent);

        parent.task.replace(f);

    }

    //** 这个类实现了runnable 接口!
    static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {

        private static final long serialVersionUID = 7000911171163930287L;

        final SingleObserver<? super T> downstream;

        final SequentialDisposable task;

        final SingleSource<? extends T> source;

        SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
            this.downstream = actual;
            this.source = source;
            this.task = new SequentialDisposable();
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }

        @Override
        public void onSuccess(T value) {
            //无线程切换,直接回调
            downstream.onSuccess(value);
        }

        @Override
        public void onError(Throwable e) {
            downstream.onError(e);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
            task.dispose();
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        //记住这里 B1 **重点** (提前剧透一下这个是套娃最内部的run)
        @Override
        public void run() {
            //因为自己实现了SingleObserver,所以自己也是个观察者,自己订阅上游,触发上游的subscribe
            source.subscribe(this);
        }
    }

}

通过A1我们会跟进到Scheduler的源码

//这里的run 就是在SingleSubscribeOn SingleSubscribeOn方法中
//创建的parent = new SubscribeOnObserver<>(observer, source);
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

我们继续跟进scheduleDirect

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    //盲猜线程切换应该跟这个worker和下面的runnable 有关。
    final Worker w = createWorker();
    //包装了一下传进来的run
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //**重点A2 DisposeTask 先看明白上面的w是啥然后就在继续跟进
    DisposeTask task = new DisposeTask(decoratedRun, w);
    //**重点A3 
    w.schedule(task, delay, unit);

    return task;
}

//先看一下createWorker,又是抽象,那么看看有哪些子类实现了她
@NonNull
public abstract Worker createWorker();

刚好Scheduler 的子类里面。有一个叫IoScheduler 感觉有点像Schedulers.io() RxJava原理保姆级教学-炒冷饭 再从subscribeOn(Schedulers.io()) 继续跟进源码Scheduler类中

@NonNull
public static Scheduler io() {
    //看这个IO是啥
    return RxJavaPlugins.onIoScheduler(IO);
}
static {
    SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

    COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
    //继续IOTask
    IO = RxJavaPlugins.initIoScheduler(new IOTask());

    TRAMPOLINE = TrampolineScheduler.instance();

    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
static final class IOTask implements Supplier<Scheduler> {
    @Override
    public Scheduler get() {
        //继续找DEFAULT是啥
        return IoHolder.DEFAULT;
    }
}

static final class IoHolder {
    //缘来就是你呀兄die
    static final Scheduler DEFAULT = new IoScheduler();
}

那么回到上面的A2继续看吧,我直接贴代码下来了

//还记得这个run吗 就是在SingleSubscribeOn 的subscribeActual方法中创建的parent = new SubscribeOnObserver<>(observer, source);
public Disposable scheduleDirect(Runnable run, long delay, @NonNull TimeUnit unit) {
    //这里w 就是IoScheduler 
    final Worker w = createWorker();
    //包装了一下传进来的run
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //**重点** 看一下DisposeTask 记住这个参数 decoratedRun她包装了run
    DisposeTask task = new DisposeTask(decoratedRun, w);
    //那我们继续跟进 看看 w.schedule干了啥
    w.schedule(task, delay, unit);

    return task;
}

//先看一下DisposeTask是啥,实现了runnable
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {

    @NonNull
    final Runnable decoratedRun;

    @NonNull
    final Worker w;

    @Nullable
    Thread runner;

    DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
        //赋值
        this.decoratedRun = decoratedRun;
        this.w = w;
    }

    @Override
    public void run() {
        runner = Thread.currentThread();
        try {
            try {
                //在自己的run方法里调用 decoratedRun的run方法,也就是SingleSubscribeOn 
                //subscribeActual方法中创建的parent = new SubscribeOnObserver<>(observer, source);也就是上面的B1,套娃run方法,source.subscribe(this);订阅上游
                //还记得我们示例中上游是什么吗,没错是SingleCreate
                //而SingleCreate的source.subscribe(parent);是我们自己实现的方法,会触发数据的产生。
                decoratedRun.run();
            } catch (Throwable ex) {
                // Exceptions.throwIfFatal(e); nowhere to go
                RxJavaPlugins.onError(ex);
                throw ex;
            }
        } finally {
            dispose();
            runner = null;
        }
    }

    @Override
    public void dispose() {
        if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
            ((NewThreadWorker)w).shutdown();
        } else {
            w.dispose();
        }
    }

    @Override
    public boolean isDisposed() {
        return w.isDisposed();
    }

    @Override
    public Runnable getWrappedRunnable() {
        return this.decoratedRun;
    }
}

回到IoScheduler,看完DisposeTask,接着看w.schedule(task, delay, unit);

//为了方便我直接copy过来了
public Disposable scheduleDirect(Runnable run, long delay, @NonNull TimeUnit unit) {
    //这里w 就是IoScheduler 
    final Worker w = createWorker();
    //包装了一下传进来的run
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //task 里面的run 会调用decoratedRun 的run,而decoratedRun是包装了传进来的run
    //实际上就是调用传进来的run对象的run方法。
    //如果忘了这个run对象是谁,那么回到上面 SingleSubscribeOn 的分析看一下。
    //其实就是(SingleSubscribeOn类中老熟人房里创建的 final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);)
    //这个parent 即实现了观察者接口,也实现了runnable接口。而这个parent的run 调用source.subscribe(this);订阅上游,触发上游的老熟人方法。
    DisposeTask task = new DisposeTask(decoratedRun, w);
    //如果上面的理解了,那我们继续跟进 看看 w.schedule干了啥
    w.schedule(task, delay, unit);

    return task;
}

//跟进w.schedule源码
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }
    // 先看看threadWorker是个啥,然后再看scheduleActual
    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}

//OK继续看父类NewThreadWorker 是个啥
static final class ThreadWorker extends NewThreadWorker {

    long expirationTime;

    ThreadWorker(ThreadFactory threadFactory) {
        super(threadFactory);
        this.expirationTime = 0L;
    }

    public long getExpirationTime() {
        return expirationTime;
    }

    public void setExpirationTime(long expirationTime) {
        this.expirationTime = expirationTime;
    }
}


public class NewThreadWorker extends Scheduler.Worker {
    //原生executor
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        //继续这里 跟进create方法
        executor = SchedulerPoolFactory.create(threadFactory);
    }
 }
 

public static ScheduledExecutorService create(ThreadFactory factory) {
    //好的原来是你
    final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, factory);
    exec.setRemoveOnCancelPolicy(PURGE_ENABLED);
    return exec;
}

值得一提的是IoShceuler除了threadWorker还有CachedWorkerPool。所以应该是在 Schedulers.io() 应该是在 Schedulers.newThread() 的基础上做了缓存的操作。 RxJava原理保姆级教学-炒冷饭 接着跟进NewThreadWorker中scheduleActual方法

@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    //包装下run
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //继续包装
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        if (delayTime <= 0) {
        //使用线程池 执行被包装后的run,还记得run方法是什么吗,没错就是那个套娃run方法!
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}

subscribeOn 调用多次

RxJava原理保姆级教学-炒冷饭

小结

subscribeOn多次调用只有距离数据源最近的一次调用生效(会影响到数据产生和回调)。也就是我们普遍认为的subscribeOn只有第一次调用生效 类似于下面伪代码

ThreadMain {
    Thread {
        Thread {
            //build data 生产数据且回调给下游
            emitter.onSuccess(1)
        }.start()
    }.start()
}.start

SingleObserveOn

好了最后再修改一下我们的示例代码。为她添加一个observeOn操作

Single.create(SingleOnSubscribe<Int> { emitter -> emitter.onSuccess(1) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({

    }, {

    })

我们跟进observeOn源码

@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Single<T> observeOn(@NonNull Scheduler scheduler) {
    Objects.requireNonNull(scheduler, "scheduler is null");
    //依旧是套娃模式,为我们创建一个新对象SingleObserveOn
    return RxJavaPlugins.onAssembly(new SingleObserveOn<>(this, scheduler));
}

public final class SingleObserveOn<T> extends Single<T> {
    //上游(这里是说相当于SingleObserveOn自己的上游,并不是最上游)
    final SingleSource<T> source;
    //线程 
    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    //老熟人为什么会调用到这个方法,你应该知道了吧
    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        //创建一个新的观察者去订阅上游,调用上游的subscribe方法又回触发上游的老熟人方法,依次类推
        source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
    }

    static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
        private static final long serialVersionUID = 3528003840217436037L;

        final SingleObserver<? super T> downstream;

        final Scheduler scheduler;

        T value;
        Throwable error;

        ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
            //函数名就很清晰了,下游,在我们的示例代码中这个下游就是最下游。
            this.downstream = actual;
            this.scheduler = scheduler;
        }

        @Override
        public void onSubscribe(Disposable d) {
            //拿到上游的给的回调,我们回调给下游
            if (DisposableHelper.setOnce(this, d)) {
                downstream.onSubscribe(this);
            }
        }

        @Override
        public void onSuccess(T value) {
            //先赋值
            this.value = value;
            //熟悉的scheduleDirect,线程切换的操作,按照之前的套路最后触发run操,一会跟进scheduler.scheduleDirect(this)
            //记住这里,收到onSuccess回调后才切换线程 A1
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void onError(Throwable e) {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void run() {
            Throwable ex = error;
            if (ex != null) {
                downstream.onError(ex);
            } else {
                //上面保存下来的值回调给下游 A2
                downstream.onSuccess(value);
            }
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }

跟进一下 scheduler.scheduleDirect(this) 源码

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
 
//有了上面的分析subscribeOn经验,再分析scheduleDirect应该很好理解
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    //应该和AndroidSchedulers.mainThread()有关吧?
    final Worker w = createWorker();
    //包装run
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //task 的run会调用到 decoratedRun的run。
    DisposeTask task = new DisposeTask(decoratedRun, w);
    //按照之前的套路他应该是切换线程的地方
    w.schedule(task, delay, unit);

    return task;
}

我们从 AndroidSchedulers.mainThread() 跟进一下

public static Scheduler mainThread() {
    //继续跟进看看MAIN_THREAD 是个啥
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}


private static final Scheduler MAIN_THREAD =
    //继续找这个 MainHolder.DEFAULT
    RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);


private static final class MainHolder {
    //亲切的handler 和MainLooper,以后本人问你RxJava或者AndroidSchedulers.mainThread()
    //怎么换到主线程的,你应该知道怎么说了吧
    static final Scheduler DEFAULT
    //最后在看一下HandlerScheduler 的schedule方法
        = new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
}

HandlerScheduler

@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");

    if (disposed) {
        return Disposable.disposed();
    }

    run = RxJavaPlugins.onSchedule(run);
    //1、包装run 
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    //2、Message.obtain
    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    if (async) {
        message.setAsynchronous(true);
    }
    //3、sendMessageDelayed
    handler.sendMessageDelayed(message, unit.toMillis(delay));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposable.disposed();
    }

    return scheduled;
}

小结

那么通过上面分析的A1A2 ,是否可以推断出,多次调用observeOn的时候每一次都会生效。因为第一个onserveOn会在onSuccess的时候切换线程然后,通过 downstream.onSuccess(value) 通知到下游的observeOn,下游的observeOn又重复了一次这个操作,在onSuccess的时候切换线程,继续通知下游。

在复习一下为什么subscribeOn只有第一次才生效,因为subscribeOn只有第一次调用时才会影响到数据源的产出与回调。

一些情况的demo验证

Single.create(SingleOnSubscribe<Int> { emitter ->
        Log.d(
            TAG, " emitter.onSuccess(1) is run on main Thread ? " +
                    "${Thread.currentThread().name == mainLooper.thread.name}"
        )
        emitter.onSuccess(1)
    })
        .subscribeOn(Schedulers.io())
        .subscribe({
            Log.d(
                TAG, "subscribe is run on main Thread ? " +
                        "${Thread.currentThread().name == mainLooper.thread.name}"
            )
        }, {

        })
ps:只有subscribeOn的情况下,subscribeOn也会影响到subscribe接收结果的线程
D/MainActivity: emitter.onSuccess(1) is run on main Thread ? false
D/MainActivity: subscribe is run on main Thread ? false
Single.create(SingleOnSubscribe<Int> { emitter ->
    Log.d(
        TAG, " emitter.onSuccess(1) is run on main Thread ? " +
                "${Thread.currentThread().name == mainLooper.thread.name}"
    )
    emitter.onSuccess(1)
})
    .subscribeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribeOn(Schedulers.io())
    .subscribe({
        Log.d(
            TAG, "subscribe is run on main Thread ? " +
                    "${Thread.currentThread().name == mainLooper.thread.name}"
        )
    }, {

    })
 ps:subscribeOn 多次只有距离数据源最近的一次才生效
 D/MainActivity: emitter.onSuccess(1) is run on main Thread ? true
 D/MainActivity: subscribe is run on main Thread ? true
Single.create(SingleOnSubscribe<Int> { emitter ->
    Log.d(
        TAG, "emitter.onSuccess(1) is run on main Thread ? " +
                "${Thread.currentThread().name == mainLooper.thread.name}"
    )
    emitter.onSuccess(1)
})
    .observeOn(Schedulers.io())
    .subscribe({
        Log.d(
            TAG, "subscribe is run on main Thread ? " +
                    "${Thread.currentThread().name == mainLooper.thread.name}"
        )
    }, {

    })
    
ps:observeOn 是在onsuccess回调的时候切换线程,所以不会影响到数据的生产线程(上游)
D/MainActivity: emitter.onSuccess(1) is run on main Thread ? true
D/MainActivity: subscribe is run on main Thread ? false

observeOn多次切换多次生效 RxJava原理保姆级教学-炒冷饭 以及observeOn和subscribenOn的调用顺序对结果没有影响

RxJava原理保姆级教学-炒冷饭

RxJava原理保姆级教学-炒冷饭

总结

上面都有小结好像没啥好总结的。 不知道你会不会纠结这个问题。 按照常规的理解,应该是 观察者 -> 订阅 ->被观察者 而我们的代码全部都是 被观察者 -> 订阅 ->观察者

Single.just(1).subscribe({
    println("$it")
},{
    
})

为什么呢?因为无限套娃呀。还记得老熟人吗,老熟人里基本都会有一句source.subscribe(XXX)当前触发自己上游的subscribe,并且自己订阅上游。上游因为被触发了subscribe,继续触发上游的上游,并且上游订阅上游的上游。以此类推。最后在最上游生成数据onSuccess再一次回调下来。最下游再回调给最外部的观察者。