查看原文
其他

Java阻塞队列的简单实现

biezhi SpringForAll社区 2021-05-26
点击上方☝SpringForAll社区 轻松关注!
及时获取有趣有料的技术文章

来源:

https://blog.biezhi.me/2019/01/simple-blocking-queue.html

Java 并发常用的组件中有一种队列叫阻塞队列(BlockingQueue),当队列为空时,获取元素的线程会阻塞等待直到队列有数据;当队列满时,想要存储元素的线程会阻塞等待直到队列有空间。我们经常会用这种数据结构可以实现生产者、消费者模型。

本文会通过两种方式来实现简单的有界阻塞队列,在最后分别测试不同实现的性能差异。

Monitor 和 Condition

看过 Java 并发相关书籍的同学应该都见过 Monitor 这个词,有人称为监视器也有人叫它管程,不过都是一个意思:一个同步工具,相当于操作系统中的互斥量(mutex),即值为 1 的信号量。

synchronized 关键词的背后就是靠 monitor 实现的,monitor 的重要特点是,同一个时刻,只有一个线程能进入 monitor 中定义的临界区,这使得 monitor 能够达到互斥的效果。但仅仅有互斥的作用是不够的,无法进入 monitor 临界区的线程应该被阻塞,在必要的时候可以被唤醒,所以 Java 提供了 wait 和 notifynotifyAll 的 API 给我们使用。

  • wait(): 让当前线程进入等待队列,同时会释放锁,直到被唤醒。

  • notify(): 从条件队列中随机唤醒一个线程,让它去参与锁竞争

  • notifyAll(): 唤醒条件队列中的所有线程,让它们去参与锁竞争


实现同步的一种方式是使用 synchronized 关键字,还可以使用 Lock 接口下的实现来完成,比如 ReentrantLock,它是一把重入锁(synchronized 也是),基于 AQS 并发框架实现。我们可以使用它来进行加锁和释放锁,如果遇到有条件需要阻塞可以使用 Condition API。

  • Lock#newCondition(): 创建一个新的条件

  • Condition#await(): 让当前线程等待

  • Condition#signal(): 唤醒一个等待线程

条件和锁总是息息相关,在没有 Lock 接口的时候你会发现 monitor 机制有一个严重的问题:一把锁只能对应一个条件(也就是只可以做一次 wait),那么在唤醒的时候就可能出现唤醒丢失。举个例子,在两个方法上有不同的条件会导致阻塞,它们持有一把锁,唤醒时候如果用 notify 只会从条件队列选择一个,使用 notifyAll 会带来大量的 CPU 上下文切换和锁竞争,伪代码如下:

1synchronized void foo() {
2    while(CONDITION1){
3        wait();
4    }
5    notifyAll();
6}
7synchronized void bar() {
8    while(CONDITION2){
9        wait();
10    }
11    notifyAll();
12}

具体实现

实现思路

我们通过定义一个 Queue 接口来实现两种队列,该队列是有界队列,使用数组的方式实现,如果你有兴趣也可以使用链表或栈来实现这个队列。提供 put 方法添加元素(满了则阻塞),take 方法弹出元素(没有元素则阻塞)。

定义接口

1public interface Queue<E> {
2
3    // 添加新元素,当队列满则阻塞
4    void put(E e) throws InterruptedException;
5
6    // 弹出队头元素,当队列空则阻塞
7    E take() throws InterruptedException;
8
9    // 队列元素个数
10    int size();
11
12    // 队列是否为空
13    boolean isEmpty();
14
15}

基于 synchronized 的实现

核心思路:

  • 添加元素时队列满则阻塞

  • 弹出元素时队列空则阻塞

  • 添加元素后唤醒消费者

  • 弹出元素后唤醒生产者

1public class BlockingQueueWithSync<E> implements Queue<E> {
2
3    private E[] array;
4    private int head;  // 队头指针
5    private int tail;  // 队尾指针
6
7    private volatile int size; // 队列元素个数
8
9    public BlockingQueueWithSync(int capacity) {
10        array = (E[]) new Object[capacity];
11    }
12
13    @Override
14    public synchronized void put(E e) throws InterruptedException {
15        // 当队列满的时候阻塞
16        while (size == array.length) {
17            this.wait();
18        }
19
20        array[tail] = e;
21        // 队列装满后索引归零
22        if (++tail == array.length) {
23            tail = 0;
24        }
25        ++size;
26        // 通知其他消费端有数据了
27        this.notifyAll();
28    }
29
30    @Override
31    public synchronized E take() throws InterruptedException {
32        // 当队列空的时候阻塞
33        while (isEmpty()) {
34            this.wait();
35        }
36
37        E element = array[head];
38        // 消费完后从0开始
39        if (++head == array.length) {
40            head = 0;
41        }
42        --size;
43        // 通知其他生产者可以生产了
44        this.notifyAll();
45        return element;
46    }
47
48    @Override
49    public synchronized boolean isEmpty() {
50        return size == 0;
51    }
52
53    @Override
54    public synchronized int size() {
55        return size;
56    }
57
58}

基于 ReentrantLock 的实现

1public class BlockingQueueWithLock<E> implements Queue<E> {
2
3    private E[] array;
4    private int head;
5    private int tail;
6
7    private volatile int size;
8
9    private Lock      lock     = new ReentrantLock();
10    private Condition notFull  = lock.newCondition();
11    private Condition notEmpty = lock.newCondition();
12
13    public BlockingQueueWithLock(int capacity) {
14        array = (E[]) new Object[capacity];
15    }
16
17    @Override
18    public void put(E e) throws InterruptedException {
19        lock.lockInterruptibly();
20        try {
21            // 队列满,阻塞
22            while (size == array.length) {
23                notFull.await();
24            }
25            array[tail] = e;
26            if (++tail == array.length) {
27                tail = 0;
28            }
29            ++size;
30            notEmpty.signal();
31        } finally {
32            lock.unlock();
33        }
34    }
35
36    @Override
37    public E take() throws InterruptedException {
38        lock.lockInterruptibly();
39        try {
40            // 队列空,阻塞
41            while (isEmpty()) {
42                notEmpty.await();
43            }
44            E element = array[head];
45            if (++head == array.length) {
46                head = 0;
47            }
48            --size;
49            // 通知isFull条件队列有元素出去
50            notFull.signal();
51            return element;
52        } finally {
53            lock.unlock();
54        }
55    }
56
57    @Override
58    public boolean isEmpty() {
59        lock.lock();
60        try {
61            return size == 0;
62        } finally {
63            lock.unlock();
64        }
65    }
66
67    @Override
68    public int size() {
69        lock.lock();
70        try {
71            return size;
72        } finally {
73            lock.unlock();
74        }
75    }
76
77}

对比性能

1public class Benchmark {
2
3    @Test
4    public void testWithMonitor() 
{
5        Queue<Integer> queue = new BlockingQueueWithSync<>(5);
6        execute(queue);
7    }
8
9    @Test
10    public void testWithCondition() 
{
11        Queue<Integer> queue = new BlockingQueueWithLock<>(5);
12        execute(queue);
13    }
14
15    private void execute(Queue<Integer> queue) {
16        ExecutorService executorService = Executors.newCachedThreadPool();
17        for (int i = 1; i <= 1000; i++) {
18            final int finalNum = i;
19            executorService.execute(() -> {
20                try {
21                    queue.put(finalNum);
22                    Integer take = queue.take();
23                    System.out.println("item: " + take);
24                } catch (InterruptedException e) {
25                    e.printStackTrace();
26                }
27            });
28        }
29        executorService.shutdown();
30    }
31
32}

这个测试程序让 2 个队列的可存储的元素数都为 5,开启 1000 个线程进行 put 和 take 操作,运行后查看总耗时。

可以看出,使用 synchronized 的方式性能较差。




● java匠人手法-优雅的处理空值

● REST API 的安全基础

● 深入浅出 CAS

● Spring Boot Devtools热部署

● Spring Boot AOP记录用户操作日志

● Spring Boot整合Mongo DB

● 【图文讲解】你一定能看懂的HTTPS原理剖析!

●  基础面试,为什么面试官总喜欢问String?

●  Spring Boot Admin 2.2.0发布,支持最新Spring Boot/Cloud之外,新增中文展示!

●  你应该知道的 @ConfigurationProperties 注解的使用姿势,这一篇就够了







如有收获,请帮忙转发,您的鼓励是作者最大的动力,谢谢!

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存