Project Reactor源码阅读-concat
本篇文章将会分析concat操作符源码。
功能介绍
public static <T> Flux<T> concat(Publisher<? extends T>... sources)
将多个Publisher
按顺序拼接起来,并向下游转发这些Publisher
发射出来的元素,通过依次订阅实现。
源码分析
首先看一下concat()
操作符在装配阶段做了什么,直接查看Flux#concat()
源码。
Flux#concat()
主要是创建
FluxConcatArray
对象,它继承了Flux
,在订阅阶段会一定会调用到FluxConcatArray#subscribe()
方法。
FluxConcatArray#subscribe()
这里最重要是两件事:
- 创建
ConcatArraySubscriber
对象。 - 调用其
ConcatArraySubscriber#onComplete()
方法。
ConcatArraySubscriber#onComplete()
重点逻辑只有2个:
p.subscribe(this)
:调用sources
中每一个Publisher
的subscribe()
方法执行订阅,触发对应Publisher
数据流动。actual.onComplete()
:当sources
中所有Publisher
都订阅完成之后,调用下游Subscriber
的onComplete()
方法,告知下游数据发射完毕。
这里的sources
就是concat()
传递进来的。
关键问题是,如何保证p.subscribe(this)
是依次调用执行的?
在调用subscribe()
方法的时候,把ConcatArraySubscriber
传递进去了。当任意一个Publisher
完成时,都会调用ConcatArraySubscriber#onComplete()
方法,从而触发下一个Publisher
订阅。
- 如果
p.subscribe(this)
是同步调用,那么在执行该方法的过程中再进入onComplete()
方法,使WIP
增加,相当于在do...while
循环中依次订阅。 - 如果
p.subscribe(this)
是异步调用,当前onComplete()
方法可能会直接退出。等Publisher
完成时,onComplete()
会再次调用,继续执行下一个p.subscribe(this)
。
转载自:https://juejin.cn/post/7151743000672993287