查看原文
其他

【098期】面试官:精通多线程?你手写过堵塞队列吗?我懵了。。

Java精选 2022-08-09

点击上方“Java精选”,选择“设为星标”

别问别人为什么,多问自己凭什么!

下方留言必回,有问必答!

每天 08:08 更新文章,每天进步一点点...

面试官:你好,你先做个自我介绍吧!

某人:面试官你好,我叫开局一张嘴面试全靠吹,某某年毕业,毕业自家里蹲大学,做过某某项目。。。。。。

面试官微微一笑,捋了捋稀疏的头发:看你简历,你精通多线程?那你手写过堵塞队列吗?

某人心里出现一万个问号,堵塞队列是啥玩意?平时基本都是crud,顶多用多线程跑数据。

某人:没有手写过。

面试官:哦,那你说下堵塞队列吧!

某人支支吾吾:这个有点忘了!

面试官:没事,那我们下一个。

此处省略一万字。

面试官扭了扭严重负荷的颈椎:先到这里吧,你先回去等通知。

某人:好的。

不出意外,某人等了一个月,等的望眼欲穿,也没等到那个期待的电话。

1.什么是队列

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。

队列其实就是跟平时排队一样,按照顺序来,先排队的先买到东西,后排队的后买到东西,排队的第一个叫队头,最后一个叫队尾,这就是队列的先进先出,这是和栈最大的区别。

2.什么是堵塞队列?

当队列为空时,消费者挂起,队列已满时,生产者挂起,这就是生产-消费者模型,堵塞其实就是将线程挂起。因为生产者的生产速度和消费者的消费速度之间的不匹配,就可以通过堵塞队列让速度快的暂时堵塞, 如生产者每秒生产两个数据,而消费者每秒消费一个数据,当队列已满时,生产者就会堵塞(挂起),等待消费者消费后,再进行唤醒。

堵塞队列会通过挂起的方式来实现生产者和消费者之间的平衡,这是和普通队列最大的区别。

3.如何实现堵塞队列?

jdk其实已经帮我们提供了实现方案,java5增加了concurrent包,concurrent包中的BlockingQueue就是堵塞队列,我们不需要关心BlockingQueue如何实现堵塞,一切都帮我们封装好了,只需要做一个没有感情的API调用者就行。

4.BlockingQueue如何使用?

BlockingQueue本身只是一个接口,规定了堵塞队列的方法,主要依靠几个实现类实现。

4.1 BlockingQueue主要方法

1)插入数据 (1)offer(E e):如果队列没满,返回true,如果队列已满,返回false(不堵塞) (2)offer(E e, long timeout, TimeUnit unit):可以设置等待时间,如果队列已满,则进行等待。超过等待时间,则返回false (3)put(E e):无返回值,一直等待,直至队列空出位置。

2)获取数据 (1)poll():如果有数据,出队,如果没有数据,返回null (2)poll(long timeout, TimeUnit unit):可以设置等待时间,如果没有数据,则等待,超过等待时间,则返回null (3)take():如果有数据,出队。如果没有数据,一直等待(堵塞)。

4.2 BlockingQueue主要实现类

1)ArrayBlockingQueue:ArrayBlockingQueue是基于数组实现的,通过初始化时设置数组长度,是一个有界队列,而且ArrayBlockingQueue和LinkedBlockingQueue不同的是,ArrayBlockingQueue只有一个锁对象,而LinkedBlockingQueue是两个锁对象,一个锁对象会造成要么是生产者获得锁,要么是消费者获得锁,两者竞争锁,无法并行。

2)LinkedBlockingQueue:LinkedBlockingQueue是基于链表实现的,和ArrayBlockingQueue不同的是,大小可以初始化设置,如果不设置,默认设置大小为Integer.MAX_VALUE,LinkedBlockingQueue有两个锁对象,可以并行处理。

3)DelayQueue:DelayQueue是基于优先级的一个无界队列,队列元素必须实现Delayed接口,支持延迟获取,元素按照时间排序,只有元素到期后,消费者才能从队列中取出。

4)PriorityBlockingQueue:PriorityBlockingQueue是基于优先级的一个无界队列,底层是基于数组存储元素的,元素按照优选级顺序存储,优先级是通过Comparable的compareTo方法来实现的(自然排序),和其他堵塞队列不同的是,其只会堵塞消费者,不会堵塞生产者,数组会不断扩容,这就是一个彩蛋,使用时要谨慎。

5)SynchronousQueue:SynchronousQueue是一个特殊的队列,其内部是没有容器的,所以生产者生产一个数据,就堵塞了,必须等消费者消费后,生产者才能再次生产,称其为队列有点不合适,现实生活中,多个人才能称为队,一个人称为队有些说不过去。

5.手写堵塞队列

我是参照了ArrayBlockingQueue的源码写的,欢迎大家斧正。另外推荐公众号Java精选,回复Java面试,获取最新版在线面试题资料。

/**
 * @author yz
 * @version 1.0
 * @date 2020/10/31 11:24
 */

    public class YzBlockingQuery {

    private Object[] tab; //队列容器

    private int takeIndex; //出队下标

    private int putIndex; //入队下标

    private int size;//元素数量

    private ReentrantLock reentrantLock = new ReentrantLock();

    private Condition notEmpty;//读条件

    private Condition notFull;//写条件

    public YzBlockingQuery(int tabCount) {
        if (tabCount <= 0) {
            new NullPointerException();
        }

        tab = new Object[tabCount];
        notEmpty = reentrantLock.newCondition();
        notFull = reentrantLock.newCondition();
    }

    public boolean offer(Object obj) {
        if (obj == null) { throw new NullPointerException(); }
        try {
            //获取锁
            reentrantLock.lock();
            //队列已满
            while (size==tab.length){
                System.out.println("队列已满");
                //堵塞
                notFull.await();
            }
            tab[putIndex]=obj;
            if(++putIndex==tab.length){
                putIndex=0;
            }
            size++;
            //唤醒读线程
            notEmpty.signal();
            return true;
        } catch (Exception e) {
            //唤醒读线程
            notEmpty.signal();
        } finally {
            reentrantLock.unlock();
        }
        return false;
    }


    public Object take(){
        try {
            reentrantLock.lock();
            while (size==0){
                System.out.println("队列空了");
                //堵塞
                notEmpty.await();
            }
            Object obj= tab[takeIndex];
            //如果到了最后一个,则从头开始,公众号Java精选,回复Java面试
            if(++takeIndex==tab.length){
                takeIndex=0;
            }
            size--;
            //唤醒写线程
            notFull.signal();
            return obj;
        }catch (Exception e){
            //唤醒写线程
            notFull.signal();
        }finally {
            reentrantLock.unlock();
        }
        return null;
    }


    public static void main(String[] args) {
        Random random = new Random(100);
        YzBlockingQuery yzBlockingQuery=new YzBlockingQuery(5);
        Thread thread1 = new Thread(() -> {
            for (int i=0;i<100;i++) {
                try {
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                yzBlockingQuery.offer(i);
                System.out.println("生产者生产了:"+i);
            }
        });
    
        Thread thread2 = new Thread(() -> {
            for (int i=0;i<100;i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Object take = yzBlockingQuery.take();
                System.out.println("消费者消费了:"+take);
            }
        });
    
        thread1.start();
        thread2.start();
    }
}

作者:工地杨大锤

blog.csdn.net/qq_38306425/article/details/109332045

期往精选  点击标题可跳转

【090期】批处理框架 Spring Batch,数据迁移量过大如何保证内存?

【091期】为什么要弃坑阿里开源的 FastJson?三种利用链漏洞分析

【092期】面试官问:JDK1.8 线程池中多余的线程是如何回收的?

【093期】面试官:多线程环境下 HashMap为什么会出现死循环?

【094期】面试官问:如果要存 IP 地址,用什么数据类型比较好?

【095期】网易二面:Kafka 为什么吞吐量大、速度快?

【096期】面试官:Spring 注解 @bean 和 @component 有什么区别?

【097期】面试必问系列:50 道经典 Spring 面试题!

 - 小程序,3000+ 道面试题在线刷,最新、最全 Java 面试题!

文章有帮助的话,在看,转发吧!

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存