Java并发之AQS

并发是服务器端必不可少的技术,当谈到并发时往往联系到锁、同步、竞争资源等名词。下面我们从Java语言来简单了解下并发编程的基础,并发框架如下图所示:

img
上图从下到上描述了Java并发的整个体系,首先是JVM的violate可见性读写以及操作系统CAS原子操作的支持,在这一基础上出现了基础并发框架AQS以及一些简单的原子变量类,最后对AQS进行扩展完善,出现了大量的并发容器、同步器、锁结构。

AQS

AQS提供了许多公用的方法来管理同步,并且也提供了一些抽象方法,子类通过继承AQS来实现它的抽象方法来管理同步状态,AQS提供的主要方法有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
getState():返回同步状态的当前值;
setState(int newState):设置当前同步状态;
compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;
tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态;
tryRelease(int arg):独占式释放同步状态;
tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;
tryReleaseShared(int arg):共享式释放同步状态;
isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;
acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;
acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;
tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;
release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;
releaseShared(int arg):共享式释放同步状态;

同步队列CLH

此外AQS内部维护着一个CLH同步队列,该队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。CLH结构如下图所示:
img
在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static final class Node {
/** 共享 */
static final Node SHARED = new Node();
/** 独占 */
static final Node EXCLUSIVE = null;
// 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;
static final int CANCELLED = 1;
//后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
static final int SIGNAL = -1;
//节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
static final int CONDITION = -2;
//表示下一次共享式同步状态获取将会无条件地传播下去
static final int PROPAGATE = -3;
/** 等待状态 */
volatile int waitStatus;
/** 前驱节点 */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 获取同步状态的线程 */
volatile Thread thread;
Node nextWaiter;
}

同步状态的获取与释放

独占式

acquire方法为AQS提供的模板方法,该方法为独占式获取同步状态,该方法对中断不敏感,即由于线程获取同步状态失败加入到CLH同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除。代码如下:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

tryAcquire:去尝试获取锁,获取成功则设置锁状态并返回true,否则返回false。该方法由子类去实现
addWaiter:如果tryAcquire返回false(获取同步状态失败),则调用该方法将当前线程加入到CLH同步队列尾部
acquireQueued:当前线程会根据公平性原则来进行阻塞等待(自旋),直到获取锁为止;并且返回当前线程在等待过程中有没有中断过,其代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋,直到满足下面条件
for (;;) {
final Node p = node.predecessor();
// 其前驱节点是头节点并且获取同步状态成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 获取同步失败,等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

可以看出只有其前驱节点为头结点才能够尝试获取同步状态,原因在于(1)FIFO队列;(2)头节点释放同步状态后,将会唤醒其后继节点,后继节点被唤醒后需要检查自己是否为头节点,acquire流程如下图所示:
img
AQS提供了acquire独占式获取同步状态,但该方法对中断不响应,对线程进行中断操作后,该线程会依然位于CLH同步队列中等待着获取同步状态。为了响应中断,AQS提供了acquireInterruptibly方法,该方法在等待获取同步状态时,如果当前线程被中断了,会立刻响应中断抛出异常InterruptedException:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 中断直接跑出异常
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 中断后抛出异常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

AQS也提供了超时处理,tryAcquireNanos,该方法为acquireInterruptibly方法的进一步增强,它除了响应中断外,还有超时控制,即如果当前线程没有在指定时间内获取同步状态,则会返回false,否则返回true。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//nanosTimeout <= 0
if (nanosTimeout <= 0L)
return false;
//超时时间
final long deadline = System.nanoTime() + nanosTimeout;
//新增Node节点
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
//自旋
for (;;) {
final Node p = node.predecessor();
//获取同步状态成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
//重新计算需要休眠的时间
nanosTimeout = deadline - System.nanoTime();
//已经超时,返回false
if (nanosTimeout <= 0L)
return false;
//如果没有超时,则等待nanosTimeout纳秒,该线程会直接从LockSupport.parkNanos中返回
//如果nanosTimeout <= spinForTimeoutThreshold ,就不需要休眠了,直接进入快速自旋的过程。原因在于 spinForTimeoutThreshold
//已经非常小了,非常短的时间等待无法做到十分精确直接自旋
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//线程是否已经中断了,中断直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

tryAcquireNanos的流程如下图所示:
img
独占式同步状态释放

1
2
3
4
5
6
7
8
9
10
11
public final boolean release(int arg) {
// 尝试释放,子类实现
if (tryRelease(arg)) {
Node h = head;
// 唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

共享式同步状态获取与释放

AQS提供acquireShared方法共享式获取同步状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public final void acquireShared(int arg) {
// tryAcquireShared由子类实现
if (tryAcquireShared(arg) < 0)
//获取失败,自旋获取同步状态
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
/共享式节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//尝试获取同步
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

阻塞与唤醒

在线程获取同步状态时如果获取失败,则加入CLH同步队列,通过通过自旋的方式不断获取同步状态,但是在自旋的过程中则需要判断当前线程是否需要阻塞,其主要方法在acquireQueued

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前驱节点
int ws = pred.waitStatus;
//状态为signal,表示当前线程处于等待状态,直接放回true
if (ws == Node.SIGNAL)
return true;
//前驱节点状态 > 0 ,则为Cancelled,表明该节点已经超时或者被中断了,需要从同步队列中取消
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
//前驱节点状态为Condition、propagate
else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

在获取同步状态失败后,线程并不是立马进行阻塞,需要检查该线程的状态,检查状态的方法为 shouldParkAfterFailedAcquire,该算法的步骤如下:
(1)如果当前线程的前驱节点状态为SINNAL,则表明当前线程需要被阻塞,调用unpark()方法唤醒,直接返回true,当前线程阻塞
(2)如果当前线程的前驱节点状态为CANCELLED(ws > 0),则表明该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false
(3)如果前驱节点非SINNAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SINNAL,返回false
shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,则调用parkAndCheckInterrupt()方法阻塞当前线程:

1
2
3
4
5
private final boolean parkAndCheckInterrupt() {
// LockSupport调用UNSAFE类park方法来挂起当前线程
LockSupport.park(this);
return Thread.interrupted();
}

当线程释放同步状态后需要唤醒其后继节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void unparkSuccessor(Node node) {
//当前节点状态
int ws = node.waitStatus;
//当前状态 < 0 则设置为 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//当前节点的后继节点
Node s = node.next;
//后继节点为null或者其状态 > 0 (超时或者被中断了)
if (s == null || s.waitStatus > 0) {
s = null;
//从tail节点来找可用节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒后继节点
if (s != null)
LockSupport.unpark(s.thread);
}

可能会存在当前线程的后继节点为null,超时、被中断的情况,如果遇到这种情况了,则需要跳过该节点,从tail尾节点开始,原因在于node.next仍然可能会存在null或者取消了,所以采用tail回溯办法找第一个可用的线程

ReentrantLock

ReentrantLock,可重入锁。它可以等同于synchronized的使用,但是ReentrantLock提供了比synchronized更强大、灵活的锁机制,可以减少死锁发生的概率.
ReentrantLock实现了Lock接口,依赖内部类Sync类来管理同步状态,Sync继承了AQS,同时Sync有2个子类,分别支持公平锁和非公平锁NonfairSync与FairSync。下面我们看看Lock的常用方法:

1
2
3
public void lock() {
sync.lock();
}

ReentrantLock里面大部分的功能都是委托给Sync来实现的,同时Sync内部定义了lock()抽象方法由其子类去实现,默认实现了nonfairTryAcquire(int acquires)方法,可以看出它是非公平锁的默认实现方式。下面我们看非公平锁的lock()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
final void lock() {
//尝试获取锁CAS
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//获取失败,调用AQS的acquire(int arg)方法,acquire在AQS中定义,子类实现tryAcquire
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
// 默认非公平锁
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//当前线程
final Thread current = Thread.currentThread();
//获取同步状态
int c = getState();
//state == 0,表示没有该锁处于空闲状态
if (c == 0) {
//获取锁成功,设置为当前线程所有
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//线程重入
//判断锁持有的线程是否为当前线程,是则增加次数,可重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

释放锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void unlock() {
// release在AQS中定义,包括了tryRelease,unparkSuccessor2个主要操作
sync.release(1);
}
// 子类实现tryRelease
protected final boolean tryRelease(int releases) {
//减掉releases
int c = getState() - releases;
//如果释放的不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//state == 0 表示已经释放完全了,其他线程可以获取同步状态了
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

公平锁的tryAcquire方法:其实只是多了一个hasQueuedPredecessors判断,即节点是否为头节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
public final boolean hasQueuedPredecessors() {
Node t = tail; //尾节点
Node h = head; //头节点
Node s;
//头节点 != 尾节点
//同步队列第一个节点不为null
//当前线程是同步队列第一个节点
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

下面我们继续分析一下Lock的阻塞和唤醒:Condition
实现等待/通知模式二种方式:(1)Synchronized来控制同步,配合Object的wait()、notify();(2)Lock提供了条件Condition,其线程的等待、唤醒操作更加详细和灵活。下图是Condition与Object的监视器方法的对比
img
Condition提供了一系列的方法来对阻塞和唤醒线程:
await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
await(long time, TimeUnit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
awaitNanos(long nanosTimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout – 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
awaitUninterruptibly() :造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
signal():唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
signal()All:唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。
Condition接口有一个实现类ConditionObject,其位于AQS内部类:
每个Condition对象都包含着一个FIFO队列,该队列是Condition对象通知/等待功能的关键。在队列中每一个节点都包含着一个线程引用,该线程就是在该Condition对象上等待的线程,其中Node和CLH同步队列的Node相同

1
2
3
4
5
6
7
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}

调用Condition的await()方法会使当前线程进入等待状态,同时会加入到Condition等待队列同时释放锁。当从await()方法返回时,当前线程一定是获取了Condition相关连的锁,await流程:首先将当前线程新建一个节点同时加入到条件队列中,然后释放当前线程持有的同步状态。然后则是不断检测该节点代表的线程释放出现在CLH同步队列中(收到signal信号之后就会在AQS队列中检测到),如果不存在则一直挂起,否则参与竞争同步状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final void await() throws InterruptedException {
// 当前线程中断
if (Thread.interrupted())
throw new InterruptedException();
//当前线程加入等待队列
Node node = addConditionWaiter();
//释放锁
long savedState = fullyRelease(node);
int interruptMode = 0;
/**
* 检测此节点的线程是否在同步队上,如果不在,则说明该线程还不具备竞争锁的资格,则继续等待
* 直到检测到此节点在同步队列上
*/
while (!isOnSyncQueue(node)) {
//线程挂起
LockSupport.park(this);
//如果已经中断了,则退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//竞争同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//清理下条件队列中的不是在等待条件的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

fullyRelease(Node node),负责释放该线程持有的锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final long fullyRelease(Node node) {
boolean failed = true;
try {
//节点状态--其实就是持有锁的数量
long savedState = getState();
//调用rlease释放锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

isOnSyncQueue(Node node):如果一个节点同步队列上获取锁则返回true

1
2
3
4
5
6
7
8
9
10
final boolean isOnSyncQueue(Node node) {
//状态为Condition,获取前驱节点为null,返回false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//后继节点不为null,肯定在CLH同步队列中,
if (node.next != null)
return true;
return findNodeFromTail(node);
}

通知唤醒:
调用Condition的signal()方法,将会唤醒在等待队列里的头节点,在唤醒节点前,会将节点移到CLH同步队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public final void signal() {
//检测当前线程是否为拥有锁的独
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//头节点,唤醒条件队列中的第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first); //唤醒
}
//doSignal(Node first)主要是做两件事:1.修改头节点,2.调用transferForSignal(Node first) 方法将节点移动到CLH同步队列中
private void doSignal(Node first) {
do {
//修改头结点,完成旧头结点的移出工作
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
//将该节点从状态CONDITION改变为初始状态0,
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将节点加入到CLH队列中去,返回的是CLH队列中node节点前面的一个节点
Node p = enq(node);
int ws = p.waitStatus;
//如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

Condition总结:一个线程获取锁后,通过调用Condition的await()方法,会将当前线程先加入到条件队列中,然后释放锁,最后通过isOnSyncQueue(Node node)方法不断自检看节点是否已经在CLH同步队列了,如果是则尝试获取锁,否则一直挂起。当线程调用signal()方法后,程序首先检查当前线程是否获取了锁,然后通过doSignal(Node first)方法唤醒条件队列的首节点。被唤醒的线程,将从await()方法中的while循环中退出来,然后调用acquireQueued()方法竞争同步状态。

CountDownLatch

CountDownLatch功能:在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
CountDownlatch与CyclicBarrier区别:
(1)CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待
(2)CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier
CountDownLatch在实现上同样依赖内部类Sync,Sync继承AQS,CountDownLatch仅有一个int类型的构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
//获取同步状态
int getCount() {
return getState();
}
//获取同步状态
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//释放同步状态
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

通过这个内部类Sync我们可以清楚地看到CountDownLatch是采用共享锁来实现的,下面我们看看它的核心方法await方法:

1
2
3
4
5
6
7
8
9
10
11
12
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//AQS定义acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//tryAcquireShared见上面Sync类内部,若state不为0(countdown未减为0)则调用AQS的doAcquireSharedInterruptibly方法自旋等待同步状态

countdown方法:

1
2
3
4
5
6
7
8
9
10
11
12
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// releaseShared见上面Sync内部,若state为0直接返回fasle,若countdown减1后为0,则调用AQS的doReleaseShared唤醒,await继续执行

Semaphore

Semaphore的实现也是依赖Sync,包含公平锁(FairSync)和非公平锁(NonfairSync),继承内部类Sync,其中Sync继承AQS
Semaphore提供了两个构造函数:

1
2
3
4
5
6
7
8
9
// 创建具有给定的许可数和非公平的公平设置的 Semaphore,默认非公平
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 创建具有给定的许可数和给定的公平设置的 Semaphore
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

通过acquire来获取信号量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 共享的可中断
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AQS定义的 acquireSharedInterruptibly,依赖sync实现的tryAcquireShared
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 公平的tryAcquireShared
protected int tryAcquireShared(int acquires) {
for (;;) {
//判断该线程是否位于CLH队列的列头
if (hasQueuedPredecessors())
return -1;
//获取当前的信号量许可
int available = getState();
//设置“获得acquires个信号量许可之后,剩余的信号量许可数”
int remaining = available - acquires;
//CAS设置信号量
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 非公平的tryAcquireShared方法,少了一个头节点的判断
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

信号量释放:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void release() {
sync.releaseShared(1);
}
// AQS的releaseShared,子类sync实现tryReleaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
//信号量的许可数 = 当前信号许可数 + 待释放的信号许可数
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//设置可获取的信号许可数为next
if (compareAndSetState(current, next))
return true;
}
}

CyclicBarrier

CyclicBarrier:允许一组线程互相等待,直到到达某个公共屏障点。在一组固定大小的线程的程序中,这些线程必须不时地互相等待,barrier 在释放等待线程后可以重用。
构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
// barrier数量
this.parties = parties;
this.count = parties;
// 最后到达公共屏障点执行的任务
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
//使用lock和condition来管理同步状态
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();

最重要的wait方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);//不超时等待
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//分代
final Generation g = generation;
//当前generation“已损坏”,抛出BrokenBarrierException异常
//抛出该异常一般都是某个线程在等待某个处于“断开”状态的CyclicBarrie
if (g.broken)
//当某个线程试图等待处于断开状态的 barrier 时,或者 barrier 进入断开状态而线程处于等待状态时,抛出该异常
throw new BrokenBarrierException();
//如果线程中断,终止CyclicBarrier
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//进来一个线程 count - 1
int index = --count;
//count == 0 表示所有线程均已到位,触发Runnable任务
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//触发任务
if (command != null)
command.run();
ranAction = true;
//唤醒所有等待线程,并更新generation
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 自旋直到所有线程到达,超时,broken,中断
for (;;) {
try {
//如果不是超时等待,则调用Condition.await()方法等待
if (!timed)
trip.await();
else if (nanos > 0L)
//超时等待,调用Condition.awaitNanos()方法等待
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
//generation已经更新,返回index
if (g != generation)
return index;
//“超时等待”,并且时间已到,终止CyclicBarrier,并抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//释放锁
lock.unlock();
}
}

await处理流程:
如果该线程不是到达的最后一个线程,则他会一直处于等待状态,除非发生以下情况:
(1)最后一个线程到达,即index == 0
(2)超出了指定时间(超时等待)
(3)其他的某个线程中断当前线程
(4)其他的某个线程中断另一个等待的线程
(5)其他的某个线程在等待barrier超时
(6)其他的某个线程在此barrier调用reset()方法。reset()方法用于将屏障重置为初始状态
此外,Generation描述着CyclicBarrier的更显换代。在CyclicBarrier中,同一批线程属于同一代。当有parties个线程到达barrier,generation就会被更新换代。其中broken标识该当前CyclicBarrier是否已经处于损坏状态。

1
2
3
private static class Generation {
boolean broken = false;
}

默认barrier是没有损坏的。当barrier损坏了或者有一个线程中断了,则通过breakBarrier()来终止所有的线程,在breakBarrier()中除了将broken设置为true,还会调用signalAll将在CyclicBarrier处于等待状态的线程全部唤醒。

1
2
3
4
5
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

当所有线程都已经到达barrier处(index == 0),则会通过nextGeneration()进行更新换地操作,在这个步骤中,做了三件事:唤醒所有线程,重置count,generation

1
2
3
4
5
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}

总结

AQS作为Java并发基础的框架提供获取状态,acquire,release等公共方法,子类通过实现具体的逻辑来达到不同的同步目的。此外Lock,CyclicBarrier,Semaphore,CountDownLatch作为最基础的同步类为其他同步类提供了基础。

谢谢大佬的打赏!