LinkedBlockingQueue 详解

LinkedBlockingQueue 详解

LinkedBlockingQueue 是 Java 并发包 java.util.concurrent 提供的一个集合类。

LinkedBlockingQueue 提供了 add()、put()、offer() 方法来添加元素,提供了 peek()、take()、poll() 方法来获取队首元素。

往队列中添加元素

offer 方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//如果已达容量上限,则返回false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
//获取互斥锁
putLock.lock();
try {
if (count.get() < capacity) {
//将元素加入队尾
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
//如果元素数量未达上限,唤醒调用 notFull.await() 方法的线程
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}

add 方法源码如下:

1
2
3
4
5
6
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

add 方法简单地调用了 offer 方法,区别在于当添加元素失败的时候会抛出异常。

接着看一下 put 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 当元素数量已满,暂停当前线程、释放 putLock 锁
while (count.get() == capacity) {
notFull.await();
}
//元素入队
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}

add()、put()、offer() 方法都可以添加元素,区别在于对于队列数量已满情况下的处理,当队列元素已满时,offer 方法会返回 false、ad d 方法会抛出异常、而 put 方法会暂停当前线程直至有空闲空间。

从队列中取元素

下面看一下 peek 的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
//当队首为空的时候返回 null
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}

由源码可知,peek 方法只是返回队首元素的值,队首元素并未出队。

poll 方法源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
//当集合元素个数大于 0 的时候,队首元素出队列
x = dequeue();
c = count.getAndDecrement();
//唤醒调用 notEmpty.await 的线程
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

最后看一下 take 方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 当集合为空的时候,暂停当前线程使当前线程处于等待状态
while (count.get() == 0) {
notEmpty.await();
}
//出队列
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

peek()、take()、poll() 方法都可以获取队首元素,区别在于当队列元素数量为空的时候,peek() 和 poll() 方法会返回 null, 而 take() 方法会暂停当前线程直至队列中存在新的元素。还有一点,peek 方法只是返回队首元素的值,队首元素并不会出队。