概览

1. 基于链表的可选有界阻塞队列。根据FIFO的出入队顺序,从队列头部检索和获取元素,在队列尾部插入新元素。

2. 当作为有界阻塞队列,在队列空间不足时,put方法将会一直阻塞直到有多余空间才会执行插入元素操作,take方法则相反,只到队列内元素不为空时,才将队列元素逐个取出。

3. 队列容量不指定时,默认为Integer.MAX_VALUE,此时可以看作无界队列。

4. 使用非公平锁进行并发控制。所有方法都是线程安全的。

使用方法

下面的文章给出了阻塞队列的四种基本用法:

了解BlockingQueue 大体框架和实现思路

LinkedBlockingQueue实现了BlockingQueue类。

在BlockingQueue中,方法被分为如下四类:

  • Throws exception:操作未实现时(正常流程下的执行)抛出异常
  • Special value:根据操作的实际情况,返回特定值,例如null、false(这些失败可能是线程中断、队列为空引起的)
  • Blocks:阻塞当前线程,直到当前线程可以成功执行
  • Times out:尝试指定时间后,放弃执行
 

Throws exception

Special value

Blocks

Times out

新增

add(E e)

offer(E e)

put(E e)

offer(E e, long timeout, TimeUnit unit)

删除

remove()

poll()

take()

poll(long timeout, TimeUnit unit)

查询

element()

peek()

   

1. add | remove | element

这三个方法在BlockingQueue的定义中,都会在操作未实现时,抛出异常。

  • add(E e)在队尾添加元素e,add内部调用offer方法实现。因此,元素e为空时,抛出NullPointerException异常;插入失败时,抛出IllegalStateException异常。
  • remove删除队首元素,内部调用poll方法。队首无数据时,抛出NoSuchElementException异常。
  • element检索队首元素。队首无数据时,抛出NoSuchElementException异常。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
blockingQue.add(1);
blockingQue.remove(1);
blockingQue.remove(); // NoSuchElementException
blockingQue.element(); // NoSuchElementException

2. offer | poll | peek

根据操作的实际情况,返回特定值,例如null、false(这些失败可能是线程中断、队列为空引起的)

  • offer(E e)在队尾添加元素e,元素e为空时,抛出NullPointerException异常;插入失败时返回false。
  • poll删除队首元素。删除失败时返回false。
  • peek检索队首元素。队首无数据时,返回null。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
blockingQue.offer(1);
blockingQue.poll();
Integer peek = blockingQue.peek(); // 返回null

3. put | take

阻塞当前线程,直到当前线程可以成功执行。

  • put(E e)在队尾添加元素e,元素e为空时,抛出NullPointerException异常。当队列满时,阻塞put线程,等待队列被消费后,队列容量不满时,该阻塞线程继续尝试在队尾插入元素。该方法在阻塞时可以被中断,并抛出InterruptedException异常。
  • take删除并获取队首元素。队首元素不为空时返回。队首元素为空,阻塞take线程,等待队列不为空时,再次尝试消费队首元素。该方法在阻塞时可以被中断,并抛出InterruptedException异常。

注意:阻塞时,不会解除锁占用。

LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
try {
    blockingQue.put(1);
    blockingQue.take();
} catch (InterruptedException e) {
    // 线程被中断
    e.printStackTrace();
}

4. offer | poll (timeout)

尝试指定时间后,放弃执行

  • offer(E e, long timeout, TimeUnit unit)在队尾添加元素e,元素e为空时,抛出NullPointerException异常;当队列容量满时,线程休眠一定时间后再次查看队列容量,当该休眠时间大于等于timeout后,此时队列还满则返回false。不满时,尝试入队。需要注意的是,由于伪唤醒机制的存在,线程可能在timeout这个时间段内的任意一点被唤醒,如果队列容易不满,则会直接执行入队操作。阻塞时,当前线程被中断抛出InterruptedException异常。
  • poll(long timeout, TimeUnit unit)删除队首元素。poll与offer对应的,当队列为空的时候,线程休眠一定时间。休眠时,当前线程被中断抛出InterruptedException异常。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
try {
    blockingQue.offer(1, 100, TimeUnit.MILLISECONDS);
    blockingQue.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
    e.printStackTrace();
}

当然除了上述的阻塞队列的基本操作外,LinkedBlockingQueue还具有集合Collection的性质。因此集合中的通用方法也可以使用。

源码解析

说明

本次源码分析主要按照下面几个步骤进行:

1. 保存队列数据的容器以及出入队方法

2. 主要成员变量以及作用

3. 主要方法分析

队列容器

结构图

仅有数据item和后继next的单向节点,结构简单。

static class Node<E> {
    E item;
 
    Node<E> next;
 
    Node(E x) { item = x; }
}

next三种情况

A 普通节点的真实后继

B 真正的队首节点,item=null(队首节点恒为head.next)

C 队尾节点,next=null

  • item: null -> first -> …… -> last
  • next: first -> second -> ……-> null

入队操作

    // 入队
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node; // last.next = node; last = node;
    }

步骤1

 步骤2

出队操作

 
    // 出队
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        // 形成引用链闭环,JVM根据可达性分析时,GC root的引用链与该对象之间不可达,进行GC
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

步骤1

 步骤2

关键成员变量

队列入队和出队锁分离,都使用了非公平锁。

这里的count属性需要注意下,这里使用了原子类保证操作的原子性。后面的入队和出队,将会频繁使用它。

/** 容量, 初始化不设置时默认为Integer.MAX_VALUE*/
private final int capacity;
 
/** 当前队列内的元素数量 */
private final AtomicInteger count = new AtomicInteger();
 
/**
 * 队首
 * 不变量: head.item == null
 */
transient Node<E> head;
 
/**
 * 队尾
 * 不变量: last.next == null
 */
private transient Node<E> last;
 
/** 出队操作公用锁 */
private final ReentrantLock takeLock = new ReentrantLock();
 
/** 用于出队操作的阻塞和唤醒 出队的话,只需要考虑队列是否为空 */
private final Condition notEmpty = takeLock.newCondition(); 
 
/** 入队操作公用锁 */
private final ReentrantLock putLock = new ReentrantLock();
 
/** 用于入队操作的阻塞和唤醒 入队只需要考虑队列空间是否足够*/
private final Condition notFull = putLock.newCondition();

初始化

三个构造函数

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
 
    // 自定义容量
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
 
    // 初始化时,批量添加集合中的元素
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                  n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

put方法

put方法几个关注点

  • 释放锁的时机
  • 执行入队操作
  • 唤醒生产者的时机
  • 唤醒消费者的时机

这几点是整个阻塞操作的核心,可以在下面的分析中仔细观察。

注:由于阻塞队列就是基于生产者-消费者模型的,因此,下文中都把调用put方法的线程称为生产者,调用take方法的线程称为消费者。

总体分析

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1; // -1代表操作异常
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 如果线程没有被标记为中断,则获取锁
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            // 这里是线程在执行put操作时唯一一个执行过程中释放锁的地方
            notFull.await(); // 容量已满,等待被消费后唤醒
        }
        // 添加元素,更新容量
        enqueue(node);
        c = count.getAndIncrement();
        // 队列容量有余时,在这里再次唤醒一个其他的生产者线程(或者说消费者消费速度大于生产)
        if (c   1 < capacity)
            notFull.signal();
    } finally {
        // 释放锁
        putLock.unlock();
    }
    // 唤醒一个消费者
    if (c == 0)
        signalNotEmpty();
}

count属性并发问题

这里需要重点关注count,由于有两把锁,count可以同时被putLock、takeLock操作,那么这里是否会产生并发问题。

分析如下:

A. 只有putLock或takeLock一把锁操作:就是单线程操作,没影响,不产生并发问题。

其他所有put操作都处于await的状态或者竞争锁状态,其他线程也因为获取不到锁而无法执行,只有等该节点添加完成释放锁,其他线程才有机会继续执行。

        while (count.get() == capacity) {
            notFull.await(); // 容量已满,等待被消费后唤醒
        }

B. putLock和takeLock同时操作:我们假设两个线程一个获取到putLock,一个获取到了takeLock(同时最多也只有两个线程操作count)。

// put            
while (count.get() == capacity) {
     notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c   1 < capacity)
notFull.signal();
 
 
 
// take
while (count.get() == 0) {
    notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
    notEmpty.signal();

由于count是原子类那么count的所有读写操作必然是一个串联的操作,而非并行操作,因此也不存在并发问题,如下图(顺序可能不同):

唤醒消费者

代码的最后一段,会有唤醒一个消费者的操作。

// 唤醒一个等待中的消费者
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

刚开始看到的时候很疑惑,为什么是c == 0才唤醒。如果生产者入队成功,那么c应该为如下值:

c = count.getAndIncrement();

后面看了一下count.getAndIncrement()方法定义才发现自己记混了,count.getAndIncrement()是一个原子操作,且返回值的是操作前的值

ok,现在没问题了。

count >= 0,也就是说,只有在生产者入队前队列为空,入队成功之后才会唤醒一个消费者消费。

take方法

take方法与put方法大致相似,只是与put做相反操作。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 队列元素为空,停止消费,让出锁并等待被唤醒
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 移除队首元素,并更新容量
        x = dequeue();
        c = count.getAndDecrement();
        // 生产速度大于消费速度,唤醒一个其他消费者进行消费
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 消费之前队列已满,消费后唤醒一个生产者
    if (c == capacity)
        signalNotFull();
    return x;
}
 
// 唤醒生产者
/**
 * Signals a waiting put. Called only from take/poll.
 */
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

总结

总体上看LinkedBlockingQueue类不难,整个生产-消费的流程实现也比较简单。源码已经把该介绍的东西都讲得很明白了,我这属于依葫芦画瓢顺着源码注释写出来的。这么一写,自己这个类的印象就很深刻了。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持Devmax。

LinkedBlockingQueue链式阻塞队列的使用和原理解析的更多相关文章

  1. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  2. Java多线程案例之阻塞队列详解

    阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则.阻塞队列能是一种线程安全的数据结构。本文将通过一些示例为大家详细讲讲阻塞队列的原理与使用,感兴趣的小伙伴可以学习一下

  3. 实例详解jQuery的链式编程风格

    jQuery中的链式操作,它让代码变得更有层次更简洁,所以这篇文章主要给大家介绍了关于jQuery链式编程风格的相关资料,需要的朋友可以参考下

  4. Node.js实现链式回调

    这篇文章介绍了Node.js实现链式回调的方法,文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  5. 微服务Spring Boot 整合Redis 阻塞队列实现异步秒杀下单思路详解

    这篇文章主要介绍了微服务Spring Boot 整合Redis 阻塞队列实现异步秒杀下单,使用阻塞队列实现秒杀的优化,采用异步秒杀完成下单的优化,本文给大家分享详细步骤及实现思路,需要的朋友可以参考下

  6. Java中ArrayBlockingQueue和LinkedBlockingQueue

    这篇文章主要介绍了Java中ArrayBlockingQueue和LinkedBlockingQueue,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的朋友可以参考一下

  7. Springboot详解线程池与多线程及阻塞队列的应用详解

    本例应用线程池、多线程、阻塞队列处理一个流程任务。本例处理一个订单流程,主要包括生成订单、订单处理、订单入库,下面我们一起看看

  8. PHP实现链式操作的原理详解

    下面小编就为大家带来一篇PHP实现链式操作的原理详解。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  9. 详解JavaScript中的链式调用

    这篇文章主要介绍了JavaScript中的链式调用的相关资料,帮助大家更好的理解和学习JavaScript,感兴趣的朋友可以了解下

  10. jQuery链式调用与show知识浅析

    这篇文章主要介绍了jQuery的XX如何实现?——2.show与链式调用 的相关资料,非常具有参考借鉴价值,感兴趣的朋友一起学习吧

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部