LinkedBlockingQueue介绍【1】LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误 。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小 。
【2】LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素 。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行 。
LinkedBlockingQueue使用//指定队列的大小创建有界队列BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(100);//无界队列BlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>();LinkedBlockingQueue的源码分析【1】属性值
// 容量,指定容量就是有界队列private final int capacity;// 元素数量,用原子操作类的原因在于有两个线程都会操作需要保证可见性private final AtomicInteger count = new AtomicInteger();// 链表头本身是不存储任何元素的,初始化时item指向nulltransient Node<E> head;// 链表尾private transient Node<E> last;// take锁锁分离,提高效率private final ReentrantLock takeLock = new ReentrantLock();// notEmpty条件// 当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒private final Condition notEmpty = takeLock.newCondition();// put锁private final ReentrantLock putLock = new ReentrantLock();// notFull条件// 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒private final Condition notFull = putLock.newCondition();//典型的单链表结构static class Node<E> {E item;//存储元素Node<E> next;//后继节点单链表结构Node(E x) { item = x; }}【2】构造函数
public LinkedBlockingQueue() {// 如果没传容量,就使用最大int值初始化其容量this(Integer.MAX_VALUE);}public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;// 初始化head和last指针为空值节点last = head = new Node<E>(null);}public LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);final ReentrantLock putLock = this.putLock;putLock.lock(); // 为保证可见性而加的锁try {int n = 0;for (E e : c) {if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));++n;}count.set(n);} finally {putLock.unlock();}}【3】核心方法分析
1)入队put方法
public void put(E e) throws InterruptedException {// 不允许null元素if (e == null) throw new NullPointerException();int c = -1;// 新建一个节点Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;// 使用put锁加锁putLock.lockInterruptibly();try {// 如果队列满了,就阻塞在notFull上等待被其它线程唤醒(阻塞生产者线程)while (count.get() == capacity) {notFull.await();}// 队列不满,就入队enqueue(node);c = count.getAndIncrement();// 队列长度加1,返回原值// 如果现队列长度小于容量,notFull条件队列转同步队列,准备唤醒一个阻塞在notFull条件上的线程(可以继续入队)// 这里为啥要唤醒一下呢?// 因为可能有很多线程阻塞在notFull这个条件上,而取元素时只有取之前队列是满的才会唤醒notFull,此处不用等到取元素时才唤醒if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock(); // 真正唤醒生产者线程}// 如果原队列长度为0,现在加了一个元素后立即唤醒阻塞在notEmpty上的线程if (c == 0)signalNotEmpty();}private void enqueue(Node<E> node) {// 直接加到last后面,last指向入队元素last = last.next = node;}private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();// 加take锁try {notEmpty.signal();// notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程} finally {takeLock.unlock();// 真正唤醒消费者线程}}
经验总结扩展阅读
- 详解ROMA Connect API 流控实现技术
- Go_Goroutine详解
- Go_Channel详解
- Docker | 容器数据卷详解
- MyBatis之ResultMap的association和collection标签详解
- Future详解
- Go的网络编程详解
- gorm中的关联操作详解
- Go中的闭包、递归
- liunx之expect操作详解