时间:2021-07-01 10:21:17 帮助过:7人阅读
与普通队列相比,阻塞队列另外支持两个附加操作,这两个附加的操作支持阻塞的插入和移除方法。
①支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。
②支持阻塞的移除方法:在队列为空时,获取元素的线程会等待队列变为非空。
一般的阻塞队列相比,只能在“尾部入队、在头部出队”,而双端阻塞队列,在头部和尾部均能入队,在头部和尾部均能出队。
LinkedBlockingDeque继承于抽象类AbstractQueue,并实现了BlockingDeque接口(BlockingDeque扩展了BlockingQueue,它表示一个双向阻塞队列的接口)。它内部主要使用两个”条件“来实现”阻塞插入“、”阻塞移出“,这两个条件分别是”未满“、”非空“。LinkedBlockingDeque是一个有界的双向阻塞队列,如果构造方法没有指定容量,其默认容量是无限大。LinkedBlockingDeque是一个基于双向链表结构的双端队列,内部使用静态内部类Node表示一个链表节点(本文基于JDK1.8)。
BlockingDeque接口方法说明
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> { //BlockingDeque自身定义的方法 ? //在队列头部插入元素,若容量已满抛出异常IllegalStateException void addFirst(E e); //在队列尾部插入元素,若容量已满抛出异常IllegalStateException void addLast(E e); //在队列头部插入元素,若容量已满返回false boolean offerFirst(E e); //在队列尾部插入元素,若容量已满返回false boolean offerLast(E e); //在队列头部插入元素,若容量已满则阻塞等待 void putFirst(E e) throws InterruptedException; //在队列尾部插入元素,若容量已满则阻塞等待 void putLast(E e) throws InterruptedException; //在队列头部插入元素,若容量已满则超时等待 boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException; //在队列尾部插入元素,若容量已满则超时等待 boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException; //在队列头部删除并返回元素,若队列已空则阻塞等待 E takeFirst() throws InterruptedException; //在队列尾部删除并返回元素,若队列已空则阻塞等待 E takeLast() throws InterruptedException; //在队列头部删除并返回元素,若队列已空则超时等待,在超时之前无法出队就返回null E pollFirst(long timeout, TimeUnit unit) throws InterruptedException; //在队列尾部删除并返回元素,若队列已空则超时等待,在超时之前无法出队就返回null E pollLast(long timeout, TimeUnit unit) throws InterruptedException; //从队列头部开始算起,在指定元素第一次出现的位置将之删除 boolean removeFirstOccurrence(Object o); //从队列尾部开始算起,在指定元素第一次出现的位置将之删除 boolean removeLastOccurrence(Object o); ? ? //父接口BlockingQueue的方法 ? boolean add(E e); boolean offer(E e); void put(E e) throws InterruptedException; boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; E remove(); E poll(); E take() throws InterruptedException; E poll(long timeout, TimeUnit unit) throws InterruptedException; E element(); E peek(); boolean remove(Object o); public boolean contains(Object o); public int size(); Iterator<E> iterator(); ? ? //栈方法 ? //在队列的头部压栈 void push(E e); //在队列的头部出栈 E pop(); }BlockingDeque
Node类
static final class Node<E> { E item; Node<E> prev; Node<E> next; Node(E x) { item = x; } }
一个Node对象代表一个链表节点,其属性item表示当前节点保存的元素(item为null时表示当前节点已经被删除),next属性表示当前节点的后继节点,prev属性表示当前节点的前驱节点。
成员变量
/** * Pointer to first node. * Invariant: (first == null && last == null) || * (first.prev == null && first.item != null) */ transient Node<E> first; /** * Pointer to last node. * Invariant: (first == null && last == null) || * (last.next == null && last.item != null) */ transient Node<E> last; /** Number of items in the deque */ private transient int count; /** Maximum number of items in the deque */ private final int capacity; /** Main lock guarding all access */ final ReentrantLock lock = new ReentrantLock(); /** Condition for waiting takes */ private final Condition notEmpty = lock.newCondition(); /** Condition for waiting puts */ private final Condition notFull = lock.newCondition();
first
: 队列的头节点
last
:队列的尾节点
count
:队列中的元素个数
capacity
:队列的指定容量
lock
:可重入锁,保证所有数据访问的线程安全
notEmpty
: 出队时的“非空”条件
notFull
:入队时的“未满”条件
LinkedBlockingQueue(int)是其主要的构造方法,构造方法主要涉及对队列容量的初始化。在使用无参构造方法时,阻塞队列的容量是Integer.MAX_VALUE,即无限大。
在初始化后,队列中不含任何元素时,头节点 、尾节点均是null。
public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } //指定容量 public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } //将某集合元素放入阻塞队列中 public LinkedBlockingDeque(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock lock = this.lock; lock.lock(); // Never contended, but necessary for visibility try { for (E e : c) { if (e == null) throw new NullPointerException(); if (!linkLast(new Node<E>(e))) throw new IllegalStateException("Deque full"); } } finally { lock.unlock(); } }
put
offer
add
offer
等方法都是在队列的尾部添加元素。它们将核心实现委托给 putLast
、offerLast
实现
public void put(E e) throws InterruptedException { putLast(e); } public boolean offer(E e) { return offerLast(e); } public boolean add(E e) { addLast(e); return true; } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { return offerLast(e, timeout, unit); }
putLast 在队尾添加元素
putLast先获取lock锁(lock.lock()
),然后尝试在队尾添加一个新节点(linkLast(node)
),若链接新节点失败,则(notFull.await()
)休眠等待直到等到”未满“通知。
public void putLast(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkLast(node)) notFull.await();//休眠等待 } finally { lock.unlock(); } }
linkLast尝试在队列的尾部添加一个新节点。主要逻辑:
①若队列已满,直接返回false(不能链接新节点) ; ②将待入队节点node作为新的尾节点添加在队尾(last = node
)并更新相关链接关系;③元素计数加1(++count
) ;④唤醒一个等待"非空"条件的线程(notEmpty.signal()
),最后返回true.
private boolean linkLast(Node<E> node) { // assert lock.isHeldByCurrentThread(); if (count >= capacity)//队列已满,不能链接新节点 return false; Node<E> l = last; node.prev = l;//设置node的前驱节点,它的前驱为原尾节点 last = node;//新尾节点是刚添加的节点node //更新原尾节点l的后继节点 if (first == null) //队列中没有任何节点,last first均未初始化, //队列中只有一个节点(元素)时,头节点first 和尾节点last指定同一节点node first = node; else l.next = node;//原尾节点l的后继节点是新尾节点(刚添加的节点node) ++count;//元素个数加1 notEmpty.signal();//通知一个等待"非空"条件的线程 return true; }
offerLast方法与putLast类似,offerLast在检测到队列已满时会直接返回false,不会阻塞等待。
public boolean offerLast(E e) { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { return linkLast(node); } finally { lock.unlock(); } }
多参数的offerLast方法,它可以看作putLast的超时版本。
public boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (!linkLast(node)) { if (nanos <= 0)//在超时之后没能入队,返回false return false; nanos = notFull.awaitNanos(nanos);//超时等待 } return true; } finally { lock.unlock(); } }
addLast 在队尾添加元素,若队列已满抛出异常。
public void addLast(E e) { if (!offerLast(e)) throw new IllegalStateException("Deque full"); }
push 在 在队列的头部插入元素,若队列已经满抛出IllegalStateException异常
public void push(E e) { addFirst(e); } public void addFirst(E e) { if (!offerFirst(e)) throw new IllegalStateException("Deque full"); }
putLast在队列头部插入元素,若容量已满则阻塞等待
先获取lock锁(lock.lock()
),然后尝试在队首添加一个新节点(linkFirst(node)
),若链接新节点失败,则(notFull.await()
)休眠等待直到等到”未满“通知。
public void putFirst(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkFirst(node)) notFull.await(); } finally { lock.unlock(); } }
linkLast尝试在队列的头部链接一个新节点。主要逻辑:
①若队列已满,直接返回false(不能链接新节点) ; ②将待入队节点node作为新的头节点添加在队首(first = node
)并更新相关链接关系;③元素计数加1(++count
) ;④通知一个等待"非空"条件的线程(notEmpty.signal()
),最后返回true.
private boolean linkFirst(Node<E> node) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) //队列已满,不能链接新节点 return false; Node<E> f = first; node.next = f;//设置node的后继节点,它的后继为原头节点 first = node;////新的尾节点是刚添加的节点node //更新原头节点f的前驱节点 if (last == null)//队列中没有任何节点,last first均未初始化 //队列中只有一个节点(元素)时,头节点first 和尾节点last指定同一节点node last = node; else f.prev = node;//原头节点f的前驱节点是新头节点(刚添加的节点node) ++count;//元素个数加1 notEmpty.signal();//通知一个等待"非空"条件的线程 return true; }
offerFirst方法与putFirst类似,但offerFirst在检测到队列已满时会直接返回false,不会阻塞等待。
public boolean offerFirst(E e) { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { return linkFirst(node); } finally { lock.unlock(); } }
多参数的offerFirst方法,它可以看作putFirst的超时版本.
public boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (!linkFirst(node)) { if (nanos <= 0) //在超时后,表明入队失败 return false; nanos = notFull.awaitNanos(nanos);//超时等待 } return true; } finally { lock.unlock(); } }
remove
pop
poll
take
等方法都是在队列的尾部添加元素。它们将核心实现委托给 removeFirst 、pollFirst 、takeFirst等实现
public E take() throws InterruptedException { return takeFirst(); } public E poll() { return pollFirst(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { return pollFirst(timeout, unit); } public E remove() { return removeFirst(); } public E pop() { return removeFirst(); }
takeFirst 先获取lock锁(lock.lock()
),然后尝试在队首取消一个节点的链接(unlinkFirst()
),若取消链接失败(队列已空),则(notEmpty.await()
)休眠等待直到等到”非空“通知。
public E takeFirst() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ( (x = unlinkFirst()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } }
unlinkFirst
尝试取消头节点在链表中的链接关系
①若队列为空,直接返回null(不能取消链接) ; ②将原头节点的后继节点(Node<E> n = f.next
)作为新头节点(first = n
)并更新相关链接关系;③元素计数自减1(--count
) ;④唤醒一个等待"未满"条件的线程(notFull.signal()
),最后返回原头节点中的元素(E item = f.item
).
private E unlinkFirst() { // assert lock.isHeldByCurrentThread(); Node<E> f = first; if (f == null) //链表未初始化,队列中没有任何元素,返回null return null; Node<E> n = f.next; E item = f.item;//保存原头节点的item,最终需要返回头节点的item f.item = null;//然后原头节点的item属性赋空 f.next = f; // help GC first = n;//新头节点是原头节点的后继节点 //设置新头节点的前驱节点 if (n == null) //删除头节点前链表中只有一个节点 //将头节点first、尾节点tail都设为null,链表中没有任何节点了。 last = null; else//删除头节点前链表中至少有两个节点(元素) n.prev = null;//将新头节点的prev设为null(头节点没有前驱节点) --count;//元素个数减1 notFull.signal();//唤醒一个等待”未满“条件的线程 return item; }
pollFirst方法与takeFirst类似,只不过pollFirst在检测到队列为空时会直接返回null,不会阻塞等待。
public E pollFirst() { final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkFirst(); } finally { lock.unlock(); } }
pollFirst(long,TimeUnit)可以看作是超时版本的takeFirst .
public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { E x; while ( (x = unlinkFirst()) == null) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return x; } finally { lock.unlock(); } }
removeFirst直接委托pollFirst实现,若队列为空,则抛出异常NoSuchElementException。
public E removeFirst() { E x = pollFirst(); if (x == null) throw new NoSuchElementException(); return x; }
takeLast
先获取lock锁(lock.lock()
),然后尝试在队尾取消一个节点的链接(unlinkLast()
),若取消链接失败(队列已空),则(notEmpty.await()
)休眠等待直到等到”非空“通知。
public E takeLast() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ( (x = unlinkLast()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } }
unlinkLast
尝试取消尾节点在链表中的链接
①若队列为空,直接返回null(不能取消链接) ; ②将原尾节点的前驱节点(Node<E> p = l.prev
)作为新尾节点(last = p
)并更新相关链接关系;③元素计数自减1(--count
) ;④唤醒一个等待"未满"条件的线程(notFull.signal()
),最后返回原尾节点中的元素(E item = l.item
).
private E unlinkLast() { // assert lock.isHeldByCurrentThread(); Node<E> l = last; if (l == null) //链表未初始化,队列中没有任何元素,返回null return null; Node<E> p = l.prev; E item = l.item;//保存尾节点的item,最终需要返回尾节点的item l.item = null;//然后将原尾节点的item属性清空 //prevn属性自指,在使用迭代器时能标识此节点已被删除 l.prev = l; // help GC last = p;//新尾节点是原尾节点的前驱继节点 //设置新尾节点的后继节点 if (p == null)//删除尾节点前 链表中只有一个节点l //将头节点first、尾节点tail都设为null,链表中没有任何节点了 first = null; else//删除尾节点前链表中至少有两个节点(元素) p.next = null;//将新尾节点的next设为null(尾节点没有后继节点) --count;//元素个数减1 notFull.signal();//唤醒一个等待”未满“条件的线程 return item; }
pollLast方法与takeLast类似,但pollLast在检测到队列为空时会直接返回null,不会阻塞等待。
public E pollLast() { final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkLast(); } finally { lock.unlock(); } }
pollLast(long,TimeUnit)
可以看作是超时版本的takeLast,在超时之前无法出队就返回null.
public E pollLast(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { E x; while ( (x = unlinkLast()) == null) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return x; } finally { lock.unlock(); } }
removeLast`直接委托pollLast实现,若队列为空,则抛出异常NoSuchElementException。
public E removeLast() { E x = pollLast(); if (x == null) throw new NoSuchElementException(); return x; }
remove、 removeFirstOccurrence方法均是从队列头部开始向后查找,在给定元素第一次出现的位置上将之删除。
removeLastOccurrence是从队列尾部开始向前查找,在给定元素第一次出现的位置上将之删除。
public boolean remove(Object o) { return removeFirstOccurrence(o); } public boolean removeFirstOccurrence(Object o) { if (o == null) return false; final ReentrantLock lock = this.lock; lock.lock(); try { for (Node<E> p = first; p != null; p = p.next) {//从队列头部开始向后查找 if (o.equals(p.item)) { unlink(p);//取消此节点在链表中的链接关系 return true; } } return false; } finally { lock.unlock(); } } ? public boolean removeLastOccurrence(Object o) { if (o == null) return false; final ReentrantLock lock = this.lock; lock.lock(); try { for (Node<E> p = last; p != null; p = p.prev) {//队列尾部开始向前查找 if (o.equals(p.item)) { unlink(p); return true; } } return false; } finally { lock.unlock(); } }
unlink
将一个节点从链表中删除
void unlink(Node<E> x) { // assert lock.isHeldByCurrentThread(); Node<E> p = x.prev; Node<E> n = x.next; if (p == null) { //如果待删除节点是头节点, unlinkFirst(); } else if (n == null) { //如果待删除节点是尾节点 unlinkLast(); } else {//待删除节点是非头尾的中间节点 //通过next 、prev属性,将删除节点的前驱节点p和待删除节点的后继节点n直接链接在一起,待删除节点x已被排除在链表外 p.next = n;//待删除节点的前驱节点的next属性设为 待删除节点的后继节点 n.prev = p;//待删除节点的后继节点的prev属性设为 待删除节点的前驱节点 x.item = null;//清空item // Don‘t mess with x‘s links. They may still be in use by // an iterator. --count; notFull.signal(); } }
element
、 getFirst
返回队列的首元素但不删除,若队列为空则抛出异常NoSuchElementException。
peek
、peekFirst
返回队列的首元素但不删除,若队列为空则返回null.
public E element() { return getFirst(); } public E getFirst() { E x = peekFirst(); if (x == null) throw new NoSuchElementException(); return x; } ? public E peek() { return peekFirst(); } public E peekFirst() { final ReentrantLock lock = this.lock; lock.lock(); try { return (first == null) ? null : first.item; } finally { lock.unlock(); } } ?
getLast返回队列的尾元素但不删除,若队列为空则抛出异常NoSuchElementException。
peekLast返回队列的尾元素但不删除,若队列为空则返回null
public E getLast() { E x = peekLast(); if (x == null) throw new NoSuchElementException(); return x; } public E peekLast() { final ReentrantLock lock = this.lock; lock.lock(); try { return (last == null) ? null : last.item; } finally { lock.unlock(); } }
contains
方法,从头到尾遍历链表在队列中查找是否存在此元素
public boolean contains(Object o) { if (o == null) return false; final ReentrantLock lock = this.lock; lock.lock(); try { for (Node<E> p = first; p != null; p = p.next) if (o.equals(p.item)) re