likes
comments
collection
share

Spring WebFlux之Flux

作者站长头像
站长
· 阅读数 50

Spring WebFluxSpring 5推出响应式/反应式Web框架,支持异步非阻塞的请求。本文主要介绍一下Spring WebFlux中的FluxFlux (发射器) 是一个以数据流的形式发出 0 到 N 个元素的发布者(Publisher)Flux发布的数据数据流可以被一个错误(error)完成(completion)信号终止,也可以被取消信号取消。

FluxPulisher的子类,具有Pulisher发出SubscribecancelErrorComplete信号。

Spring WebFlux之Flux

创建

创建Flux的方法比较多,可以分为根据单个数据项创建、根据Publisher/Iterable/Array/Stream创建、创建同步/异步Flux、创建延迟创建的Flux

根据单个数据项

根据单个数据项创建使用just方法,just方法直接在参数中传入数据即可创建FLux 定义

@SafeVarargs
public static <T> Flux<T> just(T... data);

public static <T> Flux<T> just(T data);

效果图 Spring WebFlux之Flux 示例 Spring WebFlux之Flux

根据Publisher/Iterable/Array/Stream创建

根据集合/Stream/Publisher创建Flux的方法有fromfromArrayfromIterablefromStream 定义

public static <T> Flux<T> from(Publisher<? extends T> source);

public static <T> Flux<T> fromArray(T[] array);

public static <T> Flux<T> fromIterable(Iterable<? extends T> it);

public static <T> Flux<T> fromStream(Stream<? extends T> s);

public static <T> Flux<T> fromStream(Supplier<Stream<? extends T>> streamSupplier);

效果图 Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

示例 Spring WebFlux之Flux

创建同步/异步Flux

create方法即可以创建同步的Flux又可以创建异步的Fluxgenerate方法可以创建以 同步的, 逐个地产生值的Flux 定义

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter);

public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator);

create方法的参数是一个消费类型的函数式接口,这个函数式接口的参数是一个FluxSink(发布者)。即FluxSink发布的数据被订阅用来创建Flux

generate方法的参数也是一个消费类型的函数式接口,但是这个函数式接口的参数是一个SynchronousSink(同步的发布者)

效果图 Spring WebFlux之Flux

Spring WebFlux之Flux

示例 Spring WebFlux之Flux

创建延迟创建的Flux

defer函数可以创建一个延迟创建的Flux ,即只有在这个Flux被订阅的时候才回创建。 定义

public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier);

效果图 Spring WebFlux之Flux

示例 Spring WebFlux之Flux

创建其他类型的Flux

range方法可以创建发出一系列递增的整数的Flux,empty方法可以创建一个空的Fluxerror方法创建一个在订阅后立即以错误终止的Flux

定义

public static Flux<Integer> range(int start, int count);

public static <T> Flux<T> empty();

public static <T> Flux<T> error(Throwable error);
public static <T> Flux<T> error(Supplier<? extends Throwable> errorSupplier);
public static <O> Flux<O> error(Throwable throwable, boolean whenRequested);

效果图 Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

示例 Spring WebFlux之Flux

订阅

Flux被订阅后会将数据逐个发给订阅者,订阅者通过subscribe方法订阅消费Flux发来的数据

定义

// 订阅但不处理数据
public final Disposable subscribe();

// 订阅并处理接收数据
public final Disposable subscribe(Consumer<? super T> consumer);

// 订阅并处理接收数据,第二个参数用来处理接收到的错误信号
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer);

// 订阅并处理接收数据,第二个参数用来处理接收到的错误信号, 第三个参数用来处理接收到的完成信号
public final Disposable subscribe(
      @Nullable Consumer<? super T> consumer,
      @Nullable Consumer<? super Throwable> errorConsumer,
      @Nullable Runnable completeConsumer);

效果图

Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

示例: Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

合并操作

Flux提供了concatmergezip方法用于合并Flux 定义

// 按照首位相连的方式合并两个`Flux`,并返回一个新的`Flux`
public static <T> Flux<T> concat(Publisher<? extends T>... sources);

// 按照`Flux`中数据出现的时刻合并两个`Flux`,并返回一个新的`FLux`
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source);

// 按照所有源中元素出现的顺序,将所有源的元素一比一组合成元组                                   
public static <T1, T2, O> Flux<O> zip(  Publisher<? extends T1> source1,
                                        Publisher<? extends T2> source2,
                                        final BiFunction<? super T1, ? super T2, ? extends O> combinator)

效果图

Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

示例

Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

转换

Flux的转换操作的方法有mapflatMaphandle,其中mapflatMap类似于Stream中的mapflatMaphandle


public final <V> Flux<V> map(Function<? super T, ? extends V> mapper);

public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);

public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency);

public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency, int prefetch);

public final <R> Flux<R> flatMap(
      @Nullable Function<? super T, ? extends Publisher<? extends R>> mapperOnNext,
      @Nullable Function<? super Throwable, ? extends Publisher<? extends R>> mapperOnError,
      @Nullable Supplier<? extends Publisher<? extends R>> mapperOnComplete);
      
public final <R> Flux<R> handle(BiConsumer<? super T, SynchronousSink<R>> handler);

handle方法接收一个BiConsumer函数式接口,BiConsumer的第一个参数表示源数据,第二个是SynchronousSink用于将转换后的数据发向下一个节点

效果图

Spring WebFlux之Flux

Spring WebFlux之Flux

示例

Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

转成Iterable/Stream

Flux实例可以调用toIterabletoStream方法转成Iterable/Stream 效果图

Spring WebFlux之Flux

Spring WebFlux之Flux

示例

Spring WebFlux之Flux

过滤

Flux用于过滤操作的方法有filterfilterWhenofTypedistincttakeskip等等

定义

// 类似于Stream中的filter,根据断言结果过滤数据, 保留断言为true的数据
public final Flux<T> filter(Predicate<? super T> p);

// 接收一个函数式接口类型的参数,函数式接口的第一个参数是原数据,第二个参数是个发布者,这个发布者发布的布尔值被用作过滤数据
public final Flux<T> filterWhen(Function<? super T, ? extends Publisher<Boolean>> asyncPredicate);

// 根据元素的类型过滤
public final <U> Flux<U> ofType(final Class<U> clazz);

// 取出重复元素
public final Flux<T> distinct();

// 根据keySelector的结果去重
public final <V> Flux<T> distinct(Function<? super T, ? extends V> keySelector);

// 保留前n个数据
public final Flux<T> take(long n);

//limitRequest: true将上游请求数量限制为n 。 false从上游请求无限数量的元素
public final Flux<T> take(long n, boolean limitRequest);

// 根据时间间隔保留元素
public final Flux<T> take(Duration timespan);

// 跳过前n个元素
public final Flux<T> skip(long skipped);

// 根据时间间隔跳过元素
public final Flux<T> skip(Duration timespan);

效果图

Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

示例

Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

分组

Flux发出的数据进行分组可以使用bufferwindowgroupBy方法

定义

// 收集数据放入List中发出
public final Flux<List<T>> buffer();

// 收集指定个数的数据放入List中发出
public final Flux<List<T>> buffer(int maxSize);

// 收集指定个数的数据放入指定容器中发出
public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, Supplier<C> bufferSupplier);

// 跳过指定数量的元素后, 收集指定个数的数据放入List中发出
public final Flux<List<T>> buffer(int maxSize, int skip);

// 收集指定个数的数据放入Flux中发出
public final Flux<Flux<T>> window(int maxSize);

// 跳过指定数量的元素后, 收集指定个数的数据放入Flux中发出
public final Flux<Flux<T>> window(int maxSize, int skip);

// 将Flux中的元素按自定义的规则进行分组放入一个`GroupedFlux`中
public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K> keyMapper);

// prefetch: 从源中预取数据的数量
public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K> keyMapper, int prefetch);

// 将Flux中的元素按自定义的规则进行分组放入一个`GroupedFlux`中, 第二个参数用于处理value
public final <K, V> Flux<GroupedFlux<K, V>> groupBy(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends V> valueMapper)


public final <K, V> Flux<GroupedFlux<K, V>> groupBy(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends V> valueMapper, int prefetch);

效果图

Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

Spring WebFlux之Flux

示例

Spring WebFlux之Flux

Spring WebFlux之Flux

分支处理

Flux提供了用于分组处理的方法switchIfEmpty, 如果原Flux中没有数据,则使用switchIfEmpty方法中参数发出的数据 定义

// 如果原Flux是空的,则使用参数alternate发出的数据
public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate);

效果图

Spring WebFlux之Flux

示例

Spring WebFlux之Flux

本文仅介绍了Flux的部分方法,Flux中还有很多有趣的方法。

转载自:https://juejin.cn/post/6995109051730231332
评论
请登录