Mr Dk.'s BlogMr Dk.'s Blog
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • ☕ Java Development Kit 8
    • java.io

      • Abstract Class - java.io.InputStream
      • Abstract Class - java.io.OutputStream
      • Abstract Class - java.io.Reader
      • Class - java.io.BufferedInputStream
      • Class - java.io.BufferedOutputStream
      • Class - java.io.BufferedReader
      • Class - java.io.ByteArrayInputStream
      • Class - java.io.ByteArrayOutputStream
      • Class - java.io.DataInputStream
      • Class - java.io.DataOutputStream
      • Class - java.io.FileInputStream
      • Class - java.io.FileOutputStream
      • Class - java.io.FileReader
      • Class - java.io.FilterInputStream
      • Class - java.io.FilterOutputStream
      • Class - java.io.InputStreamReader
      • Class - java.io.PipedInputStream
      • Class - java.io.PipedOutputStream
      • Class - java.io.PushbackInputStream
      • Class - java.io.SequenceInputStream
      • Interface - java.io.Closeable
    • java.lang

      • Abstract Class - java.lang.AbstractStringBuilder
      • Class - java.lang.Integer
      • Class - java.lang.String
      • Class - java.lang.ThreadLocal
    • java.nio

      • Abstract Class - java.nio.Buffer
    • java.util

      • Abstract Class - java.util.AbstractCollection
      • Abstract Class - java.util.AbstractList
      • Abstract Class - java.util.AbstractMap
      • Abstract Class - java.util.AbstractQueue
      • Abstract Class - java.util.AbstractSet
      • Class - java.util.ArrayList
      • Class - java.util.HashMap
      • Class - java.util.HashSet
      • Class - java.util.IdentityHashMap
      • Class - java.util.LinkedHashMap
      • Class - java.util.LinkedHashSet
      • Class - java.util.LinkedList
      • Class - java.util.PriorityQueue
      • Class - java.util.TreeMap
      • Class - java.util.TreeSet
      • Interface - java.util.Collection
      • Interface - java.util.Deque
      • Interface - java.util.Iterator
      • Interface - java.util.Iterator
      • Interface - java.util.Map
      • Interface - java.util.NavigableMap
      • Interface - java.util.NavigableSet
      • Interface - java.util.Queue
      • Interface - java.util.Set
      • Interface - java.util.SortedMap
      • Interface - java.util.SortedSet
    • java.util.concurrent

      • Abstract Class - java.util.concurrent.atomic.AtomicIntegerFieldUpdater
      • Abstract Class - java.util.concurrent.locks.AbstractExecutorService
      • Abstract Class - java.util.concurrent.locks.AbstractOwnableSynchronizer
      • Abstract Class - java.util.concurrent.locks.AbstractQueuedSynchronizer
      • Class - java.util.concurrent.ArrayBlockingQueue
      • Class - java.util.concurrent.ConcurrentHashMap
      • Class - java.util.concurrent.ConcurrentLinkedQueue
      • Class - java.util.concurrent.DelayQueue
      • Class - java.util.concurrent.ExecutorCompletionService
      • Class - java.util.concurrent.FutureTask
      • Class - java.util.concurrent.LinkedBlockingQueue
      • Class - java.util.concurrent.LinkedTransferQueue
      • Class - java.util.concurrent.SynchronousQueue
      • Class - java.util.concurrent.ThreadPoolExecutor
      • Class - java.util.concurrent.atomic.AtomicInteger
      • Class - java.util.concurrent.atomic.AtomicIntegerArray
      • Class - java.util.concurrent.atomic.AtomicReference
      • Class - java.util.concurrent.atomic.AtomicStampedReference
      • Class - java.util.concurrent.locks.ReentrantLock
      • Class - java.util.concurrent.locks.ReentrantReadWriteLock
      • Interface - java.util.concurrent.BlockingQueue
      • Interface - java.util.concurrent.CompletionService
      • Interface - java.util.concurrent.Executor
      • Interface - java.util.concurrent.ExecutorService
      • Interface - java.util.concurrent.Future
      • Interface - java.util.concurrent.ScheduledExecutorService
      • Interface - java.util.concurrent.TransferQueue
      • Interface - java.util.concurrent.locks.Lock
      • Interface - java.util.concurrent.locks.ReadWriteLock

Class - java.util.concurrent.DelayQueue

Created by : Mr Dk.

2020 / 10 / 25 15:24

Nanjing, Jiangsu, China


Definition

延时阻塞队列的实现。所谓的延时是指,队列中的元素只有在指定的时间过去了以后才能被取出;所谓的阻塞是指,从队列中取出 / 放入元素时,如果队列为空 / 满,那么出队 / 入队操作将一直阻塞直到队列中有元素 / 有空位为止。

/**
 * An unbounded {@linkplain BlockingQueue blocking queue} of
 * {@code Delayed} elements, in which an element can only be taken
 * when its delay has expired.  The <em>head</em> of the queue is that
 * {@code Delayed} element whose delay expired furthest in the
 * past.  If no delay has expired there is no head and {@code poll}
 * will return {@code null}. Expiration occurs when an element's
 * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
 * than or equal to zero.  Even though unexpired elements cannot be
 * removed using {@code take} or {@code poll}, they are otherwise
 * treated as normal elements. For example, the {@code size} method
 * returns the count of both expired and unexpired elements.
 * This queue does not permit null elements.
 *
 * <p>This class and its iterator implement all of the
 * <em>optional</em> methods of the {@link Collection} and {@link
 * Iterator} interfaces.  The Iterator provided in method {@link
 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
 * the DelayQueue in any particular order.
 *
 * <p>This class is a member of the
 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
 * Java Collections Framework</a>.
 *
 * @since 1.5
 * @author Doug Lea
 * @param <E> the type of elements held in this collection
 */
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

}

Delayed

可以看到放入该队列的元素需要继承 Delayed 接口,而 Delayed 接口又继承自 Comparable 接口。所以能够放入该队列的元素必须要实现以下两个函数:

  • Delayed 接口的 getDelay() 函数 - 负责获取超时的时间
  • Comparable 接口的 compareTo() 函数负责按超时的时间进行优先级排序
/**
 * Compares this object with the specified object for order.  Returns a
 * negative integer, zero, or a positive integer as this object is less
 * than, equal to, or greater than the specified object.
 *
 * <p>The implementor must ensure <tt>sgn(x.compareTo(y)) ==
 * -sgn(y.compareTo(x))</tt> for all <tt>x</tt> and <tt>y</tt>.  (This
 * implies that <tt>x.compareTo(y)</tt> must throw an exception iff
 * <tt>y.compareTo(x)</tt> throws an exception.)
 *
 * <p>The implementor must also ensure that the relation is transitive:
 * <tt>(x.compareTo(y)&gt;0 &amp;&amp; y.compareTo(z)&gt;0)</tt> implies
 * <tt>x.compareTo(z)&gt;0</tt>.
 *
 * <p>Finally, the implementor must ensure that <tt>x.compareTo(y)==0</tt>
 * implies that <tt>sgn(x.compareTo(z)) == sgn(y.compareTo(z))</tt>, for
 * all <tt>z</tt>.
 *
 * <p>It is strongly recommended, but <i>not</i> strictly required that
 * <tt>(x.compareTo(y)==0) == (x.equals(y))</tt>.  Generally speaking, any
 * class that implements the <tt>Comparable</tt> interface and violates
 * this condition should clearly indicate this fact.  The recommended
 * language is "Note: this class has a natural ordering that is
 * inconsistent with equals."
 *
 * <p>In the foregoing description, the notation
 * <tt>sgn(</tt><i>expression</i><tt>)</tt> designates the mathematical
 * <i>signum</i> function, which is defined to return one of <tt>-1</tt>,
 * <tt>0</tt>, or <tt>1</tt> according to whether the value of
 * <i>expression</i> is negative, zero or positive.
 *
 * @param   o the object to be compared.
 * @return  a negative integer, zero, or a positive integer as this object
 *          is less than, equal to, or greater than the specified object.
 *
 * @throws NullPointerException if the specified object is null
 * @throws ClassCastException if the specified object's type prevents it
 *         from being compared to this object.
 */
public int compareTo(T o);
/**
 * Returns the remaining delay associated with this object, in the
 * given time unit.
 *
 * @param unit the time unit
 * @return the remaining delay; zero or negative values indicate
 * that the delay has already elapsed
 */
long getDelay(TimeUnit unit);

Fields

类的内部使用可重入锁来实现并发控制,维护了一个优先队列。优先队列的实现之前已经看过了,基于一个 Object[] 数组实现了二叉堆,并随着元素的增多会扩展容量。所以延时阻塞队列的入队操作实际上永远不会阻塞。

private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();

/**
 * Thread designated to wait for the element at the head of
 * the queue.  This variant of the Leader-Follower pattern
 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
 * minimize unnecessary timed waiting.  When a thread becomes
 * the leader, it waits only for the next delay to elapse, but
 * other threads await indefinitely.  The leader thread must
 * signal some other thread before returning from take() or
 * poll(...), unless some other thread becomes leader in the
 * interim.  Whenever the head of the queue is replaced with
 * an element with an earlier expiration time, the leader
 * field is invalidated by being reset to null, and some
 * waiting thread, but not necessarily the current leader, is
 * signalled.  So waiting threads must be prepared to acquire
 * and lose leadership while waiting.
 */
private Thread leader = null;

/**
 * Condition signalled when a newer element becomes available
 * at the head of the queue or a new thread may need to
 * become leader.
 */
private final Condition available = lock.newCondition();

这里的线程指针 leader 遵循一种 Leader-Follower 模式,大致意思是,只有一个 leader 线程会等待下一个出队元素 (距离超时最近的元素),其它线程作为 follower 将进入无限期的等待中。当 leader 线程从 take() 或 poll() 返回后,将从 follower 线程中选择一个线程作为新的 leader。


直接开始由构造函数开始一步一步分析。由于可重入锁和优先队列已经构造完毕,所以空构造函数不做任何事。带参数的构造函数会依次将一个集合中的元素初始化到优先队列中。

/**
 * Creates a new {@code DelayQueue} that is initially empty.
 */
public DelayQueue() {}

/**
 * Creates a {@code DelayQueue} initially containing the elements of the
 * given collection of {@link Delayed} instances.
 *
 * @param c the collection of elements to initially contain
 * @throws NullPointerException if the specified collection or any
 *         of its elements are null
 */
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

那么看看 addAll() 是怎么实现的。通过迭代参数集合,分别对每一个元素调用 add()。如果成功进行了至少一次 add() 操作,就会返回 true (即集合发生了修改)。

/**
 * Adds all of the elements in the specified collection to this
 * queue.  Attempts to addAll of a queue to itself result in
 * <tt>IllegalArgumentException</tt>. Further, the behavior of
 * this operation is undefined if the specified collection is
 * modified while the operation is in progress.
 *
 * <p>This implementation iterates over the specified collection,
 * and adds each element returned by the iterator to this
 * queue, in turn.  A runtime exception encountered while
 * trying to add an element (including, in particular, a
 * <tt>null</tt> element) may result in only some of the elements
 * having been successfully added when the associated exception is
 * thrown.
 *
 * @param c collection containing elements to be added to this queue
 * @return <tt>true</tt> if this queue changed as a result of the call
 * @throws ClassCastException if the class of an element of the specified
 *         collection prevents it from being added to this queue
 * @throws NullPointerException if the specified collection contains a
 *         null element and this queue does not permit null elements,
 *         or if the specified collection is null
 * @throws IllegalArgumentException if some property of an element of the
 *         specified collection prevents it from being added to this
 *         queue, or if the specified collection is this queue
 * @throws IllegalStateException if not all the elements can be added at
 *         this time due to insertion restrictions
 * @see #add(Object)
 */
public boolean addAll(Collection<? extends E> c) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    boolean modified = false;
    for (E e : c)
        if (add(e))
            modified = true;
    return modified;
}

于是接下来跟进 add() 函数,其中又调用了 offer() 函数。这里指的一提的是,其它相关的入队操作最终都调用了 offer(),因为优先队列能够自行扩容,因此没有容量限制,任何入队操作都应当立刻成功返回。

/**
 * Inserts the specified element into this delay queue.
 *
 * @param e the element to add
 * @return {@code true} (as specified by {@link Collection#add})
 * @throws NullPointerException if the specified element is null
 */
public boolean add(E e) {
    return offer(e);
}

接下来跟进 offer()。首先占据类内的可重入锁,然后将元素通过优先队列的 offer() 函数放进去。通过 peek() 查看刚放进去的结点是否是下一次马上就要出队的结点,如果是,那么需要重置 leader,让出队线程争抢 leader 的位置。

/**
 * Inserts the specified element into this delay queue.
 *
 * @param e the element to add
 * @return {@code true}
 * @throws NullPointerException if the specified element is null
 */
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

其中,peek() 函数用于获取 (但不移除) 队头元素。如果队列中没有过期元素,那么将会返回下一个即将过期的元素。

/**
 * Retrieves, but does not remove, the head of this queue, or
 * returns {@code null} if this queue is empty.  Unlike
 * {@code poll}, if no expired elements are available in the queue,
 * this method returns the element that will expire next,
 * if one exists.
 *
 * @return the head of this queue, or {@code null} if this
 *         queue is empty
 */
public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return q.peek();
    } finally {
        lock.unlock();
    }
}

接下来重点看一看出队操作,因为这里涉及到了比较重要的 超时 问题。

首先是非阻塞版本的 poll() 函数。该函数试图获取并移除队头元素并返回;如果没有元素到期,那么返回 null。在具体实现上,在占据可重入锁后,首先调用 peek() 获取到队头元素,然后调用 getDeday() 函数 (元素肯定实现了这个函数) 看看它有没有超时。如果已经超时,那么调用优先队列的 poll() 将元素取出;否则直接返回 null。

/**
 * Retrieves and removes the head of this queue, or returns {@code null}
 * if this queue has no elements with an expired delay.
 *
 * @return the head of this queue, or {@code null} if this
 *         queue has no elements with an expired delay
 */
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

阻塞版本的 take(),这里也是 Leader-Follower Pattern 的精髓所在:

  • 只有 leader 线程使用 awaitNanos() 等待确切的超时时间
  • 其它线程使用 await() 进入无限期等待,除非被确切地唤醒

同样,占据锁后,在死循环中不断 peek 队头元素。如果队头元素为空,那么直接进入无限期等待。如果队头不为空,那么获取队头的剩余超时时间:

  • 如果队头已经到期,那么立刻返回 poll()
  • 如果队头还未到期,而已经有 leader 线程在等待了,那么当前线程进入无限期等待
  • 如果队头还未到期,而没有 leader 线程等待,那么就将自身设置为 leader,然后等待 确切的时间 (队头过期)

接下来,只有 leader 线程才可能苏醒。苏醒后,它将自身从 leader 中解除引用,然后返回死循环开始处。而所有 follower 线程将不会被唤醒,以省去不必要的开销 (因为 leader 都还没开张呢,follower 就更别急了)。

/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element with an expired delay is available on this queue.
 *
 * @return the head of this queue
 * @throws InterruptedException {@inheritDoc}
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

最终是定时阻塞版本的 poll() 函数。首先根据参数给定的超时时间计算出要等待的纳秒数。占据锁后,在一个死循环中不断尝试以下几种情况:

  1. 队头为空 (优先队列中没有元素)
    • 如果等待时间已耗尽,直接返回 null
    • 如果等待时间未耗尽,那么阻塞等待至等待时间耗尽时
  2. 队头不为空,那么取得队头元素的剩余等待时间
    • 如果队头已经过期,那么调用 poll() 出队并返回
    • 如果函数等待时间已耗尽,直接返回 null

剩余情况应该是队头不为空,但是队头还未超时,函数等待时间也还没耗尽的情况。如果:

  • 函数的剩余等待时间 < 队头过期剩余时间
  • 已有 leader 等待

两者存在一种,那么使当前线程阻塞等待直至函数等待时间耗尽。这种线程虽然最终肯定会拿到 null,但是还是要让其阻塞完才能返回 (对应于阻塞版本的无限期阻塞)。

如果队头过期剩余时间 < 函数的剩余等待时间,并且 leader 为空,那么将当前线程设置为 leader 后,阻塞直至队头过期。leader 苏醒后,更新函数自身的剩余等待时间。如果线程自身是 leader,则解除自身为 leader,并重新开始新一轮的循环 (在新一轮循环中应该可以取得过期的队头元素)。

当函数最终返回时,如果队头不为空且 leader 空缺,那么释放一个 available 的信号唤醒阻塞中的线程,使其中之一成为新的 leader。

疑惑:外层已经加了锁,那还可能有多个线程进入这一段死循环代码中吗?

/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element with an expired delay is available on this queue,
 * or the specified wait time expires.
 *
 * @return the head of this queue, or {@code null} if the
 *         specified waiting time elapses before an element with
 *         an expired delay becomes available
 * @throws InterruptedException {@inheritDoc}
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null) {
                if (nanos <= 0)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

下面的 drainTo() 函数负责返回所有 / 最多 maxElements 个 已过期元素。判断元素是否过期使用了 peekExpired() 函数 - 当且仅当队头元素不为空且已经过期时返回该元素,否则返回 null:

/**
 * Returns first element only if it is expired.
 * Used only by drainTo.  Call only when holding lock.
 */
private E peekExpired() {
    // assert lock.isHeldByCurrentThread();
    E first = q.peek();
    return (first == null || first.getDelay(NANOSECONDS) > 0) ?
        null : first;
}

而 drainTo() 函数不断出队元素并放入返回集合中,直到 peekExpired() 返回 null 或返回集合规模达到 maxElements 时停止:

/**
 * @throws UnsupportedOperationException {@inheritDoc}
 * @throws ClassCastException            {@inheritDoc}
 * @throws NullPointerException          {@inheritDoc}
 * @throws IllegalArgumentException      {@inheritDoc}
 */
public int drainTo(Collection<? super E> c) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int n = 0;
        for (E e; (e = peekExpired()) != null;) {
            c.add(e);       // In this order, in case add() throws.
            q.poll();
            ++n;
        }
        return n;
    } finally {
        lock.unlock();
    }
}

/**
 * @throws UnsupportedOperationException {@inheritDoc}
 * @throws ClassCastException            {@inheritDoc}
 * @throws NullPointerException          {@inheritDoc}
 * @throws IllegalArgumentException      {@inheritDoc}
 */
public int drainTo(Collection<? super E> c, int maxElements) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int n = 0;
        for (E e; n < maxElements && (e = peekExpired()) != null;) {
            c.add(e);       // In this order, in case add() throws.
            q.poll();
            ++n;
        }
        return n;
    } finally {
        lock.unlock();
    }
}

Others

剩余函数的基本思路都是:

  1. 先占据可重入锁
  2. 调用优先队列的同名相应函数

References

Stackoverflow - What exactly is the leader used for in DelayQueue?

知乎专栏 - 死磕 Java 集合之 DelayQueue 源码分析


Edit this page on GitHub
Prev
Class - java.util.concurrent.ConcurrentLinkedQueue
Next
Class - java.util.concurrent.ExecutorCompletionService