当前位置:Gxlcms > 数据库问题 > 线程池02-LinkedBlockingQueue 阻塞队列

线程池02-LinkedBlockingQueue 阻塞队列

时间:2021-07-01 10:21:17 帮助过:19人阅读

首先,我们先了解一下什么是阻塞队列:

  • 当队列满了时,队列会阻塞插入元素的线程,直到队列不满;

  • 当队列为空时,获取元素的线程会等待队列变成非空。

常用到的方法

技术图片

上面是对阻塞队列的简单了解,下面重点分析一下LinkedBlockingQueue。

源码分析

Node节点

  • 可以看出是单向的链表结构
static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}

构造方法和参数

  • 如果未设置初始容量,则默认是Integer.MAX_VALUE;
   /** 队列容量 */
   private final int capacity;

    /** 目前元素数量 */
    private final AtomicInteger count = new AtomicInteger();

    /** 头节点 */
    transient Node<E> head;

    /** 末尾节点 */
    private transient Node<E> last;

    /** take, poll 方法的锁 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 等待获取 条件队列*/
    private final Condition notEmpty = takeLock.newCondition();

    /** put, offer 方法的锁 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 等待存入的 条件队列 */
    private final Condition notFull = putLock.newCondition();

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

 public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);//初始化的时候设置头节点和尾节点为两个空节点
    }

插入

put 方法

  • 如果队列已经满了,则放入到条件队列中。
public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);//创建新节点
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();//获取put锁
        try {
            //判断存入的元素个数和配置的数量是否相等,如果相等。那么将当前线程放入到条件队列中
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);//将节点插入到末尾
            c = count.getAndIncrement();//元素数量+1
            if (c + 1 < capacity)//当前元素数量小于容量的时候,唤醒“存入条件队列”的头节点到同步队列
                notFull.signal();
        } finally {
            putLock.unlock();释放put锁
        }
        // 唤醒获取条件队列的头节点
        if (c == 0)
            signalNotEmpty();
    }

//将节点设置为尾节点
private void enqueue(Node<E> node) {
    last = last.next = node;
}
// 唤醒“获取条件队列”中的首节点
private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();//t获取ake锁
        try {
            notEmpty.signal();//唤醒“获取条件队列”中的首节点
        } finally {
            takeLock.unlock();
        }
    }

offer 方法

  • 如果超过容量就无法插入
  public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)//如果超过容量直接返回false,表示不能再插入数据
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);//新建节点
        final ReentrantLock putLock = this.putLock;
        putLock.lock();//获取put锁
        try {
            if (count.get() < capacity) {//小于容量时增加节点,并唤醒“存入条件队列”头节点到同步队列
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)//当前元素数量小于容量的时候,唤醒“存入条件队列”的头节点到同步队列
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        // 唤醒获取条件队列的头节点
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

take 方法

  • 获取链表中的头节点。如果不存在元素,则将当前线程放入到条件队列中。
public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//获取锁
        try {
            while (count.get() == 0) {//如果为空则放入“获取条件队列”
                notEmpty.await();
            }
            x = dequeue();//将节点加入到链表最后
            c = count.getAndDecrement();//数量减1
            if (c > 1)//如果元素书大于1,则调用“获取条件队列”中的元素放入同步队列
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        // 唤醒存入条件队列的头节点
        if (c == capacity)
            signalNotFull();
        return x;//返回头节点
    }

// 获取头节点元素
private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }


// 唤醒“存入条件队列”的头节点到同步队列
 private void signalNotFull() {
      final ReentrantLock putLock = this.putLock;
      putLock.lock();
      try {
          notFull.signal();
      } finally {
          putLock.unlock();
      }
  }

poll方法

  • 如果没有数据立即返回null
   public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;//获取锁
        takeLock.lock();
        try {
            if (count.get() > 0) {//当前容器数量大于0时
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        // 唤醒存入条件队列的头节点
        if (c == capacity)
            signalNotFull();
        return x;
    }

总结

1.如何保证当队列没有消息或者消息满了的时候,进行监听?

上面看代码的时候,两段代码刚开始是有点懵的。

1.存入的方法
  // 唤醒获取条件队列的头节点
  if (c == 0) signalNotEmpty();

2.获取的方法
  // 唤醒存入条件队列的头节点
  if (c == capacity) signalNotFull();

其实这就监听的重要环节。
逻辑是这样的。以存入为例:
1.如果当前节点为0,说明队列中没有任务;
2.唤醒“获取条件队列”的头节点,去尝试获取元素。如果获取到则执行,如果没有,则依然放入到“获取条件队列”的末尾;
3.这样就可以保证在存入数据的时候,实时监听获取节点元素了。

线程池02-LinkedBlockingQueue 阻塞队列

标签:src   mamicode   integer   个数   stat   await   不能   not   row   

人气教程排行