likes
comments
collection
share

响应式编程框架 WebFlux在实际开发中的一些痛点起初这个问题是为了回到知乎上的一个提问:为什么大多数程序员认为响应式

作者站长头像
站长
· 阅读数 54

文章的最新更新发布在:响应式编程框架 WebFlux在实际开发中的一些痛点

前言

起初这个问题是为了回到知乎上的一个提问:为什么大多数程序员认为响应式编程不易理解,甚至反人类?

但是写着写着发现内容还挺多的,于是整理成一篇博客吧。

以下是在实际开发中碰到的一些问题。

几乎所有的方法返回套上了Mono Flux

从一个请求的入口

@GetMapping("check")
public Mono<ApiResponse<String>> hlCheck(){
    return Mono.just(ApiResponse.success("I am fine. version is: " + version));
}

到从数据库的查询结果(使用的r2dbc)

@Override
public Flux<AntiquarianBook> findByBookName(String bookName){
    return antiquarianBookRepository.findAll();
}

甚至一些IO操作的工具类,为了异步非阻塞的方式来处理,几乎”污染“了所有方法。

public static Flux<String> fromPath(Path path) {
    return Flux.using(() -> Files.lines(path),
          Flux::fromStream,
          BaseStream::close
    );
}

而这也直接导致,没法方便快捷的做缓存!因为你拿到的方法返回是Mono,Flux,而不是完成的数据。

所以也就引发了下面这个问题

缓存框架的支持少之又少

具体可以看之前的这篇博客:在WebFlux中如何使用缓存,缓存Mono和Flux

太长不看的看结论, 现有缓存框架对响应式编程的支持情况:

框架名支持情况相关链接
ehcache不支持Possibility to provide asynchronous or reactive cache in future versions
jetcache不支持jetcache 支持 spring webflux 吗
reactor-extra最新版本已经停止更新reactor-addons
caffeine支持Reactive types support for @Cacheable methods 但是要求是spring 6.1M4版本之后

所以当你想要一个缓存注解就有本地缓存和远程缓存?自己写一个吧

debug的困难度上升

随便写几个初学者碰到一脸懵逼的场景:

public static void main(String[] args) throws InterruptedException {
        Flux.range(1, 10)
                .publishOn(Schedulers.newParallel("publishOn-T", 2))
                .flatMap(it -> Mono.just(executeRequest())
                                .subscribeOn(Schedulers.newParallel("subscribeOn-T", 2))
                ,2)
                .subscribe(it -> {
                    System.out.println(Instant.now() + " " +it);
                });
    }

    private static String executeRequest(){
        RestTemplate restTemplate = new RestTemplate();
        return restTemplate.exchange("http://ip:port", HttpMethod.GET, null, String.class).getBody();
    }

请问:上述代码的执行对于flatMap来说,每次会同时执行几次外部请求executeRequest()?

答案 答案是一次,而不是两次。因为`Mono.just(executeRequest())`是Hot sequence, 在初始化时则即时计算出来的。

Hot sequence与Cold sequence

举个很简单的例子:

        Flux.just(new Date(), new Date(), new Date())
                .delayElements(Duration.ofSeconds(1))
                .doOnNext(it -> System.out.println(it.getTime()))
                .blockLast();

上面代码输出的都是同一个时间点,因为在Flux初始化的时候就开始计算了。project reactor文档对此的描述是:

It directly captures the value at assembly time and replays it to anybody subscribing to it later. 它可在组装时直接捕获值,并在以后向任何订阅者重播。

如果你这样:

        System.out.println(new Date().getTime());
        Flux<Date> dateFlux = Flux.just(new Date(), new Date(), new Date())
                .delayElements(Duration.ofSeconds(1))
                .doOnNext(it -> System.out.println(it.getTime()));
        Thread.sleep(3000);
        dateFlux.subscribe();

等待三秒后订阅,你也会发现输出的时间是3秒前的。

而下面这个使用的Flux.defer,它则会推迟到实际订阅时才会计算对应的时间,则是响应式编程中说的Cold sequence

         Flux.defer(() -> {
                            return Mono.just(new Date());
                        })
                .repeat(2)
                .delayElements(Duration.ofSeconds(1))
                .doOnNext(it -> System.out.println(it.getTime()))
                .blockLast();

这个例子如果你就已经看的云里雾里,那么实际开发中则会有更多的坑,当你期望异步执行的时候实际同步执行的,而你却没察觉到。

IDE的支持很关键,但是经常掉链子

比如这个场景:我期望在debug的时候,拿到deadTipsId的值,所以你肯定会使用IDEA的Evaluate 功能,看看这个值是啥内容

响应式编程框架 WebFlux在实际开发中的一些痛点起初这个问题是为了回到知乎上的一个提问:为什么大多数程序员认为响应式

好嘛,然后你就发现IDEA卡在这不动了。

响应式编程框架 WebFlux在实际开发中的一些痛点起初这个问题是为了回到知乎上的一个提问:为什么大多数程序员认为响应式

你以为是暂时的,但是当你上个厕所接杯水回来发现还是卡在这,但是数据库其实就3条数据!!

这在紧急排查一个任务的时候真的是非常折磨的。还不如在下面写个xxx.subscribe()打印一下。

但是更奇怪的是,时不时这个功能又是正常的,不理解IDEA抽风是什么原因导致的。

filterWhen的迷惑性

看这段代码, 我期望的是Flux.just(1,2,3,4,5,6) 根据Flux.just(1, 2, 3) 过滤,当存在相等元素的时候进行输出。

val cacheFlux = Flux.just(1, 2, 3).cache()

Flux.just(1,2,3,4,5,6)
    .filterWhen {mainELe ->
        cacheFlux.any {
            mainELe == it
        }
    }
    .doOnNext {
        println(it)
    }
    .subscribe()

它的输出也确实符合预期

1
2
3

开始上强度了,如果现在我想过滤出和Flux.just(1, 2, 3)不相等的元素,你的下意识是不是把mainELe.equals(it) 改为!mainELe.equals(it) ?

那可就太错了,你会发现输出的是

1
2
3
4
5
6

为什么?仔细分析你就会发现

  • 前者cacheFlux.any { mainELe == it }说的是任意元素存在相等时则通过
  • 后者cacheFlux.any { mainELe != it }说的是任意元素不相等则通过

所以当你期望“过滤掉和cacheFlux相等的数据”时,应该是对这个结果取反, 代码变成了这样:

    val cacheFlux = Flux.just(1, 2, 3).cache()

    Flux.just(1,2,3,4,5,6)
        .filterWhen {mainELe ->
            cacheFlux.any {
                mainELe == it
            }
            .map {
                it.not()
            }
        }
        .doOnNext {
            println(it)
        }
        .subscribe()

但我使用的时候发现这真的挺反直觉的!因为any的操作符会让人少思考一层。

那聪明的同学就要问了,那我直接使用.map岂不是更好?类似这样,在map中直接比较,看起来没有那么多弯弯绕绕。

val cacheFlux = Flux.just(1, 2, 3).cache()

Flux.just(1,2,3,4,5,6)
    .filterWhen {mainELe ->
        cacheFlux.map {
                mainELe == it
            }
    }
    .doOnNext {
        println(it)
    }
    .subscribe()

当你运行一下就会发现只输出了1, 这是因为cacheFlux.map { mainELe == it } 实际是一个flux, 而filterWhen只拿了flux中的第一个元素。就相当于Flux.just(1,2,3,4,5,6)Flux.just(1)比较。

失去了异常栈的打印,让排查猜谜

比如某个场景下,查询了3个表的数据合并,聚合后处理一个属性。

当某个查询异常时,你无法一眼看出是哪个方法引发的查询错误,因为调用栈压根没打印你的代码调用位置。

log
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.r2dbc.BadSqlGrammarException: executeMany; bad SQL grammar [xxxxx]
Caused by: org.springframework.r2dbc.BadSqlGrammarException: executeMany; bad SQL grammar [xxxxxx]
at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:253)
at org.springframework.r2dbc.core.DefaultDatabaseClient.lambda$inConnectionMany$8(DefaultDatabaseClient.java:156)
at reactor.core.publisher.Flux.lambda$onErrorMap$29(Flux.java:7310)
at reactor.core.publisher.Flux.lambda$onErrorResume$30(Flux.java:7363)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredError(FluxUsingWhen.java:403)
at reactor.core.publisher.FluxUsingWhen$RollbackInner.onComplete(FluxUsingWhen.java:480)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210)
at reactor.pool.SimpleDequePool.maybeRecycleAndDrain(SimpleDequePool.java:540)
at reactor.pool.SimpleDequePool$QueuePoolRecyclerInner.onComplete(SimpleDequePool.java:781)
at reactor.core.publisher.Operators.complete(Operators.java:137)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.pool.SimpleDequePool$QueuePoolRecyclerMono.subscribe(SimpleDequePool.java:893)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.Operators.complete(Operators.java:137)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:239)
at reactor.core.publisher.MonoSupplier$MonoSupplierSubscription.request(MonoSupplier.java:148)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:260)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onSubscribe(MonoIgnoreElements.java:72)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:164)
at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:48)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134)
at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:157)
at reactor.core.publisher.FluxFilter$FilterConditionalSubscriber.onError(FluxFilter.java:291)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:265)
at reactor.core.publisher.Operators.error(Operators.java:198)
at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onError(FluxUsingWhen.java:368)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:846)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:612)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:592)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerError(FluxFlatMap.java:867)
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onError(FluxFlatMap.java:994)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:213)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:256)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:201)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onNext(FluxDiscardOnCancel.java:91)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.emit(ReactorNettyClient.java:684)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:936)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:810)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:716)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:425)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.r2dbc.postgresql.ExceptionFactory$PostgresqlBadGrammarException: relation "table_name" does not exist
at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:96)
at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:65)
at io.r2dbc.postgresql.ExceptionFactory.handleErrorResponse(ExceptionFactory.java:132)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:179)
... 45 common frames omitted

unit test覆盖率难度上升

这里不讨论unit test是否是鸡肋这个话题。当你想使用unit test覆盖你的代码时,你得这样写:

@Test
public void testAppendBoomError() {
  Flux<String> source = Flux.just("thing1", "thing2");

  StepVerifier.create(
    appendBoomError(source))
    .expectNext("thing1")
    .expectNext("thing2")
    .expectErrorMessage("boom")
    .verify();
}

或者这样:

StepVerifier.create(Mono.just(1).map(i -> i + 10),
             StepVerifierOptions.create().withInitialContext(Context.of("thing1", "thing2")))
                   .expectAccessibleContext()
                   .contains("thing1", "thing2")
                   .then()
                   .expectNext(11)
                   .verifyComplete();

太棒啦!比以前的unit test写法看起来一点都不麻烦呢。。。

mono or flux操作符超过200个,要熟练使用不是件容易事

比如现在对你随堂测试:

  • Flux.using的使用场景?
  • 什么时候该使用onErrorContinue()
  • 当你想使用Backpressure时,可以使用哪些操作符来完成?
  • 你能使用哪些操作符完成斐波那契数列的计算?

这些都是需要开发者不停看文档才能逐渐累积起来对应的知识。 所以也就导致即便你的java 函数式编程用的炉火纯青,到响应式编程看到这么多个操作符还是得重走一遍西游路。

既要R2DBC,又要分库分表?

期望使用r2dbc的同时实现分库分表?可能你只有akka这个选择。

搜了一堆关于:“sharding jdbc 支持r2dbc吗?” , “r2dbc怎么实现分库分表”

得到的结果是:

在反应式编程 API 下 ShardingSphere JDBC 无法处理 R2DBC DataSource,仅可处理 JDBC DataSource。 在使用 WebFlux 组件的 Spring Boot 微服务中应避免创建 ShardingSphere JDBC DataSource。

具体可以看这个文档上说的:

Can I use ShardingSphere fully reactive with R2DBC without proxy and sidecar? #10837

shardingsphere support r2dbc?

最后找到的结果是可以通过akka来实现数据库分片:Database sharding

上下文问题

在传统的 Servlet 环境中的使用有ThreadLocal,而因为WebFlux 是非阻塞的、异步的,使用事件驱动模型,因此请求处理不再绑定到特定的线程。

这意味着不能再依赖 ThreadLocal 传递上下文信息,必须使用 Reactor 提供的 Context 进行显式的上下文传递。

所以当你想调试上下文丢失的问题时绝对是够你吃一壶的。

  • 你可能会遇到上下文值突然“消失”或意外覆盖的情况,却难以追踪其源头。不知道为啥被覆盖,被谁覆盖的。

  • 某些第三方库或现有代码可能仍依赖于 ThreadLocal 来存储用户上下文(例如日志的 MDC),但在 WebFlux 中,由于异步非阻塞的线程模型,ThreadLocal 的数据不会在反应式链中自动传递。 如果上下文依赖于 ThreadLocal,可能会遇到数据不一致或丢失的问题。

  • 使用阻塞操作(如 block()blockOptional()toFuture().get() 等)会导致上下文的丢失,因为阻塞操作会中断反应式链条,线程模型会发生改变,导致上下文无法被正确传递

写在最后

就目前而言,我还没有把WebFlux中的坑都踩一遍,所以即便是有以上种种不妥之处,但是在写响应式编程还是有种上瘾的感觉,毕竟也不会同时踩中上面说的所有坑。

你也可以说我是因为手里有把奇怪的锤子,所以看啥都像钉子想敲敲。

在一些场景下,会让我觉得省略了很多的代码,比如:

容错与重试机制

对请求的重试只需要.retry即可,比如下面这个:重试3次,每次重试间隔2秒。

WebClient webClient = WebClient.create("https://example.com");

webClient.get()
        .uri("/resource")
        .retrieve()
        .bodyToMono(String.class)
        // retry 3 times with a 2 seconds delay between retries
        .retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2)))
        .onErrorResume(e -> {
            // Handle the error case after retries fail
            System.out.println("Request failed after retries: " + e.getMessage());
            return Mono.empty();
        })
        .subscribe(response -> System.out.println("Response: " + response));

还能使用指数退避:Retry.backoff(3, Duration.ofSeconds(1)),每次重试的间隔时间呈指数级增长。

背压支持

比如:

  • 生产者每隔 100 毫秒生成一个数据项。
  • 消费者处理数据需要 500 毫秒。

通过背压机制,消费者可以根据其处理能力控制数据的获取速度,避免过载。

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;

public class BackpressureExample {

    public static void main(String[] args) throws InterruptedException {
        // 创建一个生产者,每 100 毫秒生成一个数字
        Flux<Long> fastProducer = Flux.interval(Duration.ofMillis(100))
                .log()  // 用于输出流的每个步骤
                .onBackpressureBuffer(100,   // 缓冲区大小为 10
                        value -> System.out.println("Dropping value: " + value)
                ); // 当缓冲区满时丢弃数据

        // 模拟一个处理较慢的消费者,每 500 毫秒处理一个数据
        fastProducer
                .publishOn(Schedulers.boundedElastic()) // 在一个弹性线程池上处理
                .subscribe(value -> {
                    try {
                        // 模拟慢处理
                        Thread.sleep(500);
                        System.out.println("Processed: " + value);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });

        // 保持主线程活跃一段时间以查看输出
        Thread.sleep(500000);
    }
}
转载自:https://juejin.cn/post/7419209528263393332
评论
请登录