查看原文
其他

Java 8 并行流介绍

ImportNew ImportNew 2021-12-02

(给ImportNew加星标,提高Java技能)

编译:ImportNew/唐尤华

medium.com/javarevisited/java-8-parallel-stream-java2blog-e1254e593763


在这篇文章中,我们将介绍 Java 并行流(Parallel Stream)。


[Java 8][1] 引入了"并行流"概念实现并行处理。随着硬件成本降低,现在的 CPU 大都拥有多个核心,因此可以使用并行处理加快操作执行。


[1]:https://java2blog.com/java-8-tutorial/


让我们通过一个简单的例子来帮助理解:


```java
package org.arpit.java2blog.java8;
import java.util.Arrays;
import java.util.stream.IntStream;
public class Java8ParallelStreamMain {
public static void main(String[] args) {
System.out.println("=================================");
System.out.println("Using Sequential Stream");
System.out.println("=================================");
int[] array = {1,2,3,4,5,6,7,8,9,10};
IntStream intArrStream = Arrays.stream(array);
intArrStream.forEach(s->
{
System.out.println(s+" "+Thread.currentThread().getName());
});
System.out.println("=================================");
System.out.println("Using Parallel Stream");
System.out.println("=================================");
IntStream intParallelStream=Arrays.stream(array).parallel();
intParallelStream.forEach(s->
{
System.out.println(s+" "+Thread.currentThread().getName());
});
}
}
```


运行上面的程序,输出结果如下:


```shell
=================================
Using Sequential Stream
=================================
1 main
2 main
3 main
4 main
5 main
6 main
7 main
8 main
9 main
10 main
=================================
Using Parallel Stream
=================================
7 main
6 ForkJoinPool.commonPool-worker-3
3 ForkJoinPool.commonPool-worker-1
9 ForkJoinPool.commonPool-worker-2
2 ForkJoinPool.commonPool-worker-3
5 ForkJoinPool.commonPool-worker-1
10 ForkJoinPool.commonPool-worker-2
1 ForkJoinPool.commonPool-worker-3
8 ForkJoinPool.commonPool-worker-2
4 ForkJoinPool.commonPool-worker-1
```


仔细观察输出结果可以看到,在顺序流情况下,主线程将完成所有工作。主线程会等待当前迭代完成,然后接着进行下一次迭代。


在[并行流][2]的情况下,会同时产生4个线程,在内部使用 `ForkJoinPool` 创建和管理线程。并行流通过 `ForkJoinPool.commonPool()` 静态方法创建 `ForkJoinPool` 实例。


[2]:http://www.java67.com/2018/10/java-8-stream-and-functional-programming-interview-questions-answers.html


并行流会利用所有可用 CPU 核心并行处理任务。如果任务数超过了 CPU 核心数量,那么其余任务将等待当前正在运行的任务完成。


并行流很酷,可以一直用吗?


"当然不是!"


虽然只加上 `.parallel` 就可以把[一个流][3]转为并行流,但这并不表示所有情况都适用。


[3]:https://java2blog.com/java-8-stream-filter-examples/


在使用并行流时,需要考虑许多因素,否则将会遭遇并行流带来的负面影响。


并行流的开销远高于顺序流,而且协调线程需要花费大量时间。


[4]:http://www.java67.com/2014/04/java-8-stream-examples-and-tutorial.html


当且仅当以出现下面的情况时,需要考虑使用并行流:


  •  需要处理大量数据;

  • 正如你所知道的那样,Java 使用了 [ForkJoinPool][5] 实现并行。`ForkJoinPool` 会把输入流拆分后提交执行,因此需要确保输入流是可拆分的。


[5]:http://javarevisited.blogspot.sg/2016/12/difference-between-executor-framework-and-ForkJoinPool-in-Java.html


例如:


[ArrayList][6] 非常易于拆分,因为可以通过索引找到一个中间元素进行拆分。但 `LinkedList` 很难拆分,而且大多数情况下运行效果并不好。


  • 实际使用中会遇到麻烦的性能问题;

  • 需要确保线程之间的所有共享资源都能正确同步,否则可能会产生意外的结果。


[6]:https://javarevisited.blogspot.com/2011/05/example-of-arraylist-in-java-tutorial.html


衡量并行性最简单的公式是 Brian Goetz 在他的演讲中提到的 “NQ” 模型。


"NQ 模型:"


```
N x Q >10000
```


这里,


  • N = 数据元素数量

  • Q = 每个元素执行的工作量


这意味着,如果数据元素数量很大且每个元素执行的工作量较小(如加法操作),并行性可能会让程序更快地运行,反之亦然。如果数据元素数量较少且每个元素执行的工作量较大(比如做一些复杂的计算工作),并行处理也可能会更快地得到结果。


让我们看看另一个例子。


下面的示例将展示在执行长时间计算时,采用并行流或顺序流 CPU 的不同表现。我们将做一些耗时的计算让 CPU 忙碌起来。


```java
package org.arpit.java2blog.java8;
import java.util.ArrayList;
import java.util.List;
public class PerformanceComparisonMain {
public static void main(String[] args) {

long currentTime=System.currentTimeMillis();
List<Integer> data=new ArrayList<Integer>();
for (int i = 0; i < 100000; i++) {
data.add(i);
}

long sum=data.stream()
.map(i ->(int)Math.sqrt(i))
.map(number->performComputation(number))
.reduce(0,Integer::sum);

System.out.println(sum);
long endTime=System.currentTimeMillis();
System.out.println("Time taken to complete:"+(endTime-currentTime)/(1000*60)+" minutes");

}

public static int performComputation(int number)
{
int sum=0;
for (int i = 1; i < 1000000; i++) {
int div=(number/i);
sum+=div;

}
return sum;
}
}
```


运行上面的程序结果如下:


>>>
117612733
Time taken to complete:6 minutes
>>>


然而这里不关心输出,而是关心在执行操作时 CPU 的表现。



如图所示,在顺序流情况下,CPU 没有得到充分利用。


让我们修改第16行使并行流,再次运行程序。


```java
long sum=data.stream()
.parallel()
.map(i ->(int)Math.sqrt(i))
.map(number->performComputation(number))
.reduce(0,Integer::sum);
```


使用并行流执行,输出的结果中耗时明显减少。


>>>
117612733
Time taken to complete:3 minutes
>>>


查看使用并行流运行程序时 CPU 的历史记录。



如图所示,并行流使用了所有4个 CPU 核心进行计算。


以上是有关 Java 并行流的所有介绍。


推荐阅读

(点击标题可跳转阅读)

投行的 15 个多线程和并发面试题

重构大型业务型写接口 :并行处理注意点

并行化资源池队列 2 — 无锁化的无界队列


看完本文有收获?请转发分享给更多人

关注「ImportNew」,提升Java技能

好文章,我在看❤️

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存