likes
comments
collection
share

Project Reactor - map的工作原理

作者站长头像
站长
· 阅读数 39

Project Reactor是基于Reactive Streams设计的,要了解map的工作原理,我们需要先了解Reactive Streams。

Reactive Streams

组成部分

Project Reactor - map的工作原理

Reactive Streams是一个规范, 它包含了四个部分

  1. Publisher

    发布者,在收到订阅者的请求时才发布数据,且总数据大小可能是未知的。

  2. Subscriber

    订阅者,可以请求多次数据,请求时可以根据自身处理能力决定数据的大小。

  3. Subscription

    订阅,代表着发布者和订阅者之间订阅关系。发布者与订阅者的交互都是通过订阅来实现。

  4. Processor

    数据处理器,既是发布者也是订阅者,可以用来对发布的数据进行转换或对错误进行处理。

Project Reactor - map的工作原理

工作原理

Publisher, SubscriberSubscription之间的交互流程如下

Project Reactor - map的工作原理

  1. 主程序创建Publiser实例。
  2. 主程序创建Subscriber实例。
  3. 主程序调用Publisher.subscribe(Subscriber)订阅Publisher
  4. Publisher创建Subscription
  5. Publisher调用Subscriber.onSubscribe(Subscription)来通知Subscriber订阅成功。
  6. Subscriber调用Subscription.request(n)来请求固定大小的数据,该调用可发生多次。
  7. Subscription调用Subscriber.onNext(value)来向Subscriber发布数据,该调用可发生多次。
  8. 在不需要订阅时,Subscriber需要调用Subscription.cancel()来取消订阅。
  9. 在数据发布结束时,Subscription需要调用Subscriber.onComplete()来通知Subscriber数据发布结束。
  10. 在数据发布出错时,Subscription需要调用Subscriber.onError()来通知Subscriber数据发布错误。

map

使用方法

这里我们以Flux.map为例(Mono.map的工作原理类似), 它的使用方法如下

Flux.just(1).map(x -> x + 1).blockFirst(); // 2

map可以对流中的每个元素进行计算并用计算结果替换原来的元素。

这行代码可以被分为三个部分

Project Reactor - map的工作原理

  1. 创建一个初始Publisher,这里是FluxJust
  2. 创建一个新的Publisher作为FluxJust的装饰类并保存map的转换函数(x -> x+1),这里是FluxMap,它使用了装饰器模式。
  3. 创建一个Subscriber来订阅FluxMap,这里是BlockingFirstSubscriber。在FluxMap被订阅后,它会在FluxJustBlockingFirstSubscriber之间加入一个MapSubscriberMapSubscriber会作为Processor执行map的转换函数。

工作原理

Project Reactor - map的工作原理

注意:为了突出重点,这里进行了简化,忽略了一些对理解map工作原理不重要的类,例如InnerOperator,InternalFluxOperator。

可以看到MapSubscriber并没有严格的实现Processor,它直接继承了Subscription,而不是PublisherFluxMap.subscribe()在每次被调用时都会生成一个新的MapSubscriber实例,因此MapSubscriber永远都只会有一个Subscriber,那么直接继承Subscription就可以简化生成Subscription的步骤。

FluxMap的作用是增加易用性。如果我们只有MapSubscriber,伪代码就会变成下面这样

Subscriber subscriber = new BlockingFirstSubscriber();
MapSubscriber mapSubscriber = new MapSubscriber(subscriber, x -> x + 1);
Flux<Int> flux = Flux.just(1);
flux.subscribe(mapSubscriber);

如果我们要进行两次map,伪代码会变成

Subscriber subscriber = new BlockingFirstSubscriber();
MapSubscriber mapSubscriber1 = new MapSubscriber(subscriber, x -> x + 1);
MapSubscriber mapSubscriber2 = new MapSubscriber(mapSubscriber1, x -> x + 2);
Flux<Int> flux = Flux.just(1);
flux.subscribe(mapSubscriber2);

我们既要嵌套更多的MapSubscriber,又要修改flux.subscribe()的入参,无法解耦PublisherSubscriber。引入FluxMap后,我们只需要增加更多的FluxMap,不需要修改其他地方。

以上面的代码为例,map的工作流程如下

Project Reactor - map的工作原理

  1. 主程序创建FluxJust实例,它是一个CorePublisher
  2. 主程序调用FluxJust.map(f)对数据进行转换。
  3. FluxJust创建FluxMap实例,它包含了FluxJustf,并且也是一个CorePublisher
  4. 主程序创建BlockingFirstSubscriber实例。
  5. 主程序调用FluxMap.subscribe(BlockingFirstSubscriber)订阅FluxMap
  6. FluxMap创建MapSubscriber实例,它包含了BlockingFirstSubscriberf
  7. FluxMap调用FluxJust.subscribe(MapSubscriber)订阅FluxJust
  8. FluxJust创建Subscription实例。
  9. FluxJust调用MapSubscriber.onSubscribe(Subscription)来通知MapSubscriber订阅成功。
  10. MapSubscriber调用BlockingFirstSubscriber.onSubscribe(MapSubscriber)来通知BlockingFirstSubscriber订阅成功,这里MapSubscriber既是一个Subscriber,也是一个Subscription
  11. BlockingFirstSubscriber调用MapSubscriber.request(n)来请求固定大小的数据,该调用可发生多次。
  12. MapSubscriber调用Subscription.request(n)来请求固定大小的数据,该调用可发生多次。
  13. Subscription调用MapSubscriber.onNext(value)来向MapSubscriber发布数据,该调用可发生多次。
  14. MapSubscriber调用f(value)来对数据进行转换,这是map的核心逻辑。
  15. MapSubscriber调用BlockingFirstSubscriber.onNext(mappedValue)来向BlockingFirstSubscriber发布转换后的数据,该调用可发生多次。
  16. 在数据发布结束时,Subscription需要调用MapSubscriber.onComplete()来通知MapSubscriber数据发布结束。
  17. MapSubscriber调用BlockingFirstSubscriber.onComplete()来通知BlockingFirstSubscriber数据发布结束。

总结

map是Reactor中最基本的流操作方法,掌握它的工作原理可以帮助我们更好的理解其他操作,下一篇我们会在其基础上讲解filter的工作原理。

转载自:https://juejin.cn/post/7189995153267327033
评论
请登录