Project Reactor - map的工作原理
Project Reactor是基于Reactive Streams设计的,要了解map
的工作原理,我们需要先了解Reactive Streams。
Reactive Streams
组成部分
Reactive Streams是一个规范, 它包含了四个部分
-
Publisher
发布者,在收到订阅者的请求时才发布数据,且总数据大小可能是未知的。
-
Subscriber
订阅者,可以请求多次数据,请求时可以根据自身处理能力决定数据的大小。
-
Subscription
订阅,代表着发布者和订阅者之间订阅关系。发布者与订阅者的交互都是通过订阅来实现。
-
Processor
数据处理器,既是发布者也是订阅者,可以用来对发布的数据进行转换或对错误进行处理。
工作原理
Publisher
, Subscriber
和Subscription
之间的交互流程如下
- 主程序创建
Publiser
实例。 - 主程序创建
Subscriber
实例。 - 主程序调用
Publisher.subscribe(Subscriber)
订阅Publisher
。 Publisher
创建Subscription
。Publisher
调用Subscriber.onSubscribe(Subscription)
来通知Subscriber
订阅成功。Subscriber
调用Subscription.request(n)
来请求固定大小的数据,该调用可发生多次。Subscription
调用Subscriber.onNext(value)
来向Subscriber
发布数据,该调用可发生多次。- 在不需要订阅时,
Subscriber
需要调用Subscription.cancel()
来取消订阅。 - 在数据发布结束时,
Subscription
需要调用Subscriber.onComplete()
来通知Subscriber
数据发布结束。 - 在数据发布出错时,
Subscription
需要调用Subscriber.onError()
来通知Subscriber
数据发布错误。
map
使用方法
这里我们以Flux.map
为例(Mono.map
的工作原理类似), 它的使用方法如下
Flux.just(1).map(x -> x + 1).blockFirst(); // 2
map
可以对流中的每个元素进行计算并用计算结果替换原来的元素。
这行代码可以被分为三个部分
- 创建一个初始
Publisher
,这里是FluxJust
。 - 创建一个新的
Publisher
作为FluxJust
的装饰类并保存map
的转换函数(x -> x+1
),这里是FluxMap
,它使用了装饰器模式。 - 创建一个
Subscriber
来订阅FluxMap
,这里是BlockingFirstSubscriber
。在FluxMap
被订阅后,它会在FluxJust
和BlockingFirstSubscriber
之间加入一个MapSubscriber
,MapSubscriber
会作为Processor
执行map
的转换函数。
工作原理
注意:为了突出重点,这里进行了简化,忽略了一些对理解map工作原理不重要的类,例如InnerOperator,InternalFluxOperator。
可以看到MapSubscriber
并没有严格的实现Processor
,它直接继承了Subscription
,而不是Publisher
。FluxMap.subscribe()
在每次被调用时都会生成一个新的MapSubscriber
实例,因此MapSubscriber
永远都只会有一个Subscriber
,那么直接继承Subscription
就可以简化生成Subscription
的步骤。
FluxMap
的作用是增加易用性。如果我们只有MapSubscriber
,伪代码就会变成下面这样
Subscriber subscriber = new BlockingFirstSubscriber();
MapSubscriber mapSubscriber = new MapSubscriber(subscriber, x -> x + 1);
Flux<Int> flux = Flux.just(1);
flux.subscribe(mapSubscriber);
如果我们要进行两次map
,伪代码会变成
Subscriber subscriber = new BlockingFirstSubscriber();
MapSubscriber mapSubscriber1 = new MapSubscriber(subscriber, x -> x + 1);
MapSubscriber mapSubscriber2 = new MapSubscriber(mapSubscriber1, x -> x + 2);
Flux<Int> flux = Flux.just(1);
flux.subscribe(mapSubscriber2);
我们既要嵌套更多的MapSubscriber
,又要修改flux.subscribe()
的入参,无法解耦Publisher
和Subscriber
。引入FluxMap
后,我们只需要增加更多的FluxMap
,不需要修改其他地方。
以上面的代码为例,map
的工作流程如下
- 主程序创建
FluxJust
实例,它是一个CorePublisher
。 - 主程序调用
FluxJust.map(f)
对数据进行转换。 FluxJust
创建FluxMap
实例,它包含了FluxJust
和f
,并且也是一个CorePublisher
。- 主程序创建
BlockingFirstSubscriber
实例。 - 主程序调用
FluxMap.subscribe(BlockingFirstSubscriber)
订阅FluxMap
。 FluxMap
创建MapSubscriber
实例,它包含了BlockingFirstSubscriber
和f
。FluxMap
调用FluxJust.subscribe(MapSubscriber)
订阅FluxJust
。FluxJust
创建Subscription
实例。FluxJust
调用MapSubscriber.onSubscribe(Subscription)
来通知MapSubscriber
订阅成功。MapSubscriber
调用BlockingFirstSubscriber.onSubscribe(MapSubscriber)
来通知BlockingFirstSubscriber
订阅成功,这里MapSubscriber
既是一个Subscriber
,也是一个Subscription
。BlockingFirstSubscriber
调用MapSubscriber.request(n)
来请求固定大小的数据,该调用可发生多次。MapSubscriber
调用Subscription.request(n)
来请求固定大小的数据,该调用可发生多次。Subscription
调用MapSubscriber.onNext(value)
来向MapSubscriber
发布数据,该调用可发生多次。MapSubscriber
调用f(value)
来对数据进行转换,这是map
的核心逻辑。MapSubscriber
调用BlockingFirstSubscriber.onNext(mappedValue)
来向BlockingFirstSubscriber
发布转换后的数据,该调用可发生多次。- 在数据发布结束时,
Subscription
需要调用MapSubscriber.onComplete()
来通知MapSubscriber
数据发布结束。 MapSubscriber
调用BlockingFirstSubscriber.onComplete()
来通知BlockingFirstSubscriber
数据发布结束。
总结
map
是Reactor中最基本的流操作方法,掌握它的工作原理可以帮助我们更好的理解其他操作,下一篇我们会在其基础上讲解filter
的工作原理。
转载自:https://juejin.cn/post/7189995153267327033