向 JDK 学设计 | Stream 核心原理探析
[向 JDK 学设计] 系列第 2 篇,本系列会涉及一小部分源码,简单看看就行,重要的是学习源码背后的设计思想,然后运用到实际开发中
大家好,我是尘光,持续学习,持续创作中
前言
Java 开发离不开 JDK,JDK 本身经过了精心的设计,具有出色的性能,并且支持向下兼容。可以说 JDK 是 Java 软件工程师的第一手学习资料,其中的设计思想值得学习和借鉴
Java8 是 Java 的一个里程碑版本,引入了大量新特性,如 Lambda 表达式、默认方法、方法引用等。当然也有本文的主角:Stream API
Stream API 介绍
Stream API
以声明式的方式处理数据集合,与传统集合框架相比,链式操作让代码更简洁,可读性更强
Stream API
支持并行化处理,适当的使用场景下,可以充分利用多核 CPU 的优势
与传统的外部迭代不同,Stream API
使用内部迭代,Stream
框架自动处理迭代,底层也可以做更多的优化
Stream API
很多操作是惰性的,不会立即执行。相反,它们会构建一个操作管道,并在需要结果时才执行这些操作
示例
以一个场景为例,有一个乱序 List
集合,包含 1~10,现在要遍历集合,找出其中的奇数,并求平方,然后再倒序打印出来
在 Java8 以前,没有 Stream API
,需要外部迭代来处理,并且步骤较为繁琐
public static void main(String[] args){
List<Integer> numbers = Arrays.asList(2, 1, 4, 3, 5, 10, 9, 8, 7, 6);
// 1 外部迭代过滤
List<Integer> expected = new ArrayList<>();
for (Integer num : numbers) {
if (num % 2 == 1) {
expected.add(num * num);
}
}
// 2 排序,也会遍历过滤后的 List 的所有元素
Collections.sort(expected, new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o2 - o1;
}
});
// 3 外部迭代输出
for (Integer num : expected) {
System.out.println(num);
}
}
如果使用 Stream API
,在不考虑代码格式化的情况下,只需要一行代码,简洁很多
public static void main(String[] args){
List<Integer> numbers = Arrays.asList(2, 1, 4, 3, 5, 10, 9, 8, 7, 6);
numbers.stream().filter(item -> item % 2 == 1).map(item -> item * item).sorted(Comparator.reverseOrder()).forEach(System.out::println);
}
实际开发中,一般会对代码进行格式化,看起来也很简洁
public static void main(String[] args){
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers.stream()
.filter(item -> item % 2 == 1)
.map(item -> item * item)
.sorted(Comparator.reverseOrder())
.forEach(System.out::println);
}
操作分类
不考虑将原始数据转换为 Stream 的操作,Stream 链式声明中的操作分为如下两类
- 中间操作(Intermediate Operation)
- 终止操作(Terminal Operation)
中间操作不会立即进行计算,终止操作会触发计算。一个完整的 Stream
声明包含 0~n
个中间操作和 1
个终止操作。没有终止操作的 Stream
不会触发计算逻辑,没有实际意义
上述示例中的 Stream 声明包含 3
个中间操作和 1
个 终止操作,如下图所示
Stream 框架实现原理
几个重要的类或接口
Spliterator
Spliterator
(可分割迭代器)用于将原始数据分割成多个部分,以便在不同的线程上并行处理,即支持并行流(Parallel Streams)操作
Spliterator
可以被分割(split)成多个子级 Spliterator
。每个子级 Spliterator
都包含原始 Spliterator
元素的一个子集
Spliterator
接口有 2 个核心方法
trySplit()
尝试将当前Spliterator
分割为两个- 如果成功,返回一个包含原始
Spliterator
部分元素的新Spliterator
,原始Spliterator
则只包含剩余的元素 - 如果当前
Spliterator
不能或不应该被分割(例如,已经很小,或出于某种原因不适合分割),则返回null
- 如果成功,返回一个包含原始
tryAdvance(Consumer<? super T> action)
尝试处理一个元素- 如果当前
Spliterator
还有元素可以处理,并且处理成功,返回true
- 其他情况返回
false
- 如果当前
trySplit()
和 tryAdvance()
方法一起工作,以支持 Stream API
的并行处理功能。 trySplit()
用于分割原始数据以支持并行处理,而 tryAdvance()
则用于在无法或不需要分割时顺序处理元素
TerminalOp
终止操作顶层接口,用于标识 Stream Pipeline 中的终止操作
TerminalOp
的实现类需要实现 evaluateSequential()
和 evaluateParallel()
方法,也就是要支持串行和并行的处理能力
TerminalOp
的实现类如下
Sink
Sink
是一个关键的内部接口,本质上是一个 Consumer
,它定义了 Stream 之间的操作行为,包括 begin()
、end()
、cancellationRequested()
和 accept()
等方法。这些方法在 Stream 流水线上定义了数据如何被消费和处理
Sink
接口的定义如下
interface Sink<T> extends Consumer<T> {
default void begin(long size) {}
default void end() {}
default void accept(int value) {
throw new IllegalStateException("called wrong accept method");
}
default void accept(long value) {
throw new IllegalStateException("called wrong accept method");
}
default void accept(double value) {
throw new IllegalStateException("called wrong accept method");
}
// ...
}
当 Stream 流水线上有多个中间操作时,这些操作的核心代码通常都放在 Sink
的实现类中。Sink
在流水线中扮演着最终接收和处理数据的角色
在 Stream 流水线上,从最后一个 Stage(阶段)开始,通过不断地调用上一个 Stage 的 opWrapSink()
方法,可以得到一个代表流水线上所有操作的 Sink
。这个 Sink
封装了所有的中间操作和终止操作,定义了数据在流水线上的完整处理流程
PipelineHelper
抽象类,Stream Pipeline 执行的工具类
PipelineHelper
描述了 Stream 流水线的初始状态,包括原始数据、中间操作和并行信息,以及终止操作信息
AbstractPipeline
抽象类,表示流水线的一个阶段,可以是原始数据,也可以是中间操作。 AbstractPipeline
包含大部分的流水线计算逻辑,此外还包含不同操作的通用实现,非常重要
IntPipeline
, LongPipeline
, DoublePipeline
和 ReferencePipeline
有 3 个重要的内部类,以 ReferencePipeline
为例
Head
表示流水线的初始阶段,一般是原始数据。StatelessOp
用于标识无状态中间操作,StatefulOp
用于标识有状态中间操作
StreamSupport
工具类,用于创建和管理 Stream 对象,支持创建通用 Stream
、IntStream
、LongStream
, DoubleStream
对象
执行流程
1. 基于原始数据创建 Stream
原始数据可以是如下类型
- 数组
- 集合
- 生成器函数
- I/O 通道
如果原始数据为数组,可以通过 Arrays.stream()
的不同重载方法创建 Stream
如果原始数据为集合,可以直接调用集合对象的 stream()
或者 parallelStream()
方法转换为 Stream。因为这两个方法都是 Collection
接口的默认方法
可以直接通过 Stream
接口的 of(T)
, ofNullable(T)
, of(T...)
方法创建 Stream
还可以通过 Stream
接口的重载方法 iterate()
和 generate()
方法创建 Stream
Stream 框架在创建 Stream
实例时,底层使用 StreamSupport
。以集合方式创建 Stream 为例,具体流程如下
从以上时序图可知,一般情况下,创建的 Stream
是 ReferencePipeline.Head
实例,表示一个初始化阶段
示例
List<Integer> numbers = Arrays.asList(2, 1, 4, 3, 5, 10, 9, 8, 7, 6);
Stream<Integer> streamInstance = numbers.stream();
System.out.println(streamInstance.getClass());
调试结果如下
streamInstance
对应的 previousStage
为 null
, depth
为 0
2. 追加中间操作
前面已经分析过,AbstractPipeline
既可以表示原始数据,又可以表示中间操作,其中的 previousStage
表示流水线中的前一个阶段
每追加一个中间操作,新的中间操作的 previousStage
就保存上一个阶段。如果是追加第一个中间操作,previousStage
就是原始数据,如果不是追加第一个中间操作,previousStage
就是上一个中间操作,如下图所示
无状态中间操作
对第一步创建的 Stream
实例追加无状态中间操作,以 filter()
为例,具体流程如下
过滤数据不会改变元素,因此这一步返回 ReferencePipeline.StatelessOp
实例。此外,StatelessOp
为抽象类,不能实例化,这里返回的是对应的匿名类实例,并且重写了一个重要方法 opWrapSink()
示例
List<Integer> numbers = Arrays.asList(2, 1, 4, 3, 5, 10, 9, 8, 7, 6);
Stream<Integer> streamInstance = numbers.stream().filter(item -> item % 2 == 1);
System.out.println(streamInstance.getClass());
调试结果如下
streamInstance
对应的 previousStage
为 ReferencePipeline.Head
实例, sourceStage
取 previousStage
对应的 sourceStage
,因此一直指向 Stream 流水线的初始阶段,depth
为 1
有状态中间操作
对第一步创建的 Stream
实例追加有状态中间操作,以 distinct()
为例,具体流程如下
更一般地,通用的有状态中间操作流程如下,变化的仅仅是 XxxOps
比如,limit()
和 skip()
对应 SliceOps
, sorted()
对应 SortedOps
3. 触发终止操作
对前面创建的 Stream
实例触发终止操作,以 forEach()
为例
实例化终止操作
在 Stream 流水线上声明终止操作,程序开始执行后,触发 Stream
计算逻辑之前,会先实例化终止操作
仍然以 forEach()
为例,会实例化为 ForEachOps.OfRef
终止操作实例化完成后,结构如下
触发计算逻辑
实例化终止操作后,接下来在 ReferencePipeline
中会调用 evaluate()
自动触发计算逻辑,如下图所示
AbstractPipeline
中会根据 sourceStage.parallel
属性判断走并行处理逻辑还是串行处理逻辑,由于并行处理逻辑比较复杂,因此在以后的文章中详细介绍,这里先按串行处理逻辑继续分析 terminalOp.evaluateSequential()
先来看一下 evaluateSequential()
的核心流程
wrapSink
在追加无状态中间操作时,是以匿名类的方式实例化 StatelessOp
的,在 ReferencePipline
的实现中,一般会重写 opWrapSink()
方法,该方法返回 Sink.ChainedReference
类的实例,传入的 sink
会作为 Sink.ChainedReference
实例的 downstream
,形成一条链
相关源码如下
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
// 重写方法 ======================================
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
protected final Sink<? super E_OUT> downstream;
public ChainedReference(Sink<? super E_OUT> downstream) {
// 下游 Sink ========================================
this.downstream = Objects.requireNonNull(downstream);
}
// ...
}
wrapSink()
执行结束后的结构如下,由于 Source Pipeline 对应的 depth
为 0,不包含
使用 IDEA 的调试结果如下,符合上述结构
copyInto
在 AbstractPipeline
的 copyInto()
方法里就是真正的执行逻辑了,在本文的示例中,spliterator
为 Spliterators.ArraySpliterator
实例
调试结果如下,最终会执行 forEachRemaining()
方法
特性分析
内部迭代只遍历一次
不使用 Stream API
的示例中,我们遍历了 3 次集合,而在 Stream 框架中使用了内部迭代,只需要遍历一次
通过前面的分析,在 Stream 流水线上追加中间操作时,在重写的 opWrapSink()
方法中,通常有如下代码
// ...
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
// ...
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
// 核心逻辑
downstream.accept(u);
}
};
}
// ...
也就是说,一个中间操作会通过 downstream
进行连接。比如,声明了 2 个 filter,filterA
和 filterB
List<Integer> numbers = Arrays.asList(2, 1, 4, 3, 5, 10, 9, 8, 7, 6);
Predicate<Integer> filterA = item -> item % 2 == 1;
Predicate<Integer> filterB = item -> item >= 5;
Stream<Integer> streamInstance = numbers.stream().filter(filterA).filter(filterB);
streamInstance.forEach(System.out::println);
filterA
的 downstream
为 filterB
,因此 filterA
执行 accept()
方法时,downstream.accept(u)
会继续调用 filterB
的 accept()
方法,即在一次迭代中处理了多个计算逻辑
Stream 框架中的设计思想
高度抽象
Stream 框架将传统集合框架中进行数据处理的常用步骤抽象为中间操作和终止操作,简化了冗长的代码,增强了代码可读性
此外,抽象出通用接口 Stream
,还抽象出了实际开发中常用的 IntStream
, LongStream
, DoubleStream
。这几个接口都继承了 BaseStream
接口,其关系如下
其中,IntStream
, LongStream
, DoubleStream
和 Stream
接口是通过 BaseStream
产生联系的
IntStream
, LongStream
和 DoubleStream
中有一些特殊的方法,比如聚合方法 sum()
, max()
, min()
, average
, count()
。IntStream
有 asLongStream()
和 asDoubleStream()
方法。LongStream
有 asDoubleStream()
方法
声明式编程
声明式编程(Declarative Programming)的设计思想主要侧重于描述程序的目标或期望结果,而不是明确指定达到这些目标的具体步骤或算法
这种风格使得代码更加简洁和易于理解,因为我们只需要关注业务逻辑,而不需要关注底层实现细节,因为 Stream API
底层已经帮我们做好了后勤工作
惰性计算
在 Stream 框架中,只有终止操作会触发计算,这使用了惰性计算思想
惰性计算的核心思想是仅在真正需要执行某个表达式或函数时才进行计算,而不是在定义时或调用时立即进行计算
这种机制有很多优点,如避免不必要的计算、提升性能、节省空间等
原始数据只有在需要计算时才被计算;一个元素的不同计算逻辑不必完全计算,只有需要的部分才被计算
内部迭代
Stream 框架采用内部迭代的方式处理数据,即数据的迭代和处理由 Stream API
负责,而不需要显式地编写循环操作
并行处理
Stream 框架支持并行处理数据,通过将流转换为并行流(调用 parallel()
方法),可以在多个线程上同时处理数据。这种设计使得 Stream API
能够充分利用多核 CPU 的性能,提高程序的执行效率
Stream 框架中的设计模式
流式接口模式
构建 Stream 时支持链式调用,这是 Stream API
的一大特性,使用了流式接口模式(Fluent Interface Pattern)
流式接口允许方法调用以串行链的方式组合在一起,使得代码更加易读和简洁
流式接口的方法返回值为抽象接口本身,在 Stream 框架中,filter()
、map()
、sorted()
等中间操作方法的返回值都是 Stream
接口,因此可以连续添加多个中间操作,最后一个中间操作仍然返回 Stream
接口,因此可以继续添加一个终止操作
public interface Stream<T> extends BaseStream<T, Stream<T>> {
Stream<T> filter(Predicate<? super T> predicate);
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
Stream<T> sorted();
Stream<T> sorted(Comparator<? super T> comparator);
void forEach(Consumer<? super T> action);
// ...
}
模板方法模式
由于 Stream 框架使用内部迭代处理数据,因此框架内部大量使用了模板方法模式,在模板方法中定义通用处理逻辑,模板内部调用用户传入的参数
比如 filter()
中间操作
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
// 钩子方法,具体实现逻辑由应用程序传入
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
在抽象类 ReferencePipeline
中定义了 filter()
操作的处理流程。在创建时,检查 predicate
不可为空;在触发计算时,通过 predicate
的测试结果判断是否要包含该元素
结语
以上是对 Stream API
核心执行原理的一些探析,由于篇幅原因,其中关于并行处理逻辑的部分没有展开,将在后续的文章中详细介绍
参考文档
Processing Data with Java SE 8 Streams, Part 1
转载自:https://juejin.cn/post/7371633297154687030