likes
comments
collection
share

StreamAPI介绍以及基本使用

作者站长头像
站长
· 阅读数 2

StreamAPI

引入

Stream就是流,流式操作就跟生活中的流水线一样。比如我有一批原材料,经过流水线 挑选--> 清洗 --> 包装。就变成了一个产品了。

Stream流就好比我需要组装一条流水线,当数据就像原料一样源源不断的走上流水线的传送带上,经过各种步骤的处理,知道走完所有的流水线,原料数据就组装完成了,最终变成一个我想要的东西。

响应式编程跟传统的命令行编程的思想有一个重大的差异:

命令行编程:拿到一堆数据之后,需要自己写for循环推进,需要自己控制循环逻辑,比如大于2了怎么办,小于3了又怎么办。 响应式编程:拿到一堆数据之后,他需要经过123456步的流水线,来转化成我需要的数据或者数据集。


StreamAPI介绍

StreamAPI介绍以及基本使用

流是什么?流里面有什么? 我们可以把他比喻为一条生产流水线,流水线的传送带上就会有一些东西,这些东西会从原料一步步的加工成为一个完整的产品。 声明式处理集合数据,包括:筛选、转换、组合等,而这些数据,就算流里面的原料。

Stream流的思想:

  1. 首先得有一个流,流里面可以有一堆数据,或者一个数据,甚至是空流都可以。反正得有一个流给我操作。就相当于,一条流水线,可以没有原材料,但是得有存放原材料的仓库,有仓库,流水线才能有起点,流水线才能进行下去。
  2. 中间操作,就是流水线的每一个步骤。比如我想挑出来原料中的瑕疵品,那么流水线的第一个步骤就是选品。每一个流水线的步骤,就叫做流的中间操作。
  3. 新流,在经过一系列的中间操作之后,最终会得到一个新流。比如流水线对原材料进行了选品,加工,分组,包装,就得到了一些新的产品。
  4. 终止操作,得到了新流之后也不能直接使用,因为流是动态的。比如原材料遍历完流水线的所有操作了,但是还放在传送带上传送着。这时候,我们需要把产品从传送带上拿下来,统计数量或者拿到最大值。这个就叫做终止操作
  5. 结果,终止操作中,我们把新流的元素拿出来了或者统计数量了之类的,得到某个结果了,那么整个流操作就结束了。

总结:一个流要处理结束,有三大关键步骤:

  1. 创建流
  2. 有中间操作
  3. 有终止操作

例子:

List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); 我需要找出了list列表里最大的偶数

方法一:用for循环

// 方法1:用for循环 int max = 0; for(Integer integer : list){ if(integer % 2 ==0){ // 是偶数,和max进行比较交换 max = integer >= max ? integer : max; } } System.out.println("最大偶数:" + max);

方法二:用StreamAPI

// 方法2:用Stream流 list.stream() .filter(ele -> { System.out.println("正在filter:" + ele); return ele % 2 == 0; }) .max(Integer::compareTo) .ifPresent(System.out::println);


流管道组成:

解析例子二:

首先来认识几个名词:

Stream Pipeline:流水线、流管道 Intermediate Operations:中间操作 Terminal Operation:终止操作

Stream所有数据和操作被组合成流管道。

  1. 一个数据源(可以是一个数组、集合、生成器函数、I/O管道)
  2. 零个或多个中间操作(将一个流变形成另一个流)
  3. 一个终止操作(产生最终结果)

流是惰性的。只有在启动最终操作时才会对数据源进行计算,而且只有在需要时才会消耗元素。 什么意思呢?让我们来看例子二:

list.stream() // intermediate operation.中间操作 .filter(ele -> { System.out.println("正在filter:" + ele); return ele % 2 == 0; });

如果是像上面一样写的话,没有后面的.max(Integer::compareTo).ifPresent(System.out::println);的话,打印语句是不会执行的。 StreamAPI介绍以及基本使用

因为还没有到终止操作,所以不会对数据源进行计算。比如一条流水线,传送带都修好了,但是最后的收货环节没做好,相当于原料在流水线上加工完成后,没地方可去,那么这条流水线是不会启动的。

在例子二中,

filter()方法在源码的文档中有写是intermediate operation中间操作。 StreamAPI介绍以及基本使用

max()方法则是terminal operation终止操作。 StreamAPI介绍以及基本使用

max()方法后,还可以传一个条件判断ifPresent()StreamAPI介绍以及基本使用

// 方法2:用Stream流 list.stream() // intermediate operation.中间操作 .filter(ele -> { System.out.println("正在filter:" + ele); return ele % 2 == 0; }) //terminal operation.终止操作。过滤出我们想要的值,如果断言返回true就算我们要的值,如果断言返回false,就忽略掉 .max(Integer::compareTo) // 简写:.ifPresent(ele -> System.out.println("最大偶数:" + ele)); .ifPresent(System.out::println); //如果存在这个数,就输出

Stream().filter()是一个中间操作,用于筛选满足特定条件的元素,并生成一个新的流。

它接受一个Predicate函数式接口作为参数,该接口定义了一个用于判断元素是否满足条件的方法。filter()方法会遍历流中的每个元素,对每个元素应用Predicate接口的test()方法,如果返回true,则将该元素包含在新的流中,否则不包含。

相对应的还有collect()方法,如上例子还可以这样写:.filter(n -> n % 2 == 0).collect(Collectors.toList());,使用filter()方法筛选出符合条件(即能被2整除)的元素,最后使用collect()方法将结果收集到一个新的List中。

当加上了.max()方法后,流水线就完整了,再次启动,就能输出所有的filter打印语句 StreamAPI介绍以及基本使用

总结:

  1. 流是惰性的,如,生活中流水线完整了才会启动传送带。
  2. 流管道组成:一个数据源、N个中间操作(N可以为0)、一个终止操作。

StreamAPI的基本使用

一、创建流:

StreamAPI介绍以及基本使用

可以直接创建流:

//1、直接创建流: //创建了一个包含整数1、2和3的流 Stream stream = Stream.of(1, 2, 3); // 使用Stream.concat()方法将两个流连接起来,创建了一个包含整数4、5、6以及之前流中的整数1、2、3的新流 Stream concat = Stream.concat(Stream.of(4, 5, 6), stream); //使用了Stream.builder()创建了一个流构建器,然后添加了两个字符串"11"和"22",最后通过build()方法构建了一个包含这两个字符串的流 Stream build = Stream.builder().add("11").add("22").build();

也可以集合容器中获取这个流

//2、从集合容器中获取这个流,List、Set、Map List list1 = List.of(1, 2); Stream stream1 = list1.stream(); Set integers1 = Set.of(1, 2); integers1.stream(); Map<Object, Object> map = Map.of(); map.keySet().stream(); map.values().stream();

map有Key有Value,可以分别拿这两个的Stream流。


中间操作:

StreamAPI介绍以及基本使用

流量并发还是不并发?和for有什么区别?

要验证这个问题,可以通过代码来测试一下:

System.out.println("主线程: " + Thread.currentThread()); // 流量并发还是不并发?和for有什么区别? long count = Stream.of(1,2,3,4,5) .filter(i -> { System.out.println("filter线程:" + Thread.currentThread()); System.out.println("正在filter:" + i); return i > 2; }) // intermediate operation. 中间操作 .count(); //terminal operation.终止操作

运行结果: StreamAPI介绍以及基本使用

可以看到,主线程和filter线程是同一个,说明流也是用for循环挨个处理的

**但是!**流有一个很帅的效果:可以并发.parallel()方法。 在流的后面调用一下.parallel()方法,就会并发执行

System.out.println("主线程: " + Thread.currentThread()); // 流量并发还是不并发?和for有什么区别? long count = Stream.of(1,2,3,4,5) .parallel() .filter(i -> { System.out.println("filter线程:" + Thread.currentThread()); System.out.println("正在filter:" + i); return i > 2; }) // intermediate operation. 中间操作 .count(); //terminal operation.终止操作 }

运行结果: StreamAPI介绍以及基本使用

这时候可以看到,只有一个线程的名字是跟主线程是一样的。而且也不是按顺序执行的。说明这是个并发的过程。

默认是不并发的,但也可以并发。但只要加上了.parallel()方法,就会变成一个并发流。 并发以后,要自行解决并发安全问题。

什么是并发安全问题呢?看一下下面的例子:

List testList = new ArrayList<>(); // 流量并发还是不并发?和for有什么区别? long count = Stream.of(1,2,3,4,5) .parallel() .filter(i -> { testList.add(i); return i > 2; }) // intermediate operation. 中间操作 .count(); //terminal operation.终止操作

假设线程1执行了之后,testList就会变成了[1], 这个时候,线程2和线程3同时执行,那么testList就会分别变成:线程2:[1,2],线程3:[1,3]。 add了3个数,正确的testList应该是[1,2,3]的,但是因为并发的问题,分别变成了两个不同的列表。 那么接下来线程4执行的时候,是该拿[1,2]还是[1,3]呢?这时候就出问题了。

流的所有操作都是无状态,就是说,流的数据状态仅在本次流有效,不溢出至函数外

我可以在流中处理完数据后把数据交给别人,但是不能在处理的时候让别人帮我存。

在写流的时候,要把写成一个整体外面的容器不能对流里的任何数据有任何的引用、存储、增删改

上面的问题可以用锁来解决:

System.out.println("主线程: " + Thread.currentThread()); List testList = new ArrayList<>(); // 流量并发还是不并发?和for有什么区别? long count = Stream.of(1,2,3,4,5) .parallel() .filter(i -> { synchronized (Object.class){ System.out.println("filter线程:" + Thread.currentThread()); System.out.println("正在filter:" + i); testList.add(i); System.out.println("testList:" + testList); return i > 2; } }) // intermediate operation. 中间操作 .count(); //terminal operation.终止操作

执行结果: StreamAPI介绍以及基本使用

但是用锁之后就串行了,跟之前的不用.parallel()方法是类似的执行过程,所以也相当于没有并发。

synchronized可以控制并发。类似一把琐,所有的线程去抢这把锁,谁先拿到锁谁就先执行。

总结:

如果要用并发,就要明确流里面不会产生问题,是安全的。如果可能会出现问题,就不要用并发。 如果用了并发,且出现问题了,可以用锁解决