Mr Dk.'s BlogMr Dk.'s Blog
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • ☕ Java Development Kit 8
    • java.io

      • Abstract Class - java.io.InputStream
      • Abstract Class - java.io.OutputStream
      • Abstract Class - java.io.Reader
      • Class - java.io.BufferedInputStream
      • Class - java.io.BufferedOutputStream
      • Class - java.io.BufferedReader
      • Class - java.io.ByteArrayInputStream
      • Class - java.io.ByteArrayOutputStream
      • Class - java.io.DataInputStream
      • Class - java.io.DataOutputStream
      • Class - java.io.FileInputStream
      • Class - java.io.FileOutputStream
      • Class - java.io.FileReader
      • Class - java.io.FilterInputStream
      • Class - java.io.FilterOutputStream
      • Class - java.io.InputStreamReader
      • Class - java.io.PipedInputStream
      • Class - java.io.PipedOutputStream
      • Class - java.io.PushbackInputStream
      • Class - java.io.SequenceInputStream
      • Interface - java.io.Closeable
    • java.lang

      • Abstract Class - java.lang.AbstractStringBuilder
      • Class - java.lang.Integer
      • Class - java.lang.String
      • Class - java.lang.ThreadLocal
    • java.nio

      • Abstract Class - java.nio.Buffer
    • java.util

      • Abstract Class - java.util.AbstractCollection
      • Abstract Class - java.util.AbstractList
      • Abstract Class - java.util.AbstractMap
      • Abstract Class - java.util.AbstractQueue
      • Abstract Class - java.util.AbstractSet
      • Class - java.util.ArrayList
      • Class - java.util.HashMap
      • Class - java.util.HashSet
      • Class - java.util.IdentityHashMap
      • Class - java.util.LinkedHashMap
      • Class - java.util.LinkedHashSet
      • Class - java.util.LinkedList
      • Class - java.util.PriorityQueue
      • Class - java.util.TreeMap
      • Class - java.util.TreeSet
      • Interface - java.util.Collection
      • Interface - java.util.Deque
      • Interface - java.util.Iterator
      • Interface - java.util.Iterator
      • Interface - java.util.Map
      • Interface - java.util.NavigableMap
      • Interface - java.util.NavigableSet
      • Interface - java.util.Queue
      • Interface - java.util.Set
      • Interface - java.util.SortedMap
      • Interface - java.util.SortedSet
    • java.util.concurrent

      • Abstract Class - java.util.concurrent.atomic.AtomicIntegerFieldUpdater
      • Abstract Class - java.util.concurrent.locks.AbstractExecutorService
      • Abstract Class - java.util.concurrent.locks.AbstractOwnableSynchronizer
      • Abstract Class - java.util.concurrent.locks.AbstractQueuedSynchronizer
      • Class - java.util.concurrent.ArrayBlockingQueue
      • Class - java.util.concurrent.ConcurrentHashMap
      • Class - java.util.concurrent.ConcurrentLinkedQueue
      • Class - java.util.concurrent.DelayQueue
      • Class - java.util.concurrent.ExecutorCompletionService
      • Class - java.util.concurrent.FutureTask
      • Class - java.util.concurrent.LinkedBlockingQueue
      • Class - java.util.concurrent.LinkedTransferQueue
      • Class - java.util.concurrent.SynchronousQueue
      • Class - java.util.concurrent.ThreadPoolExecutor
      • Class - java.util.concurrent.atomic.AtomicInteger
      • Class - java.util.concurrent.atomic.AtomicIntegerArray
      • Class - java.util.concurrent.atomic.AtomicReference
      • Class - java.util.concurrent.atomic.AtomicStampedReference
      • Class - java.util.concurrent.locks.ReentrantLock
      • Class - java.util.concurrent.locks.ReentrantReadWriteLock
      • Interface - java.util.concurrent.BlockingQueue
      • Interface - java.util.concurrent.CompletionService
      • Interface - java.util.concurrent.Executor
      • Interface - java.util.concurrent.ExecutorService
      • Interface - java.util.concurrent.Future
      • Interface - java.util.concurrent.ScheduledExecutorService
      • Interface - java.util.concurrent.TransferQueue
      • Interface - java.util.concurrent.locks.Lock
      • Interface - java.util.concurrent.locks.ReadWriteLock

Class - java.util.concurrent.ArrayBlockingQueue

Created by : Mr Dk.

2020 / 10 / 21 15:21

Nanjing, Jiangsu, China


Definition

底层基于数组实现的 BlockQueue (与链表实现的阻塞队列形成对照)。FIFO 的逻辑类似,但数组是有边界的,所以基于数组实现的阻塞队列是一个经典的 有容量 队列。当队列容量已满时,插入线程将会被阻塞;当队列容量为空时,读取线程也会被阻塞。另外,这个类还支持可选的 公平性 参数,以牺牲吞吐量为代价,保证加入队列的 FIFO 顺序一定被满足。

/**
 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
 * array.  This queue orders elements FIFO (first-in-first-out).  The
 * <em>head</em> of the queue is that element that has been on the
 * queue the longest time.  The <em>tail</em> of the queue is that
 * element that has been on the queue the shortest time. New elements
 * are inserted at the tail of the queue, and the queue retrieval
 * operations obtain elements at the head of the queue.
 *
 * <p>This is a classic &quot;bounded buffer&quot;, in which a
 * fixed-sized array holds elements inserted by producers and
 * extracted by consumers.  Once created, the capacity cannot be
 * changed.  Attempts to {@code put} an element into a full queue
 * will result in the operation blocking; attempts to {@code take} an
 * element from an empty queue will similarly block.
 *
 * <p>This class supports an optional fairness policy for ordering
 * waiting producer and consumer threads.  By default, this ordering
 * is not guaranteed. However, a queue constructed with fairness set
 * to {@code true} grants threads access in FIFO order. Fairness
 * generally decreases throughput but reduces variability and avoids
 * starvation.
 *
 * <p>This class and its iterator implement all of the
 * <em>optional</em> methods of the {@link Collection} and {@link
 * Iterator} interfaces.
 *
 * <p>This class is a member of the
 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
 * Java Collections Framework</a>.
 *
 * @since 1.5
 * @author Doug Lea
 * @param <E> the type of elements held in this collection
 */
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /**
     * Serialization ID. This class relies on default serialization
     * even for the items array, which is default-serialized, even if
     * it is empty. Otherwise it could not be declared final, which is
     * necessary here.
     */
    private static final long serialVersionUID = -817911632652898426L;
}

Fields

该类显然会使用一个对象数组作为队列内存。数组被修饰为 final,表示一经初始化后 容量不可改变。为了避免元素的复制,基于数组的队列实现一般都会循环利用数组空间,因此队列的头尾直接体现为两个数组下标。count 用于维护队列中的元素个数。

在并发控制方面,这个类使用了一把可重入锁来锁定所有的存取操作。

/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count;

/*
    * Concurrency control uses the classic two-condition algorithm
    * found in any textbook.
    */

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

Constructor

对于构造函数来说,必备的参数是队列容量,用于初始化队列内部的数组。可选的参数是 是否启用公平竞争机制,默认是非公平锁,以及在初始化数组后预先放入一个集合内的所有元素:

  • 根据 capacity 参数初始化内部数组
  • 初始化可重入锁 (公平 / 非公平),以及非空 / 非满的阻塞条件
  • 如果需要加入初始元素
    • 首先对队列加锁
    • 依次放入集合元素
    • 初始元素溢出将会产生异常
    • 妥善设置队尾位置
    • 解锁
/**
 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 * capacity and default access policy.
 *
 * @param capacity the capacity of this queue
 * @throws IllegalArgumentException if {@code capacity < 1}
 */
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

/**
 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 * capacity and the specified access policy.
 *
 * @param capacity the capacity of this queue
 * @param fair if {@code true} then queue accesses for threads blocked
 *        on insertion or removal, are processed in FIFO order;
 *        if {@code false} the access order is unspecified.
 * @throws IllegalArgumentException if {@code capacity < 1}
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

/**
 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 * capacity, the specified access policy and initially containing the
 * elements of the given collection,
 * added in traversal order of the collection's iterator.
 *
 * @param capacity the capacity of this queue
 * @param fair if {@code true} then queue accesses for threads blocked
 *        on insertion or removal, are processed in FIFO order;
 *        if {@code false} the access order is unspecified.
 * @param c the collection of elements to initially contain
 * @throws IllegalArgumentException if {@code capacity} is less than
 *         {@code c.size()}, or less than 1.
 * @throws NullPointerException if the specified collection or any
 *         of its elements are null
 */
public ArrayBlockingQueue(int capacity, boolean fair,
                            Collection<? extends E> c) {
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

Input Operations

Offer

非阻塞方法,向队尾插入元素。如果插入成功则返回 true,如果插入失败则返回 false。在操作前先要获取锁。

/**
 * Inserts the specified element at the tail of this queue if it is
 * possible to do so immediately without exceeding the queue's capacity,
 * returning {@code true} upon success and {@code false} if this queue
 * is full.  This method is generally preferable to method {@link #add},
 * which can fail to insert an element only by throwing an exception.
 *
 * @throws NullPointerException if the specified element is null
 */
public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

该函数还有定时阻塞版本:当队列已满时,等待指定长时间后如果队列还没有空,则返回 false:

/**
 * Inserts the specified element at the tail of this queue, waiting
 * up to the specified wait time for space to become available if
 * the queue is full.
 *
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

以下是具体的入队函数。在队尾放入元素后,将队尾指针自增 - 如果队尾已经到达数组尾部,则回到数组开头。增加计数器。释放非空的信号给可能存在的 take 线程。

/**
 * Inserts element at current put position, advances, and signals.
 * Call only when holding lock.
 */
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

Add

默认使用了父类的 add()。在父类中可以看到,实际上使用 offer() 作为实现,进行非阻塞插入操作:

  • 如果插入成功,则返回 true
  • 如果因队列已满而插入失败,则抛出异常
/**
 * Inserts the specified element at the tail of this queue if it is
 * possible to do so immediately without exceeding the queue's capacity,
 * returning {@code true} upon success and throwing an
 * {@code IllegalStateException} if this queue is full.
 *
 * @param e the element to add
 * @return {@code true} (as specified by {@link Collection#add})
 * @throws IllegalStateException if this queue is full
 * @throws NullPointerException if the specified element is null
 */
public boolean add(E e) {
    return super.add(e);
}

父类中的实现:

/**
 * Inserts the specified element into this queue if it is possible to do so
 * immediately without violating capacity restrictions, returning
 * <tt>true</tt> upon success and throwing an <tt>IllegalStateException</tt>
 * if no space is currently available.
 *
 * <p>This implementation returns <tt>true</tt> if <tt>offer</tt> succeeds,
 * else throws an <tt>IllegalStateException</tt>.
 *
 * @param e the element to add
 * @return <tt>true</tt> (as specified by {@link Collection#add})
 * @throws IllegalStateException if the element cannot be added at this
 *         time due to capacity restrictions
 * @throws ClassCastException if the class of the specified element
 *         prevents it from being added to this queue
 * @throws NullPointerException if the specified element is null and
 *         this queue does not permit null elements
 * @throws IllegalArgumentException if some property of this element
 *         prevents it from being added to this queue
 */
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

Put

向队列中插入元素,如果队列已满,则阻塞等待直到队列中有空位。

/**
 * Inserts the specified element at the tail of this queue, waiting
 * for space to become available if the queue is full.
 *
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

Output Operations

Poll

非阻塞函数,从队列中取出元素。如果队列为空,则立刻返回 null:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

定时阻塞函数,从队列中取出元素。在等待指定长度的时间后,如果队列依旧为空,则返回 null:

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

其中用到的出队函数:

  • 从 takeIndex (队头) 处取出元素,将队头对应的引用设置为 null
  • 队头向后移;如果队头超过数组边界,则绕回数组的最开头
  • count 计数器自减
  • 向可能等待的 put 线程释放信号
/**
 * Extracts element at current take position, advances, and signals.
 * Call only when holding lock.
 */
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

Take

阻塞函数,从队列中取出元素。如果队列为空,则一直阻塞直至能够从队列中取出元素为止。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

Peek

瞥一眼当前队头元素 (不进行出队操作):

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

Size

返回当前的队列长度。在对队列进行上锁后,直接返回 count 计数器的值:

// this doc comment is overridden to remove the reference to collections
// greater in size than Integer.MAX_VALUE
/**
 * Returns the number of elements in this queue.
 *
 * @return the number of elements in this queue
 */
public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return count;
    } finally {
        lock.unlock();
    }
}

返回数组中剩余空间的长度。在上锁后,由数组容量减去当前队列长度就是剩余空间:

// this doc comment is a modified copy of the inherited doc comment,
// without the reference to unlimited queues.
/**
 * Returns the number of additional elements that this queue can ideally
 * (in the absence of memory or resource constraints) accept without
 * blocking. This is always equal to the initial capacity of this queue
 * less the current {@code size} of this queue.
 *
 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
 * an element will succeed by inspecting {@code remainingCapacity}
 * because it may be the case that another thread is about to
 * insert or remove an element.
 */
public int remainingCapacity() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return items.length - count;
    } finally {
        lock.unlock();
    }
}

Iteration

Remove

从队头开始遍历队列,删除 equals() 返回 true 的一个元素,并返回 true;如果没有对队列进行修改,则返回 false:

/**
 * Removes a single instance of the specified element from this queue,
 * if it is present.  More formally, removes an element {@code e} such
 * that {@code o.equals(e)}, if this queue contains one or more such
 * elements.
 * Returns {@code true} if this queue contained the specified element
 * (or equivalently, if this queue changed as a result of the call).
 *
 * <p>Removal of interior elements in circular array based queues
 * is an intrinsically slow and disruptive operation, so should
 * be undertaken only in exceptional circumstances, ideally
 * only when the queue is known not to be accessible by other
 * threads.
 *
 * @param o element to be removed from this queue, if present
 * @return {@code true} if this queue changed as a result of the call
 */
public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}

其中删除元素的具体逻辑:

  • 如果被删除的元素恰好在队头,则直接出队
  • 如果被删除的元素不在队头,需要把从该元素开始至队尾的每个元素向前移动
  • 向可能存在的 put 线程释放信号
/**
 * Deletes item at array index removeIndex.
 * Utility for remove(Object) and iterator.remove.
 * Call only when holding lock.
 */
void removeAt(final int removeIndex) {
    // assert lock.getHoldCount() == 1;
    // assert items[removeIndex] != null;
    // assert removeIndex >= 0 && removeIndex < items.length;
    final Object[] items = this.items;
    if (removeIndex == takeIndex) {
        // removing front item; just advance
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {
        // an "interior" remove

        // slide over all others up through putIndex.
        final int putIndex = this.putIndex;
        for (int i = removeIndex;;) {
            int next = i + 1;
            if (next == items.length)
                next = 0;
            if (next != putIndex) {
                items[i] = items[next];
                i = next;
            } else {
                items[i] = null;
                this.putIndex = i;
                break;
            }
        }
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    notFull.signal();
}

Contains

上锁并遍历数组,判断是否包含 equals() 返回 true 的元素。

/**
 * Returns {@code true} if this queue contains the specified element.
 * More formally, returns {@code true} if and only if this queue contains
 * at least one element {@code e} such that {@code o.equals(e)}.
 *
 * @param o object to be checked for containment in this queue
 * @return {@code true} if this queue contains the specified element
 */
public boolean contains(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            do {
                if (o.equals(items[i]))
                    return true;
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}

Iterator

以后再做补充,这里的迭代器实现还挺奇怪的。


Edit this page on GitHub
Prev
Abstract Class - java.util.concurrent.locks.AbstractQueuedSynchronizer
Next
Class - java.util.concurrent.ConcurrentHashMap