Spring WebFlux之Flux
Spring WebFlux
是Spring 5
推出响应式/反应式
Web框架,支持异步
、非阻塞
的请求。本文主要介绍一下Spring WebFlux
中的Flux
。
Flux (发射器)
是一个以数据流
的形式发出 0 到 N
个元素的发布者(Publisher)
,Flux
发布的数据数据流
可以被一个错误(error)
或完成(completion)
信号终止,也可以被取消
信号取消。
Flux
是Pulisher
的子类,具有Pulisher
发出Subscribe
、cancel
、Error
、Complete
信号。
创建
创建Flux
的方法比较多,可以分为根据单个数据项
创建、根据Publisher/Iterable/Array/Stream
创建、创建同步/异步
的Flux
、创建延迟创建的Flux
、
根据单个数据项
根据单个数据项
创建使用just
方法,just
方法直接在参数中传入数据
即可创建FLux
定义
@SafeVarargs
public static <T> Flux<T> just(T... data);
public static <T> Flux<T> just(T data);
效果图
示例
根据Publisher/Iterable/Array/Stream
创建
根据集合/Stream/Publisher
创建Flux
的方法有from
、fromArray
、fromIterable
、fromStream
定义
public static <T> Flux<T> from(Publisher<? extends T> source);
public static <T> Flux<T> fromArray(T[] array);
public static <T> Flux<T> fromIterable(Iterable<? extends T> it);
public static <T> Flux<T> fromStream(Stream<? extends T> s);
public static <T> Flux<T> fromStream(Supplier<Stream<? extends T>> streamSupplier);
效果图
示例
创建同步/异步
的Flux
create
方法即可以创建同步的Flux
又可以创建异步的Flux
,generate
方法可以创建以 同步的
, 逐个地
产生值的Flux
定义
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter);
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator);
create
方法的参数是一个消费类型的函数式接口
,这个函数式接口
的参数是一个FluxSink(发布者)
。即FluxSink
发布的数据
被订阅用来创建Flux
generate
方法的参数也是一个消费类型的函数式接口
,但是这个函数式接口
的参数是一个SynchronousSink(同步的发布者)
效果图
示例
创建延迟创建的Flux
defer
函数可以创建一个延迟创建的Flux
,即只有在这个Flux
被订阅的时候才回创建。
定义
public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier);
效果图
示例
创建其他类型的Flux
range
方法可以创建发出一系列递增的整数的Flux
,empty
方法可以创建一个空的Flux
,error
方法创建一个在订阅后立即以错误终止的Flux
定义
public static Flux<Integer> range(int start, int count);
public static <T> Flux<T> empty();
public static <T> Flux<T> error(Throwable error);
public static <T> Flux<T> error(Supplier<? extends Throwable> errorSupplier);
public static <O> Flux<O> error(Throwable throwable, boolean whenRequested);
效果图
示例
订阅
Flux
被订阅后会将数据逐个发给订阅者
,订阅者通过subscribe
方法订阅消费Flux
发来的数据
定义
// 订阅但不处理数据
public final Disposable subscribe();
// 订阅并处理接收数据
public final Disposable subscribe(Consumer<? super T> consumer);
// 订阅并处理接收数据,第二个参数用来处理接收到的错误信号
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer);
// 订阅并处理接收数据,第二个参数用来处理接收到的错误信号, 第三个参数用来处理接收到的完成信号
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer);
效果图
示例:
合并操作
Flux
提供了concat
、merge
、zip
方法用于合并Flux
定义
// 按照首位相连的方式合并两个`Flux`,并返回一个新的`Flux`
public static <T> Flux<T> concat(Publisher<? extends T>... sources);
// 按照`Flux`中数据出现的时刻合并两个`Flux`,并返回一个新的`FLux`
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source);
// 按照所有源中元素出现的顺序,将所有源的元素一比一组合成元组
public static <T1, T2, O> Flux<O> zip( Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
final BiFunction<? super T1, ? super T2, ? extends O> combinator)
效果图
示例
转换
Flux
的转换操作的方法有map
、flatMap
、handle
,其中map
和flatMap
类似于Stream
中的map
和flatMap
,handle
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper);
public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);
public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency);
public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency, int prefetch);
public final <R> Flux<R> flatMap(
@Nullable Function<? super T, ? extends Publisher<? extends R>> mapperOnNext,
@Nullable Function<? super Throwable, ? extends Publisher<? extends R>> mapperOnError,
@Nullable Supplier<? extends Publisher<? extends R>> mapperOnComplete);
public final <R> Flux<R> handle(BiConsumer<? super T, SynchronousSink<R>> handler);
handle
方法接收一个BiConsumer
函数式接口,BiConsumer
的第一个参数表示源数据,第二个是SynchronousSink
用于将转换后的数据发向下一个节点
效果图
示例
转成Iterable/Stream
Flux
实例可以调用toIterable
、toStream
方法转成Iterable/Stream
效果图
示例
过滤
Flux
用于过滤操作的方法有filter
、filterWhen
、ofType
、distinct
、take
、skip
等等
定义
// 类似于Stream中的filter,根据断言结果过滤数据, 保留断言为true的数据
public final Flux<T> filter(Predicate<? super T> p);
// 接收一个函数式接口类型的参数,函数式接口的第一个参数是原数据,第二个参数是个发布者,这个发布者发布的布尔值被用作过滤数据
public final Flux<T> filterWhen(Function<? super T, ? extends Publisher<Boolean>> asyncPredicate);
// 根据元素的类型过滤
public final <U> Flux<U> ofType(final Class<U> clazz);
// 取出重复元素
public final Flux<T> distinct();
// 根据keySelector的结果去重
public final <V> Flux<T> distinct(Function<? super T, ? extends V> keySelector);
// 保留前n个数据
public final Flux<T> take(long n);
//limitRequest: true将上游请求数量限制为n 。 false从上游请求无限数量的元素
public final Flux<T> take(long n, boolean limitRequest);
// 根据时间间隔保留元素
public final Flux<T> take(Duration timespan);
// 跳过前n个元素
public final Flux<T> skip(long skipped);
// 根据时间间隔跳过元素
public final Flux<T> skip(Duration timespan);
效果图
示例
分组
对Flux
发出的数据进行分组可以使用buffer
、window
、groupBy
方法
定义
// 收集数据放入List中发出
public final Flux<List<T>> buffer();
// 收集指定个数的数据放入List中发出
public final Flux<List<T>> buffer(int maxSize);
// 收集指定个数的数据放入指定容器中发出
public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, Supplier<C> bufferSupplier);
// 跳过指定数量的元素后, 收集指定个数的数据放入List中发出
public final Flux<List<T>> buffer(int maxSize, int skip);
// 收集指定个数的数据放入Flux中发出
public final Flux<Flux<T>> window(int maxSize);
// 跳过指定数量的元素后, 收集指定个数的数据放入Flux中发出
public final Flux<Flux<T>> window(int maxSize, int skip);
// 将Flux中的元素按自定义的规则进行分组放入一个`GroupedFlux`中
public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K> keyMapper);
// prefetch: 从源中预取数据的数量
public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K> keyMapper, int prefetch);
// 将Flux中的元素按自定义的规则进行分组放入一个`GroupedFlux`中, 第二个参数用于处理value
public final <K, V> Flux<GroupedFlux<K, V>> groupBy(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends V> valueMapper)
public final <K, V> Flux<GroupedFlux<K, V>> groupBy(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends V> valueMapper, int prefetch);
效果图
示例
分支处理
Flux
提供了用于分组处理的方法switchIfEmpty
, 如果原Flux
中没有数据,则使用switchIfEmpty
方法中参数发出的数据
定义
// 如果原Flux是空的,则使用参数alternate发出的数据
public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate);
效果图
示例
本文仅介绍了Flux
的部分方法,Flux
中还有很多有趣的方法。
转载自:https://juejin.cn/post/6995109051730231332