App下載

解析 Java 是如何通過AQS實(shí)現(xiàn)數(shù)據(jù)組織?

猿友 2021-07-20 11:39:06 瀏覽數(shù) (1912)
反饋

并發(fā)是Java語言中的一個(gè)很重要的概念,而說起并發(fā)就繞不過AQS。AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實(shí)現(xiàn)都依賴于它。接下來將和大家簡單地介紹一下 AQS。

引言

從本篇文章開始,我們將介紹 Java AQS 的實(shí)現(xiàn)方式,本文先介紹 AQS 的內(nèi)部數(shù)據(jù)是如何組織的,后面的文章中再分別介紹 AQS 的各個(gè)部門實(shí)現(xiàn)。

AQS

通過前面的介紹,大家一定看出來了,上述的各種類型的鎖和一些線程控制接口(CountDownLatch 等),最終都是通過 AQS 來實(shí)現(xiàn)的,不同之處只在于 tryAcquire 等抽象函數(shù)如何實(shí)現(xiàn)。從這個(gè)角度來看,AQS(AbstractQueuedSynchronizer) 這個(gè)基類設(shè)計(jì)的真的很不錯(cuò),能夠包容各種同步控制方案,并提供了必須的下層依賴:比如阻塞,隊(duì)列等。接下來我們就來揭開它神秘的面紗。

內(nèi)部數(shù)據(jù)

AQS 顧名思義,就是通過隊(duì)列來組織修改互斥資源的請(qǐng)求。當(dāng)這個(gè)資源空閑時(shí)間,那么修改請(qǐng)求可以直接進(jìn)行,而當(dāng)這個(gè)資源處于鎖定狀態(tài)時(shí),就需要等待,AQS 會(huì)將所有等待的請(qǐng)求維護(hù)在一個(gè)類似于 CLH 的隊(duì)列中。CLH:Craig、Landin and Hagersten隊(duì)列,是單向鏈表,AQS中的隊(duì)列是CLH變體的虛擬雙向隊(duì)列(FIFO),AQS是通過將每條請(qǐng)求共享資源的線程封裝成一個(gè)節(jié)點(diǎn)來實(shí)現(xiàn)鎖的分配。主要原理圖如下:

圖中的 state 是一個(gè)用 volatile 修飾的 int 變量,它的使用都是通過 CAS 來進(jìn)行的,而 FIFO 隊(duì)列完成請(qǐng)求排隊(duì)的工作,隊(duì)列的操作也是通過 CAS 來進(jìn)行的,正因如此該隊(duì)列的操作才能達(dá)到理想的性能要求。

通過 CAS 修改 state 比較容易,大家應(yīng)該都能理解,但是如果要通過 CAS 維護(hù)一個(gè)雙向隊(duì)列要怎么做呢?這里我們看一下 AQS 中 CLH 隊(duì)列的實(shí)現(xiàn)。在 AQS 中有兩個(gè)指針一個(gè)指針指向了隊(duì)列頭,一個(gè)指向了隊(duì)列尾。它們都是懶初始化的,也就是說最初都為null。

/**
 * Head of the wait queue, lazily initialized.  Except for
 * initialization, it is modified only via method setHead.  Note:
 * If head exists, its waitStatus is guaranteed not to be
 * CANCELLED.
 */
private transient volatile Node head;

/**
 * Tail of the wait queue, lazily initialized.  Modified only via
 * method enq to add new wait node.
 */
private transient volatile Node tail;

隊(duì)列中的每個(gè)節(jié)點(diǎn),都是一個(gè) Node 實(shí)例,該實(shí)例的第一個(gè)關(guān)鍵字段是 waitState,它表述了當(dāng)前節(jié)點(diǎn)所處的狀態(tài),通過 CAS 進(jìn)行修改:

  • SIGNAL:表示當(dāng)前節(jié)點(diǎn)承擔(dān)喚醒后繼節(jié)點(diǎn)的責(zé)任
  • CANCELLED:表示當(dāng)前節(jié)點(diǎn)已經(jīng)超時(shí)或者被打斷
  • CONDITION:表示當(dāng)前節(jié)點(diǎn)正在 Condition 上等待(通過鎖可以創(chuàng)建 Condition 對(duì)象)
  • PROPAGATE:只會(huì)設(shè)置在 head 節(jié)點(diǎn)上,用于表明釋放共享鎖時(shí),需要將這個(gè)行為傳播到其他節(jié)點(diǎn)上,這個(gè)我們稍后詳細(xì)介紹。
static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    /**
     * Status field, taking on only the values:
     *   SIGNAL:     The successor of this node is (or will soon be)
     *               blocked (via park), so the current node must
     *               unpark its successor when it releases or
     *               cancels. To avoid races, acquire methods must
     *               first indicate they need a signal,
     *               then retry the atomic acquire, and then,
     *               on failure, block.
     *   CANCELLED:  This node is cancelled due to timeout or interrupt.
     *               Nodes never leave this state. In particular,
     *               a thread with cancelled node never again blocks.
     *   CONDITION:  This node is currently on a condition queue.
     *               It will not be used as a sync queue node
     *               until transferred, at which time the status
     *               will be set to 0. (Use of this value here has
     *               nothing to do with the other uses of the
     *               field, but simplifies mechanics.)
     *   PROPAGATE:  A releaseShared should be propagated to other
     *               nodes. This is set (for head node only) in
     *               doReleaseShared to ensure propagation
     *               continues, even if other operations have
     *               since intervened.
     *   0:          None of the above
     *
     * The values are arranged numerically to simplify use.
     * Non-negative values mean that a node doesn't need to
     * signal. So, most code doesn't need to check for particular
     * values, just for sign.
     *
     * The field is initialized to 0 for normal sync nodes, and
     * CONDITION for condition nodes.  It is modified using CAS
     * (or when possible, unconditional volatile writes).
     */
    volatile int waitStatus;

    /**
     * Link to predecessor node that current node/thread relies on
     * for checking waitStatus. Assigned during enqueuing, and nulled
     * out (for sake of GC) only upon dequeuing.  Also, upon
     * cancellation of a predecessor, we short-circuit while
     * finding a non-cancelled one, which will always exist
     * because the head node is never cancelled: A node becomes
     * head only as a result of successful acquire. A
     * cancelled thread never succeeds in acquiring, and a thread only
     * cancels itself, not any other node.
     */
    volatile Node prev;

    /**
     * Link to the successor node that the current node/thread
     * unparks upon release. Assigned during enqueuing, adjusted
     * when bypassing cancelled predecessors, and nulled out (for
     * sake of GC) when dequeued.  The enq operation does not
     * assign next field of a predecessor until after attachment,
     * so seeing a null next field does not necessarily mean that
     * node is at end of queue. However, if a next field appears
     * to be null, we can scan prev's from the tail to
     * double-check.  The next field of cancelled nodes is set to
     * point to the node itself instead of null, to make life
     * easier for isOnSyncQueue.
     */
    volatile Node next;

    /**
     * The thread that enqueued this node.  Initialized on
     * construction and nulled out after use.
     */
    volatile Thread thread;

    /**
     * Link to next node waiting on condition, or the special
     * value SHARED.  Because condition queues are accessed only
     * when holding in exclusive mode, we just need a simple
     * linked queue to hold nodes while they are waiting on
     * conditions. They are then transferred to the queue to
     * re-acquire. And because conditions can only be exclusive,
     * we save a field by using special value to indicate shared
     * mode.
     */
    Node nextWaiter;

    /**
     * Returns true if node is waiting in shared mode.
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    //...
}

因?yàn)槭请p向隊(duì)列,所以 Node 實(shí)例中勢必有 prev 和 next 指針,此外 Node 中還會(huì)保存與其對(duì)應(yīng)的線程。最后是 nextWaiter,當(dāng)一個(gè)節(jié)點(diǎn)對(duì)應(yīng)了共享請(qǐng)求時(shí),nextWaiter 指向了 Node. SHARED 而當(dāng)一個(gè)節(jié)點(diǎn)是排他請(qǐng)求時(shí),nextWaiter 默認(rèn)指向了 Node. EXCLUSIVE 也就是 null。我們知道 AQS 也提供了 Condition 功能,該功能就是通過 nextWaiter 來維護(hù)在 Condition 上等待的線程。也就是說這里的 nextWaiter 在鎖的實(shí)現(xiàn)部分中,扮演者共享鎖和排它鎖的標(biāo)志位,而在條件等待隊(duì)列中,充當(dāng)鏈表的 next 指針。

同步隊(duì)列

接下來,我們由最常見的入隊(duì)操作出發(fā),介紹 AQS 框架的實(shí)現(xiàn)與使用。從下面的代碼中可以看到入隊(duì)操作支持兩種模式,一種是排他模式,一種是共享模式,分別對(duì)應(yīng)了排它鎖場景和共享鎖場景。

  1. 當(dāng)任意一種請(qǐng)求,要入隊(duì)時(shí),先會(huì)構(gòu)建一個(gè) Node 實(shí)例,然后獲取當(dāng)前 AQS 隊(duì)列的尾結(jié)點(diǎn),如果尾結(jié)點(diǎn)為空,就是說隊(duì)列還沒初始化,初始化過程在后面 enq 函數(shù)中實(shí)現(xiàn)
  2. 這里我們先看初始化之后的情況,即 tail != null,先將當(dāng)前 Node 的前向指針 prev 更新,然后通過 CAS 將尾結(jié)點(diǎn)修改為當(dāng)前 Node,修改成功時(shí),再更新前一個(gè)節(jié)點(diǎn)的后向指針 next,因?yàn)橹挥行薷奈仓羔樳^程是原子的,所以這里會(huì)出現(xiàn)新插入一個(gè)節(jié)點(diǎn)時(shí),之前的尾節(jié)點(diǎn) previousTail 的 next 指針為null的情況,也就是說會(huì)存在短暫的正向指針和反向指針不同步的情況,不過在后面的介紹中,你會(huì)發(fā)現(xiàn) AQS 很完備地避開了這種不同步帶來的風(fēng)險(xiǎn)(通過從后往前遍歷)
  3. 如果上述操作成功,則當(dāng)前線程已經(jīng)進(jìn)入同步隊(duì)列,否則,可能存在多個(gè)線程的競爭,其他線程設(shè)置尾結(jié)點(diǎn)成功了,而當(dāng)前線程失敗了,這時(shí)候會(huì)和尾結(jié)點(diǎn)未初始化一樣進(jìn)入 enq 函數(shù)中。
/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        // 已經(jīng)進(jìn)行了初始化
        node.prev = pred;
        // CAS 修改尾節(jié)點(diǎn)
        if (compareAndSetTail(pred, node)) {
            // 成功之后再修改后向指針
            pred.next = node;
            return node;
        }
    }
    // 循環(huán) CAS 過程和初始化過程
    enq(node);
    return node;
}

正常通過 CAS 修改數(shù)據(jù)都會(huì)在一個(gè)循環(huán)中進(jìn)行,而這里的 addWaiter 只是在一個(gè) if 中進(jìn)行,這是為什么呢?實(shí)際上,大家看到的 addWaiter 的這部分 CAS 過程是一個(gè)快速執(zhí)行線,在沒有競爭時(shí),這種方式能省略不少判斷過程。當(dāng)發(fā)生競爭時(shí),會(huì)進(jìn)入 enq 函數(shù)中,那里才是循環(huán) CAS 的地方。

  1. 整個(gè) enq 的工作在一個(gè)循環(huán)中進(jìn)行
  2. 先會(huì)檢查是否未進(jìn)行初始化,是的話,就設(shè)置一個(gè)虛擬節(jié)點(diǎn) Node 作為 head 和 tail,也就是說同步隊(duì)列的第一個(gè)節(jié)點(diǎn)并不保存實(shí)際數(shù)據(jù),只是一個(gè)保存指針的地方
  3. 初始化完成后,通過 CAS 修改尾節(jié)點(diǎn),直到修改成功為止,最后修復(fù)后向指針
/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    for (;;) {// 在一個(gè)循環(huán)中進(jìn)行 CAS 操作
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            // CAS 修改尾節(jié)點(diǎn)
            if (compareAndSetTail(t, node)) {
                // 成功之后再修改后向指針
                t.next = node;
                return t;
            }
        }

以上就是通過AQS實(shí)現(xiàn)數(shù)據(jù)組織的詳細(xì)內(nèi)容,更多關(guān)于AQS數(shù)據(jù)組織的資料請(qǐng)關(guān)注W3Cschool其它相關(guān)文章!


0 人點(diǎn)贊