BlockingQueue 原理与使用
BlockingQueue
是 Java 并发包(java.util.concurrent)中提供的一个接口,它代表了一个支持阻塞操作的队列。它是一个特殊的队列,当队列为空时,获取元素的操作会被阻塞,直到队列中有可用元素;当队列已满时,插入元素的操作会被阻塞,直到队列有空闲位置
前置知识
Queue
队列(Queue)是一种常用的数据结构,它遵循先进先出(First-In-First-Out,FIFO)的原则。队列可以理解为一个有序的线性表,其中元素的插入(入队)只能在队尾进行,元素的删除(出队)只能在队头进行。队列常用于在多线程和并发编程中实现任务调度、消息传递等场景
队列的主要操作包括:
- 入队(enqueue):将元素添加到队列的末尾。
- 出队(dequeue):从队列的头部删除并返回元素。
- 获取队头元素(peek/front):查看队头元素,但不进行删除操作。
队列的常见用途包括:
- 实现线程池任务调度:线程池通过队列来保存待执行的任务,每个线程从队列中获取任务进行执行。
- 消息传递:在多线程或分布式系统中,可以使用队列来实现消息的传递和通信。
- 广度优先搜索:在图遍历算法中,广度优先搜索(BFS)可以使用队列来实现节点的遍历顺序。
Java 中的 Queue
在Java中,Queue
是一个接口,它是Java集合框架中的一部分。Queue
接口继承自 Collection
接口
Queue接口定义如下:
1 |
|
在Queue接口中,定义基本的元素插入和删除的方法,主要方法及其含义分别如下:
方法 | 说明 |
---|---|
boolean add(E e) |
向队列中添加一个元素;如果有空间则添加成功返回true,否则则抛出IllegalStateException 异常 |
boolean offer(E e) |
向队列中添加一个元素;如果有空间则添加成功返回true,否则返回false |
E remove() |
从队列中删除一个元素;如果元素存在则返回队首元素,否则抛出NoSuchElementException 异常 |
E poll(); |
从队列中删除一个元素;如果元素存在则返回队首元素,否则返回null |
E element() |
从队列获取一个元素,但是不删除;如果元素存在则返回队首元素,否则抛出NoSuchElementException 异常 |
E peek() |
从队列获取一个元素,但是不删除;如果元素存在则返回队首元素,否则返回null |
Deque
**Deque
*(Double-Ended Queue*)是Java集合框架中的一种双端队列,它继承自Queue
接口。与普通的队列(Queue)不同,Deque
允许在队列的两端进行元素的插入和删除操作,因此可以在队头和队尾都进行入队和出队操作。Deque
可以作为栈(先进后出,LIFO)或队列(先进先出,FIFO)来使用,因此它是一种非常灵活的数据结构
Deque
接口定义了一系列双端队列的操作方法,包括以下主要方法:
方法 | 描述 |
---|---|
void addFirst(E e) |
在队头插入元素 |
boolean offerFirst(E e) |
在队头插入元素 |
void addLast(E e) |
在队尾插入元素 |
boolean offerLast(E e) |
在队尾插入元素 |
E removeFirst() |
移除并返回队头元素 |
E pollFirst() |
移除并返回队头元素 |
E removeLast() |
移除并返回队尾元素 |
E pollLast() |
移除并返回队尾元素 |
E getFirst() |
返回队头元素但不移除 |
boolean removeFirstOccurrence(Object o) |
如果元素o存在,则从队列中删除第一次出现的该元素 |
boolean removeLastOccurrence(Object o) |
如果元素o存在,则从队列中删除最后一次出现的该元素 |
除了上述方法,Deque
还继承了 Queue
接口的方法,因此它可以作为普通队列使用。此外,Deque
还提供了栈相关的方法,如 push(E e)
和 pop()
,用于在栈的头部进行入栈和出栈操作
Queue 的实现
LinkedList
: 可以实现Queue
接口,即可作为队列使用ArrayDeque
: 双端队列的实现,也可以作为队列使用PriorityQueue
: 优先级队列的实现,不是严格的FIFO队列,而是根据元素的优先级来决定出队顺序
在Java并发包(java.util.concurrent)中,还有BlockingQueue
接口的实现,它提供了阻塞式的队列操作,常用于多线程环境下的任务调度和消息传递。常见的BlockingQueue
实现类有:
ArrayBlockingQueue
: 基于数组的有界阻塞队列LinkedBlockingQueue
: 基于链表的可选有界阻塞队列SynchronousQueue
: 容量为1的阻塞队列,用于直接传递元素
BlockingQueue
BlockingQueue
其实就是阻塞队列,是基于阻塞机制实现的线程安全的队列。而阻塞机制的实现是通过在入队和出队时加锁的方式避免并发操作
BlockingQueue
不同于普通的 Queue
的区别主要是:
- 通过在入队和出队时进行加锁,保证了队列线程安全
- 支持阻塞的入队和出队方法:当队列满时,会阻塞入队的线程,直到队列不满;当队列为空时,会阻塞出队的线程,直到队列中有元素
BlockingQueue
常用于生产者-消费者模型中,往队列里添加元素的是生产者,从队列中获取元素的是消费者;通常情况下生产者和消费者都是由多个线程组成;下图所示则为一个最常见的生产者-消费者模型,生产者和消费者之间通过队列平衡两者的的处理能力、进行解耦等
BlockingQueue 接口定义
BlockingQueue
接口定义了以下几个主要的方法:
put(E element)
: 将指定的元素插入队列,如果队列已满,则阻塞直到队列有空闲位置take()
: 获取并移除队列的头部元素,如果队列为空,则阻塞直到队列有可用元素offer(E element)
: 将指定的元素插入队列,如果队列已满,则立即返回false,不会阻塞poll()
: 获取并移除队列的头部元素,如果队列为空,则立即返回null,不会阻塞offer(E element, long timeout, TimeUnit unit)
: 将指定的元素插入队列,如果队列已满,则等待指定的时间,超时后返回falsepoll(long timeout, TimeUnit unit)
: 获取并移除队列的头部元素,如果队列为空,则等待指定的时间,超时后返回null
BlockingQueue
主要提供了四类方法,如下表所示:
方法 | 抛出异常 | 返回特定值 | 阻塞 | 阻塞特定时间 |
---|---|---|---|---|
入队 | add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
出队 | remove() |
poll() |
take() |
poll(time, unit) |
获取队首元素 | element() |
peek() |
不支持 | 不支持 |
BlockingQueue实现类及原理
常见的实现BlockingQueue
的类有:
ArrayBlockingQueue
: 基于数组的有界阻塞队列,需要指定队列的容量LinkedBlockingQueue
: 基于链表的可选有界阻塞队列,默认大小为Integer.MAX_VALUE
PriorityBlockingQueue
: 支持优先级排序的无界阻塞队列DelayQueue
: 延迟队列,其中的元素只有在其指定的延迟时间过后才能从队列中获取SynchronousQueue
: 容量为1的阻塞队列,用于直接传递元素,通常用于线程池等场景
使用BlockingQueue
可以有效地进行线程间通信和协作,让多线程编程更加简单和安全。它是在并发编程中常用的工具之一,能够很好地处理生产者-消费者问题以及其他队列相关的任务
其中在日常开发中用的比较多的是 ArrayBlockingQueue
和 LinkedBlockingQueue
,本文也将主要介绍这两个实现类的原理
ArrayBlockingQueue的用法和原理
ArrayBlockingQueue的类定义
实现了 BlockingQueue
接口,并继承了抽象队列类 AbstractQueue
(封装了部分通用方法)
1 |
|
ArrayBlockingQueue类属性
1 |
|
在 ArrayBlockingQueue 中,还定义了队列元素存储以及入队、出队操作的属性。
final Object[] items
:由于ArrayBlockingQueue是基于数组实现的阻塞队列,所以使用items
数组,存储队列中的元素int takeIndex
和int putIndex
:两个items数组的索引值,分别指向出队元素的索引值以及将要入队元素的索引值;通过这两个索引,可以控制元素从items
数组中如何进行出队和入队int count
:当前队列中的元素数量,通过该值实现了队列有界性
除了上述几个属性,还需要部分属性进行并发控制,在BlockingQueue中使用了双Condition算法
进行并发控制,主要通过如下几个变量实现:
ReentrantLock lock
:这里使用了ReetrantLock作为独占锁,进行并发控制Condition notEmpty
和Condition notFull
:定义了两个阻塞唤醒条件,分别表示等待出队的条件
和等待入队的条件
构造方法
在 ArrayBlockingQueue 构造方法中,主要功能时初始化元素数组以及锁和 condition 条件;可以通过capacity
变量指定有界队列的元素数量,以及通过 fair
指定是否使用公平锁。
1 |
|
ArrayBlockingQueue 中4组存取数据的方法实现也是大同小异,本次以put和take方法进行解析
入队逻辑
put
无论是放数据还是取数据都是从队头开始,逐渐往队尾移动。
1 |
|
源码中有个有意思的设计,添加元素的时候如果已经到了队尾,下次就从队头开始添加,相当于做成了一个循环队列。像下面这样:
出队逻辑
take
1 |
|
阻塞实现
通过上面的描述,我们了解了基于数组的阻塞队列的入队和出队实现逻辑,但是我们还剩下最后一个疑问,当入队和出队时,如果无法直接进行入队和出队操作,需要进行阻塞等待,那么阻塞是如何实现的呢?在 ArrayBlockingQueue
中主要是使用独占锁 ReentrantLock
以及两个条件队列 notFull
和 notEmpty
实现的。
我们首先看一下阻塞入队的方法 put(E e)
,下面是其代码:
1 |
|
调用 put
方法进行阻塞式入队的基本流程为:
- 首先,在进行入队操作前,使用
ReentrantLock
进行加锁操作,保证只有一个线程执行入队或出队操作;如果锁被其他线程占用,则等待; - 如果加锁成功,则首先判断队列是否满,也就是
while(count == items.length)
;如果队列已满,则调用notFull.await()
,将当前线程阻塞,并添加到notFull条件队列
中等待唤醒;如果队列不满,则直接调用enqueue
方法,进行元素插入; - 当前线程添加到
notFull
条件队列中后,只有当其他线程有出队操作时,会调用notFull.signal()
方法唤醒等待的线程;当前线程被唤醒后,还需要再次进行一次队列是否满的判断,如果此时队列不满才可以进行enqueue
操作,否则仍然需要再次阻塞等待,这也就是为什么在判断队列是否满时使用while
的原因,即避免当前线程被意外唤醒,或者唤醒后被其他线程抢先完成入队操作。 - 最后,当完成入队操作后,在finally代码块中进行锁释放
lock.unlock
,完成put
入队操作
下面我们再来看下阻塞出队方法 take()
,代码如下:
1 |
|
其实take
方法与put
方法类似,主要流程也是先加锁,然后循环判断队列是否为空,如果为空则添加到notEmpty条件队列等待,如果不为空则进行出队操作;最后进行锁释放。
指定等待时间的阻塞实现
OK,到这里我们了解了如何进行阻塞的入队和出队操作,在 ArrayBlockingQueue
中还支持指定等待时间的阻塞式入队和出队操作,分别是 offer(e, time, unit)
和 poll(time, unit)
方法。这里我们就只要看下 offer(e, time, unit)
的实现逻辑,代码如下:
1 |
|
在上面代码中,我们重点看下 while
循环中判断队列是否满的条件:
- 当队列满时,则首先判断剩余等待时间是否为0,如果为0表示已经到了等待时间,此时入队失败,直接返回
false
- 当剩余等待时间大于0时,则需要继续等待,即调用
nanos = notFull.awaitNanos(nanos)
,当该线程被唤醒时,awaitNanos
会返回剩余的等待时间 nanos,根据 nanos 则可以判断是否已经到等待时间
在出队方法 poll(time, unit)
方法中,实现逻辑类似,这里不再赘述,有兴趣的小伙伴可以自行查看源码研究哦。
ArrayBlockingQueue 原理总结
- ArrayBlockingQueue 是一个有界阻塞队列,初始化时需要指定容量大小
- 在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用 ArrayBlockingQueue 是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞
- 使用独占锁 ReentrantLock 实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈
LinkedBlockingQueue的用法和原理
LinkedBlockingQueue类定义
LinkedBlockingQueue实现了BlockingQueue接口,并继承了AbstractQueue类,其代码如下:
1 |
|
在LinkedBlockingQueue中定义的变量及其含义如下:
capacity
:该变量表示队列的容量,设置该值则变为一个有界队列;如果不设置的话默认取值为Integer.MAX_VALUE,也可以认为是无界队列count
:当前队列中元素的数量head
和last
:分别表示链表的头尾节点,其中头结点head
不存储元素,head.item = null
takeLock
和notEmpty
:出队的锁以及出队条件putLock
和notFull
:入队的锁以及入队条件
可以看出与ArrayBlockingQueue
不同的是,在LinkedBlockingQueue
中,入队和出队分别使用两个锁,两个锁可以分别认为是毒写锁和读锁,这里的具体原因在后面会进行详细描述
链表节点定义
LinkedBlockingQueue
是基于链表实现的,所以链表的节点定义如下,在Node<E>
节点中分别定义了元素item
以及后继节点next
。
1 |
|
构造方法定义
然后我们再来看一下构造方法定义,在LinkedBlockingQueue
中提供了三个构造方法,分别是默认构造方法、指定队列容量的构造方法、基于集合的构造方法;
在构造方法中,需要设置队列的容量,并初始化链表的头尾节点;基于集合的构造方法,会根据输入的集合,构建一个非空的队列。
1 |
|
入队逻辑
put 方法定义如下
1 |
|
put(e)
方法的执行步骤大体如下,- 判断要put的元素e是否为null,如果为null直接抛出空指针异常;
- e不为null,则使用e创建一个Node节点,获得put锁;
- 判断当前队列中的元素数量和队列的容量,如果相等,则阻塞当前线程;
- 如果不相等,把生成的node节点插入队列,enqueue方法定义如下,
1
2
3
4
5
6
7private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
// 在enqueue(node)操作中,就是将插入节点设置为尾结点的next节点,也就是last.next = node,然后再修改尾结点为新插入的节点,即:last = last.next,完成了入队节点的插入操作- 使用原子操作类把当前队列中的元素数量增1;如果添加后的队列中的元素数量比容量小,则表示可以继续执行put类的操作,唤醒notFull.singal();
- 如果c=0,即在enqueue前为空,数量为0(此时会阻塞take进程),enqueue后为1,则需要唤醒take进程,如下
1
2
3
4
5
6
7
8
9private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
offer 方法
offer方法的定义如下:
1 |
|
此方法的执行步骤大体如下,
- 判断当前队列中的元素数量和队列容量,如果相等,直接返回false;
- 如果当前队列中元素数量小于队列容量,执行入队操作;
- 入队操作之后,判断队列中元素数量如果仍小于队列容量,唤醒其他的阻塞线程;
- 如果c==0(即入队成功,队列中元素的数量为1),则需要唤醒阻塞在put锁的线程;
add
在 LinkedBlockingQueue
中,由于继承了 AbstractQueue
类,所以add
方法也是使用的AbstractQueue
中的定义,代码如下;add
方法直接调用了offer(E e)
方法,并判断是否入队成功,如果入队失败则抛出 IllegalStateException
异常。
1 |
|
出队逻辑
take方法和put刚好相反,其定义如下
1 |
|
此方法的执行步骤大体如下,
- 获得take锁,表示执行take操作;
- 获得当前队列的元素数量,如果数量为0,则阻塞当前线程,直到被中断或者被唤醒;
- 如果当前队列的元素数量不额外i0,则执行出队操作;
1 |
|
从上面的代码可以看出把头节点进行出队,即head指向下一个节点
- 当前队列的元素数量减一,并返回操作前的数量;
- 如果之前大于1(c最小为2),指向dequeue后数量最小为1,证明队列中仍有元素,需要唤醒获得take锁的其他阻塞线程,take.singal();
- 如果c等于当前队列的容量(执行完dequeue后,当前队列中元素的数量等于capacity-1,则未满),则需要唤醒获得put锁的其他put线程;
1 |
|
poll
poll方法定义如下:
1 |
|
此方法的执行步骤大体如下,
- 如果当前队列元素数量为0,直接返回null
- 如果当前队列元素数量大于0,执行出队操作
- 如果c>1,即c最小为2,则出队成功后,仍有1个元素,可以唤醒阻塞在take锁的线程
- 如果c=capacity,则出队成功后,队列中的元素为capacity-1,这时队列为满,可以唤醒阻塞在put锁上的其他线程,即可以添加元素
remove
在 LinkedBlockingQueue
中,remove
方法也是直接使用的父类 AbstractQueue
中的 remove
方法,代码如下;remove
方法直接调用了 poll()
方法,如果出队成功则返回出队元素,出队失败则抛出NoSuchElementException
异常。
1 |
|
LinkedBlockingQueue 对比 ArrayBlockingQueue
ArrayBlockingQueue
中,使用了一个 ReentrantLock lock
作为入队和出队的锁,并使用两个条件 notEmpty
和 notFull
来进行线程间通信。而在本文介绍的 LinkedBlockingQueue
中,使用了两个锁 putLock
和 takeLock
分别作为入队和出队的锁,同样使用了两个锁的两个条件 notFull
和 notEmpty
进行线程间通信
由于在 ArrayBlockingQueue
中,入队和出队操作共用了同一个锁,所以两个操作之间会有相互影响;而在 LinkedBlockingQueue
中,入队和出队操作分别使用不同的锁,则入队和出队互不影响,可以提供队列的操作性能