查看原文
其他

死磕 java线程——自己动手写一个线程池

丹卿 漫话编程 2020-10-29

(友情提示:手机横屏看源码更方便)


问题

(1)自己动手写一个线程池需要考虑哪些因素?

(2)自己动手写的线程池如何测试?

简介

线程池是Java并发编程中经常使用到的技术,那么自己如何动手写一个线程池呢?本文彤哥将手把手带你写一个可用的线程池。

属性分析

线程池,顾名思义它首先是一个“池”,这个池里面放的是线程,线程是用来执行任务的。

首先,线程池中的线程应该是有类别的,有的是核心线程,有的是非核心线程,所以我们需要两个变量标识核心线程数量coreSize和最大线程数量maxSize。

为什么要区分是否为核心线程呢?这是为了控制系统中线程的数量。

当线程池中线程数未达到核心线程数coreSize时,来一个任务加一个线程是可以的,也可以提高任务执行的效率。

当线程池中线程数达到核心线程数后,得控制一下线程的数量,来任务了先进队列,如果任务执行足够快,这些核心线程很快就能把队列中的任务执行完毕,完全没有新增线程的必要。

当队列中任务也满了,这时候光靠核心线程就无法及时处理任务了,所以这时候就需要增加新的线程了,但是线程也不能无限制地增加,所以需要控制其最大线程数量maxSize。

其次,我们需要一个任务队列来存放任务,这个队列必须是线程安全的,我们一般使用BlockingQueue阻塞队列来充当,当然使用ConcurrentLinkedQueue也是可以的(注意ConcurrentLinkedQueue不是阻塞队列,不能运用在jdk的线程池中)。

最后,当任务越来越多而线程处理却不及时,迟早会达到一种状态,队列满了,线程数也达到最大线程数了,这时候怎么办呢?这时候就需要走拒绝策略了,也就是这些无法及时处理的任务怎么办的一种策略,常用的策略有丢弃当前任务、丢弃最老的任务、调用者自己处理、抛出异常等。

根据上面的描述,我们定义一个线程池一共需要这么四个变量:核心线程数coreSize、最大线程数maxSize、阻塞队列BlockingQueue、拒绝策略RejectPolicy。

另外,为了便于给线程池一个名称,我们再加一个变量:线程池的名称name。

所以我们得出了线程池的属性及构造方法大概如下:

  1. public class MyThreadPoolExecutor implements Executor {

  2. /**

  3. * 线程池的名称

  4. */

  5. private String name;

  6. /**

  7. * 核心线程数

  8. */

  9. private int coreSize;

  10. /**

  11. * 最大线程数

  12. */

  13. private int maxSize;

  14. /**

  15. * 任务队列

  16. */

  17. private BlockingQueue<Runnable> taskQueue;

  18. /**

  19. * 拒绝策略

  20. */

  21. private RejectPolicy rejectPolicy;


  22. public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {

  23. this.name = name;

  24. this.coreSize = coreSize;

  25. this.maxSize = maxSize;

  26. this.taskQueue = taskQueue;

  27. this.rejectPolicy = rejectPolicy;

  28. }

  29. }

任务流向分析

根据上面的属性分析,基本上我们已经得到了任务流向的完整逻辑:

首先,如果运行的线程数小于核心线程数,直接创建一个新的核心线程来运行新的任务。

其次,如果运行的线程数达到了核心线程数,则把新任务入队列。

然后,如果队列也满了,则创建新的非核心线程来运行新的任务。

最后,如果非核心线程数也达到最大了,那就执行拒绝策略。

代码逻辑大致如下:

  1. @Override

  2. public void execute(Runnable task) {

  3. // 正在运行的线程数

  4. int count = runningCount.get();

  5. // 如果正在运行的线程数小于核心线程数,直接加一个线程

  6. if (count < coreSize) {

  7. // 注意,这里不一定添加成功,addWorker()方法里面还要判断一次是不是确实小

  8. if (addWorker(task, true)) {

  9. return;

  10. }

  11. // 如果添加核心线程失败,进入下面的逻辑

  12. }


  13. // 如果达到了核心线程数,先尝试让任务入队

  14. // 这里之所以使用offer(),是因为如果队列满了offer()会立即返回false

  15. if (taskQueue.offer(task)) {

  16. // do nothing,为了逻辑清晰这里留个空if

  17. } else {

  18. // 如果入队失败,说明队列满了,那就添加一个非核心线程

  19. if (!addWorker(task, false)) {

  20. // 如果添加非核心线程失败了,那就执行拒绝策略

  21. rejectPolicy.reject(task, this);

  22. }

  23. }

  24. }

创建线程逻辑分析

首先,创建线程的依据是正在运行的线程数量有没有达到核心线程数或者最大线程数,所以我们还需要一个变量runningCount用来记录正在运行的线程数。

其次,这个变量runningCount需要在并发环境下加加减减,所以这里需要使用到Unsafe的CAS指令来控制其值的修改,用了CAS就要给这个变量加上volatile修饰,为了方便我们这里直接使用AtomicInteger来作为这个变量的类型。

然后,因为是并发环境中,所以需要判断runningCount < coreSize(或maxSize)(条件一)的同时修改runningCount CAS加一(条件二)成功了才表示可以增加一个线程,如果条件一失败则表示不能再增加线程了直接返回false,如果条件二失败则表示其它线程先修改了runningCount的值,则重试。

最后,创建一个线程并运行新任务,且不断从队列中拿任务来运行。

代码逻辑如下:

  1. private boolean addWorker(Runnable newTask, boolean core) {

  2. // 自旋判断是不是真的可以创建一个线程

  3. for (; ; ) {

  4. // 正在运行的线程数

  5. int count = runningCount.get();

  6. // 核心线程还是非核心线程

  7. int max = core ? coreSize : maxSize;

  8. // 不满足创建线程的条件,直接返回false

  9. if (count >= max) {

  10. return false;

  11. }

  12. // 修改runningCount成功,可以创建线程

  13. if (runningCount.compareAndSet(count, count + 1)) {

  14. // 线程的名字

  15. String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();

  16. // 创建线程并启动

  17. new Thread(() -> {

  18. System.out.println("thread name: " + Thread.currentThread().getName());

  19. // 运行的任务

  20. Runnable task = newTask;

  21. // 不断从任务队列中取任务执行,如果取出来的任务为null,则跳出循环,线程也就结束了

  22. while (task != null || (task = getTask()) != null) {

  23. try {

  24. // 执行任务

  25. task.run();

  26. } finally {

  27. // 任务执行完成,置为空

  28. task = null;

  29. }

  30. }

  31. }, threadName).start();


  32. break;

  33. }

  34. }


  35. return true;

  36. }

取任务逻辑分析

从队列中取任务应该使用take()方法,这个方法会一直阻塞直至取到任务或者中断,如果中断了就返回null,这样当前线程也就可以安静地结束了,另外还要注意中断了记得把runningCount减一。

  1. private Runnable getTask() {

  2. try {

  3. // take()方法会一直阻塞直到取到任务为止

  4. return taskQueue.take();

  5. } catch (InterruptedException e) {

  6. // 线程中断了,返回null可以结束当前线程

  7. // 当前线程都要结束了,理应要把runningCount的数量减一

  8. runningCount.decrementAndGet();

  9. return null;

  10. }

  11. }

好了,到这里我们自己的线程池就写完了,下面我们一起来想想怎么测试呢?

测试逻辑分析

我们再来回顾下自己的写的线程池的构造方法:

  1. public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {

  2. this.name = name;

  3. this.coreSize = coreSize;

  4. this.maxSize = maxSize;

  5. this.taskQueue = taskQueue;

  6. this.rejectPolicy = rejectPolicy;

  7. }

name,这个随便传;

coreSize,我们假设为5;

maxSize,我们假设为10;

taskQueue,任务队列,既然我们设置的是有边界的,我们就用最简单的ArrayBlockingQueue好吧,容量设置为15,这样里面最多可以存储15条任务;

rejectPolicy,拒绝策略,我们假设使用丢弃当前任务的策略,OK,我们来实现一个。

  1. /**

  2. * 丢弃当前任务

  3. */

  4. public class DiscardRejectPolicy implements RejectPolicy {

  5. @Override

  6. public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {

  7. // do nothing

  8. System.out.println("discard one task");

  9. }

  10. }

OK,这样一个线程池就创建完成了,下面就是执行任务了,我们假设通过for循环连续不断地添加100个任务好不好。

  1. public class MyThreadPoolExecutorTest {

  2. public static void main(String[] args) {

  3. Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());

  4. AtomicInteger num = new AtomicInteger(0);


  5. for (int i = 0; i < 100; i++) {

  6. threadPool.execute(()->{

  7. try {

  8. Thread.sleep(1000);

  9. System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());

  10. } catch (InterruptedException e) {

  11. e.printStackTrace();

  12. }

  13. });

  14. }

  15. }

  16. }

我们分析下这段程序:

(1)先连续创建了5个核心线程,并执行了新任务;

(2)后面的15个任务进了队列;

(3)队列满了,又连续创建了5个线程,并执行了新任务;

(4)后面的任务就没得执行了,全部走了丢弃策略;

(5)所以真正执行成功的任务应该是 5 + 15 + 5 = 25 条任务;

运行之:

  1. thread name: core_test2

  2. thread name: core_test5

  3. thread name: core_test3

  4. thread name: core_test4

  5. thread name: core_test1

  6. thread name: test6

  7. thread name: test7

  8. thread name: test8

  9. thread name: test9

  10. discard one task

  11. thread name: test10

  12. discard one task

  13. ...省略被拒绝的任务


  14. discard one task

  15. running: 1570546871851: 2

  16. running: 1570546871851: 8

  17. running: 1570546871851: 7

  18. running: 1570546871851: 6

  19. running: 1570546871851: 5

  20. running: 1570546871851: 3

  21. running: 1570546871851: 4

  22. running: 1570546871851: 1

  23. running: 1570546871851: 10

  24. running: 1570546871851: 9

  25. running: 1570546872852: 14

  26. running: 1570546872852: 20

  27. running: 1570546872852: 19

  28. running: 1570546872852: 17

  29. running: 1570546872852: 18

  30. running: 1570546872852: 16

  31. running: 1570546872852: 15

  32. running: 1570546872852: 12

  33. running: 1570546872852: 13

  34. running: 1570546872852: 11

  35. running: 1570546873852: 21

  36. running: 1570546873852: 24

  37. running: 1570546873852: 23

  38. running: 1570546873852: 25

  39. running: 1570546873852: 22

可以看到,创建了5个核心线程、5个非核心线程,成功执行了25条任务,完成没问题,完美^^。

总结

(1)自己动手写一个线程池需要考虑的因素主要有:核心线程数、最大线程数、任务队列、拒绝策略。

(2)创建线程的时候要时刻警惕并发的陷阱;

彩蛋

我们知道,jdk自带的线程池还有两个参数:keepAliveTime、unit,它们是干什么的呢?

答:它们是用来控制何时销毁非核心线程的,当然也可以销毁核心线程,具体的分析请期待下一章吧。

完整源码

Executor接口

  1. public interface Executor {

  2. void execute(Runnable command);

  3. }

MyThreadPoolExecutor线程池实现类

  1. /**

  2. * 自动动手写一个线程池

  3. */

  4. public class MyThreadPoolExecutor implements Executor {


  5. /**

  6. * 线程池的名称

  7. */

  8. private String name;

  9. /**

  10. * 线程序列号

  11. */

  12. private AtomicInteger sequence = new AtomicInteger(0);

  13. /**

  14. * 核心线程数

  15. */

  16. private int coreSize;

  17. /**

  18. * 最大线程数

  19. */

  20. private int maxSize;

  21. /**

  22. * 任务队列

  23. */

  24. private BlockingQueue<Runnable> taskQueue;

  25. /**

  26. * 拒绝策略

  27. */

  28. private RejectPolicy rejectPolicy;

  29. /**

  30. * 当前正在运行的线程数

  31. * 需要修改时线程间立即感知,所以使用AtomicInteger

  32. * 或者也可以使用volatile并结合Unsafe做CAS操作(参考Unsafe篇章讲解)

  33. */

  34. private AtomicInteger runningCount = new AtomicInteger(0);


  35. public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {

  36. this.name = name;

  37. this.coreSize = coreSize;

  38. this.maxSize = maxSize;

  39. this.taskQueue = taskQueue;

  40. this.rejectPolicy = rejectPolicy;

  41. }


  42. @Override

  43. public void execute(Runnable task) {

  44. // 正在运行的线程数

  45. int count = runningCount.get();

  46. // 如果正在运行的线程数小于核心线程数,直接加一个线程

  47. if (count < coreSize) {

  48. // 注意,这里不一定添加成功,addWorker()方法里面还要判断一次是不是确实小

  49. if (addWorker(task, true)) {

  50. return;

  51. }

  52. // 如果添加核心线程失败,进入下面的逻辑

  53. }


  54. // 如果达到了核心线程数,先尝试让任务入队

  55. // 这里之所以使用offer(),是因为如果队列满了offer()会立即返回false

  56. if (taskQueue.offer(task)) {

  57. // do nothing,为了逻辑清晰这里留个空if

  58. } else {

  59. // 如果入队失败,说明队列满了,那就添加一个非核心线程

  60. if (!addWorker(task, false)) {

  61. // 如果添加非核心线程失败了,那就执行拒绝策略

  62. rejectPolicy.reject(task, this);

  63. }

  64. }

  65. }


  66. private boolean addWorker(Runnable newTask, boolean core) {

  67. // 自旋判断是不是真的可以创建一个线程

  68. for (; ; ) {

  69. // 正在运行的线程数

  70. int count = runningCount.get();

  71. // 核心线程还是非核心线程

  72. int max = core ? coreSize : maxSize;

  73. // 不满足创建线程的条件,直接返回false

  74. if (count >= max) {

  75. return false;

  76. }

  77. // 修改runningCount成功,可以创建线程

  78. if (runningCount.compareAndSet(count, count + 1)) {

  79. // 线程的名字

  80. String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();

  81. // 创建线程并启动

  82. new Thread(() -> {

  83. System.out.println("thread name: " + Thread.currentThread().getName());

  84. // 运行的任务

  85. Runnable task = newTask;

  86. // 不断从任务队列中取任务执行,如果取出来的任务为null,则跳出循环,线程也就结束了

  87. while (task != null || (task = getTask()) != null) {

  88. try {

  89. // 执行任务

  90. task.run();

  91. } finally {

  92. // 任务执行完成,置为空

  93. task = null;

  94. }

  95. }

  96. }, threadName).start();


  97. break;

  98. }

  99. }


  100. return true;

  101. }


  102. private Runnable getTask() {

  103. try {

  104. // take()方法会一直阻塞直到取到任务为止

  105. return taskQueue.take();

  106. } catch (InterruptedException e) {

  107. // 线程中断了,返回null可以结束当前线程

  108. // 当前线程都要结束了,理应要把runningCount的数量减一

  109. runningCount.decrementAndGet();

  110. return null;

  111. }

  112. }


  113. }

RejectPolicy拒绝策略接口

  1. public interface RejectPolicy {

  2. void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor);

  3. }

DiscardRejectPolicy丢弃策略实现类

  1. /**

  2. * 丢弃当前任务

  3. */

  4. public class DiscardRejectPolicy implements RejectPolicy {

  5. @Override

  6. public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {

  7. // do nothing

  8. System.out.println("discard one task");

  9. }

  10. }

测试类

  1. public class MyThreadPoolExecutorTest {

  2. public static void main(String[] args) {

  3. Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());

  4. AtomicInteger num = new AtomicInteger(0);


  5. for (int i = 0; i < 100; i++) {

  6. threadPool.execute(()->{

  7. try {

  8. Thread.sleep(1000);

  9. System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());

  10. } catch (InterruptedException e) {

  11. e.printStackTrace();

  12. }

  13. });

  14. }


  15. }

  16. }



推荐阅读:


喜欢我可以给我设为星标哦

好文章,我 在看 

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存