Project Reactor源码阅读-concat
本篇文章将会分析concat操作符源码。
功能介绍
public static <T> Flux<T> concat(Publisher<? extends T>... sources)

将多个Publisher按顺序拼接起来,并向下游转发这些Publisher发射出来的元素,通过依次订阅实现。
源码分析
首先看一下concat()操作符在装配阶段做了什么,直接查看Flux#concat()源码。
Flux#concat()
主要是创建FluxConcatArray对象,它继承了Flux,在订阅阶段会一定会调用到FluxConcatArray#subscribe()方法。
FluxConcatArray#subscribe()

这里最重要是两件事:
- 创建
ConcatArraySubscriber对象。 - 调用其
ConcatArraySubscriber#onComplete()方法。
ConcatArraySubscriber#onComplete()

重点逻辑只有2个:
p.subscribe(this):调用sources中每一个Publisher的subscribe()方法执行订阅,触发对应Publisher数据流动。actual.onComplete():当sources中所有Publisher都订阅完成之后,调用下游Subscriber的onComplete()方法,告知下游数据发射完毕。
这里的sources就是concat()传递进来的。
关键问题是,如何保证p.subscribe(this)是依次调用执行的?
在调用subscribe()方法的时候,把ConcatArraySubscriber传递进去了。当任意一个Publisher完成时,都会调用ConcatArraySubscriber#onComplete()方法,从而触发下一个Publisher订阅。
- 如果
p.subscribe(this)是同步调用,那么在执行该方法的过程中再进入onComplete()方法,使WIP增加,相当于在do...while循环中依次订阅。 - 如果
p.subscribe(this)是异步调用,当前onComplete()方法可能会直接退出。等Publisher完成时,onComplete()会再次调用,继续执行下一个p.subscribe(this)。
转载自:https://juejin.cn/post/7151743000672993287