查看原文
其他

【并发技术13】条件阻塞Condition的应用

倪升武 武哥聊编程 2022-08-24


阅读本文大概需要6分钟


今天周六,该休息休息,该浪浪,武哥还是来聊聊技术吧,如题。

Condition 将 Object 监听器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用, Condition 替代了 Object 监视器方法的使用。

1. Condition的基本使用

由于 Condition 可以用来替代 wait、notify 等方法,所以可以对比着之前写过的传统线程同步通信技术的代码来看,先来回顾一下原来的问题:

有两个线程,子线程先执行10次,然后主线程执行5次,然后再切换到子线程执行10,再主线程执行5次……如此往返执行50次。

之前用 wait 和notify 来实现的,现在用 Condition 来改写一下,代码如下:

  1. public class ConditionCommunication {

  2.    public static void main(String[] args) {

  3.        Business bussiness = new Business();

  4.        new Thread(new Runnable() {// 开启一个子线程

  5.                    @Override

  6.                    public void run() {

  7.                        for (int i = 1; i <= 50; i++) {

  8.                            bussiness.sub(i);

  9.                        }

  10.                    }

  11.                }).start();

  12.        // main方法主线程

  13.        for (int i = 1; i <= 50; i++) {

  14.            bussiness.main(i);

  15.        }

  16.    }  

  17. }

  18. class Business {

  19.    Lock lock = new ReentrantLock();

  20.    Condition condition = lock.newCondition(); //Condition是在具体的lock之上的

  21.    private boolean bShouldSub = true;

  22.    public void sub(int i) {

  23.        lock.lock();

  24.        try {

  25.            while (!bShouldSub) {

  26.                try {

  27.                    condition.await(); //用condition来调用await方法

  28.                } catch (Exception e) {

  29.                    // TODO Auto-generated catch block

  30.                    e.printStackTrace();

  31.                }

  32.            }

  33.            for (int j = 1; j <= 10; j++) {

  34.                System.out.println("sub thread sequence of " + j

  35.                        + ", loop of " + i);

  36.            }

  37.            bShouldSub = false;

  38.            condition.signal(); //用condition来发出唤醒信号,唤醒某一个

  39.        } finally {

  40.            lock.unlock();

  41.        }

  42.    }

  43.    public void main(int i) {

  44.        lock.lock();

  45.        try {

  46.            while (bShouldSub) {

  47.                try {

  48.                    condition.await(); //用condition来调用await方法

  49.                } catch (Exception e) {

  50.                    // TODO Auto-generated catch block

  51.                    e.printStackTrace();

  52.                }

  53.            }

  54.            for (int j = 1; j <= 10; j++) {

  55.                System.out.println("main thread sequence of " + j

  56.                        + ", loop of " + i);

  57.            }

  58.            bShouldSub = true;

  59.            condition.signal(); //用condition来发出唤醒信号么,唤醒某一个

  60.        } finally {

  61.            lock.unlock();

  62.        }

  63.    }

  64. }


从代码来看,Condition 的使用是和 Lock 一起的,没有 Lock 就没法使用 Condition,因为 Condition 是通过 Lock 来 new 出来的,这种用法很简单,只要掌握了 synchronized 和 wait、notify 的使用,完全可以掌握 Lock 和 Condition 的使用。

2. Condition的拔高

2.1 缓冲区的阻塞队列

上面使用 Lock 和 Condition 来代替 synchronized 和 Object 监视器方法实现了两个线程之间的通信,现在再来写个稍微高级点的应用:模拟缓冲区的阻塞队列。

什么叫缓冲区呢?举个例子,现在有很多人要发消息,我是中转站,我要帮别人把消息发出去,那么现在我就需要做两件事,一件事是接受用户发过来的消息,并按照顺序放到缓冲区,另一件事是从缓冲区按顺序取出用户发过来的消息,并发送出去。

现在把这个实际的问题抽象一下:缓冲区即一个数组,我们可以向数组种写入数据,也可以从数组种把数据取走,我要做的两件事就是开启两个线程,一个存数据,一个取数据。但是问题来了,如果缓冲区满了,说明接收的消息太多了,即发送过来的消息太快了,我另一个线程还来不及发完,导致现在的缓冲区没地方放了,那么此时就得阻塞存数据这个线程,让其等待;相反,如果我转发的太快,现在缓冲区所有内容都被我发完了,还没有用户发新的消息来,那么此时就得阻塞取数据这个线程。

好了,分析完了这个缓冲区的阻塞队列,下面就用 Condition 技术来实现一下。

  1. class Buffer {

  2.    final Lock lock = new ReentrantLock(); //定义一个锁

  3.    final Condition notFull = lock.newCondition(); //定义阻塞队列满了的Condition

  4.    final Condition notEmpty = lock.newCondition();//定义阻塞队列空了的Condition

  5.    final Object[] items = new Object[10]; //为了下面模拟,设置阻塞队列的大小为10,不要设太大

  6.    int putptr, takeptr, count; //数组下标,用来标定位置的

  7.    //往队列中存数据

  8.    public void put(Object x) throws InterruptedException {

  9.        lock.lock(); //上锁

  10.        try {

  11.            while (count == items.length) {

  12.                System.out.println(Thread.currentThread().getName() + " 被阻塞了,暂时无法存数据!");

  13.                notFull.await();    //如果队列满了,那么阻塞存数据这个线程,等待被唤醒

  14.            }

  15.            //如果没满,按顺序往数组中存

  16.            items[putptr] = x;

  17.            if (++putptr == items.length) //这是到达数组末端的判断,如果到了,再回到始端

  18.                putptr = 0;

  19.            ++count;    //消息数量

  20.            System.out.println(Thread.currentThread().getName() + " 存好了值: " + x);

  21.            notEmpty.signal(); //好了,现在队列中有数据了,唤醒队列空的那个线程,可以取数据啦

  22.        } finally {

  23.            lock.unlock(); //放锁

  24.        }

  25.    }

  26.    //从队列中取数据

  27.    public Object take() throws InterruptedException {

  28.        lock.lock(); //上锁

  29.        try {

  30.            while (count == 0) {

  31.                System.out.println(Thread.currentThread().getName() + " 被阻塞了,暂时无法取数据!");

  32.                notEmpty.await();  //如果队列是空,那么阻塞取数据这个线程,等待被唤醒

  33.            }

  34.            //如果没空,按顺序从数组中取

  35.            Object x = items[takeptr];

  36.            if (++takeptr == items.length) //判断是否到达末端,如果到了,再回到始端

  37.                takeptr = 0;

  38.            --count; //消息数量

  39.            System.out.println(Thread.currentThread().getName() + " 取出了值: " + x);

  40.            notFull.signal(); //好了,现在队列中有位置了,唤醒队列满的那个线程,可以存数据啦

  41.            return x;

  42.        } finally {

  43.            lock.unlock(); //放锁

  44.        }

  45.    }

  46. }


这个程序很经典,我是从官方 JDK 文档中拿出来的,然后加了注释。程序中定义了来两个 Condition,分别针对两个线程,等待和唤醒分别用不同的 Condition 来执行,思路很清晰,程序也很健壮。

可以考虑一个问题,为啥要用两个 Condition 呢?之所以这么设计肯定是有原因的:如果用一个 Condition,现在假设队列满了,但是有 2 个线程 A 和 B 同时存数据,那么都进入了睡眠,好,现在另一个线程取走一个了,然后唤醒了其中一个线程 A,那么 A 可以存了,存完后, A 又唤醒一个线程,如果 B 被唤醒了,那就出问题了,因为此时队列是满的,B 不能存的,B存的话就会覆盖原来还没被取走的只,这就是一个 Condition 带来的问题。

来测试一下上面的阻塞队列的效果。

  1. public class BoundedBuffer {

  2.    public static void main(String[] args) {        

  3.        Buffer buffer = new Buffer();      

  4.        for(int i = 0; i < 5; i ++) { //开启5个线程往缓冲区存数据

  5.            new Thread(new Runnable() {            

  6.                @Override

  7.                public void run() {

  8.                    try {

  9.                        buffer.put(new Random().nextInt(1000)); //随机存数据

  10.                    } catch (InterruptedException e) {

  11.                        e.printStackTrace();

  12.                    }

  13.                }

  14.            }).start();

  15.        }

  16.        for(int i = 0; i < 10; i ++) { //开启10个线程从缓冲区中取数据

  17.            new Thread(new Runnable() {            

  18.                @Override

  19.                public void run() {

  20.                    try {

  21.                        buffer.take(); //从缓冲区取数据

  22.                    } catch (InterruptedException e) {

  23.                        e.printStackTrace();

  24.                    }

  25.                }

  26.            }).start();

  27.        }

  28.    }

  29. }


我故意只开启 5 个线程存数据,10 个线程取数据,就是想让它出现取数据被阻塞的情况发生,看运行的结果。

Thread-5 被阻塞了,暂时无法取数据!
Thread-10 被阻塞了,暂时无法取数据!
Thread-1 存好了值: 755
Thread-0 存好了值: 206
Thread-2 存好了值: 741
Thread-3 存好了值: 381
Thread-14 取出了值: 755
Thread-4 存好了值: 783
Thread-6 取出了值: 206
Thread-7 取出了值: 741
Thread-8 取出了值: 381
Thread-9 取出了值: 783
Thread-5 被阻塞了,暂时无法取数据!
Thread-11 被阻塞了,暂时无法取数据!
Thread-12 被阻塞了,暂时无法取数据!
Thread-10 被阻塞了,暂时无法取数据!
Thread-13 被阻塞了,暂时无法取数据!

从结果中可以看出,线程 5 和10 抢先执行,发现队列中没有,于是就被阻塞了,睡在那了,知道队列中有新的值存入才可以取,但是他们两运气不好,存的数据又被其他线程前线取走了……可以多运行几次。如果想要看到存数据被阻塞,可以将取数据的线程设置少一点,这里我就不设了。

2.2 两个以上线程之间的唤醒

还是原来那个题目,现在让三个线程来执行,看一下题目:

有三个线程,子线程1先执行10次,然后子线程2执行10次,然后主线程执行5次,然后再切换到子线程1执行10次,子线程2执行10次,主线程执行5次……如此往返执行50次。

如果不用 Condition,还真不好弄,但是用 Condition 来做的话,就非常方便了,原理很简单,定义三个 Condition,子线程 1 执行完唤醒子线程 2,子线程 2 执行完唤醒主线程,主线程执行完唤醒子线程1。唤醒机制和上面那个缓冲区道理差不多,下面附上代码。

  1. public class ThreeConditionCommunication {

  2.    public static void main(String[] args) {

  3.        Business bussiness = new Business();

  4.        new Thread(new Runnable() {// 开启一个子线程

  5.                    @Override

  6.                    public void run() {

  7.                        for (int i = 1; i <= 50; i++) {

  8.                            bussiness.sub1(i);

  9.                        }

  10.                    }

  11.                }).start();

  12.        new Thread(new Runnable() {// 开启另一个子线程

  13.            @Override

  14.            public void run() {

  15.                for (int i = 1; i <= 50; i++) {

  16.                    bussiness.sub2(i);

  17.                }

  18.            }

  19.        }).start();

  20.        // main方法主线程

  21.        for (int i = 1; i <= 50; i++) {

  22.            bussiness.main(i);

  23.        }

  24.    }

  25.    static class Business {

  26.        Lock lock = new ReentrantLock();

  27.        Condition condition1 = lock.newCondition(); //Condition是在具体的lock之上的

  28.        Condition condition2 = lock.newCondition();

  29.        Condition conditionMain = lock.newCondition();

  30.        private int bShouldSub = 0;

  31.        public void sub1(int i) {

  32.            lock.lock();

  33.            try {

  34.                while (bShouldSub != 0) {

  35.                    try {

  36.                        condition1.await(); //用condition来调用await方法

  37.                    } catch (Exception e) {

  38.                        // TODO Auto-generated catch block

  39.                        e.printStackTrace();

  40.                    }

  41.                }

  42.                for (int j = 1; j <= 10; j++) {

  43.                    System.out.println("sub1 thread sequence of " + j

  44.                            + ", loop of " + i);

  45.                }

  46.                bShouldSub = 1;

  47.                condition2.signal(); //让线程2执行

  48.            } finally {

  49.                lock.unlock();

  50.            }

  51.        }

  52.        public void sub2(int i) {

  53.            lock.lock();

  54.            try {

  55.                while (bShouldSub != 1) {

  56.                    try {

  57.                        condition2.await(); //用condition来调用await方法

  58.                    } catch (Exception e) {

  59.                        // TODO Auto-generated catch block

  60.                        e.printStackTrace();

  61.                    }

  62.                }

  63.                for (int j = 1; j <= 10; j++) {

  64.                    System.out.println("sub2 thread sequence of " + j

  65.                            + ", loop of " + i);

  66.                }

  67.                bShouldSub = 2;

  68.                conditionMain.signal(); //让主线程执行

  69.            } finally {

  70.                lock.unlock();

  71.            }

  72.        }

  73.        public void main(int i) {

  74.            lock.lock();

  75.            try {

  76.                while (bShouldSub != 2) {

  77.                    try {

  78.                        conditionMain.await(); //用condition来调用await方法

  79.                    } catch (Exception e) {

  80.                        // TODO Auto-generated catch block

  81.                        e.printStackTrace();

  82.                    }

  83.                }

  84.                for (int j = 1; j <= 5; j++) {

  85.                    System.out.println("main thread sequence of " + j

  86.                            + ", loop of " + i);

  87.                }

  88.                bShouldSub = 0;

  89.                condition1.signal(); //让线程1执行

  90.            } finally {

  91.                lock.unlock();

  92.            }

  93.        }

  94.    }

  95. }


关于线程中 Condition 技术就分享这么多吧。

这里有技术、有段子、有生活、有感悟、有资源,要不然怎么叫『程序员私房菜』呢?什么?你不是程序员?这有关系吗?来吧,还等什么~

更多推荐阅读:

微服务架构盛行的时代,你需要了解点 Spring Boot

读一篇故事,交一个朋友~

【并发技术10】线程并发库的使用

【并发技术11】Callable与Future的应用

【并发技术12】线程锁技术的使用

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存