likes
comments
collection
share

Project Reactor - flatMapSequential的工作原理

作者站长头像
站长
· 阅读数 82

我们已经介绍过Project Reactor - flatMap的工作原理,里面提到flatMap是无法保证数据按原始流的顺序发布的。

例如我们有一个流[1, 2],我们称之为原始流。我们对原始流做了如下转换

  • 1 -> [a, b]
  • 2 -> [c, d] 按原始流的顺序发布是指,在最终流中,1转换后的数据必定在2转换后的数据之前,即 [a, b]必定在[c, d]之前发布。

flatMap的最终流既可能是[a,d,c,b],也可能是[a,c,d,b],数据的顺序依赖于数据的发布时间,先发布的会排在前面。

如果我们想要让最终流的数据按照[a,b,c,d]的顺序发布,那么我们需要使用flatMapSequential

使用方法

这里我们以Flux.flatMapSequential为例, 它的使用方法如下

Flux.just(1, 2).flatMapSequential(x -> Flux.just(x,x)).blockFirst(); // 1,1,2,2

这行代码可以被分为三个部分

Project Reactor - flatMapSequential的工作原理

  1. 创建一个初始Publisher,这里是FluxArray
  2. 创建一个新的Publisher作为FluxArray的装饰类并保存flatMapSequential的转换函数(x -> FluxJust(x,x)),这里是FluxMergeSequential,它使用了装饰器模式。
  3. 创建一个Subscriber来订阅FluxMergeSequential,这里是BlockingFirstSubscriber。在FluxMergeSequential被订阅后,它会在FluxArrayBlockingFirstSubscriber之间加入一个MergeSequentialMainMergeSequentialMain会作为Processor执行flatMapSequential的转换函数,在接收到一个数据后,它会用转换函数生成一个新的Publisher,并创建一个MergeSequentialInner来订阅该PublisherMergeSequentialInner会将接收到的数据转发给MergeSequentialMain,然后MergeSequentialMain再发送给BlockingFirstSubscriber

可以看到,如果每个元素生成的流都只有一个元素的话,它和flatMap是等价的。

Flux.just(1, 2).flatMapSequential(x -> Flux.just(x+1)).blockFirst();
//等价于Flux.just(1, 2).flatMap(x -> Flux.just(x+1)).blockFirst()

工作原理

Project Reactor - flatMapSequential的工作原理

以上面的代码为例,flatMapSequential的工作流程如下

Project Reactor - flatMapSequential的工作原理

注意:这里我们假设数据是顺序发布的,没有体现innerNext的多线程逻辑,后面会解释多线程下innerNext的工作原理。

  1. 主程序创建FluxArray实例,它是一个CorePublisher
  2. 主程序调用FluxArray.flatMapSequential(f)对数据进行转换。
  3. FluxArray创建FluxMergeSequential实例,它包含了FluxArrayf,并且也是一个CorePublisher
  4. 主程序创建BlockingFirstSubscriber实例。
  5. 主程序调用FluxMergeSequential.subscribe(BlockingFirstSubscriber)订阅FluxMergeSequential
  6. FluxMergeSequential创建MergeSequentialMain实例,它包含了BlockingFirstSubscriberf
  7. FluxMergeSequential调用FluxArray.subscribe(MergeSequentialMain)订阅FluxArray
  8. FluxArray创建Subscription实例。
  9. FluxArray调用MergeSequentialMain.onSubscribe(Subscription)来通知MergeSequentialMain订阅成功。
  10. MergeSequentialMain调用BlockingFirstSubscriber.onSubscribe(MergeSequentialMain)来通知BlockingFirstSubscriber订阅成功,这里MergeSequentialMain既是一个Subscriber,也是一个Subscription
  11. MergeSequentialMain调用Subscription.request(maxConcurrency)来请求数据,其大小为flatMapSequential允许的最大并行数。该调用不需要等待BlockingFirstSubscriber调用MergeSequentialMain.request(n),因为MergeSequentialMain需要先将Subscription发送的数据转换为新的Publisher,然后才能将新的Publisher数据发给BlockingFirstSubscriber,所以转换过程可以提前发生。
  12. BlockingFirstSubscriber调用MergeSequentialMain.request(n)来请求固定大小的数据,该调用可发生多次。
  13. Subscription调用MergeSequentialMain.onNext(value)来向MergeSequentialMain发布数据,该调用可发生多次。
  14. MergeSequentialMain调用f(value)来对数据进行转换,生成新的Publisher,这里我们将其命名为MappedPublisher
  15. MergeSequentialMain生成一个MergeSequentialInner,并将其加入到subscribers数组中。
  16. 在数据发布结束时,Subscription需要调用MergeSequentialMain.onComplete()来通知MergeSequentialMain数据发布结束。该步骤可以发生在第11步之后的任何时间点,和后续步骤是并行关系。
  17. MergeSequentialMain调用MappedPublisher.subscribe(MergeSequentialInner)来订阅MappedPublisher
  18. MappedPublisher创建Subscription实例,这里我们将其命名为MappedSubscription
  19. MappedPublisher调用MergeSequentialInner.onSubscribe(MappedSubscription)来通知MergeSequentialInner订阅成功。
  20. MergeSequentialInner创建一个数据队列,用于存储从MappedSubscription接收到的数据。
  21. MergeSequentialInner调用MappedSubscription.request(prefetch)来请求数据,其大小为自定义的预加载数据大小。
  22. MappedSubscription调用MergeSequentialInner.onNext(value)来向MergeSequentialInner发布数据,该调用可发生多次。
  23. MergeSequentialInner调用MergeSequentialMain.innerNext(value)来向MergeSequentialMain发布数据。
  24. MergeSequentialMainvalue存储到MergeSequentialInner的数据队列中。
  25. 如果MergeSequentialMaincurrent指向的MergeSequentialInnernull,则从subscribers中获取下一个MergeSequentialInner并将其设为current。注意,这里MergeSequentialMain是从subscribers中顺序获取MergeSequentialInner,并且只有在current数据发布结束后才会获取下一个。这是和flatMap最大的不同。
  26. MergeSequentialMain调用BlockingFirstSubscriber.onNext(value)来向BlockingFirstSubscriber发布数据。图中是单线程的情况,如果有多个MergeSequentialInner在同时调用MergeSequentialMain.innerNextMergeSequentialMain会将数据存储在MergeSequentialInner的数据队列中。在该MergeSequentialInner成为current后,MergeSequentialMain会将数据队列中的数据全部发布直到MappedSubscription数据发布结束。
  27. MergeSequentialInner调用MappedSubscription.request(1)来继续请求数据。
  28. 在数据发布结束时,MappedSubscription调用MergeSequentialInner.onComplete()来通知MergeSequentialInner数据发布结束。
  29. 将数据发布结束的MergeSequentialInner标记为结束。
  30. MergeSequentialInner调用MergeSequentialMain.innerComplete(MergeSequentialInner)来通知MergeSequentialMain数据发布结束。
  31. 如果MergeSequentialInner数据队列已经清空且数据发布已结束,current设置为null
  32. 如果无法从subscribers中得到下一个MergeSequentialInnerSubscirption已经发布完成,调用BlockingFirstSubscriber.onComplete()来通知BlockingFirstSubscriber数据发布结束。

下面我们看一下MergeSequentialMain.innerNext在多线程状态下的工作原理。

Project Reactor - flatMapSequential的工作原理

这里我们假设Subscription发布了三个元素,每个元素都生成了一个MappedSubscription且都有一个MergeSequentialInner订阅了它,三个MergeSequentialInner都在调用MergeSequentialMain.innerNext发布数据。

  • 状态1
    • 第一个MergeSequentialInner是当前发布数据的MergeSequentialInnerMergeSequentialMain从其数据队列中获取数据并发布给Subscriber直到MappedSubscription数据发布结束。
    • 第二个MergeSequentialInner不是当前发布数据的MergeSequentialInner,数据全部存储到它的数据队列中。
    • 第三个MergeSequentialInner不是当前发布数据的MergeSequentialInner,数据全部存储到它的数据队列中。
  • 状态2
    • 第一个MappedSubscription数据发布结束后, MergeSequentialMain将第二个MergeSequentialInner设置为当前发布数据的MergeSequentialInner。第一个MergeSequentialInner被删除。
    • 第二个MergeSequentialInner是当前发布数据的MergeSequentialInnerMergeSequentialMain从其数据队列中获取数据并发布给Subscriber直到MappedSubscription数据发布结束。
    • 第三个MergeSequentialInner不是当前发布数据的MergeSequentialInner,数据全部存储到它的数据队列中。
  • 状态3
    • 第一个MergeSequentialInner已经被删除。
    • 第二个MappedSubscription数据发布结束后, MergeSequentialMain将第三个MergeSequentialInner设置为当前发布数据的MergeSequentialInner。第二个MergeSequentialInner被删除。
    • 第三个MergeSequentialInner是当前发布数据的MergeSequentialInnerMergeSequentialMain从其数据队列中获取数据并发布给Subscriber直到MappedSubscription数据发布结束。

拓展

可以看到,flatMapSequential并没有像mapflatMap那样创建一个叫FluxFlatMapSequential的类,而是创建了类FluxMergeSequential。这是因为在Flux中确实有mergeSequential的操作

Flux.mergeSequential(Flux.just(Flux.just(1,1), Flux.just(2,2))).blockFirst(); //1,1,2,2

它是可以用flatMapSequential实现的

Flux.just(Flux.just(1,1), Flux.just(2,2)).flatMapSequential(x -> x).blockFirst(); //1,1,2,2

它们本质上都是要把多个流按顺序合并,flatMapSequential只是多了生成流这一步,这就是flatMapSequential的要用FluxMergeSequential的原因。

总结

大家可能注意到Flux还有Flux.mergeFlux.concat的操作,它们也都是把多个流合并成一个流,下一篇我们就介绍一下它们的工作原理。

转载自:https://juejin.cn/post/7196668306278547513
评论
请登录