Mr Dk.'s BlogMr Dk.'s Blog
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • 🦆 About Me
  • ⛏️ Technology Stack
  • 🔗 Links
  • 🗒️ About Blog
  • Algorithm
  • C++
  • Compiler
  • Cryptography
  • DevOps
  • Docker
  • Git
  • Java
  • Linux
  • MS Office
  • MySQL
  • Network
  • Operating System
  • Performance
  • PostgreSQL
  • Productivity
  • Solidity
  • Vue.js
  • Web
  • Wireless
  • 🐧 How Linux Works (notes)
  • 🐧 Linux Kernel Comments (notes)
  • 🐧 Linux Kernel Development (notes)
  • 🐤 μc/OS-II Source Code (notes)
  • ☕ Understanding the JVM (notes)
  • ⛸️ Redis Implementation (notes)
  • 🗜️ Understanding Nginx (notes)
  • ⚙️ Netty in Action (notes)
  • ☁️ Spring Microservices (notes)
  • ⚒️ The Annotated STL Sources (notes)
  • ☕ Java Development Kit 8
GitHub
  • 📝 Notes
    • Algorithm
      • Algorithm - Bloom Filter
      • Algorithm - Disjoint Set
      • Algorithm - Fast Power
      • Algorithm - KMP
      • Algorithm - Monotonic Stack
      • Algorithm - RB-Tree
      • Algorithm - Regular Expression
      • Algorithm - Sliding Window
      • Online Judge - I/O
    • C++
      • C++ - Const
      • C++ File I/O
      • C++ - Object Layout
      • C++ - Operator Overload
      • C++ - Polymorphism
      • C++ STL algorithm
      • C++ STL map
      • C++ STL multimap
      • C++ STL priority_queue
      • C++ STL set
      • C++ STL string
      • C++ STL unordered_map
      • C++ STL vector
      • C++ - Smart Pointer
      • C++ - Template & Genericity
    • Compiler
      • ANTLR - Basic
      • Compiler - LLVM Architecture
      • Compiler - Multi-version GCC
    • Cryptography
      • Cryptography - Certbot
      • Cryptography - Digital Signature & PKCS #7
      • Cryptography - GPG
      • Cryptography - JWT
      • Cryptography - Keystore & Certificates
      • Cryptography - OAuth 2.0
      • Cryptography - Java 实现对称与非对称加密算法
      • Cryptography - TLS
    • DevOps
      • DevOps - Travis CI
    • Docker
      • Docker - Image & Storage Management
      • Docker - Image
      • Docker - Libcontainer
      • Docker - Multi-Arch Image
      • Docker - Multi-Stage Build
      • Docker - Network
      • Docker - Orchestration & Deployment
      • Docker - Overview
      • Docker - Service Building
      • Docker - Volume & Network Usage
      • Docker - Volume
      • Linux - Control Group
      • Linux - Namespace
    • Git
      • Git - Branch & Merge
      • Git - Cached
      • Git - Cherry Pick
      • Git - Commit
      • Git - Patch
      • Git - Proxy
      • Git - Rebase
      • Git - Reset
      • Git - Stash
      • Git - Theme for Git-Bash
    • Java
      • JVM - Synchronized
      • JVM - Volatile
      • Java - Annotation 注解
      • Java - BIO & NIO
      • Java - Class Path
      • Java - Condition and LockSupport
      • Java - Current Timestamp
      • Java - Deep Copy
      • Java - 运行环境配置
      • Java - Equals
      • Java - Exporting JAR
      • Java - Javadoc
      • Java - Lock
      • Java - Maven 项目构建工具
      • Java - References
      • Java - Reflection Mechanism
      • Java - String Split
      • Java - Thread Pool
      • Java - Thread
      • Tomcat - Class Loader
      • Tomcat - Container
    • Linux
      • addr2line
      • cut
      • df
      • du
      • fallocate
      • find
      • fio
      • grep
      • groupadd
      • gzip
      • head / tail
      • hexdump
      • iostat
      • iotop
      • kill
      • ldd
      • lsof
      • ltrace / strace
      • mpstat
      • netstat
      • nm
      • pidstat
      • pmap
      • readlink
      • readlink
      • rpm2cpio / rpm2archive
      • sort
      • tee
      • uniq
      • useradd
      • usermod
      • watch
      • wc
      • which
      • xargs
    • MS Office
      • MS Office - Add-in Dev
      • MS Office - Application
    • MySQL
      • InnoDB - Architecture
      • InnoDB - Backup
      • InnoDB - Checkpoint
      • InnoDB - Critical Features
      • InnoDB - Files
      • InnoDB - Index
      • InnoDB - Insert Buffer
      • InnoDB - Lock
      • InnoDB - Partition Table
      • InnoDB - Table Storage
      • MySQL - Server Configuration
      • MySQL - Storage Engine
    • Network
      • Network - ARP
      • Network - FTP
      • Network - GitHub Accelerating
      • HTTP - Message Format
      • HTTP - POST 提交表单的两种方式
      • Network - Proxy Server
      • Network - SCP
      • Network - SSH
      • Network - TCP Congestion Control
      • Network - TCP Connection Management
      • Network - TCP Flow Control
      • Network - TCP Retransmission
      • Network - Traceroute
      • Network - V2Ray
      • Network - WebSocket
      • Network - Windows 10 Mail APP
      • Network - frp
    • Operating System
      • Linux - Kernel Compilation
      • Linux - Multi-OS
      • Linux - Mutex & Condition
      • Linux - Operations
      • Linux: Package Manager
      • Linux - Process Manipulation
      • Linux - User ID
      • Linux - Execve
      • OS - Compile and Link
      • OS - Dynamic Linking
      • OS - ELF
      • Linux - Image
      • OS - Loading
      • OS - Shared Library Organization
      • OS - Static Linking
      • Syzkaller - Architecture
      • Syzkaller - Description Syntax
      • Syzkaller - Usage
      • Ubuntu - Desktop Recover (Python)
      • WSL: CentOS 8
    • Performance
      • Linux Performance - Perf Event
      • Linux Performance - Perf Record
      • Linux Performance - Perf Report
      • Linux Performance - Flame Graphs
      • Linux Performance - Off CPU Analyze
    • PostgreSQL
      • PostgreSQL - ANALYZE
      • PostgreSQL - Atomics
      • PostgreSQL - CREATE INDEX CONCURRENTLY
      • PostgreSQL - COPY FROM
      • PostgreSQL - COPY TO
      • PostgreSQL - Executor: Append
      • PostgreSQL - Executor: Group
      • PostgreSQL - Executor: Limit
      • PostgreSQL - Executor: Material
      • PostgreSQL - Executor: Nest Loop Join
      • PostgreSQL - Executor: Result
      • PostgreSQL - Executor: Sequential Scan
      • PostgreSQL - Executor: Sort
      • PostgreSQL - Executor: Unique
      • PostgreSQL - FDW Asynchronous Execution
      • PostgreSQL - GUC
      • PostgreSQL - Locking
      • PostgreSQL - LWLock
      • PostgreSQL - Multi Insert
      • PostgreSQL - Plan Hint GUC
      • PostgreSQL - Process Activity
      • PostgreSQL - Query Execution
      • PostgreSQL - Spinlock
      • PostgreSQL - Storage Management
      • PostgreSQL - VFD
      • PostgreSQL - WAL Insert
      • PostgreSQL - WAL Prefetch
    • Productivity
      • LaTeX
      • Venn Diagram
      • VuePress
    • Solidity
      • Solidity - ABI Specification
      • Solidity - Contracts
      • Solidity - Expressions and Control Structures
      • Solidity - Layout and Structure
      • Solidity - Remix IDE
      • Solidity - Slither
      • Solidity - Types
      • Solidity - Units and Globally Available Variables
    • Vue.js
      • Vue.js - Environment Variable
    • Web
      • Web - CORS
      • Web - OpenAPI Specification
    • Wireless
      • Wireless - WEP Cracking by Aircrack-ng
      • Wireless - WPS Cracking by Reaver
      • Wireless - wifiphisher

Java - Condition and LockSupport

Created by : Mr Dk.

2020 / 10 / 31 13:43

Nanjing, Jiangsu, China


读 JDK 源代码时遇到了 java.util.concurrent.locks.Condition 类,不得其解。

Monitor Object

每一个 Java 对象都有一组监视器函数,用于与对象锁和 synchronized 关键字配合,实现等待 / 通知模式。使用等待 / 通知模式时,前提是需要通过 synchronized 关键字获得对象的锁,没有获得到对象锁的线程将阻塞在 同步队列 上。调用 wait() 函数时,线程将进入等待状态,加入到一个 等待队列 中,并释放锁:

synchronized (obj) {
    while (!condition) {
        obj.wait();
    }
    // ...
}

由于锁被释放,同步队列上的其它线程得以通过竞争获得对象锁,并进入临界区。存在一个线程调用 notify() 系列函数:

synchronized (obj) {
    // ...
    obj.notify();
}

调用 notify() 系列函数后,会将等待队列上 (一个或多个) 等待时间最久的线程移入同步队列上阻塞,重新参与对象锁的竞争。在线程争取到对象锁后,从 wait() 函数处返回。

Comparison

与 synchronized + wait() / notify() 机制类似,对于实现 Lock 接口的 JDK 锁对象,也提供了相应的 Condition 对象实现等待 / 通知模式。具体的使用方式类似,重点在于 Condition 对象是由 Lock 对象得到的:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

public void conditionWait() throws InterruptedException {
    lock.lock(); // 占有锁
    try {
        condition.await(); // 等待
    } finally {
        lock.unlock(); // 确保锁被释放
    }
}

public void conditionSignal() throws InterruptedException {
    lock.lock();
    try {
        condition.signal();
    } finally {
        lock.unlock();
    }
}

在对象占有锁并调用 condition.await() 后,将会释放锁并进入等待队列;当另一个对象占有锁并调用 condition.signal() 后,正在等待中的线程将 重新进入锁竞争状态,当再次占有锁后,将从 condition.await() 返回。

两种等待 / 通知的实现方式有什么区别呢?

ItemsObject MonitorCondition
前置条件占据对象锁调用 Lock.newCondition() 获取 Condition 对象;调用 Lock.lock() 获取锁
调用方式obj.wait()condition.await()
等待队列个数一个多个 (一个 Condition 对象一个队列,一个 Lock 可以产生多个 Condition 对象)
线程释放锁并等待支持支持
线程释放锁并等待,等待时不响应中断不支持支持
线程释放锁并超时等待支持支持
线程释放锁并等待到将来某个时间不支持支持
唤醒等待队列中的一个线程支持支持
唤醒等待队列中的全部线程支持支持

Implementation

每个 Condition 对象内都包含一个 等待队列。等待队列是一个 FIFO 队列,每个结点保存了线程引用,同时也维护着头结点和尾结点。当线程调用 Condition.await() 函数时,发生以下几件事:

  1. 释放锁
  2. 将当前线程构造为结点并加入等待队列
  3. 线程进入等待状态

其中,await() 函数中移入队列的操作并不需要 CAS 操作,因为调用该函数的线程必定已经获取了锁,所以由锁来保证线程安全。

Object 监视器模型中,一个对象包含一个同步队列和 一个等待队列;而 JUC 中的 Lock 拥有一个同步队列和 多个等待队列。

从队列的角度来看,当调用 Condition.await() 函数时,相当于同步队列的头结点移入等待队列中:

/**
 * Implements interruptible condition wait.
 * <ol>
 * <li> If current thread is interrupted, throw InterruptedException.
 * <li> Save lock state returned by {@link #getState}.
 * <li> Invoke {@link #release} with saved state as argument,
 *      throwing IllegalMonitorStateException if it fails.
 * <li> Block until signalled or interrupted.
 * <li> Reacquire by invoking specialized version of
 *      {@link #acquire} with saved state as argument.
 * <li> If interrupted while blocked in step 4, throw InterruptedException.
 * </ol>
 */
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

调用 Condition.signal() 时,等待队列中等待时间最久的结点将会被移入同步队列中并唤醒。同时,函数还会检查当前线程是否获得锁的前提条件:

/**
 * Moves the longest-waiting thread, if one exists, from the
 * wait queue for this condition to the wait queue for the
 * owning lock.
 *
 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
 *         returns {@code false}
 */
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

被唤醒后的线程 (已经在同步队列中) 将会加入到获取同步状态的竞争中。Condition.signal() 与 Condition.signalAll() 的区别在于,signalAll() 相当于对等待队列中的每一个结点都调用一次 signal()。带来的效果是等待队列中的所有结点被移动到同步队列中,并唤醒原来在等待队列中的所有线程。

Lock Support

java.util.concurrent.locks.LockSupport 中定义了一组公共静态函数,提供了基本的线程阻塞 / 唤醒功能。根据源码,基本是通过 Unsafe 类调用了 native JVM 函数实现的。

以下函数使当前线程休眠 (不被线程调度器调度),函数将不会返回,直到以下条件发生:

  1. 另一个线程对当前线程调用了 unpark()
  2. 另一个线程对当前线程发出了中断信号
  3. 调用没理由地返回了 😓

可以看到最终通过 UNSAFE 类将阻塞目标设置到了 HotSpot 中当前线程的 parkBlocker 域上。

private static void setBlocker(Thread t, Object arg) {
    // Even though volatile, hotspot doesn't need a write barrier here.
    UNSAFE.putObject(t, parkBlockerOffset, arg);
}

/**
 * Disables the current thread for thread scheduling purposes unless the
 * permit is available.
 *
 * <p>If the permit is available then it is consumed and the call returns
 * immediately; otherwise
 * the current thread becomes disabled for thread scheduling
 * purposes and lies dormant until one of three things happens:
 *
 * <ul>
 * <li>Some other thread invokes {@link #unpark unpark} with the
 * current thread as the target; or
 *
 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 * the current thread; or
 *
 * <li>The call spuriously (that is, for no reason) returns.
 * </ul>
 *
 * <p>This method does <em>not</em> report which of these caused the
 * method to return. Callers should re-check the conditions which caused
 * the thread to park in the first place. Callers may also determine,
 * for example, the interrupt status of the thread upon return.
 *
 * @param blocker the synchronization object responsible for this
 *        thread parking
 * @since 1.6
 */
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

另外还有超时版本和 DDL 版本:

/**
 * Disables the current thread for thread scheduling purposes, for up to
 * the specified waiting time, unless the permit is available.
 *
 * <p>If the permit is available then it is consumed and the call
 * returns immediately; otherwise the current thread becomes disabled
 * for thread scheduling purposes and lies dormant until one of four
 * things happens:
 *
 * <ul>
 * <li>Some other thread invokes {@link #unpark unpark} with the
 * current thread as the target; or
 *
 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 * the current thread; or
 *
 * <li>The specified waiting time elapses; or
 *
 * <li>The call spuriously (that is, for no reason) returns.
 * </ul>
 *
 * <p>This method does <em>not</em> report which of these caused the
 * method to return. Callers should re-check the conditions which caused
 * the thread to park in the first place. Callers may also determine,
 * for example, the interrupt status of the thread, or the elapsed time
 * upon return.
 *
 * @param blocker the synchronization object responsible for this
 *        thread parking
 * @param nanos the maximum number of nanoseconds to wait
 * @since 1.6
 */
public static void parkNanos(Object blocker, long nanos) {
    if (nanos > 0) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, nanos);
        setBlocker(t, null);
    }
}

/**
 * Disables the current thread for thread scheduling purposes, until
 * the specified deadline, unless the permit is available.
 *
 * <p>If the permit is available then it is consumed and the call
 * returns immediately; otherwise the current thread becomes disabled
 * for thread scheduling purposes and lies dormant until one of four
 * things happens:
 *
 * <ul>
 * <li>Some other thread invokes {@link #unpark unpark} with the
 * current thread as the target; or
 *
 * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
 * current thread; or
 *
 * <li>The specified deadline passes; or
 *
 * <li>The call spuriously (that is, for no reason) returns.
 * </ul>
 *
 * <p>This method does <em>not</em> report which of these caused the
 * method to return. Callers should re-check the conditions which caused
 * the thread to park in the first place. Callers may also determine,
 * for example, the interrupt status of the thread, or the current time
 * upon return.
 *
 * @param blocker the synchronization object responsible for this
 *        thread parking
 * @param deadline the absolute time, in milliseconds from the Epoch,
 *        to wait until
 * @since 1.6
 */
public static void parkUntil(Object blocker, long deadline) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(true, deadline);
    setBlocker(t, null);
}

当另一个线程对一个已经阻塞的线程调用了 unpark() 后,阻塞的线程将恢复被调度。

/**
 * Makes available the permit for the given thread, if it
 * was not already available.  If the thread was blocked on
 * {@code park} then it will unblock.  Otherwise, its next call
 * to {@code park} is guaranteed not to block. This operation
 * is not guaranteed to have any effect at all if the given
 * thread has not been started.
 *
 * @param thread the thread to unpark, or {@code null}, in which case
 *        this operation has no effect
 */
public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

Edit this page on GitHub
Prev
Java - Class Path
Next
Java - Current Timestamp