BlockingQueue 原理与使用

BlockingQueue 是 Java 并发包(java.util.concurrent)中提供的一个接口,它代表了一个支持阻塞操作的队列。它是一个特殊的队列,当队列为空时,获取元素的操作会被阻塞,直到队列中有可用元素;当队列已满时,插入元素的操作会被阻塞,直到队列有空闲位置

前置知识

Queue

队列(Queue)是一种常用的数据结构,它遵循先进先出(First-In-First-Out,FIFO)的原则。队列可以理解为一个有序的线性表,其中元素的插入(入队)只能在队尾进行,元素的删除(出队)只能在队头进行。队列常用于在多线程和并发编程中实现任务调度、消息传递等场景

队列的主要操作包括:

  1. 入队(enqueue):将元素添加到队列的末尾。
  2. 出队(dequeue):从队列的头部删除并返回元素。
  3. 获取队头元素(peek/front):查看队头元素,但不进行删除操作。

队列的常见用途包括:

  • 实现线程池任务调度:线程池通过队列来保存待执行的任务,每个线程从队列中获取任务进行执行。
  • 消息传递:在多线程或分布式系统中,可以使用队列来实现消息的传递和通信。
  • 广度优先搜索:在图遍历算法中,广度优先搜索(BFS)可以使用队列来实现节点的遍历顺序。

Java 中的 Queue

在Java中,Queue 是一个接口,它是Java集合框架中的一部分。Queue 接口继承自 Collection 接口

Queue接口定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Queue<E> extends Collection<E> {

boolean add(E e);

boolean offer(E e);

E remove();

E poll();

E element();

E peek();
}

在Queue接口中,定义基本的元素插入和删除的方法,主要方法及其含义分别如下:

方法 说明
boolean add(E e) 向队列中添加一个元素;如果有空间则添加成功返回true,否则则抛出IllegalStateException异常
boolean offer(E e) 向队列中添加一个元素;如果有空间则添加成功返回true,否则返回false
E remove() 从队列中删除一个元素;如果元素存在则返回队首元素,否则抛出NoSuchElementException异常
E poll(); 从队列中删除一个元素;如果元素存在则返回队首元素,否则返回null
E element() 从队列获取一个元素,但是不删除;如果元素存在则返回队首元素,否则抛出NoSuchElementException异常
E peek() 从队列获取一个元素,但是不删除;如果元素存在则返回队首元素,否则返回null

Deque

**Deque*Double-Ended Queue*)是Java集合框架中的一种双端队列,它继承自Queue接口。与普通的队列(Queue)不同,Deque允许在队列的两端进行元素的插入和删除操作,因此可以在队头和队尾都进行入队和出队操作。Deque可以作为栈(先进后出,LIFO)或队列(先进先出,FIFO)来使用,因此它是一种非常灵活的数据结构

Deque 接口定义了一系列双端队列的操作方法,包括以下主要方法:

方法 描述
void addFirst(E e) 在队头插入元素
boolean offerFirst(E e) 在队头插入元素
void addLast(E e) 在队尾插入元素
boolean offerLast(E e) 在队尾插入元素
E removeFirst() 移除并返回队头元素
E pollFirst() 移除并返回队头元素
E removeLast() 移除并返回队尾元素
E pollLast() 移除并返回队尾元素
E getFirst() 返回队头元素但不移除
boolean removeFirstOccurrence(Object o) 如果元素o存在,则从队列中删除第一次出现的该元素
boolean removeLastOccurrence(Object o) 如果元素o存在,则从队列中删除最后一次出现的该元素

除了上述方法,Deque 还继承了 Queue 接口的方法,因此它可以作为普通队列使用。此外,Deque 还提供了栈相关的方法,如 push(E e)pop(),用于在栈的头部进行入栈和出栈操作

Queue 的实现

  • LinkedList: 可以实现Queue接口,即可作为队列使用
  • ArrayDeque: 双端队列的实现,也可以作为队列使用
  • PriorityQueue: 优先级队列的实现,不是严格的FIFO队列,而是根据元素的优先级来决定出队顺序

在Java并发包(java.util.concurrent)中,还有BlockingQueue接口的实现,它提供了阻塞式的队列操作,常用于多线程环境下的任务调度和消息传递。常见的BlockingQueue实现类有:

  • ArrayBlockingQueue: 基于数组的有界阻塞队列
  • LinkedBlockingQueue: 基于链表的可选有界阻塞队列
  • SynchronousQueue: 容量为1的阻塞队列,用于直接传递元素

BlockingQueue

BlockingQueue 其实就是阻塞队列,是基于阻塞机制实现的线程安全的队列。而阻塞机制的实现是通过在入队和出队时加锁的方式避免并发操作

BlockingQueue 不同于普通的 Queue 的区别主要是:

  1. 通过在入队和出队时进行加锁,保证了队列线程安全
  2. 支持阻塞的入队和出队方法:当队列满时,会阻塞入队的线程,直到队列不满;当队列为空时,会阻塞出队的线程,直到队列中有元素

BlockingQueue 常用于生产者-消费者模型中,往队列里添加元素的是生产者,从队列中获取元素的是消费者;通常情况下生产者和消费者都是由多个线程组成;下图所示则为一个最常见的生产者-消费者模型,生产者和消费者之间通过队列平衡两者的的处理能力、进行解耦等

image

BlockingQueue 接口定义

BlockingQueue 接口定义了以下几个主要的方法:

  1. put(E element): 将指定的元素插入队列,如果队列已满,则阻塞直到队列有空闲位置
  2. take(): 获取并移除队列的头部元素,如果队列为空,则阻塞直到队列有可用元素
  3. offer(E element): 将指定的元素插入队列,如果队列已满,则立即返回false,不会阻塞
  4. poll(): 获取并移除队列的头部元素,如果队列为空,则立即返回null,不会阻塞
  5. offer(E element, long timeout, TimeUnit unit): 将指定的元素插入队列,如果队列已满,则等待指定的时间,超时后返回false
  6. poll(long timeout, TimeUnit unit): 获取并移除队列的头部元素,如果队列为空,则等待指定的时间,超时后返回null

BlockingQueue 主要提供了四类方法,如下表所示:

方法 抛出异常 返回特定值 阻塞 阻塞特定时间
入队 add(e) offer(e) put(e) offer(e, time, unit)
出队 remove() poll() take() poll(time, unit)
获取队首元素 element() peek() 不支持 不支持

BlockingQueue实现类及原理

常见的实现BlockingQueue的类有:

  • ArrayBlockingQueue: 基于数组的有界阻塞队列,需要指定队列的容量
  • LinkedBlockingQueue: 基于链表的可选有界阻塞队列,默认大小为Integer.MAX_VALUE
  • PriorityBlockingQueue: 支持优先级排序的无界阻塞队列
  • DelayQueue: 延迟队列,其中的元素只有在其指定的延迟时间过后才能从队列中获取
  • SynchronousQueue: 容量为1的阻塞队列,用于直接传递元素,通常用于线程池等场景

使用BlockingQueue可以有效地进行线程间通信和协作,让多线程编程更加简单和安全。它是在并发编程中常用的工具之一,能够很好地处理生产者-消费者问题以及其他队列相关的任务

其中在日常开发中用的比较多的是 ArrayBlockingQueueLinkedBlockingQueue,本文也将主要介绍这两个实现类的原理

ArrayBlockingQueue的用法和原理

ArrayBlockingQueue的类定义

image-20230726190657928

实现了 BlockingQueue 接口,并继承了抽象队列类 AbstractQueue(封装了部分通用方法)

1
2
3
4
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

}
ArrayBlockingQueue类属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 用来存放数据的数组
final Object[] items;

// 下次取数据的数组下标位置
int takeIndex;

// 下次放数据的数组下标位置
int putIndex;

// 当前已有元素的个数
int count;

// 独占锁,用来保证存取数据安全
final ReentrantLock lock;

// 取数据的条件
private final Condition notEmpty;

// 放数据的条件
private final Condition notFull;

在 ArrayBlockingQueue 中,还定义了队列元素存储以及入队、出队操作的属性。

  • final Object[] items:由于ArrayBlockingQueue是基于数组实现的阻塞队列,所以使用items数组,存储队列中的元素
  • int takeIndexint putIndex:两个items数组的索引值,分别指向出队元素的索引值以及将要入队元素的索引值;通过这两个索引,可以控制元素从items数组中如何进行出队和入队
  • int count:当前队列中的元素数量,通过该值实现了队列有界性

除了上述几个属性,还需要部分属性进行并发控制,在BlockingQueue中使用了双Condition算法进行并发控制,主要通过如下几个变量实现:

  • ReentrantLock lock:这里使用了ReetrantLock作为独占锁,进行并发控制
  • Condition notEmptyCondition notFull:定义了两个阻塞唤醒条件,分别表示等待出队的条件等待入队的条件
构造方法

在 ArrayBlockingQueue 构造方法中,主要功能时初始化元素数组以及锁和 condition 条件;可以通过capacity 变量指定有界队列的元素数量,以及通过 fair 指定是否使用公平锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/** 指定队列元素数量capacity,并使用非公平锁进行并发控制 */
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

/** 指定队列元素数量capacity,并通过fair变量指定使用公平锁/非公平锁进行并发控制*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity]; // 初始化元素数组
lock = new ReentrantLock(fair); // 初始化锁
notEmpty = lock.newCondition(); // 初始化出队条件
notFull = lock.newCondition(); // 初始化入队条件
}

ArrayBlockingQueue 中4组存取数据的方法实现也是大同小异,本次以put和take方法进行解析

入队逻辑

put

image

无论是放数据还是取数据都是从队头开始,逐渐往队尾移动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 放数据,如果队列已满,就一直阻塞,直到有其他线程从队列中取走数据
public void put(E e) throws InterruptedException {
// 校验元素不能为空
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加锁,加可中断的锁
lock.lockInterruptibly();
try {
// 如果队列已满,就一直阻塞,直到被唤醒
while (count == items.length)
notFull.await();
// 如果队列未满,就往队列添加元素
enqueue(e);
} finally {
// 结束后,别忘了释放锁
lock.unlock();
}
}

// 实际往队列添加数据的方法
private void enqueue(E x) {
// 获取数组
final Object[] items = this.items;
// putIndex 表示本次插入的位置
items[putIndex] = x;
// ++putIndex 计算下次插入的位置
// 如果本次插入的位置,正好是队尾,下次插入就从 0 开始
if (++putIndex == items.length)
putIndex = 0;
// 元素数量加一
count++;
// 唤醒因为队列空等待的线程
notEmpty.signal();
}

源码中有个有意思的设计,添加元素的时候如果已经到了队尾,下次就从队头开始添加,相当于做成了一个循环队列。像下面这样:

image

出队逻辑

take

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 取数据,如果队列为空,就一直阻塞,直到有其他线程往队列中放数据
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加锁,加可中断的锁
lock.lockInterruptibly();
try {
// 如果队列为空,就一直阻塞,直到被唤醒
while (count == 0)
notEmpty.await();
// 如果队列不为空,就从队列取数据
return dequeue();
} finally {
// 结束后,别忘了释放锁
lock.unlock();
}
}

// 实际从队列取数据的方法
private E dequeue() {
// 获取数组
final Object[] items = this.items;
// takeIndex 表示本次取数据的位置,是上一次取数据时计算好的
E x = (E) items[takeIndex];
// 取完之后,就把队列该位置的元素删除
items[takeIndex] = null;
// ++takeIndex 计算下次取数据的位置
// 如果本次取数据的位置,正好是队尾,下次就从 0 开始取数据
if (++takeIndex == items.length)
takeIndex = 0;
// 元素数量减一
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒被队列满所阻塞的线程
notFull.signal();
return x;
}
阻塞实现

通过上面的描述,我们了解了基于数组的阻塞队列的入队和出队实现逻辑,但是我们还剩下最后一个疑问,当入队和出队时,如果无法直接进行入队和出队操作,需要进行阻塞等待,那么阻塞是如何实现的呢?在 ArrayBlockingQueue 中主要是使用独占锁 ReentrantLock 以及两个条件队列 notFullnotEmpty 实现的。

我们首先看一下阻塞入队的方法 put(E e),下面是其代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
try {
while (count == items.length) {
// 如果队列已满,线程阻塞,并添加到notFull条件队列中等待唤醒
notFull.await();
}
// 如果队列未满,则调用enqueue方法进行入队操作
enqueue(e);
} finally {
// 释放锁
lock.unlock();
}
}

调用 put 方法进行阻塞式入队的基本流程为:

  • 首先,在进行入队操作前,使用 ReentrantLock 进行加锁操作,保证只有一个线程执行入队或出队操作;如果锁被其他线程占用,则等待;
  • 如果加锁成功,则首先判断队列是否满,也就是 while(count == items.length);如果队列已满,则调用 notFull.await(),将当前线程阻塞,并添加到 notFull条件队列 中等待唤醒;如果队列不满,则直接调用 enqueue 方法,进行元素插入;
  • 当前线程添加到 notFull 条件队列中后,只有当其他线程有出队操作时,会调用 notFull.signal() 方法唤醒等待的线程;当前线程被唤醒后,还需要再次进行一次队列是否满的判断,如果此时队列不满才可以进行 enqueue 操作,否则仍然需要再次阻塞等待,这也就是为什么在判断队列是否满时使用 while 的原因,即避免当前线程被意外唤醒,或者唤醒后被其他线程抢先完成入队操作。
  • 最后,当完成入队操作后,在finally代码块中进行锁释放 lock.unlock,完成 put入队操作

下面我们再来看下阻塞出队方法 take(),代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
try {
while (count == 0)
// 判断队列是否为空,如果为空则线程阻塞,添加到notEmpty条件队列等待
notEmpty.await();
// 队列不为空,进行出队操作
return dequeue();
} finally {
// 释放锁
lock.unlock();
}
}

其实take方法与put方法类似,主要流程也是先加锁,然后循环判断队列是否为空,如果为空则添加到notEmpty条件队列等待,如果不为空则进行出队操作;最后进行锁释放。

指定等待时间的阻塞实现

OK,到这里我们了解了如何进行阻塞的入队和出队操作,在 ArrayBlockingQueue 中还支持指定等待时间的阻塞式入队和出队操作,分别是 offer(e, time, unit)poll(time, unit)方法。这里我们就只要看下 offer(e, time, unit)的实现逻辑,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
// 获取剩余等待时间
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
try {
// 判断队列是否满
while (count == items.length) {
if (nanos <= 0)
// 入队队列满,等待时间为0,则入队失败,返回false
return false;
// 如果队列满,等待时间大于0,且未到等待时间,则继续等待nanos
nanos = notFull.awaitNanos(nanos);
}
// 队列不满,进行入队操作
enqueue(e);
return true;
} finally {
// 释放锁
lock.unlock();
}
}

在上面代码中,我们重点看下 while 循环中判断队列是否满的条件:

  • 当队列满时,则首先判断剩余等待时间是否为0,如果为0表示已经到了等待时间,此时入队失败,直接返回 false
  • 当剩余等待时间大于0时,则需要继续等待,即调用 nanos = notFull.awaitNanos(nanos),当该线程被唤醒时,awaitNanos 会返回剩余的等待时间 nanos,根据 nanos 则可以判断是否已经到等待时间

在出队方法 poll(time, unit) 方法中,实现逻辑类似,这里不再赘述,有兴趣的小伙伴可以自行查看源码研究哦。

ArrayBlockingQueue 原理总结
  • ArrayBlockingQueue 是一个有界阻塞队列,初始化时需要指定容量大小
  • 在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用 ArrayBlockingQueue 是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞
  • 使用独占锁 ReentrantLock 实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈

LinkedBlockingQueue的用法和原理

LinkedBlockingQueue类定义

image-20230726190624089

LinkedBlockingQueue实现了BlockingQueue接口,并继承了AbstractQueue类,其代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

/** 队列的容量,如果不传则默认Integer.MAX_VALUE */
private final int capacity;

/** 当前队列中元素数量 */
private final AtomicInteger count = new AtomicInteger();

/**
* 链表的头指针,head.item = null
*/
transient Node<E> head;
/**
* 链表的尾指针 last.next = null
*/
private transient Node<E> last;

/** 出队操作锁 */
private final ReentrantLock takeLock = new ReentrantLock();

/** 出队条件:非空队列 */
private final Condition notEmpty = takeLock.newCondition();

/** 入队锁 */
private final ReentrantLock putLock = new ReentrantLock();

/** 入队条件:非满队列 */
private final Condition notFull = putLock.newCondition();

}

在LinkedBlockingQueue中定义的变量及其含义如下:

  • capacity:该变量表示队列的容量,设置该值则变为一个有界队列;如果不设置的话默认取值为Integer.MAX_VALUE,也可以认为是无界队列
  • count:当前队列中元素的数量
  • headlast:分别表示链表的头尾节点,其中头结点head不存储元素,head.item = null
  • takeLocknotEmpty:出队的锁以及出队条件
  • putLocknotFull:入队的锁以及入队条件

可以看出与ArrayBlockingQueue不同的是,在LinkedBlockingQueue中,入队和出队分别使用两个锁,两个锁可以分别认为是毒写锁和读锁,这里的具体原因在后面会进行详细描述

链表节点定义

LinkedBlockingQueue是基于链表实现的,所以链表的节点定义如下,在Node<E>节点中分别定义了元素item以及后继节点next

1
2
3
4
5
6
7
8
static class Node<E> {
E item;

// 后继节点,
Node<E> next;

Node(E x) { item = x; }
}
构造方法定义

然后我们再来看一下构造方法定义,在LinkedBlockingQueue中提供了三个构造方法,分别是默认构造方法、指定队列容量的构造方法、基于集合的构造方法;

在构造方法中,需要设置队列的容量,并初始化链表的头尾节点;基于集合的构造方法,会根据输入的集合,构建一个非空的队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 默认构造方法,队列容量为Integer.MAX_VALUE
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

/**
* 指定队列容量的构造方法
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 初始化链表的头尾节点
last = head = new Node<E>(null);
}

/**
* 基于集合构建队列,默认容量为Integer.MAX_VALUE
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
入队逻辑

put 方法定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
//获得添加锁,
final ReentrantLock putLock = this.putLock;
//获得当前队列中的元素数量
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//如果当前队列中元素的数量等于队列的容量,则阻塞当前线程,
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
//当前线程中元素数量增1,返回操作前的数量
c = count.getAndIncrement();
//c+1其实是当前队列中元素的数量,如果比容量小,则唤醒notFull的操作,即可以进行继续添加,执行put等添加操作。
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//说明在执行enqueue前的数量为0,执行完enqueue后数量为1,则需要唤醒取进程。
if (c == 0)
signalNotEmpty();
}
  • put(e) 方法的执行步骤大体如下,

    • 判断要put的元素e是否为null,如果为null直接抛出空指针异常;
    • e不为null,则使用e创建一个Node节点,获得put锁;
    • 判断当前队列中的元素数量和队列的容量,如果相等,则阻塞当前线程;
    • 如果不相等,把生成的node节点插入队列,enqueue方法定义如下,
    1
    2
    3
    4
    5
    6
    7
    private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
    }

    // 在enqueue(node)操作中,就是将插入节点设置为尾结点的next节点,也就是last.next = node,然后再修改尾结点为新插入的节点,即:last = last.next,完成了入队节点的插入操作
    • 使用原子操作类把当前队列中的元素数量增1;如果添加后的队列中的元素数量比容量小,则表示可以继续执行put类的操作,唤醒notFull.singal();
    • 如果c=0,即在enqueue前为空,数量为0(此时会阻塞take进程),enqueue后为1,则需要唤醒take进程,如下
    1
    2
    3
    4
    5
    6
    7
    8
    9
    private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
    notEmpty.signal();
    } finally {
    takeLock.unlock();
    }
    }

offer 方法

offer方法的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}

此方法的执行步骤大体如下,

  • 判断当前队列中的元素数量和队列容量,如果相等,直接返回false;
  • 如果当前队列中元素数量小于队列容量,执行入队操作;
  • 入队操作之后,判断队列中元素数量如果仍小于队列容量,唤醒其他的阻塞线程;
  • 如果c==0(即入队成功,队列中元素的数量为1),则需要唤醒阻塞在put锁的线程;

add

LinkedBlockingQueue 中,由于继承了 AbstractQueue 类,所以add方法也是使用的AbstractQueue中的定义,代码如下;add方法直接调用了offer(E e)方法,并判断是否入队成功,如果入队失败则抛出 IllegalStateException 异常。

1
2
3
4
5
6
7
8
9
public boolean add(E e) {
// 直接调用offer(e)方法进行入队
if (offer(e))
// 入队成功:返回true
return true;
else
// 入队失败:抛出异常
throw new IllegalStateException("Queue full");
}
出队逻辑

take方法和put刚好相反,其定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public E take() throws InterruptedException {
E x;
int c = -1;
//获得当前队列的元素数量
final AtomicInteger count = this.count;
//获得take锁
final ReentrantLock takeLock = this.takeLock;
//执行take操作
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();//阻塞当前线程
}
x = dequeue();
//当前队列的数量减1,返回操作前的数量
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//当前队列中元素数量为capacity-1,即未满,可以调用put方法,需要唤醒阻塞在put锁上的线程
if (c == capacity)
signalNotFull();
return x;
}

此方法的执行步骤大体如下,

  • 获得take锁,表示执行take操作;
  • 获得当前队列的元素数量,如果数量为0,则阻塞当前线程,直到被中断或者被唤醒;
  • 如果当前队列的元素数量不额外i0,则执行出队操作;
1
2
3
4
5
6
7
8
9
10
11
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;//head赋值给h
Node<E> first = h.next;//相当于第二个节点赋值给first
h.next = h; // help GC
head = first;//头节点指向第二个节点
E x = first.item;
first.item = null;
return x;
}

从上面的代码可以看出把头节点进行出队,即head指向下一个节点

  • 当前队列的元素数量减一,并返回操作前的数量;
  • 如果之前大于1(c最小为2),指向dequeue后数量最小为1,证明队列中仍有元素,需要唤醒获得take锁的其他阻塞线程,take.singal();
  • 如果c等于当前队列的容量(执行完dequeue后,当前队列中元素的数量等于capacity-1,则未满),则需要唤醒获得put锁的其他put线程;
1
2
3
4
5
6
7
8
9
10
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//唤醒阻塞在put锁的其他线程
notFull.signal();
} finally {
putLock.unlock();
}
}

poll

poll方法定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

此方法的执行步骤大体如下,

  • 如果当前队列元素数量为0,直接返回null
  • 如果当前队列元素数量大于0,执行出队操作
  • 如果c>1,即c最小为2,则出队成功后,仍有1个元素,可以唤醒阻塞在take锁的线程
  • 如果c=capacity,则出队成功后,队列中的元素为capacity-1,这时队列为满,可以唤醒阻塞在put锁上的其他线程,即可以添加元素

remove

LinkedBlockingQueue中,remove 方法也是直接使用的父类 AbstractQueue 中的 remove 方法,代码如下;remove 方法直接调用了 poll() 方法,如果出队成功则返回出队元素,出队失败则抛出NoSuchElementException 异常。

1
2
3
4
5
6
7
8
9
10
public E remove() {
// 调用poll()方法进行出队
E x = poll();
if (x != null)
// 出队成功:返回出队元素
return x;
else
// 出队失败:抛出异常
throw new NoSuchElementException();
}

LinkedBlockingQueue 对比 ArrayBlockingQueue

ArrayBlockingQueue中,使用了一个 ReentrantLock lock 作为入队和出队的锁,并使用两个条件 notEmptynotFull 来进行线程间通信。而在本文介绍的 LinkedBlockingQueue中,使用了两个锁 putLocktakeLock 分别作为入队和出队的锁,同样使用了两个锁的两个条件 notFullnotEmpty 进行线程间通信

由于在 ArrayBlockingQueue 中,入队和出队操作共用了同一个锁,所以两个操作之间会有相互影响;而在 LinkedBlockingQueue 中,入队和出队操作分别使用不同的锁,则入队和出队互不影响,可以提供队列的操作性能


BlockingQueue 原理与使用
https://sugayoiya.github.io/posts/29247.html
作者
Sugayoiya
发布于
2021年9月15日
许可协议