Project Reactor源码阅读-flatMap
本篇文章会从源码角度分析flatMap
实现原理,并且详细介绍其队列融合工作流程。
相关示例源码:github.com/chentianmin…
功能介绍
public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)
将外部Publisher
发射出来的元素转换成内部Publisher
,然后将这些内部的Publisher
合并到一个Flux
中,允许元素交错。
mapper
:转换函数,将外部Publisher
发射出来的元素转换成一个新的Publisher
。concurrency
:针对外部Publisher
的最大请求数量,同时也是scalarQueue队列大小。可通过reactor.bufferSize.small
属性配置,默认256
。prefetch
:针对内部Publisher
的最大请求数量,同时也是innerQueue队列大小。可通过reactor.bufferSize.x
属性配置,默认32
。
代码示例
public Flux<Integer> flat(int delayMillis, int i) {
return delayPublishFlux(delayMillis, i * 10, i * 10 + 5);
}
@Test
public void test() {
delayPublishFlux(100, 1, 6)
.doOnRequest(r -> logLong(r, "main-request"))
.flatMap((i) -> flat(1000, i).doOnRequest(r -> logLong(r, "inner-request"))
.subscribeOn(Schedulers.newElastic("inner")), 3, 2)
.subscribe(i -> logInt(i, "消费"));
sleep(10000);
}
可以看到mian-request
最大是3,inner-request
最大是2。证明了concurrency
和prefetch
的作用。
源码分析
首先看一下flatMap()
操作符在装配阶段做了什么。
Flux#flatMap()
创建FluxFlatMap
对象,它既是Subscriber
,又是Subscription
。
另外还创建了2个队列Supplier
,第一个是以concurrency
为大小创建的mainQueueSupplier
(创建scalarQueue
),另一个是以prefetch
为大小的创建的innerQueueSupplier
。接下来查看订阅阶段发生了什么。
Flux#subscribe()
因为FluxFlatMap
实现了OptimizableOperator
接口,实际的Subscriber
是通过调用subscribeOrReturn()
返回的。
FlatMapMain#onSubscribe()
这里最关键的是调用request()
向上游请求数据,请求数量是maxConcurrency
。这正是flatMap()
方法传入的concurrency
。当数据下发时,肯定会调用onNext()
。
FlatMapMain#onNext()
- 将当前元素转换成一个
Publisher
。 - 如果转换后的
Publisher
是Callable
,则直接获取元素调用tryEmitScalar()
下发。 - 否则创建
FlatMapInner
对象,用它来订阅Publisher
。
在前面的方法定义中提到过:flatMap()
支持并发处理,允许元素交错。
看到这里,我们能得出一个推论:flatMap()
是否真的会并发处理,取决转换后的Publisher
是否支持异步订阅,即p.subscribe(innner)
是否异步执行。
代码验证
首先回顾一下前面讲过的publishOn()
和subScribeOn()
工作机制。
publishOn()
:在onNext()
、onComplete()
、onError()
方法进行线程切换,publishOn()
使得它下游的消费阶段异步执行。subScribeOn()
:在subscribe
的时候进行线程切换,subscribeOn()
使得它上游的订阅阶段以及整个消费阶段异步执行。
同步执行(不会发生交错)
@Test
public void testSync() {
delayPublishFlux(100, 1, 6)
.flatMap((i) -> flat(1000, i), 3, 2)
.subscribe(i -> logInt(i, "消费"));
sleep(10000);
}
重点是去掉了flat()
的subscribeOn()
调用。
同步执行,元素没有发生交错。
subscribeOn()异步执行
@Test
public void testSubscribeOn() {
delayPublishFlux(100, 1, 6)
.flatMap((i) -> flat(1000, i).subscribeOn(Schedulers.newElastic("inner")), 3, 2)
.subscribe(i -> logInt(i, "消费"));
sleep(10000);
}
异步执行,并且此时最大并发是3。
publishOn()异步执行
@Test
public void testPublishOn() {
delayPublishFlux(100, 1, 6)
.flatMap((i) -> flat(10, i)
.publishOn(Schedulers.newElastic("inner"))
// 故意让下游执行慢一点
.doOnNext(x -> sleep(1000)), 3, 2)
.subscribe(i -> logInt(i, "消费"));
sleep(10000);
}
异步执行,并且此时最大并发是3。
通过以上代码可以得知,如果内部Publisher
生产数据慢,推荐使用subscribeOn()
。如果只是内部Publisher
消费速度慢,推荐使用publishOn()
。如果生产都消费都慢的话,两个操作符一起使用。
在调用内部Publisher的subscribe()
方法之后,后续肯定会执行FlatMapInner#onSubscribe()
。
FlatMapInner#onSubscribe()
与publishOn
类似,FlatMapInner
也支持同步队列融合、异步队列融合以及非融合三种处理方式。
如果上游的
Subscription
是QueueSubscription
类型,则会进行队列融合。具体采用同步还是异步,取决于该QueueSubscription#requestFusion()
实现。
- 同步队列融合:复用当前队列,然后直接调用
FlatMapMain#drain()
排空队列。 - 异步队列融合:复用当前队列,然后调用上游
s.request()
请求数据,请求数量是prefetch
。 - 非融合:直接调用上游
s.request()
请求数据,请求数量也是prefetch
。
非融合
以下代码会非融合方式执行。(和SubscribeOn()
异步执行逻辑是一样的)
@Test
public void testNoFused() {
delayPublishFlux(100, 1, 6)
.flatMap((i) -> flat(100, i)
.subscribeOn(Schedulers.newElastic("inner")), 3, 2)
.subscribe(i -> {
// 消费慢一点,innerQueue更容易有数据积压
sleep(1000);
logInt(i, "消费");
});
sleep(10000);
}
FlatMapInner#onNext()
此时onNext()
方法入参就是内部Publisher
实际下发的元素,继续调用FlatMapMain#tryEmit()
下发。
FlatMapMain#tryEmit()
上面代码逻辑其实主要由两种情况组成:(后面两大块逻辑是差不多的)
- 可能直接调用下游
Subscriber#onNext()
继续下发元素。 - 创建队列,然后将元素加入队列,视情况调用
drainLoop()
排空队列。
具体取决于数据生产以及消费的速度:
1、如果消费速度大于生产速度,没有数据积压,则直接调用下游Subscriber#onNext()
进行下发。
2、如果消费速度跟不上生产速度,元素会直接先保存到innerQueue
中,然后在wip==0
时调用drainLoop()
排空队列。
FlatMapMain#tryEmitScalar()
前面提到过,如果转换后的Publisher是Callable
,会执行tryEmitScalar()
方法。该方法做的事情跟tryEmit()
处理逻辑基本一致,主要差别就是处理的元素和使用的队列不同。
tryEmitScalar()
处理的元素是内部Publisher
直接调用call()
获取的,而tryEmit()
是内部Publisher
向下游发送的。tryEmitScalar()
使用scalarQueue缓存元素,而tryEmit()
使用innerQueue。
FlatMapMain#drainLoop()
drainLoop()
逻辑非常多,截取部分关键代码:
- 排空innerQueue中的元素,下发给下游。
- 请求内部
Publisher
下发元素,请求数量就是本次innerQueue排出数量。 - 请求外部
Publisher
下发元素,请求数量是可补充数量,不会超过concurrency
。
另外还有一些完成和取消控制。
同步队列融合
以下代码会以同步队列融合方式执行。
@Test
public void testSyncFused() {
delayPublishFlux(100, 1, 6)
.flatMap((i) -> flatRange(i), 3, 2)
.subscribe(i -> logInt(i, "消费"));
sleep(10000);
}
复用当前队列,然后直接调用FlatMapMain.drain()
排空队列。
FlatMapMain#drain()
依然是调用drainLoop()
排空队列中的元素。注意,同步队列融合没有request()
过程,直接在onSubscribe()
阶段进行元素下发。
异步队列融合
以下代码会以异步队列融合方式执行。
@Test
public void testAsyncFused() {
delayPublishFlux(100, 1, 6)
.flatMap((i) -> flatRange(i).publishOn(Schedulers.newElastic("inner")), 3, 2)
.subscribe(i -> logInt(i, "消费"));
sleep(10000);
}
复用当前队列,然后调用上游s.request()
请求数据,请求数量是prefetch
。后续肯定会调用FlatMapInner#onNext()
。
FlatMapInner#onNext()
此时入参为null
,然后调用了FlatMapMain.drain()
,排空队列元素。异步队列融合会复用队列,上游实际发送是null
,可以将其理解成一个信号,告知下游排空队列中的元素。
转载自:https://juejin.cn/post/7154665098458431524