并发是服务器端必不可少的技术,当谈到并发时往往联系到锁、同步、竞争资源等名词。下面我们从Java语言来简单了解下并发编程的基础,并发框架如下图所示:
![]()
上图从下到上描述了Java并发的整个体系,首先是JVM的violate可见性读写以及操作系统CAS原子操作的支持,在这一基础上出现了基础并发框架AQS以及一些简单的原子变量类,最后对AQS进行扩展完善,出现了大量的并发容器、同步器、锁结构。
AQS
AQS提供了许多公用的方法来管理同步,并且也提供了一些抽象方法,子类通过继承AQS来实现它的抽象方法来管理同步状态,AQS提供的主要方法有:
|
|
同步队列CLH
此外AQS内部维护着一个CLH同步队列,该队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。CLH结构如下图所示:

在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其定义如下:
|
|
同步状态的获取与释放
独占式
acquire方法为AQS提供的模板方法,该方法为独占式获取同步状态,该方法对中断不敏感,即由于线程获取同步状态失败加入到CLH同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除。代码如下:
|
|
tryAcquire:去尝试获取锁,获取成功则设置锁状态并返回true,否则返回false。该方法由子类去实现
addWaiter:如果tryAcquire返回false(获取同步状态失败),则调用该方法将当前线程加入到CLH同步队列尾部
acquireQueued:当前线程会根据公平性原则来进行阻塞等待(自旋),直到获取锁为止;并且返回当前线程在等待过程中有没有中断过,其代码如下:
|
|
可以看出只有其前驱节点为头结点才能够尝试获取同步状态,原因在于(1)FIFO队列;(2)头节点释放同步状态后,将会唤醒其后继节点,后继节点被唤醒后需要检查自己是否为头节点,acquire流程如下图所示:

AQS提供了acquire独占式获取同步状态,但该方法对中断不响应,对线程进行中断操作后,该线程会依然位于CLH同步队列中等待着获取同步状态。为了响应中断,AQS提供了acquireInterruptibly方法,该方法在等待获取同步状态时,如果当前线程被中断了,会立刻响应中断抛出异常InterruptedException:
|
|
AQS也提供了超时处理,tryAcquireNanos,该方法为acquireInterruptibly方法的进一步增强,它除了响应中断外,还有超时控制,即如果当前线程没有在指定时间内获取同步状态,则会返回false,否则返回true。
|
|
tryAcquireNanos的流程如下图所示:

独占式同步状态释放
|
|
共享式同步状态获取与释放
AQS提供acquireShared方法共享式获取同步状态:
|
|
阻塞与唤醒
在线程获取同步状态时如果获取失败,则加入CLH同步队列,通过通过自旋的方式不断获取同步状态,但是在自旋的过程中则需要判断当前线程是否需要阻塞,其主要方法在acquireQueued
|
|
在获取同步状态失败后,线程并不是立马进行阻塞,需要检查该线程的状态,检查状态的方法为 shouldParkAfterFailedAcquire,该算法的步骤如下:
(1)如果当前线程的前驱节点状态为SINNAL,则表明当前线程需要被阻塞,调用unpark()方法唤醒,直接返回true,当前线程阻塞
(2)如果当前线程的前驱节点状态为CANCELLED(ws > 0),则表明该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false
(3)如果前驱节点非SINNAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SINNAL,返回false
shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,则调用parkAndCheckInterrupt()方法阻塞当前线程:
|
|
当线程释放同步状态后需要唤醒其后继节点:
|
|
可能会存在当前线程的后继节点为null,超时、被中断的情况,如果遇到这种情况了,则需要跳过该节点,从tail尾节点开始,原因在于node.next仍然可能会存在null或者取消了,所以采用tail回溯办法找第一个可用的线程
ReentrantLock
ReentrantLock,可重入锁。它可以等同于synchronized的使用,但是ReentrantLock提供了比synchronized更强大、灵活的锁机制,可以减少死锁发生的概率.
ReentrantLock实现了Lock接口,依赖内部类Sync类来管理同步状态,Sync继承了AQS,同时Sync有2个子类,分别支持公平锁和非公平锁NonfairSync与FairSync。下面我们看看Lock的常用方法:
|
|
ReentrantLock里面大部分的功能都是委托给Sync来实现的,同时Sync内部定义了lock()抽象方法由其子类去实现,默认实现了nonfairTryAcquire(int acquires)方法,可以看出它是非公平锁的默认实现方式。下面我们看非公平锁的lock()方法:
|
|
释放锁:
|
|
公平锁的tryAcquire方法:其实只是多了一个hasQueuedPredecessors判断,即节点是否为头节点
|
|
下面我们继续分析一下Lock的阻塞和唤醒:Condition
实现等待/通知模式二种方式:(1)Synchronized来控制同步,配合Object的wait()、notify();(2)Lock提供了条件Condition,其线程的等待、唤醒操作更加详细和灵活。下图是Condition与Object的监视器方法的对比

Condition提供了一系列的方法来对阻塞和唤醒线程:
await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
await(long time, TimeUnit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
awaitNanos(long nanosTimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout – 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
awaitUninterruptibly() :造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
signal():唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
signal()All:唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。
Condition接口有一个实现类ConditionObject,其位于AQS内部类:
每个Condition对象都包含着一个FIFO队列,该队列是Condition对象通知/等待功能的关键。在队列中每一个节点都包含着一个线程引用,该线程就是在该Condition对象上等待的线程,其中Node和CLH同步队列的Node相同
|
|
调用Condition的await()方法会使当前线程进入等待状态,同时会加入到Condition等待队列同时释放锁。当从await()方法返回时,当前线程一定是获取了Condition相关连的锁,await流程:首先将当前线程新建一个节点同时加入到条件队列中,然后释放当前线程持有的同步状态。然后则是不断检测该节点代表的线程释放出现在CLH同步队列中(收到signal信号之后就会在AQS队列中检测到),如果不存在则一直挂起,否则参与竞争同步状态。
|
|
fullyRelease(Node node),负责释放该线程持有的锁:
|
|
isOnSyncQueue(Node node):如果一个节点同步队列上获取锁则返回true
|
|
通知唤醒:
调用Condition的signal()方法,将会唤醒在等待队列里的头节点,在唤醒节点前,会将节点移到CLH同步队列中
|
|
Condition总结:一个线程获取锁后,通过调用Condition的await()方法,会将当前线程先加入到条件队列中,然后释放锁,最后通过isOnSyncQueue(Node node)方法不断自检看节点是否已经在CLH同步队列了,如果是则尝试获取锁,否则一直挂起。当线程调用signal()方法后,程序首先检查当前线程是否获取了锁,然后通过doSignal(Node first)方法唤醒条件队列的首节点。被唤醒的线程,将从await()方法中的while循环中退出来,然后调用acquireQueued()方法竞争同步状态。
CountDownLatch
CountDownLatch功能:在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
CountDownlatch与CyclicBarrier区别:
(1)CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待
(2)CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier
CountDownLatch在实现上同样依赖内部类Sync,Sync继承AQS,CountDownLatch仅有一个int类型的构造函数
|
|
通过这个内部类Sync我们可以清楚地看到CountDownLatch是采用共享锁来实现的,下面我们看看它的核心方法await方法:
|
|
countdown方法:
|
|
Semaphore
Semaphore的实现也是依赖Sync,包含公平锁(FairSync)和非公平锁(NonfairSync),继承内部类Sync,其中Sync继承AQS
Semaphore提供了两个构造函数:
|
|
通过acquire来获取信号量:
|
|
信号量释放:
|
|
CyclicBarrier
CyclicBarrier:允许一组线程互相等待,直到到达某个公共屏障点。在一组固定大小的线程的程序中,这些线程必须不时地互相等待,barrier 在释放等待线程后可以重用。
构造函数:
|
|
最重要的wait方法:
|
|
await处理流程:
如果该线程不是到达的最后一个线程,则他会一直处于等待状态,除非发生以下情况:
(1)最后一个线程到达,即index == 0
(2)超出了指定时间(超时等待)
(3)其他的某个线程中断当前线程
(4)其他的某个线程中断另一个等待的线程
(5)其他的某个线程在等待barrier超时
(6)其他的某个线程在此barrier调用reset()方法。reset()方法用于将屏障重置为初始状态
此外,Generation描述着CyclicBarrier的更显换代。在CyclicBarrier中,同一批线程属于同一代。当有parties个线程到达barrier,generation就会被更新换代。其中broken标识该当前CyclicBarrier是否已经处于损坏状态。
|
|
默认barrier是没有损坏的。当barrier损坏了或者有一个线程中断了,则通过breakBarrier()来终止所有的线程,在breakBarrier()中除了将broken设置为true,还会调用signalAll将在CyclicBarrier处于等待状态的线程全部唤醒。
|
|
当所有线程都已经到达barrier处(index == 0),则会通过nextGeneration()进行更新换地操作,在这个步骤中,做了三件事:唤醒所有线程,重置count,generation
|
|
总结
AQS作为Java并发基础的框架提供获取状态,acquire,release等公共方法,子类通过实现具体的逻辑来达到不同的同步目的。此外Lock,CyclicBarrier,Semaphore,CountDownLatch作为最基础的同步类为其他同步类提供了基础。