菠菜网

allbet官网官方注册:Java 中行列同步器 AQS(AbstractQueuedSynchronizer)实现原理

时间:5个月前   阅读:52

前言

在 Java 中通过锁来控制多个线程对共享资源的接见,使用 Java 编程语言开发的同伙都知道,可以通过 synchronized 关键字来实现锁的功效,它可以隐式的获取锁,也就是说我们使用该关键字并不需要去体贴锁的获取和释放历程,然则在提供方便的同时也意味着其灵活性的下降。例如,有这样的一个场景,先获取锁 A,然后再获取锁 B,当锁 B 获取到之后,释放锁 A 同时获取锁 C,当获取锁 C 后,再释放锁 B 同时获取锁 D,依次类推,像这种对照复杂的场景,使用 synchronized 关键字就对照难实现了。
在 Java SE 5 之后,新增加了 Lock 接口和一系列的实现类来提供和 synchronized 关键字一样的功效,它需要我们显示的举行锁的获取和释放,除此之外还提供了可响应中止的锁获取操作以及超时获取锁等同步特征。JDK 中提供的 Lock 接口实现类大部门都是聚合一个同步器 AQS 的子类来实现多线程的接见控制的,下面我们看看这个构建锁和其它同步组件的基础框架——行列同步器 AQS(AbstractQueuedSynchronizer)。

AQS 基础数据结构

同步行列

行列同步器 AQS(下文简称为同步器)主要是依赖于内部的一个 FIFO(first-in-first-out)双向行列来对同步状态举行治理的,当线程获取同步状态失败时,同步器会将当前线程和当前守候状态等信息封装成一个内部界说的节点 Node,然后将其加入行列,同时壅闭当前线程;当同步状态释放时,会将同步行列中首节点叫醒,让其再次实验去获取同步状态。同步行列的基本结构如下:

行列节点 Node

同步行列使用同步器中的静态内部类 Node 用来保留获取同步状态的线程的引用、线程的守候状态、前驱节点和后继节点。

同步行列中 Node 节点的属性名称和详细寄义如下表所示:

属性类型和名称 形貌
volatile int waitStatus 当前节点在行列中的守候状态
volatile Node prev 前驱节点,当节点加入同步行列时被赋值(使用尾部添加方式)
volatile Node next 后继节点
volatile Thread thread 获取同步状态的线程
Node nextWaiter 守候行列中的后继节点,若是当前节点是共享的,则该字段是一个 SHARED 常量

每个节点线程都有两种锁模式,划分为 SHARED 示意线程以共享的模式守候锁,EXCLUSIVE 示意线程以独占的方式守候锁。同时每个节点的守候状态 waitStatus 只能取以下表中的枚举值:

枚举值 形貌
SIGNAL 值为 -1,示意该节点的线程已经准备完毕,守候资源释放
CANCELLED 值为 1,示意该节点线程获取锁的请求已经作废了
CONDITION 值为 -2,示意该节点线程守候在 Condition 上,守候被其它线程叫醒
PROPAGATE 值为 -3,示意下一次共享同步状态获取会无限举行下去,只在 SHARED 情形下使用
0 值为 0,初始状态,初始化的默认值
同步状态 state

同步器内部使用了一个名为 state 的 int 类型的变量示意同步状态,同步器的主要使用方式是通过继续,子类通过继续并实现它的抽象方式来治理同步状态,同步器给我们提供了如下三个方式来对同步状态举行更改。

方式署名 形貌
protected final int getState() 获取当前同步状态
protected final void setState(int newState) 设置当前同步状态
protected final boolean compareAndSetState(int expect, int update) 使用 CAS 设置当前状态,该方式能够保证状态设置的原子性

在独享锁中同步状态 state 这个值通常是 0 或者 1(若是是重入锁的话 state 值就是重入的次数),在共享锁中 state 就是持有锁的数目。

独占式同步状态获取与释放

同步器中提供了 acquire(int arg) 方式来举行独占式同步状态的获取,获取到了同步状态也就是获取到了锁,该方式源码如下所示:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

方式首先会挪用 tryAcquire 方式实验去获取锁,查看方式的源码可以发现,同步器并未对该方式举行实现(只是抛出一个不支持操作异常 UnsupportedOperationException),这个方式是需要后续同步组件的开发人员自己去实现的,若是方式返回 true 则示意当前线程乐成获取到锁,挪用 selfInterrupt() 中止当前线程(PS:这里留给人人一个问题:为什么获取了锁以后还要中止线程呢?),方式竣事返回,若是方式返回 false 则示意当前线程获取锁失败,也就是说有其它线程先前已经获取到了锁,此时就需要把当前线程以及守候状态等信息添加到同步行列中,下面来看看同步器在线程未获取到锁时详细是若何实现。
通过源码发现,当获取锁失败时,会执行判断条件与操作的后半部门 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),首先指定锁模式为 Node.EXCLUSIVE 挪用 addWaiter 方式,该方式源码如下:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

通过方式参数指定的锁模式(共享锁 or 独占锁)和当前线程构造出一个 Node 节点,若是同步行列已经初始化,那么首先会举行一次从尾部加入行列的实验,使用 compareAndSetTail 方式保证原子性,进入该方式源码可以发现是基于 sun.misc 包下提供的 Unsafe 类来实现的。若是首次实验加入同步行列失败,会再次挪用 enq 方式举行入队操作,继续跟进 enq 方式源码如下:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

通过其源码可以发现和第一次实验加入行列的代码类似,只是该方式内里加了同步行列初始化判断,使用 compareAndSetHead 方式保证设置头节点的原子性,同样它底层也是基于 Unsafe 类,然后外层套了一个 for (; 死循环,循环唯一的退出条件是从队尾入队乐成,也就是说若是从该方式乐成返回了就示意已经入队乐成了,至此,addWaiter 执行完毕返回当前 Node 节点。然后以该节点作为 acquireQueued 方式的入参继续举行其它步骤,该方式如下所示:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

可以看到,该方式本质上也是通过一个死循环(自旋)去获取锁而且支持中止,在循环体外面界说两个符号变量,failed 符号是否乐成获取到锁,interrupted 符号在守候的历程中是否被中止过。方式首先通过 predecessor 获取当前节点的前驱节点,当当前节点的前驱节点是 head 头节点时就挪用 tryAcquire 实验获取锁,也就是第二个节点则实验获取锁,这里为什么要从第二个节点才实验获取锁呢?是由于同步行列本质上是一个双向链表,在双向链表中,第一个节点并不存储任何数据是虚节点,只是起到一个占位的作用,真正存储数据的节点是从第二个节点最先的。若是乐成获取锁,也就是 tryAcquire 方式返回 true 后,将 head 指向当前节点并把之前找到的头节点 p 从行列中移除,修改是否乐成获取到锁符号,竣事方式返回中止符号。
若是当前节点的前驱节点 p 不是头节点或者前驱节点 p 是头节点然则获取锁操作失败,那么会挪用 shouldParkAfterFailedAcquire 方式判断当前 node 节点是否需要被壅闭,这里的壅闭判断主要是为了防止长时间自旋给 CPU 带来异常大的执行开销,浪费资源。该方式源码如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
          * This node has already set status asking a release
          * to signal it, so it can safely park.
          */
        return true;
    if (ws > 0) {
        /*
          * Predecessor was cancelled. Skip over predecessors and
          * indicate retry.
          */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
          * waitStatus must be 0 or PROPAGATE.  Indicate that we
          * need a signal, but don't park yet.  Caller will need to
          * retry to make sure it cannot acquire before parking.
          */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

方式参数为当前节点的前驱节点以及当前节点,主要是靠前驱节点来判断是否需要举行壅闭,首先获取到前驱节点的守候状态 ws,若是节点状态 ws 为 SIGNAL,示意前驱节点的线程已经准备完毕,守候资源释放,方式返回 true 示意可以壅闭,若是 ws > 0,通过上文可以知道节点只有一个状态 CANCELLED(值为 1) 知足该条件,示意该节点线程获取锁的请求已经作废了,会通过一个 do-while 循环向前查找 CANCELLED 状态的节点并将其从同步行列中移除,否则进入 else 分支,使用 compareAndSetWaitStatus 原子操作将前驱节点的守候状态修改为 SIGNAL,以上这两种情形都不需要举行壅闭方式返回 false。
当经由判断后需要壅闭的话,也就是 compareAndSetWaitStatus 方式返回 true 时,会通过 parkAndCheckInterrupt 方式壅闭挂起当前线程,并返回当前线程的中止标识。方式如下:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

线程壅闭是通过 LockSupport 这个工具类实现的,深入其源码可以发现它底层也是基于 Unsafe 类实现的。若是以上两个方式都返回 true 的话就更新中止符号。这里另有一个问题就是什么时刻会将一个节点的守候状态 waitStatus 修改为 CANCELLED 节点线程获取锁的请求作废状态呢?仔细的同伙可能已经发现了,在上文贴出的 acquireQueued 方式源码中的 finally 块中会凭据 failed 符号来决议是否挪用 cancelAcquire 方式,这个方式就是用来将节点状态修改为 CANCELLED 的,方式的详细实现留给人人去探索。至此 AQS 独占式同步状态获取锁的流程就完成了,下面通过一个流程图来看看整体流程:

下面再看看独占式锁释放的历程,同步器使用 release 方式来让我们举行独占式锁的释放,其方式源码如下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

首先挪用 tryRelease 方式实验举行锁释放操作,继续跟进该方式发现同步器只是抛出了一个不支持操作异常 UnsupportedOperationException,这里和上文独占锁获取中 tryAcquire 方式是一样的套路,需要开发者自己界说锁释放操作。

通过其 JavaDoc 可以得知,若是返回 false,则示意释放锁失败,方式竣事。该方式若是返回 true,则示意当前线程释放锁乐成,需要通知行列中守候获取锁的线程举行锁获取操作。首先获取头节点 head,若是当前头节点不为 null,而且其守候状态不是初始状态(0),则排除线程壅闭挂起状态,通过 unparkSuccessor 方式实现,该方式源码如下:

private void unparkSuccessor(Node node) {
    /*
      * If status is negative (i.e., possibly needing signal) try
      * to clear in anticipation of signalling.  It is OK if this
      * fails or if status is changed by waiting thread.
      */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
      * Thread to unpark is held in successor, which is normally
      * just the next node.  But if cancelled or apparently null,
      * traverse backwards from tail to find the actual
      * non-cancelled successor.
      */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

首先获取头节点的守候状态 ws,若是状态值为负数(Node.SIGNAL or Node.PROPAGATE),则通过 CAS 操作将其改为初始状态(0),然后获取头节点的后继节点,若是后继节点为 null 或者后继节点状态为 CANCELLED(获取锁请求已作废),就从行列尾部最先寻找第一个状态为非 CANCELLED 的节点,若是该节点不为空则使用 LockSupport 的 unpark 方式将其叫醒,该方式底层是通过 Unsafe 类的 unpark 实现的。这里需要从队尾查找非 CANCELLED 状态的节点的原因是,在之前的获取独占锁失败时的入队 addWaiter 方式实现中,该方式如下:

假设一个线程执行到了上图中的 ① 处,② 处还没有执行,此时另一个线程正好执行了 unparkSuccessor 方式,那么就无法通过早年向后查找了,由于节点的后继指针 next 还没赋值呢,以是需要从后往前举行查找。至此,独占式锁释放操作就竣事了,同样的,最后我们也通过一个流程图来看看整个锁释放的历程:

独占式可中止同步状态获取

同步器提供了 acquireInterruptibly 方式来举行可响应中止的获取锁操作,方式实现源码如下:

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

方式首先检查当前线程的中止状态,若是已中止,则直接抛出中止异常 InterruptedException 即响应中止,否则挪用 tryAcquire 方式实验获取锁,若是获取乐成则方式竣事返回,获取失败挪用 doAcquireInterruptibly 方式,跟进该方式如下:

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

仔细观察可以发现该方式实现源码和上文中 acquireQueued 方式的实现基本上类似,只是这里把入队操作 addWaiter 放到了方式内里了,另有一个区别就是当在循环体内判断需要举行中止时会直接抛出异常来响应中止,两个方式的对好比下:

其它步骤和独占式锁获取一致,流程图大体上和不响应中止的锁获取差不多,只是在最最先多了一步线程中止状态检查和循环是会抛出中止异常而已。

独占式超时获取同步状态

同步器提供了 tryAcquireNanos 方式可以超时获取同步状态(也就是锁),该方式提供了之前 synchronized 关键字不支持的超时获取的特征,通过该方式我们可以在指定时间段 nanosTimeout 内获取锁,若是获取到锁则返回 true,否则,返回 false。方式源码如下:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

首先会挪用 tryAcquire 方式实验获取一次锁,若是获取锁乐成则马上返回,否则挪用 doAcquireNanos 方式进入超时获取锁流程。通过上文可以得知,同步器的 acquireInterruptibly 方式在守候获取同步状态时,若是当前线程被中止了,会抛出中止异常 InterruptedException 并马上返回。超时获取锁的流程实在是在响应中止的基础上增加了超时获取的特征,doAcquireNanos 方式的源码如下:

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

由以上方式实现源码可以看出,针对超时获取这里主要实现思绪是:先使用当前时间加上参数传入的超时时间距离 deadline 盘算出超时的时间点,然后每次举行循环的时刻使用超时时间点 deadline 减去当前时间获得剩余的时间 nanosTimeout,若是剩余时间小于 0 则证实当前获取锁操作已经超时,方式竣事返回 false,反若是剩余时间大于 0。
可以看到在内里执行自旋的时刻和上面独占式同步获取锁状态 acquireQueued 方式那里是一样的套路,即当当前节点的前驱节点为头节点时挪用 tryAcquire 实验获取锁,若是获取乐成则返回。

除了超时时间盘算那里差别外,另有个差别的地方就是在超时获取锁失败之后的操作,若是当前线程获取锁失败,则判断剩余超时时间 nanosTimeout 是否小于 0,若是小于 0 则示意已经超时方式马上返回,反之则会判断是否需要举行壅闭挂起当前线程,若是通过 shouldParkAfterFailedAcquire 方式判断需要挂起壅闭当前线程,还要进一步对照超时剩余时间 nanosTimeout 和 spinForTimeoutThreshold 的巨细,若是小于即是 spinForTimeoutThreshold 值(1000 纳秒)的话,将不会使当前线程举行超时守候,而是再次举行自旋历程。
加后面这个判断的主要原因在于,在异常短(小于 1000 纳秒)的时间内的守候无法做到十分正确,若是这时还举行超时守候的话,反而会让我们指定 nanosTimeout 的超时从整体上给人感受反而不太正确,因此,在剩余超时时间异常短的情形下,同步器会再次自旋举行超时获取锁的历程,独占式超时获取锁整个历程如下所示:

共享式同步状态获取与释放

共享锁顾名思义就是可以多个线程共用一个锁,在同步器中使用 acquireShared 来获取共享锁(同步状态),方式源码如下:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

首先通过 tryAcquireShared 实验获取共享锁,该方式是一个模板方式在同步器中只是抛出一个不支持操作异常,需要开发人员自己去实现,同时方式的返回值有三种差别的类型划分代表三种差别的状态,其寄义如下:

  1. 小于 0 示意当前线程获取锁失败
  2. 即是 0 示意当前线程获取锁乐成,然则之后的线程在没有锁释放的情形下获取锁将失败,也就是说这个锁是共享模式下的最后一把锁了
  3. 大于 0 示意当前线程获取锁乐成,而且另有剩余的锁可以获取

当方式 tryAcquireShared 返回值小于 0 时,也就是获取锁失败,将会执行方式 doAcquireShared,继续跟进该方式:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

方式首先挪用 addWaiter 方式封装当前线程和守候状态为共享模块的节点并将其添加到守候同步行列中,可以发现在共享模式下节点的 nextWaiter 属性是固定值 Node.SHARED。然后循环获取当前节点的前驱节点,若是前驱节点是头节点的话就实验获取共享锁,若是返回值大于即是 0 示意获取共享锁乐成,则挪用 setHeadAndPropagate 方式,更新头节点同时若是有可用资源,则向后流传,叫醒后继节点,接下来会检查一下中止标识,若是已经中止则中止当前线程,方式竣事返回。若是返回值小于 0,则示意获取锁失败,需要挂起壅闭当前线程或者继续自旋获取共享锁。下面看看 setHeadAndPropagate 方式的详细实现:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
        * Try to signal next queued node if:
        *   Propagation was indicated by caller,
        *     or was recorded (as h.waitStatus either before
        *     or after setHead) by a previous operation
        *     (note: this uses sign-check of waitStatus because
        *      PROPAGATE status may transition to SIGNAL.)
        * and
        *   The next node is waiting in shared mode,
        *     or we don't know, because it appears null
        *
        * The conservatism in both of these checks may cause
        * unnecessary wake-ups, but only when there are multiple
        * racing acquires/releases, so most need signals now or soon
        * anyway.
        */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

首先将当前获取到锁的节点设置为头节点,然后方式参数 propagate > 0 时示意之前 tryAcquireShared 方式的返回值大于 0,也就是说当前另有剩余的共享锁可以获取,则获取当前节点的后继节点而且后继节点是共享节点时叫醒节点去实验获取锁,doReleaseShared 方式是同步器共享锁释放的主要逻辑。

同步器提供了 releaseShared 方式来举行共享锁的释放,方式源码如下所示:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

首先挪用 tryReleaseShared 方式实验释放共享锁,方式返回 false 代表锁释放失败,方式竣事返回 false,否则就示意乐成释放锁,然后执行 doReleaseShared 方式,举行叫醒后继节点并检查它是否可以向后流传等操作。继续跟进该方式如下:

private void doReleaseShared() {
        /*
        * Ensure that a release propagates, even if there are other
        * in-progress acquires/releases.  This proceeds in the usual
        * way of trying to unparkSuccessor of head if it needs
        * signal. But if it does not, status is set to PROPAGATE to
        * ensure that upon release, propagation continues.
        * Additionally, we must loop in case a new node is added
        * while we are doing this. Also, unlike other uses of
        * unparkSuccessor, we need to know if CAS to reset status
        * fails, if so rechecking.
        */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

可以看到和独占式锁释放差别的是,在共享模式下,状态同步和释放可以同时执行,其原子性由 CAS 来保证,若是头节点改变了也会继续循环。每次共享节点在共享模式下叫醒时,头节点都市指向它,这样就可以保证可以获取到共享锁的所有后续节点都可以叫醒了。

若何自界说同步组件

在 JDK 中基于同步器实现的一些类绝大部门都是聚合了一个或多个继续了同步器的类,使用同步器提供的模板方式自界说内部同步状态的治理,然后通过这个内部类去实现同步状态治理的功效,实在这从某种程度上来说使用了 模板模式。好比 JDK 中可重入锁 ReentrantLock、读写锁 ReentrantReadWriteLock、信号量 Semaphore 以及同步工具类 CountDownLatch 等,其源码部门截图如下:

通过上文可以知道,我们基于同步器可以划分自界说独占锁同步组件和共享锁同步组件,下面以实现一个在统一个时刻最多只允许 3 个线程接见,其它线程的接见将被壅闭的同步工具 TripletsLock 为例,很显然这个工具是共享锁模式,主要思绪就是去实现一个 JDk 中的 Lock 接口来提供面向使用者的方式,好比,挪用 lock 方式获取锁,使用 unlock 来对锁举行释放等,在 TripletsLock 类内部有一个自界说同步器 Sync 继续自同步器 AQS,用来对线程的接见和同步状态举行控制,当线程挪用 lock 方式获取锁时,自界说同步器 Sync 先盘算出获取到锁后的同步状态,然后使用 Unsafe 类操作来保证同步状态更新的原子性,由于统一时刻只能 3 个线程接见,这里我们可以将同步状态 state 的初始值设置为 3,示意当前可用的同步资源数目,当有线程乐成获取到锁时将同步状态 state 减 1,有线程乐成释放锁时将同步状态加 1,同步状态的取值局限为 0、1、2、3,同步状态为 0 时示意没有可用同步资源,这个时刻若是有线程接见将被壅闭。下面来看看这个自界说同步组件的实现代码:

/**
 * @author mghio
 * @date: 2020-06-13
 * @version: 1.0
 * @description:
 * @since JDK 1.8
 */
public class TripletsLock implements Lock {

  private final Sync sync = new Sync(3);

  private static final class Sync extends AbstractQueuedSynchronizer {
    public Sync(int state) {
      setState(state);
    }

    Condition newCondition() {
      return new ConditionObject();
    }

    @Override
    protected int tryAcquireShared(int reduceCount) {
      for (; ;) {
        int currentState = getState();
        int newState = currentState - reduceCount;
        if (newState < 0 || compareAndSetState(currentState, newState)) {
          return newState;
        }
      }
    }

    @Override
    protected boolean tryReleaseShared(int count) {
      for (; ;) {
        int currentState = getState();
        int newState = currentState + count;
        if (compareAndSetState(currentState, newState)) {
          return true;
        }
      }
    }
  }

  @Override
  public void lock() {
    sync.acquireShared(1);
  }

  @Override
  public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
  }

  @Override
  public boolean tryLock() {
    return sync.tryAcquireShared(1) > 0;
  }

  @Override
  public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  }

  @Override
  public void unlock() {
    sync.releaseShared(1);
  }

  @Override
  public Condition newCondition() {
    return sync.newCondition();
  }
}

下面启动 20 个线程测试看看自界说同步同步工具类 TripletsLock 是否到达我们的预期。测试代码如下:

/**
 * @author mghio
 * @date: 2020-06-13
 * @version: 1.0
 * @description:
 * @since JDK 1.8
 */
public class TripletsLockTest {
  private final Lock lock = new TripletsLock();
  private final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

  @Test
  public void testTripletsLock() {
    // 启动 20 个线程
    for (int i = 0; i < 20; i++) {
      Thread worker = new Runner();
      worker.setDaemon(true);
      worker.start();
    }

    for (int i = 0; i < 20; i++) {
      second(2);
      System.out.println();
    }
  }

  private class Runner extends Thread {
    @Override
    public void run() {
      for (; ;) {
        lock.lock();
        try {
          second(1);
          System.out.println(dateFormat.format(new Date()) + " ----> " + Thread.currentThread().getName());
          second(1);
        } finally {
          lock.unlock();
        }
      }
    }
  }

  private static void second(long seconds) {
    try {
      TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

测试效果如下:

从以上测试效果可以发现,统一时刻只有三个线程可以获取到锁,相符预期,这里需要明确的是这个锁获取历程是非公正的。

总结

本文主要是对同步器中的基础数据结构、独占式与共享式同步状态获取与释放历程做了简要剖析,由于水平有限若有错误之处还请留言讨论。行列同步器 AbstractQueuedSynchronizer 是 JDK 中许多的一些多线程并发工具类的实现基础框架,对其深入学习明白有助于我们更好的去使用其特征和相关工具类。

参考文章

Java并发编程的艺术
Java Synchronizer - AQS Learning
从 ReentrantLock 的实现看 AQS 的原理及应用
The java.util.concurrent Synchronizer Framework

,

欧博开户网址

欢迎进入欧博开户网址(Allbet Gaming):www.aLLbetgame.us,欧博网址开放会员注册、代理开户、电脑客户端下载、苹果安卓下载等业务。

上一篇:欧博会员开户:前端面试手写篇

下一篇:潍坊天气:中医小妙招,在家轻松应对暑湿感冒,好用不贵当时见效