Project Reactor - flatMapSequential的工作原理
我们已经介绍过Project Reactor - flatMap的工作原理,里面提到flatMap
是无法保证数据按原始流的顺序发布的。
例如我们有一个流[1, 2]
,我们称之为原始流。我们对原始流做了如下转换
1 -> [a, b]
2 -> [c, d]
按原始流的顺序发布是指,在最终流中,1
转换后的数据必定在2
转换后的数据之前,即[a, b]
必定在[c, d]
之前发布。
flatMap
的最终流既可能是[a,d,c,b]
,也可能是[a,c,d,b]
,数据的顺序依赖于数据的发布时间,先发布的会排在前面。
如果我们想要让最终流的数据按照[a,b,c,d]
的顺序发布,那么我们需要使用flatMapSequential
。
使用方法
这里我们以Flux.flatMapSequential
为例, 它的使用方法如下
Flux.just(1, 2).flatMapSequential(x -> Flux.just(x,x)).blockFirst(); // 1,1,2,2
这行代码可以被分为三个部分
- 创建一个初始
Publisher
,这里是FluxArray
。 - 创建一个新的
Publisher
作为FluxArray
的装饰类并保存flatMapSequential
的转换函数(x -> FluxJust(x,x)
),这里是FluxMergeSequential
,它使用了装饰器模式。 - 创建一个
Subscriber
来订阅FluxMergeSequential
,这里是BlockingFirstSubscriber
。在FluxMergeSequential
被订阅后,它会在FluxArray
和BlockingFirstSubscriber
之间加入一个MergeSequentialMain
。MergeSequentialMain
会作为Processor
执行flatMapSequential
的转换函数,在接收到一个数据后,它会用转换函数生成一个新的Publisher
,并创建一个MergeSequentialInner
来订阅该Publisher
。MergeSequentialInner
会将接收到的数据转发给MergeSequentialMain
,然后MergeSequentialMain
再发送给BlockingFirstSubscriber
。
可以看到,如果每个元素生成的流都只有一个元素的话,它和flatMap
是等价的。
Flux.just(1, 2).flatMapSequential(x -> Flux.just(x+1)).blockFirst();
//等价于Flux.just(1, 2).flatMap(x -> Flux.just(x+1)).blockFirst()
工作原理
以上面的代码为例,flatMapSequential
的工作流程如下
注意:这里我们假设数据是顺序发布的,没有体现innerNext的多线程逻辑,后面会解释多线程下innerNext的工作原理。
- 主程序创建
FluxArray
实例,它是一个CorePublisher
。 - 主程序调用
FluxArray.flatMapSequential(f)
对数据进行转换。 FluxArray
创建FluxMergeSequential
实例,它包含了FluxArray
和f
,并且也是一个CorePublisher
。- 主程序创建
BlockingFirstSubscriber
实例。 - 主程序调用
FluxMergeSequential.subscribe(BlockingFirstSubscriber)
订阅FluxMergeSequential
。 FluxMergeSequential
创建MergeSequentialMain
实例,它包含了BlockingFirstSubscriber
和f
。FluxMergeSequential
调用FluxArray.subscribe(MergeSequentialMain)
订阅FluxArray
。FluxArray
创建Subscription
实例。FluxArray
调用MergeSequentialMain.onSubscribe(Subscription)
来通知MergeSequentialMain
订阅成功。MergeSequentialMain
调用BlockingFirstSubscriber.onSubscribe(MergeSequentialMain)
来通知BlockingFirstSubscriber
订阅成功,这里MergeSequentialMain
既是一个Subscriber
,也是一个Subscription
。MergeSequentialMain
调用Subscription.request(maxConcurrency)
来请求数据,其大小为flatMapSequential
允许的最大并行数。该调用不需要等待BlockingFirstSubscriber
调用MergeSequentialMain.request(n)
,因为MergeSequentialMain
需要先将Subscription
发送的数据转换为新的Publisher
,然后才能将新的Publisher
数据发给BlockingFirstSubscriber
,所以转换过程可以提前发生。BlockingFirstSubscriber
调用MergeSequentialMain.request(n)
来请求固定大小的数据,该调用可发生多次。Subscription
调用MergeSequentialMain.onNext(value)
来向MergeSequentialMain
发布数据,该调用可发生多次。MergeSequentialMain
调用f(value)
来对数据进行转换,生成新的Publisher
,这里我们将其命名为MappedPublisher
。MergeSequentialMain
生成一个MergeSequentialInner
,并将其加入到subscribers
数组中。- 在数据发布结束时,
Subscription
需要调用MergeSequentialMain.onComplete()
来通知MergeSequentialMain
数据发布结束。该步骤可以发生在第11步之后的任何时间点,和后续步骤是并行关系。 MergeSequentialMain
调用MappedPublisher.subscribe(MergeSequentialInner)
来订阅MappedPublisher
。MappedPublisher
创建Subscription
实例,这里我们将其命名为MappedSubscription
。MappedPublisher
调用MergeSequentialInner.onSubscribe(MappedSubscription)
来通知MergeSequentialInner
订阅成功。MergeSequentialInner
创建一个数据队列,用于存储从MappedSubscription
接收到的数据。MergeSequentialInner
调用MappedSubscription.request(prefetch)
来请求数据,其大小为自定义的预加载数据大小。MappedSubscription
调用MergeSequentialInner.onNext(value)
来向MergeSequentialInner
发布数据,该调用可发生多次。MergeSequentialInner
调用MergeSequentialMain.innerNext(value)
来向MergeSequentialMain
发布数据。MergeSequentialMain
将value
存储到MergeSequentialInner
的数据队列中。- 如果
MergeSequentialMain
中current
指向的MergeSequentialInner
为null
,则从subscribers
中获取下一个MergeSequentialInner
并将其设为current
。注意,这里MergeSequentialMain
是从subscribers
中顺序获取MergeSequentialInner
,并且只有在current
数据发布结束后才会获取下一个。这是和flatMap
最大的不同。 MergeSequentialMain
调用BlockingFirstSubscriber.onNext(value)
来向BlockingFirstSubscriber
发布数据。图中是单线程的情况,如果有多个MergeSequentialInner
在同时调用MergeSequentialMain.innerNext
,MergeSequentialMain
会将数据存储在MergeSequentialInner
的数据队列中。在该MergeSequentialInner
成为current
后,MergeSequentialMain
会将数据队列中的数据全部发布直到MappedSubscription
数据发布结束。MergeSequentialInner
调用MappedSubscription.request(1)
来继续请求数据。- 在数据发布结束时,
MappedSubscription
调用MergeSequentialInner.onComplete()
来通知MergeSequentialInner
数据发布结束。 - 将数据发布结束的
MergeSequentialInner
标记为结束。 MergeSequentialInner
调用MergeSequentialMain.innerComplete(MergeSequentialInner)
来通知MergeSequentialMain
数据发布结束。- 如果
MergeSequentialInner
数据队列已经清空且数据发布已结束,current
设置为null
。 - 如果无法从
subscribers
中得到下一个MergeSequentialInner
且Subscirption
已经发布完成,调用BlockingFirstSubscriber.onComplete()
来通知BlockingFirstSubscriber
数据发布结束。
下面我们看一下MergeSequentialMain.innerNext
在多线程状态下的工作原理。
这里我们假设Subscription
发布了三个元素,每个元素都生成了一个MappedSubscription
且都有一个MergeSequentialInner
订阅了它,三个MergeSequentialInner
都在调用MergeSequentialMain.innerNext
发布数据。
- 状态1
- 第一个
MergeSequentialInner
是当前发布数据的MergeSequentialInner
,MergeSequentialMain
从其数据队列中获取数据并发布给Subscriber
直到MappedSubscription
数据发布结束。 - 第二个
MergeSequentialInner
不是当前发布数据的MergeSequentialInner
,数据全部存储到它的数据队列中。 - 第三个
MergeSequentialInner
不是当前发布数据的MergeSequentialInner
,数据全部存储到它的数据队列中。
- 第一个
- 状态2
- 第一个
MappedSubscription
数据发布结束后,MergeSequentialMain
将第二个MergeSequentialInner
设置为当前发布数据的MergeSequentialInner
。第一个MergeSequentialInner
被删除。 - 第二个
MergeSequentialInner
是当前发布数据的MergeSequentialInner
,MergeSequentialMain
从其数据队列中获取数据并发布给Subscriber
直到MappedSubscription
数据发布结束。 - 第三个
MergeSequentialInner
不是当前发布数据的MergeSequentialInner
,数据全部存储到它的数据队列中。
- 第一个
- 状态3
- 第一个
MergeSequentialInner
已经被删除。 - 第二个
MappedSubscription
数据发布结束后,MergeSequentialMain
将第三个MergeSequentialInner
设置为当前发布数据的MergeSequentialInner
。第二个MergeSequentialInner
被删除。 - 第三个
MergeSequentialInner
是当前发布数据的MergeSequentialInner
,MergeSequentialMain
从其数据队列中获取数据并发布给Subscriber
直到MappedSubscription
数据发布结束。
- 第一个
拓展
可以看到,flatMapSequential
并没有像map
和flatMap
那样创建一个叫FluxFlatMapSequential
的类,而是创建了类FluxMergeSequential
。这是因为在Flux
中确实有mergeSequential
的操作
Flux.mergeSequential(Flux.just(Flux.just(1,1), Flux.just(2,2))).blockFirst(); //1,1,2,2
它是可以用flatMapSequential
实现的
Flux.just(Flux.just(1,1), Flux.just(2,2)).flatMapSequential(x -> x).blockFirst(); //1,1,2,2
它们本质上都是要把多个流按顺序合并,flatMapSequential
只是多了生成流这一步,这就是flatMapSequential
的要用FluxMergeSequential
的原因。
总结
大家可能注意到Flux
还有Flux.merge
和Flux.concat
的操作,它们也都是把多个流合并成一个流,下一篇我们就介绍一下它们的工作原理。
转载自:https://juejin.cn/post/7196668306278547513