RxJava原理保姆级教学-炒冷饭
看完这篇文章你会了解到什么
- Observable、Single、Flowable、Completable、Maybe的区别
- Rxjava 常规的流程
- subscribeOn多次调用是否生效的原因
- observeOn多次调用是否生效的原因
Observable、Flowable、Single、Completable、Maybe的区别
区别 | ||
---|---|---|
Observable | 可多次发送事件,直到 onComplete 或 onError 被调用结束订阅 | 可以发送多次 |
Flowable | 和 Observable 一样。并且可以处理背压(下游处理速度比上游发送速度慢) | 可以发送多次且处理流速问题 |
Single | 单次发送事件(onSuccess、onError),发完即结束订阅。 | 只能发送单次一般用于网络请求 |
Completable | 单次发送事件(onError、onComplete),发完即结束订阅 | 只能发送单次没有失败只有完成 |
Maybe | 单次发送事件(onSuccess、onError、onComplete),发完即结束订阅。 | 只能发送单次相当于 Completable 和Single 结合 |
小结
Observable 和 Flowable可以发送多次。其他的只能发送单次,且发送完即结束。根据自身业务选择更合适的api使用。例如应用内部传递数据,可以使用Completable不关心结果。网络请求成功失败使用Single。
Rxjava 常规的流程
以下面代码为例
Single.just(1)
.subscribe(object : SingleObserver<Int> {
override fun onSuccess(t: Int?) {
}
override fun onSubscribe(d: Disposable?) {
}
override fun onError(e: Throwable?) {
}
})
Singlejust
Single.just()点击源码你会发现下面的实现
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <@NonNull T> Single<T> just(T item) {
//1、判空操作
Objects.requireNonNull(item, "item is null");
//2、RxJavaPlugins.onAssembly hook用的,最后返回类型还是Single<T>
//3、创建了一个SingleJust的子类,这里的item也就是我们示例代码中的source 1
return RxJavaPlugins.onAssembly(new SingleJust<>(item));
}
而SingleJust的结构是这样的
public final class SingleJust<T> extends Single<T> {
final T value;
//1.示例中我们的source 1
public SingleJust(T value) {
this.value = value;
}
@Override
//2 记住这个方法subscribeActual 待会分析,后续我们会多次提到他。先称之为 老熟人
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposable.disposed());
observer.onSuccess(value);
}
}
那么我的示例代码如果去除掉判空和hook操作,可以这样写。
SingleJust(1).subscribe(object:SingleObserver<Int>{
override fun onSuccess(t: Int?) {
}
override fun onSubscribe(d: Disposable?) {
}
override fun onError(e: Throwable?) {
}
在SingleJust的源码中,并没有subscribe方法,那么去看一下他的父类Single
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull SingleObserver<? super T> observer) {
//判空
Objects.requireNonNull(observer, "observer is null");
//hook
observer = RxJavaPlugins.onSubscribe(this, observer);
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
try {
//老熟人
subscribeActual(observer);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}
还记得子类SingleJust中实现的subscribeActual方法吗。那如果不出意外的话父类中的subscribeActual应该是抽象方法。继续在父类Single中寻找果然没有意外。
protected abstract void subscribeActual(@NonNull SingleObserver<? super T> observer);
PS:味道有点类似于我们写UI,BaseActivity 中的getLayoutId操作
abstract class BaseActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(getLayoutId())
}
protected abstract fun getLayoutId(): Int
}
class MainActivity : BaseActivity() {
override fun getLayoutId(): Int {
return R.layout.activity_main
}
}
我们继续回到RxJava来 父类中调用的subscribe去除些判空操作和hook,其实就是调用的SingleJust中实现的subscribeActual并且把我们创建的observer作为参数传递了进去。那么现在回头来看我们的SingleJust无比的清晰。
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
//通过onSubscribe 给 observer观察者回调一个dispose
observer.onSubscribe(Disposable.disposed());
//通过onSuccess 给观察者回调我们的source 也就是我们示例中的1
observer.onSuccess(value);
}
SingleMap
那么给我们的示例再增加一点代码
Single.just(1)
.map(object : Function<Int, String> {
override fun apply(t: Int): String {
return t.toString()
}
})
.subscribe(object : SingleObserver<String> {
override fun onSuccess(t: String?) {
}
override fun onSubscribe(d: Disposable?) {
}
override fun onError(e: Throwable?) {
}
})
前面还是一样的逻辑。 我们最初的Single 被包装成了SingleJust,map跟进源码会发现我们再次被包装成了SingleMap,熟悉的感觉,判空,hook,创建对象。区别的地方在于,SingleJust传递的参数是数据源,而map传递的参数是this(this 是啥?在此示例中是我们的SingleJust对象),和mapper(mapper又是啥?是我们在外面创建的 object : Function<Int, String>{ 巴拉巴拉 }这一堆具体的int 转 String 方法!)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}
此刻不妨大胆的猜一猜,其他转换类型的操作符,比如filter,zipWith,flatMap等等,也会把this当做参数传递进去。没错我们在不知不觉中开启了无限套娃模式。当然在设计模式中专业的名字叫做装饰者模式。好奇的同学可以打开浏览器搜索,在本文我们不做深入探讨。
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Maybe<T> filter(@NonNull Predicate<? super T> predicate) {
Objects.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new MaybeFilterSingle<>(this, predicate));
}
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <@NonNull U, @NonNull R> Maybe<R> zipWith(@NonNull MaybeSource<? extends U> other, @NonNull BiFunction<? super T, ? super U, ? extends R> zipper) {
Objects.requireNonNull(other, "other is null");
return zip(this, other, zipper);
}
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <@NonNull R> Single<R> flatMap(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleFlatMap<>(this, mapper));
}
好的回到我们的示例中。Single.just(1).map(巴拉巴拉).subscrible(巴拉巴拉) 我们从Single 被变成SingleJust,又被包装成SingleMap。那么这个subscrible方法其实是调用的SingleMap的subscrible方法。我们跟进SingleMap的源码
//为了阅读方便我们在看一下map的api
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
//this 就是我们的SingleJust(1) , mapper 我们自己为了转换,创建的对象
return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
//1、数据源,示例中的SingleJust(1)
this.source = source;
//2、类型转换的实现对象
this.mapper = mapper;
}
//3、老熟人,还记得这块逻辑吗。 复习一下SingleMap没有subscribe方法所以去父类Single中找,
// 父类中subscribe中调用抽象方法subscribeActual。也就是子类SingleMap实现的subscribeActual方法
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
//4、source 也就是 SingleJust(1),我们称之为上游。
//5、MapSingleObserver创建了一个观察者去 订阅上游。
//6、t 最外面我们自己创建的观者
//7、**重点分析** source.subscribe 这里SingleMap 再去触发自己上游的 subscribe 方法。
//也就是我们示例中的SingleJust(1)的 subscribeActual 方法,其中只有回调onSubscribe 和回调
//onSuccess,都回调到我们的MapSingleObserver
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
final SingleObserver<? super R> t;
final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
//8、外部的观察者
this.t = t;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable d) {
//9、由上面7触发的回调。在此回调到外面的观察者(也就是我们写代码自己实现的那个观察者)
t.onSubscribe(d);
}
//10、由上面7触发的回调
@Override
public void onSuccess(T value) {
R v;
try {
//11、类型转换,我们外面使用map(巴拉巴拉)自己是实现的方法
v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
//12、最终结果回调给外部
t.onSuccess(v);
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
}
}
整理一下流程
SingleCreate
修改一下我们的示例代码
Single.create(object : SingleOnSubscribe<Int> {
//记住这里,又一个订阅方法?
override fun subscribe(emitter: SingleEmitter<Int>) {
//A1、**一会分析如何触发记住这里SingleOnSubscribe 对象,以及参数 emitter**
emitter.onSuccess(1)
Log.d(TAG, "subscribe is run on main Thread ? " +
"${Thread.currentThread().name == mainLooper.thread.name}")
}
}).subscribe(object : SingleObserver<Int> {
override fun onSuccess(t: Int) {
Log.d(TAG, "onSuccess is run on main Thread ? " +
"${Thread.currentThread().name == mainLooper.thread.name}")
}
override fun onSubscribe(d: Disposable?) {
// A2、一会分析
}
override fun onError(e: Throwable?) {
}
})
SingleCreate 和 SingleJust 有点小差别,跟进一下源码
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <@NonNull T> Single<T> create(@NonNull SingleOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
//1、创建一个新的SingleCreat对象 参数传入source
// source也就是我们在外部自己实现的object : SingleOnSubscribe<Int>巴拉巴拉那一大堆上游数据源
return RxJavaPlugins.onAssembly(new SingleCreate<>(source));
}
public final class SingleCreate<T> extends Single<T> {
final SingleOnSubscribe<T> source;
public SingleCreate(SingleOnSubscribe<T> source) {
//2、外部自己实现的object : SingleOnSubscribe<Int>巴拉巴拉那一大堆上游数据源
this.source = source;
}
//3、老熟人 subscribe会走到这里
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
Emitter<T> parent = new Emitter<>(observer);
//4、创建了一个parent,通过onSubscribe回调给下游的观察者
//(parent,实现了Disposable)也就是示例中的A2
observer.onSubscribe(parent);
try {
//5、parent订阅上游。触发上游的subscribe方法。
//也就是触发外部自己实现的object : SingleOnSubscribe<Int>巴拉巴拉那一大堆的subscribe方法。
//还记得是啥吗,不记得往上翻一下,SingleCreate示例中的A1,我们在subscribe方法中调用了emitter.onSuccess(1)
//emitter 又是啥?是subscribe方法的参数,没错就是parent!,emitter.onSuccess(1),也就是parent.onSuccess(1)
//**这块如果不李姐,建议从SingleCreate部分开始重新再看一下。
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class Emitter<T>
extends AtomicReference<Disposable>
implements SingleEmitter<T>, Disposable {
private static final long serialVersionUID = -2467358622224974244L;
final SingleObserver<? super T> downstream;
Emitter(SingleObserver<? super T> downstream) {
this.downstream = downstream;
}
//6、由上面5触发
@Override
public void onSuccess(T value) {
if (get() != DisposableHelper.DISPOSED) {
//以原子方式设置为给定值,并返回以前的值。
Disposable d = getAndSet(DisposableHelper.DISPOSED);
if (d != DisposableHelper.DISPOSED) {
try {
if (value == null) {
downstream.onError(ExceptionHelper.createNullPointerException("onSuccess called with a null value."));
} else {
//7、给下游回调成功结果
downstream.onSuccess(value);
}
} finally {
if (d != null) {
d.dispose();
}
}
}
}
}
。。。省略无关代码
}
我们示例约等于
SingleCreate<Int>({emitter ->emitter.onSuccess(1)})
.subscribe(巴拉巴拉)
再给他增加一个subscribeOn(Schedulers.io())
SingleCreate<Int>({emitter ->emitter.onSuccess(1)})
.subscribeOn(Schedulers.io())
.subscribe(巴拉巴拉)
SingleSubscribeOn
subscribeOn跟进一下源码
public final Single<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
//this 是上游也就是示例中的SingleCreate<Int>({emitter ->emitter.onSuccess(1)})
return RxJavaPlugins.onAssembly(new SingleSubscribeOn<>(this, scheduler));
}
public final class SingleSubscribeOn<T> extends Single<T> {
final SingleSource<? extends T> source;
final Scheduler scheduler;
public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
//老熟人subscribe 会调到这里
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
//创建一个parent对象,且这个parent对象实现了runnable接口(当然他也实现了SingleObserver和Disposable接口)!
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
//parent 作为disposeable 回调给观察者
observer.onSubscribe(parent);
//**重点**A1、这里scheduler 是我们传入的scheduler.io 待会分析
Disposable f = scheduler.scheduleDirect(parent);
parent.task.replace(f);
}
//** 这个类实现了runnable 接口!
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 7000911171163930287L;
final SingleObserver<? super T> downstream;
final SequentialDisposable task;
final SingleSource<? extends T> source;
SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
this.downstream = actual;
this.source = source;
this.task = new SequentialDisposable();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this, d);
}
@Override
public void onSuccess(T value) {
//无线程切换,直接回调
downstream.onSuccess(value);
}
@Override
public void onError(Throwable e) {
downstream.onError(e);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
task.dispose();
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
//记住这里 B1 **重点** (提前剧透一下这个是套娃最内部的run)
@Override
public void run() {
//因为自己实现了SingleObserver,所以自己也是个观察者,自己订阅上游,触发上游的subscribe
source.subscribe(this);
}
}
}
通过A1我们会跟进到Scheduler的源码
//这里的run 就是在SingleSubscribeOn SingleSubscribeOn方法中
//创建的parent = new SubscribeOnObserver<>(observer, source);
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
我们继续跟进scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//盲猜线程切换应该跟这个worker和下面的runnable 有关。
final Worker w = createWorker();
//包装了一下传进来的run
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//**重点A2 DisposeTask 先看明白上面的w是啥然后就在继续跟进
DisposeTask task = new DisposeTask(decoratedRun, w);
//**重点A3
w.schedule(task, delay, unit);
return task;
}
//先看一下createWorker,又是抽象,那么看看有哪些子类实现了她
@NonNull
public abstract Worker createWorker();
刚好Scheduler 的子类里面。有一个叫IoScheduler 感觉有点像Schedulers.io()
再从subscribeOn(Schedulers.io()) 继续跟进源码Scheduler类中
@NonNull
public static Scheduler io() {
//看这个IO是啥
return RxJavaPlugins.onIoScheduler(IO);
}
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
//继续IOTask
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
static final class IOTask implements Supplier<Scheduler> {
@Override
public Scheduler get() {
//继续找DEFAULT是啥
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
//缘来就是你呀兄die
static final Scheduler DEFAULT = new IoScheduler();
}
那么回到上面的A2继续看吧,我直接贴代码下来了
//还记得这个run吗 就是在SingleSubscribeOn 的subscribeActual方法中创建的parent = new SubscribeOnObserver<>(observer, source);
public Disposable scheduleDirect(Runnable run, long delay, @NonNull TimeUnit unit) {
//这里w 就是IoScheduler
final Worker w = createWorker();
//包装了一下传进来的run
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//**重点** 看一下DisposeTask 记住这个参数 decoratedRun她包装了run
DisposeTask task = new DisposeTask(decoratedRun, w);
//那我们继续跟进 看看 w.schedule干了啥
w.schedule(task, delay, unit);
return task;
}
//先看一下DisposeTask是啥,实现了runnable
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
@NonNull
final Runnable decoratedRun;
@NonNull
final Worker w;
@Nullable
Thread runner;
DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
//赋值
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
try {
//在自己的run方法里调用 decoratedRun的run方法,也就是SingleSubscribeOn
//subscribeActual方法中创建的parent = new SubscribeOnObserver<>(observer, source);也就是上面的B1,套娃run方法,source.subscribe(this);订阅上游
//还记得我们示例中上游是什么吗,没错是SingleCreate
//而SingleCreate的source.subscribe(parent);是我们自己实现的方法,会触发数据的产生。
decoratedRun.run();
} catch (Throwable ex) {
// Exceptions.throwIfFatal(e); nowhere to go
RxJavaPlugins.onError(ex);
throw ex;
}
} finally {
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
@Override
public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}
回到IoScheduler,看完DisposeTask,接着看w.schedule(task, delay, unit);
//为了方便我直接copy过来了
public Disposable scheduleDirect(Runnable run, long delay, @NonNull TimeUnit unit) {
//这里w 就是IoScheduler
final Worker w = createWorker();
//包装了一下传进来的run
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//task 里面的run 会调用decoratedRun 的run,而decoratedRun是包装了传进来的run
//实际上就是调用传进来的run对象的run方法。
//如果忘了这个run对象是谁,那么回到上面 SingleSubscribeOn 的分析看一下。
//其实就是(SingleSubscribeOn类中老熟人房里创建的 final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);)
//这个parent 即实现了观察者接口,也实现了runnable接口。而这个parent的run 调用source.subscribe(this);订阅上游,触发上游的老熟人方法。
DisposeTask task = new DisposeTask(decoratedRun, w);
//如果上面的理解了,那我们继续跟进 看看 w.schedule干了啥
w.schedule(task, delay, unit);
return task;
}
//跟进w.schedule源码
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
// 先看看threadWorker是个啥,然后再看scheduleActual
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
//OK继续看父类NewThreadWorker 是个啥
static final class ThreadWorker extends NewThreadWorker {
long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
public long getExpirationTime() {
return expirationTime;
}
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}
public class NewThreadWorker extends Scheduler.Worker {
//原生executor
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
//继续这里 跟进create方法
executor = SchedulerPoolFactory.create(threadFactory);
}
}
public static ScheduledExecutorService create(ThreadFactory factory) {
//好的原来是你
final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, factory);
exec.setRemoveOnCancelPolicy(PURGE_ENABLED);
return exec;
}
值得一提的是IoShceuler除了threadWorker还有CachedWorkerPool。所以应该是在
Schedulers.io() 应该是在 Schedulers.newThread() 的基础上做了缓存的操作。
接着跟进NewThreadWorker中scheduleActual方法
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
//包装下run
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//继续包装
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
//使用线程池 执行被包装后的run,还记得run方法是什么吗,没错就是那个套娃run方法!
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
subscribeOn 调用多次
小结
subscribeOn多次调用只有距离数据源最近的一次调用生效(会影响到数据产生和回调)。也就是我们普遍认为的subscribeOn只有第一次调用生效 类似于下面伪代码
ThreadMain {
Thread {
Thread {
//build data 生产数据且回调给下游
emitter.onSuccess(1)
}.start()
}.start()
}.start
SingleObserveOn
好了最后再修改一下我们的示例代码。为她添加一个observeOn操作
Single.create(SingleOnSubscribe<Int> { emitter -> emitter.onSuccess(1) })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
}, {
})
我们跟进observeOn源码
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Single<T> observeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
//依旧是套娃模式,为我们创建一个新对象SingleObserveOn
return RxJavaPlugins.onAssembly(new SingleObserveOn<>(this, scheduler));
}
public final class SingleObserveOn<T> extends Single<T> {
//上游(这里是说相当于SingleObserveOn自己的上游,并不是最上游)
final SingleSource<T> source;
//线程
final Scheduler scheduler;
public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
//老熟人为什么会调用到这个方法,你应该知道了吧
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
//创建一个新的观察者去订阅上游,调用上游的subscribe方法又回触发上游的老熟人方法,依次类推
source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
}
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 3528003840217436037L;
final SingleObserver<? super T> downstream;
final Scheduler scheduler;
T value;
Throwable error;
ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
//函数名就很清晰了,下游,在我们的示例代码中这个下游就是最下游。
this.downstream = actual;
this.scheduler = scheduler;
}
@Override
public void onSubscribe(Disposable d) {
//拿到上游的给的回调,我们回调给下游
if (DisposableHelper.setOnce(this, d)) {
downstream.onSubscribe(this);
}
}
@Override
public void onSuccess(T value) {
//先赋值
this.value = value;
//熟悉的scheduleDirect,线程切换的操作,按照之前的套路最后触发run操,一会跟进scheduler.scheduleDirect(this)
//记住这里,收到onSuccess回调后才切换线程 A1
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void onError(Throwable e) {
this.error = e;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void run() {
Throwable ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
//上面保存下来的值回调给下游 A2
downstream.onSuccess(value);
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
跟进一下 scheduler.scheduleDirect(this) 源码
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
//有了上面的分析subscribeOn经验,再分析scheduleDirect应该很好理解
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//应该和AndroidSchedulers.mainThread()有关吧?
final Worker w = createWorker();
//包装run
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//task 的run会调用到 decoratedRun的run。
DisposeTask task = new DisposeTask(decoratedRun, w);
//按照之前的套路他应该是切换线程的地方
w.schedule(task, delay, unit);
return task;
}
我们从 AndroidSchedulers.mainThread() 跟进一下
public static Scheduler mainThread() {
//继续跟进看看MAIN_THREAD 是个啥
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
private static final Scheduler MAIN_THREAD =
//继续找这个 MainHolder.DEFAULT
RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
private static final class MainHolder {
//亲切的handler 和MainLooper,以后本人问你RxJava或者AndroidSchedulers.mainThread()
//怎么换到主线程的,你应该知道怎么说了吧
static final Scheduler DEFAULT
//最后在看一下HandlerScheduler 的schedule方法
= new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
}
HandlerScheduler
@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposable.disposed();
}
run = RxJavaPlugins.onSchedule(run);
//1、包装run
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
//2、Message.obtain
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
//3、sendMessageDelayed
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposable.disposed();
}
return scheduled;
}
小结
那么通过上面分析的A1 和A2 ,是否可以推断出,多次调用observeOn的时候每一次都会生效。因为第一个onserveOn会在onSuccess的时候切换线程然后,通过 downstream.onSuccess(value) 通知到下游的observeOn,下游的observeOn又重复了一次这个操作,在onSuccess的时候切换线程,继续通知下游。
在复习一下为什么subscribeOn只有第一次才生效,因为subscribeOn只有第一次调用时才会影响到数据源的产出与回调。
一些情况的demo验证
Single.create(SingleOnSubscribe<Int> { emitter ->
Log.d(
TAG, " emitter.onSuccess(1) is run on main Thread ? " +
"${Thread.currentThread().name == mainLooper.thread.name}"
)
emitter.onSuccess(1)
})
.subscribeOn(Schedulers.io())
.subscribe({
Log.d(
TAG, "subscribe is run on main Thread ? " +
"${Thread.currentThread().name == mainLooper.thread.name}"
)
}, {
})
ps:只有subscribeOn的情况下,subscribeOn也会影响到subscribe接收结果的线程
D/MainActivity: emitter.onSuccess(1) is run on main Thread ? false
D/MainActivity: subscribe is run on main Thread ? false
Single.create(SingleOnSubscribe<Int> { emitter ->
Log.d(
TAG, " emitter.onSuccess(1) is run on main Thread ? " +
"${Thread.currentThread().name == mainLooper.thread.name}"
)
emitter.onSuccess(1)
})
.subscribeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.io())
.subscribe({
Log.d(
TAG, "subscribe is run on main Thread ? " +
"${Thread.currentThread().name == mainLooper.thread.name}"
)
}, {
})
ps:subscribeOn 多次只有距离数据源最近的一次才生效
D/MainActivity: emitter.onSuccess(1) is run on main Thread ? true
D/MainActivity: subscribe is run on main Thread ? true
Single.create(SingleOnSubscribe<Int> { emitter ->
Log.d(
TAG, "emitter.onSuccess(1) is run on main Thread ? " +
"${Thread.currentThread().name == mainLooper.thread.name}"
)
emitter.onSuccess(1)
})
.observeOn(Schedulers.io())
.subscribe({
Log.d(
TAG, "subscribe is run on main Thread ? " +
"${Thread.currentThread().name == mainLooper.thread.name}"
)
}, {
})
ps:observeOn 是在onsuccess回调的时候切换线程,所以不会影响到数据的生产线程(上游)
D/MainActivity: emitter.onSuccess(1) is run on main Thread ? true
D/MainActivity: subscribe is run on main Thread ? false
observeOn多次切换多次生效
以及observeOn和subscribenOn的调用顺序对结果没有影响
总结
上面都有小结好像没啥好总结的。 不知道你会不会纠结这个问题。 按照常规的理解,应该是 观察者 -> 订阅 ->被观察者 而我们的代码全部都是 被观察者 -> 订阅 ->观察者
Single.just(1).subscribe({
println("$it")
},{
})
为什么呢?因为无限套娃呀。还记得老熟人吗,老熟人里基本都会有一句source.subscribe(XXX)当前触发自己上游的subscribe,并且自己订阅上游。上游因为被触发了subscribe,继续触发上游的上游,并且上游订阅上游的上游。以此类推。最后在最上游生成数据onSuccess再一次回调下来。最下游再回调给最外部的观察者。
转载自:https://juejin.cn/post/7209501612980928570