查看原文
其他

超简单,实现Java线程池

学研妹 Java学研大本营 2024-01-02

本文介绍了如何使用线程池解决高并发进程函数调用的问题,包含线程池的概念、实现方法和使用场景。

长按关注《Java学研大本营》

1 问题描述

假设我们正在编写一个简单的应用程序,该应用程序从客户端接收一些输入,对其进行一些CPU密集型处理,然后记录输出。我们编写的代码看起来像以下内容:

class ProcessingLibrary {
 public void process (Input userInput) {
 // 一些CPU密集型逻辑,用于处理用户输入
 userInput.process();

 // 记录结果
 Logger.log(userInput.getResults());
 }
}

看起来很简单,一个普通的函数,它接受用户输入,对其进行一些处理,然后返回输出,我们可以将这个库提供给客户端。但是,你的客户如果正在高并发地调用该进程函数,很快他们可能会抱怨他们的请求处理输入的时间太长。原因非常简单,当外部客户端调用你的函数时,调用线程被阻塞,因为实际处理是由调用线程进行的。

2 初步解决方案

添加另一个要求:

  • 调用线程不应该被阻塞。

为了解决这个问题,我们可以考虑利用客户端的CPU核心并将多线程纳入我们的代码。我们进行一些修改,具体如下:

class ProcessingLibrary {
 public void process (Input userInput) {
 Runnable runnable = () -> {
 userInput.process();
 Logger.log(userInput.getResults());
 };

 Thread thread = new Thread(runnable);
 thread.start();
 }
}

对于每个请求,最好不要阻塞调用线程,而是生成一个新线程来进行重度处理。调用线程可以从我们的处理方法中提前返回。确实,这比我们早期的版本有很大的改进。我们的客户开始使用新版本,他们比以前更满意了。但现在他们开始抱怨CPU使用率变得过高,并且在他们那边发生了崩溃。发生了什么?如果我们仔细检查代码,我们会发现我们没有限制正在生成的线程数!如果客户以非常高的速率调用进程函数,可能会生成数百甚至数千个线程,这对CPU来说是很大的开销。我们还必须限制生成的线程数。

3 线程池

注意!我们这里又添加了一个要求:

  • 调用线程不应该被阻塞。

  • 我们的逻辑不应生成无限多数量的线程。

我们想要的是,调用线程不应该被阻塞,我们应该生成一定数量的线程,由客户根据他们的CPU资源和他们想要实现的并行度决定。实际数量取决于请求到达的速率和平均请求时间。线程池是解决这个问题的理想解决方案。线程池是一个简单的概念,可以并行执行应用程序代码并利用CPU核心。线程池包含一组固定数量的可重复使用的工作线程,它们执行分配给它们的任务,而不会阻塞调用线程。我们下面看看如何实现一个简单的线程池。

我们可以得出几个简单的观察:

  • 我们的线程应该是可重复使用的,并且应该在请求到达时惰性创建。线程创建是一个昂贵的过程(至少在Java中是这样)。

  • 如果请求到达的速率远高于线程池中的线程数,我们可以在其他请求执行完毕时将请求输入保持在等待状态,然后当一个线程完成处理一个请求时,它可以从请求行中获取另一个请求并开始处理它。通过这种方式,我们仍然可以实现相当高的并行性并获得更多的请求吞吐量。

  • 队列可以成为存储我们传入请求的良好数据结构,而我们的线程可以在完成先前的项目后不断地从队列中获取项目。

考虑以上要求,我们为线程池编写一个简单的类。

class ThreadPool {
 private BlockingQueue<Runnable> taskQueue;
 private Integer poolSize;
 private AtomicInteger currentPoolSize;

 public ThreadPool(int poolSize) {
 this.poolSize = poolSize;
 this.taskQueue = new LinkedBlockingQueue<>();
 this.currentPoolSize = new AtomicInteger(0);
 }
 
 public void submitTask(Runnable runnable) {
 this.taskQueue.add(runnable);
 
 if(this.currentPoolSize.get() < this.poolSize) {
 // 如果有更多的池大小可用,创建一个新线程
 // 这个线程也应该被重新用于未来任务
 // 因此,它应该继续从队列中寻找更多的任务
 
 this.currentPoolSize.incrementAndGet();
 this.createSingleThreadForPool();
 }
 }

 private void createSingleThreadForPool() {
 Runnable poolRunner = () -> {
 while(true) {
 if(this.taskQueue.size() > 0) {
 Runnable taskFromQueue = this.taskQueue.poll();
 taskFromQueue.run();
 }
 }
 };

 new Thread(poolRunner).start();
 }
}

从以上实现中可以得到以下几点:

  • BlockingQueue是一个线程安全的队列实现。我们需要确保线程安全,因为多个线程正在访问共享状态。AtomicInteger也是如此,用于线程安全更新我们当前的池大小。

  • 池运行者中的while循环是为了确保该线程保持活动状态,以便我们在想要接收更多任务时可以继续运行。

我们可以更改我们对ProcessingLibrary的实现,如下:

class ProcessingLibrary {
 private ThreadPool threadPool;
 
 public ProcessingLibrary(int poolSize) {
 this.threadPool = new ThreadPool(poolSize);
 }

 public void process (Input userInput) {
 Runnable runnable = () -> {
 userInput.process();
 Logger.log(userInput.getResults());
 };

 this.threadPool.submitTask(runnable);
 }
}

现在,我们已经满足了对这个问题的两个要求 :)

在Java中,concurrent库提供了与此类似的内容,称为ExecutorService。虽然我们讨论的实现有一些注意事项,例如我们生成的线程一直在等待,但这是一个理解线程池内部工作原理的良好起点。

推荐书单

《Java编程讲义》

《Java编程讲义》根据目前Java开发领域的实际需求,从初学者角度出发,详细讲解了Java技术的基础知识。全书共15章,包括Java开发入门,Java语言基础,Java控制结构,数组,面向对象编程,继承和多态,抽象类、接口和内部类,异常处理,Java常用类库,集合与泛型,Lambda表达式,输入-输出流,多线程,JDBC数据库技术,网络编程等内容。内容全面覆盖.1ava开发必备的基础知识点,结合生活化案例展开讲解,程序代码给出了详细的注释,能够使初学者轻松领会Java技术精髓,快速掌握Java开发技能。   《Java编程讲义》适合作为高等院校相关专业的教材及教学参考书,也适合作为Java开发入门者的自学用书,还可供开发人员查阅、参考。

购买链接:https://item.jd.com/13495830.html

精彩回顾

超简单,精通Java异常处理

详解FPGA —— 下一代AI算力芯片(下)

详解FPGA —— 下一代AI算力芯片(中)

详解FPGA —— 下一代AI算力芯片(上)

9个步骤,手把手教你在Windows上安装Hadoop

长按关注《Java学研大本营》
长按访问【IT今日热榜】,发现每日技术热点
继续滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存