likes
comments
collection
share

向 JDK 学设计 | Stream 核心原理探析

作者站长头像
站长
· 阅读数 39

[向 JDK 学设计] 系列第 2 篇,本系列会涉及一小部分源码,简单看看就行,重要的是学习源码背后的设计思想,然后运用到实际开发中

大家好,我是尘光,持续学习,持续创作中

前言

Java 开发离不开 JDK,JDK 本身经过了精心的设计,具有出色的性能,并且支持向下兼容。可以说 JDK 是 Java 软件工程师的第一手学习资料,其中的设计思想值得学习和借鉴

Java8 是 Java 的一个里程碑版本,引入了大量新特性,如 Lambda 表达式、默认方法、方法引用等。当然也有本文的主角:Stream API

Stream API 介绍

Stream API 以声明式的方式处理数据集合,与传统集合框架相比,链式操作让代码更简洁,可读性更强

Stream API 支持并行化处理,适当的使用场景下,可以充分利用多核 CPU 的优势

与传统的外部迭代不同,Stream API 使用内部迭代,Stream 框架自动处理迭代,底层也可以做更多的优化

Stream API 很多操作是惰性的,不会立即执行。相反,它们会构建一个操作管道,并在需要结果时才执行这些操作

示例

以一个场景为例,有一个乱序 List 集合,包含 1~10,现在要遍历集合,找出其中的奇数,并求平方,然后再倒序打印出来

在 Java8 以前,没有 Stream API,需要外部迭代来处理,并且步骤较为繁琐

public static void main(String[] args){
    List<Integer> numbers = Arrays.asList(2, 1, 4, 3, 5, 10, 9, 8, 7, 6);

    // 1 外部迭代过滤
    List<Integer> expected = new ArrayList<>();
    for (Integer num : numbers) {
      if (num % 2 == 1) {
        expected.add(num * num);
      }
    }

    // 2 排序,也会遍历过滤后的 List 的所有元素
    Collections.sort(expected, new Comparator<Integer>() {
      @Override
      public int compare(Integer o1, Integer o2) {
        return o2 - o1;
      }
    });

    // 3 外部迭代输出
    for (Integer num : expected) {
      System.out.println(num);
    }
}

如果使用 Stream API,在不考虑代码格式化的情况下,只需要一行代码,简洁很多

public static void main(String[] args){
    List<Integer> numbers = Arrays.asList(2, 1, 4, 3, 5, 10, 9, 8, 7, 6);
    numbers.stream().filter(item -> item % 2 == 1).map(item -> item * item).sorted(Comparator.reverseOrder()).forEach(System.out::println);
  }

实际开发中,一般会对代码进行格式化,看起来也很简洁

public static void main(String[] args){
    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    numbers.stream()
            .filter(item -> item % 2 == 1)
            .map(item -> item * item)
            .sorted(Comparator.reverseOrder())
            .forEach(System.out::println);
}

操作分类

不考虑将原始数据转换为 Stream 的操作,Stream 链式声明中的操作分为如下两类

  • 中间操作(Intermediate Operation)
  • 终止操作(Terminal Operation)

中间操作不会立即进行计算,终止操作会触发计算。一个完整的 Stream 声明包含 0~n 个中间操作和 1 个终止操作。没有终止操作的 Stream 不会触发计算逻辑,没有实际意义

上述示例中的 Stream 声明包含 3 个中间操作和 1 个 终止操作,如下图所示 向 JDK 学设计 | Stream 核心原理探析

Stream 框架实现原理

几个重要的类或接口

Spliterator

Spliterator(可分割迭代器)用于将原始数据分割成多个部分,以便在不同的线程上并行处理,即支持并行流(Parallel Streams)操作

Spliterator 可以被分割(split)成多个子级 Spliterator。每个子级 Spliterator 都包含原始 Spliterator 元素的一个子集

Spliterator 接口有 2 个核心方法

  • trySplit() 尝试将当前 Spliterator 分割为两个
    • 如果成功,返回一个包含原始 Spliterator 部分元素的新 Spliterator,原始 Spliterator 则只包含剩余的元素
    • 如果当前 Spliterator 不能或不应该被分割(例如,已经很小,或出于某种原因不适合分割),则返回 null
  • tryAdvance(Consumer<? super T> action) 尝试处理一个元素
    • 如果当前 Spliterator 还有元素可以处理,并且处理成功,返回 true
    • 其他情况返回 false

trySplit()tryAdvance() 方法一起工作,以支持 Stream API 的并行处理功能。 trySplit() 用于分割原始数据以支持并行处理,而 tryAdvance() 则用于在无法或不需要分割时顺序处理元素

TerminalOp

终止操作顶层接口,用于标识 Stream Pipeline 中的终止操作

TerminalOp 的实现类需要实现 evaluateSequential()evaluateParallel() 方法,也就是要支持串行和并行的处理能力

TerminalOp 的实现类如下

«interface»
TerminalOp
FindOp
MatchOp
ForEachOp
«abstract»
ReduceOp
OfRef
OfInt
OfLong
OfDouble

Sink

Sink 是一个关键的内部接口,本质上是一个 Consumer,它定义了 Stream 之间的操作行为,包括 begin()end()cancellationRequested() accept() 等方法。这些方法在 Stream 流水线上定义了数据如何被消费和处理

Sink 接口的定义如下

interface Sink<T> extends Consumer<T> {
    default void begin(long size) {}
    
    default void end() {}
    
    default void accept(int value) {
        throw new IllegalStateException("called wrong accept method");
    }

    default void accept(long value) {
        throw new IllegalStateException("called wrong accept method");
    }

    default void accept(double value) {
        throw new IllegalStateException("called wrong accept method");
    }
    
    // ...
}

当 Stream 流水线上有多个中间操作时,这些操作的核心代码通常都放在 Sink 的实现类中。Sink 在流水线中扮演着最终接收和处理数据的角色

在 Stream 流水线上,从最后一个 Stage(阶段)开始,通过不断地调用上一个 Stage 的 opWrapSink() 方法,可以得到一个代表流水线上所有操作的 Sink。这个 Sink 封装了所有的中间操作和终止操作,定义了数据在流水线上的完整处理流程

PipelineHelper

抽象类,Stream Pipeline 执行的工具类

PipelineHelper 描述了 Stream 流水线的初始状态,包括原始数据、中间操作和并行信息,以及终止操作信息

AbstractPipeline

抽象类,表示流水线的一个阶段,可以是原始数据,也可以是中间操作。 AbstractPipeline 包含大部分的流水线计算逻辑,此外还包含不同操作的通用实现,非常重要

«abstract»
PipelineHelper<P_OUT>
«abstract»
AbstractPipeline<E_IN, E_OUT, S extends BaseStream>
-AbstractPipeline sourceStage
-AbstractPipeline previousStage
-AbstractPipeline nextStage
-int depth
«abstract»
IntPipeline
«abstract»
LongPipeline
«abstract»
DoublePipeline
«abstract»
ReferencePipeline

IntPipeline, LongPipeline, DoublePipelineReferencePipeline 有 3 个重要的内部类,以 ReferencePipeline 为例

«abstract»
ReferencePipeline<P_IN, P_OUT>
Head
«abstract»
StatelessOp
boolean opIsStateful()
«abstract»
StatefulOp
boolean opIsStateful()

Head 表示流水线的初始阶段,一般是原始数据。StatelessOp 用于标识无状态中间操作,StatefulOp 用于标识有状态中间操作

StreamSupport

工具类,用于创建和管理 Stream 对象,支持创建通用 StreamIntStreamLongStream, DoubleStream 对象

执行流程

1. 基于原始数据创建 Stream

原始数据可以是如下类型

  • 数组
  • 集合
  • 生成器函数
  • I/O 通道

如果原始数据为数组,可以通过 Arrays.stream() 的不同重载方法创建 Stream

如果原始数据为集合,可以直接调用集合对象的 stream() 或者 parallelStream() 方法转换为 Stream。因为这两个方法都是 Collection 接口的默认方法

可以直接通过 Stream 接口的 of(T), ofNullable(T), of(T...) 方法创建 Stream

还可以通过 Stream 接口的重载方法 iterate()generate() 方法创建 Stream

Stream 框架在创建 Stream 实例时,底层使用 StreamSupport。以集合方式创建 Stream 为例,具体流程如下

Stream FrameApplicationCollectionStreamSupportReferencePipeline.Headstream()1stream()2构造方法3Stream 实例4Stream 实例5Stream 实例6ApplicationCollectionStreamSupportReferencePipeline.Head

从以上时序图可知,一般情况下,创建的 StreamReferencePipeline.Head 实例,表示一个初始化阶段

示例

List<Integer> numbers = Arrays.asList(2, 1, 4, 3, 5, 10, 9, 8, 7, 6);
Stream<Integer> streamInstance = numbers.stream();
System.out.println(streamInstance.getClass());

调试结果如下 向 JDK 学设计 | Stream 核心原理探析 streamInstance 对应的 previousStagenull, depth0

2. 追加中间操作

前面已经分析过,AbstractPipeline 既可以表示原始数据,又可以表示中间操作,其中的 previousStage 表示流水线中的前一个阶段

每追加一个中间操作,新的中间操作的 previousStage 就保存上一个阶段。如果是追加第一个中间操作,previousStage 就是原始数据,如果不是追加第一个中间操作,previousStage 就是上一个中间操作,如下图所示 向 JDK 学设计 | Stream 核心原理探析

无状态中间操作

对第一步创建的 Stream 实例追加无状态中间操作,以 filter() 为例,具体流程如下

ReferencePipeline 实例ReferencePipelineReferencePipeline.StatelessOpfilter()1构造方法2StatelessOp 实例3StatelessOp 实例4ReferencePipeline 实例ReferencePipelineReferencePipeline.StatelessOp

过滤数据不会改变元素,因此这一步返回 ReferencePipeline.StatelessOp 实例。此外,StatelessOp 为抽象类,不能实例化,这里返回的是对应的匿名类实例,并且重写了一个重要方法 opWrapSink()

示例

List<Integer> numbers = Arrays.asList(2, 1, 4, 3, 5, 10, 9, 8, 7, 6);
Stream<Integer> streamInstance = numbers.stream().filter(item -> item % 2 == 1);
System.out.println(streamInstance.getClass());

调试结果如下 向 JDK 学设计 | Stream 核心原理探析 streamInstance 对应的 previousStageReferencePipeline.Head 实例, sourceStagepreviousStage 对应的 sourceStage,因此一直指向 Stream 流水线的初始阶段,depth1

有状态中间操作

对第一步创建的 Stream 实例追加有状态中间操作,以 distinct() 为例,具体流程如下

ReferencePipeline 实例ReferencePipelineDistinctOpsReferencePipeline.StatefulOpdistinct()1makeRef()2构造方法3StatefulOp 实例4StatefulOp 实例5StatefulOp 实例6ReferencePipeline 实例ReferencePipelineDistinctOpsReferencePipeline.StatefulOp

更一般地,通用的有状态中间操作流程如下,变化的仅仅是 XxxOps

ReferencePipeline 实例ReferencePipelineXxxOpsReferencePipeline.StatefulOp有状态中间操作1makeRef()2构造方法3StatefulOp 实例4StatefulOp 实例5StatefulOp 实例6ReferencePipeline 实例ReferencePipelineXxxOpsReferencePipeline.StatefulOp

比如,limit()skip() 对应 SliceOps, sorted() 对应 SortedOps

3. 触发终止操作

对前面创建的 Stream 实例触发终止操作,以 forEach() 为例

实例化终止操作

在 Stream 流水线上声明终止操作,程序开始执行后,触发 Stream 计算逻辑之前,会先实例化终止操作

仍然以 forEach() 为例,会实例化为 ForEachOps.OfRef

ReferencePipeline 实例ReferencePipelineForEachOpsForEachOps.OfRefforEach()1实例化终止操作 makeRef()2构造方法3TerminalOp 实现类实例这里是 ForEachOps.OfRef 实例4TerminalOp 实现类实例这里是 ForEachOps.OfRef 实例5ReferencePipeline 实例ReferencePipelineForEachOpsForEachOps.OfRef

终止操作实例化完成后,结构如下 向 JDK 学设计 | Stream 核心原理探析

触发计算逻辑

实例化终止操作后,接下来在 ReferencePipeline 中会调用 evaluate() 自动触发计算逻辑,如下图所示

ReferencePipeline 实例ReferencePipelineAbstractPipelineForEachOp1. 实例化终止操作,详情见上一步2. 触发计算逻辑alt[isParallel()]forEach()1evaluate()2terminalOp.evaluateParallel()3terminalOp.evaluateSequential()4ReferencePipeline 实例ReferencePipelineAbstractPipelineForEachOp

AbstractPipeline 中会根据 sourceStage.parallel 属性判断走并行处理逻辑还是串行处理逻辑,由于并行处理逻辑比较复杂,因此在以后的文章中详细介绍,这里先按串行处理逻辑继续分析 terminalOp.evaluateSequential()

先来看一下 evaluateSequential() 的核心流程

ForEachOpAbstractPipelineReferencePipelineSink.ChainedReferenceSpliterators.ArraySpliteratorwrapSink() 方法从终止操作往前依次调用 opSrapSink()wrappedSinkloop[p.depth > 0]par[wrapSink() 方法]copyInto() 方法p = p.previousStageloop[p.depth > 0]此时 p 为 sourceStagepar[copyIntoWithCancel()]alt[不满足短路条件]wrapAndCopyInto()1opWrapSink()2opWrapSink()3Sink 实例4sink 实例5wrappedSink.begin()6spliterator.forEachRemaining()7wrappedSink.end()8wrappedSink.begin()9p.forEachWithCancel()10wrappedSink.end()11ForEachOpAbstractPipelineReferencePipelineSink.ChainedReferenceSpliterators.ArraySpliterator
wrapSink

在追加无状态中间操作时,是以匿名类的方式实例化 StatelessOp 的,在 ReferencePipline 的实现中,一般会重写 opWrapSink() 方法,该方法返回 Sink.ChainedReference 类的实例,传入的 sink 会作为 Sink.ChainedReference 实例的 downstream,形成一条链

相关源码如下

public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        
        // 重写方法 ======================================
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}

abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
    protected final Sink<? super E_OUT> downstream;

    public ChainedReference(Sink<? super E_OUT> downstream) {
        // 下游 Sink ========================================
        this.downstream = Objects.requireNonNull(downstream);
    }

    // ...
}

wrapSink() 执行结束后的结构如下,由于 Source Pipeline 对应的 depth 为 0,不包含 向 JDK 学设计 | Stream 核心原理探析 使用 IDEA 的调试结果如下,符合上述结构 向 JDK 学设计 | Stream 核心原理探析

copyInto

AbstractPipelinecopyInto() 方法里就是真正的执行逻辑了,在本文的示例中,spliteratorSpliterators.ArraySpliterator 实例

调试结果如下,最终会执行 forEachRemaining() 方法 向 JDK 学设计 | Stream 核心原理探析

特性分析

内部迭代只遍历一次

不使用 Stream API 的示例中,我们遍历了 3 次集合,而在 Stream 框架中使用了内部迭代,只需要遍历一次

通过前面的分析,在 Stream 流水线上追加中间操作时,在重写的 opWrapSink() 方法中,通常有如下代码

// ...

@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
    return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
        // ...

        @Override
        public void accept(P_OUT u) {
            if (predicate.test(u))
                // 核心逻辑
                downstream.accept(u);
        }
    };
}

// ...

也就是说,一个中间操作会通过 downstream 进行连接。比如,声明了 2 个 filter,filterAfilterB

List<Integer> numbers = Arrays.asList(2, 1, 4, 3, 5, 10, 9, 8, 7, 6);
Predicate<Integer> filterA = item -> item % 2 == 1;
Predicate<Integer> filterB = item -> item >= 5;
Stream<Integer> streamInstance = numbers.stream().filter(filterA).filter(filterB);
streamInstance.forEach(System.out::println);

filterAdownstreamfilterB,因此 filterA 执行 accept() 方法时,downstream.accept(u) 会继续调用 filterBaccept() 方法,即在一次迭代中处理了多个计算逻辑

Stream 框架中的设计思想

高度抽象

Stream 框架将传统集合框架中进行数据处理的常用步骤抽象为中间操作和终止操作,简化了冗长的代码,增强了代码可读性

此外,抽象出通用接口 Stream,还抽象出了实际开发中常用的 IntStream, LongStream, DoubleStream。这几个接口都继承了 BaseStream 接口,其关系如下

«interface»
BaseStream<T, S>
«interface»
Stream<T>
«interface»
IntStream
«interface»
LongStream
«interface»
DoubleStream

其中,IntStream, LongStream, DoubleStreamStream 接口是通过 BaseStream 产生联系的

IntStream, LongStreamDoubleStream 中有一些特殊的方法,比如聚合方法 sum(), max(), min(), average, count()IntStreamasLongStream()asDoubleStream() 方法。LongStreamasDoubleStream() 方法

声明式编程

声明式编程(Declarative Programming)的设计思想主要侧重于描述程序的目标或期望结果,而不是明确指定达到这些目标的具体步骤或算法

这种风格使得代码更加简洁和易于理解,因为我们只需要关注业务逻辑,而不需要关注底层实现细节,因为 Stream API 底层已经帮我们做好了后勤工作

惰性计算

在 Stream 框架中,只有终止操作会触发计算,这使用了惰性计算思想

惰性计算的核心思想是仅在真正需要执行某个表达式或函数时才进行计算,而不是在定义时或调用时立即进行计算

这种机制有很多优点,如避免不必要的计算、提升性能、节省空间等

原始数据只有在需要计算时才被计算;一个元素的不同计算逻辑不必完全计算,只有需要的部分才被计算

内部迭代

Stream 框架采用内部迭代的方式处理数据,即数据的迭代和处理由 Stream API 负责,而不需要显式地编写循环操作

并行处理

Stream 框架支持并行处理数据,通过将流转换为并行流(调用 parallel() 方法),可以在多个线程上同时处理数据。这种设计使得 Stream API 能够充分利用多核 CPU 的性能,提高程序的执行效率

Stream 框架中的设计模式

流式接口模式

构建 Stream 时支持链式调用,这是 Stream API 的一大特性,使用了流式接口模式(Fluent Interface Pattern)

流式接口允许方法调用以串行链的方式组合在一起,使得代码更加易读和简洁

流式接口的方法返回值为抽象接口本身,在 Stream 框架中,filter()map()sorted() 等中间操作方法的返回值都是 Stream 接口,因此可以连续添加多个中间操作,最后一个中间操作仍然返回 Stream 接口,因此可以继续添加一个终止操作

public interface Stream<T> extends BaseStream<T, Stream<T>> {
    Stream<T> filter(Predicate<? super T> predicate);

    <R> Stream<R> map(Function<? super T, ? extends R> mapper);
    
    Stream<T> sorted();

    Stream<T> sorted(Comparator<? super T> comparator);
    
    void forEach(Consumer<? super T> action);
    
    // ...
}

模板方法模式

由于 Stream 框架使用内部迭代处理数据,因此框架内部大量使用了模板方法模式,在模板方法中定义通用处理逻辑,模板内部调用用户传入的参数

比如 filter() 中间操作

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    // 钩子方法,具体实现逻辑由应用程序传入
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

在抽象类 ReferencePipeline 中定义了 filter() 操作的处理流程。在创建时,检查 predicate 不可为空;在触发计算时,通过 predicate 的测试结果判断是否要包含该元素

结语

以上是对 Stream API 核心执行原理的一些探析,由于篇幅原因,其中关于并行处理逻辑的部分没有展开,将在后续的文章中详细介绍

参考文档

Processing Data with Java SE 8 Streams, Part 1

Processing Data with Java SE 8 Streams, Part 2

流式接口模式

转载自:https://juejin.cn/post/7371633297154687030
评论
请登录