APP下载

Object中wait/notify的另一种实现Condition中的await/signal

消息来源:baojiabao.com 作者: 发布时间:2024-05-15

报价宝综合消息Object中wait/notify的另一种实现Condition中的await/signal

前几篇文章对AQS的独占资源模式和共享资源模式的源代码进行了解析,本篇文章来分析AQS最后一个非常重要的知识点:Condition源代码解析。此篇文章内容较长,我是一个字一个字敲出来的,请您也慢慢的品读,自己认识有一定的局限性,欢迎交流更正。

大家还记得Java中怎样实现生产者和消费者模式吗?原理是非常的简单的,为了防止生产者和消费者不均衡的情况发生,在生产者/消费者模式中会提供一个缓冲区,而这个缓冲区的作用就是储存资料的,生产者生产资料后存放在缓冲区中,而消费者从缓冲区中消费这些资料,从而使生产者和消费者进行了解耦,简单概括如下:

1:生产者生产资料,直到缓冲区满后阻塞。

2:消费者消费资料,直到缓冲区为空后阻塞。

3:当缓冲区有资料后,会通知消费者进行消费资料

4:当缓冲区没有资料后,会通知生产者进行生产资料

5:可以同时存在多个生产者和多个消费者

对于上面生产者和消费者模式的实现方式我们首先想到的就是通过呼叫Object中的wait()方法阻塞,呼叫notify()和notifyAll()方法进行通知,但是呼叫上述方法的前提是必须获取锁,所以可以利用synchronized+wait()+notify()/notifyAll()方式来实现生产者和消费者模式。下面举一个简单的例子来说明。

缓冲区的程式码如下:

/**

* 缓冲区

*/

public class SyncCache {

//缓冲区

private String[] data = new String[10];

//缓冲区阵列索引

private int index;

//生产资料

public void product(String productData) {

synchronized (this) {

if (index == data.length) {

try {

System.out.println("缓冲区已满,生产者被阻塞");

this.wait();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

data[index] = productData;

index++;

this.notify();

}

}

//消费资料

public String consume() {

synchronized (this) {

if (index == 0) {

try {

System.out.println("缓冲区已空,消费者被阻塞");

this.wait();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

this.notify();

index--;

String consumeData = data[index];

return consumeData;

}

}

}

生产者程式码如下:

public class Product implements Runnable {

private SyncCache cache;

public Product(SyncCache cache) {

this.cache = cache;

}

@Override

public void run() {

int count = 1;

for(;;){

String data = "a"+count;

cache.product(data);

System.out.println("生产者生产了资料:"+data);

count++;

}

}

}

消费者程式码如下:

public class Consume implements Runnable {

private SyncCache cache;

public Consume(SyncCache cache) {

this.cache = cache;

}

@Override

public void run() {

for(;;){

System.out.println("消费者消费了资料:" + cache.consume());

}

}

}

测试程式码如下:

public class Test {

public static void main(String[] args) {

SyncCache cache = new SyncCache();

Product product = new Product(cache);

Consume consume = new Consume(cache);

new Thread(product).start();

new Thread(consume).start();

}

}

通过上面简单的例子执行结果如下:

上面这个例子非常的简单,但是能够很好的说明生产者和消费者模式的例子,可以看出呼叫Object物件的wait()方法和notify()方法必须在获取锁的前提下,并且这个锁属于独占锁

上面说了这么多,那么和要讲的Condition有什么联络吗?上面通过呼叫wait()/notify()只是实现生产者和消费者模式的一种方法,Condition也能实现生产者和消费者模式。

1:synchronized通过和AQS实现的锁对应,比如:ReentrantLock

2:Condition中的await()方法和Object中的wait()方法对应

3:Condition中的signal()方法和Object的notify()方法对应

4:Condition中的signalAll()方法和Object的notifyAll()方法对应

通过上面的讲述大家明白了Condition的作用了吧,但是Condition只是一个界面,在AQS内部有它的一个实现类ConditionObject,接下来我从源代码角度去分析AQS是怎样实现Condition的。

ConditionObject是AQS中的内部类,它复用了AQS中的Node节点的定义,如图:

在ConditionObject中也有一个以Node为节点的伫列,我们称之为Condition伫列,在AQS中有一个CLH等待伫列,我们称之为Sync伫列,但是两者还是有一定的区别的。

1:sync伫列的头结点为head,而Condition伫列的头结点为firstWaiter

2:sync伫列的尾节点为tail,而Condition伫列的尾节点为lastWaiter

3:sync伫列的head没有和任何执行绪系结,而Condition伫列的firstWaiter绑定了执行绪

一、await在条件变数上的等待

和Object中的wait()方法一样,在呼叫await()方法之前必须获取到锁,并且是独占锁

public final void await() throws InterruptedException {

//判断执行绪是否被中断,如果被中断则丢掷异常

if (Thread.interrupted())

throw new InterruptedException();

//$1:将当前执行绪封装成Node节点

Node node = addConditionWaiter();

//$2:呼叫await()后要释放锁,这一步就是释放获取的资源

int savedState = fullyRelease(node);

int interruptMode = 0;

//$3:通过while循环监测执行绪状态,直到执行绪被signal或者执行绪被中断并且放入了synch伫列中

while (!isOnSyncQueue(node)) {

//阻塞当前执行绪

LockSupport.park(this);

//$4:如果发生中断则退出while循环,判断中断模式,如果中断是丢掷异常,还是稍后在处理中断

if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

break;

}

//退出上述的while循环后,通过呼叫acquireQueued()方法获取资源,如果获取资源时发生了中断,如果在呼叫await()方法时被中断,则依然是THROW_IE

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

interruptMode = REINTERRUPT;

//如果是因为中断,此时waitStatus=0,但是此时它仍在Condition伫列中,所以需要从Condition伫列中清除

if (node.nextWaiter != null) // clean up if cancelled

unlinkCancelledWaiters();

//$5:如果有中断,则处理不同的中断

if (interruptMode != 0)

reportInterruptAfterWait(interruptMode);

}

通过对源代码的注释,大家对await有一定的了解了。我系统的总结以下:

第一步:首先判断当前执行绪是中断,如果被中断,则丢掷异常,如果没有被中断,则继续下面的流程。

第二步:通过呼叫addConditionWaiter()将当前执行绪封装成Node节点存放到Condition伫列的尾部

第三步:因为当前执行绪已经获取了锁,所以呼叫await需要释放资源,所以通过呼叫fullyRelease()释放资源,也就是释放锁,因为这个锁是独占锁并且可以重入,所以要全部把资源释放,从fully字面上也可以理解。

第四步:通过while循环判断当前执行绪是否在sync等待伫列上,如果没有在sync伫列上,则需要阻塞当前执行绪,然后呼叫checkInterruptWaiting()方法判断是否被中断过,如果被中断过,则跳出while循环。

第五步:通过呼叫acquireQueued()方法获取资源,如果在呼叫这个方法时被中断,则中断型别变成REINTERRUPT(稍后处理中断),我们知道这个方法返回值只是记录是否被中断过,并不会响应中断。

第六步:如果是因为中断,此时waitStatus=0,但是此时它仍在Condition伫列中,所以需要从Condition伫列中清除

第七步:如果被中断,则呼叫reportInterruptAfterWait()方法处理不同的中断型别

从上面可以看出await()会释放当前获取的锁,然后阻塞,直到被signal或者被中断。流程图如下:

$1:addConditionWaiter():将执行绪新增到Condition伫列中

private Node addConditionWaiter() {

//获取condition伫列的尾节点

Node t = lastWaiter;

// 如果尾节点被取消,则从condition中清除

if (t != null && t.waitStatus != Node.CONDITION) {

unlinkCancelledWaiters();

t = lastWaiter;

}

//将当前执行绪封装成Node节点,并且waitStatus状态为CONDITION

Node node = new Node(Thread.currentThread(), Node.CONDITION);

if (t == null)

firstWaiter = node;

else

t.nextWaiter = node;

lastWaiter = node;

return node;

}

上述这个方法就是将当前执行绪封装成Node节点,然后加入到Condition的尾部,在加入之前需要检查以下尾部节点t是否还在等待Condition条件,如果被signal或者被中断,则呼叫清除方法将尾节点从Condition伫列中清除掉。

$2:fullyRelease():释放已经获取的资源

final int fullyRelease(Node node) {

//这个failed标识:判断资源是否释放成功

boolean failed = true;

try {

//获取到当前资源的值

int savedState = getState();

//呼叫释放资源方法

if (release(savedState)) {

failed = false;

return savedState;

} else {

throw new IllegalMonitorStateException();

}

} finally {

//如果释放资源失败,则节点的状态改成CANCELED

if (failed)

node.waitStatus = Node.CANCELLED;

}

}

这个方法非常的简单,就是释放拥有的资源,如果资源释放失败,则将当前执行绪节点的状态变成CANCELLED。

$3:isOnSyncQueue():判断节点是否在sync伫列中

final boolean isOnSyncQueue(Node node) {

//如果状态是CONDITION:说明在condition伫列上,一定不在sync伫列上

//如果当前节点的前驱节点为空:说明一定不在sync伫列上

if (node.waitStatus == Node.CONDITION || node.prev == null)

return false;

//如果当前节点的后继节点不为空,说明一定在sync伫列上

if (node.next != null)

return true;

return findNodeFromTail(node);

}

private boolean findNodeFromTail(Node node) {

Node t = tail;

for (;;) {

if (t == node)

return true;

if (t == null)

return false;

t = t.prev;

}

}

我首先贴一张加入sync伫列的源代码图:

上图中的第1步就是设定node.prev为当前的尾节点,如果node.prev==null,则可以判定当前节点一定不在sync上。在往condition伫列新增节点时,我们并没有设定节点的后继节点next的值,而在sync伫列中我们设定过node.next,所以如果node.next!=null则说明一定在sync节点上。

如果上面条件都不符合,也就是说node.prev不一定为null,node.next一定为null,这个时候可能节点正在加入sync伫列,不过可能加入失败,如上图第2步通过CAS可能会失败,所以在尝试通过呼叫findNodeFromTail()判断以下是否加入sync伫列成功。这个方法从尾部开始遍历sync伫列,如果监测到节点则直接返回true,如果此时还没有成功,则不在等待了,直接返回false

$4:checkInterruptWhileWaiting():监测是否发生中断,如果发生中断,则判断不同的中断处理型别

private static final int REINTERRUPT = 1;

private static final int THROW_IE = -1;

private int checkInterruptWhileWaiting(Node node) {

//如果发生中断,则呼叫transferAfterCancelledWait()方法判断

//如果没有发生中断,则直接返回0

return Thread.interrupted() ?

(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :

0;

}

从await()方法可以看出,执行绪从park中返回主要有两个条件,

第一个条件:被signal/signalAll唤醒的

第二个条件:执行绪被中断

如果是因为第二个条件执行绪被中断返回的,则主要返回3个值:

0:表示没有发生中断

REINTERRUPT(1):表示发生了中断,在返回之前即使发生中断但是被忽略,等待返回后重新处理中断

THROW_IE(-1):表示发生了中断,后续需要丢掷异常处理

final boolean transferAfterCancelledWait(Node node) {

//$1:如果执行绪中断则通过CAS将waitStatus从CONDITION变成0

if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {

//成功后加入到sync伫列中等待获取资源

enq(node);

return true;

}

while (!isOnSyncQueue(node))

Thread.yield();

return false;

}

这个方法在AQS框架中,不在ConditionObject中,此方法虽然简短,但是蕴含着很多的知识点。当是第一个条件被signal/signalAll唤醒返回的话,它首先会将状态从CONDITION变成0,

1:如果$1的CAS执行成功,则说明中断先于signal发生,因为如果signal先发生,它会把状态从CONDITION变成0.

2:如果1执行成功,则说明从中断中返回,呼叫end()将执行绪加入synch伫列,并返回true

3:如果1执行失败,说明signal()已经执行,但是有可能没有执行完成,则判断节点是否在sync伫列中,如果不在sync伫列中则呼叫yield让出CPU,while是一个自旋,直到节点加入到sync伫列才返回。

上面就是await()方法的所有内容,相信大家有一定的理解了,其实用一句话总结:await就是将当前执行绪封装成Node存放在Condition伫列中,然后释放已经获取的资源,直到被signal/signalAll或者执行绪被中断才从Condition伫列移除。

awaitNanos、awaitUntil和await的机制类似,大家可以自行观看程式码。

$5:reportInterruptAfterWait():如果执行绪在阻塞时被中断,则需要对中断处理

private void reportInterruptAfterWait(int interruptMode)

throws InterruptedException {

if (interruptMode == THROW_IE)

throw new InterruptedException();

else if (interruptMode == REINTERRUPT)

selfInterrupt();

}

方法很简单,如果中断的型别是THROW_IE,则直接丢掷异常,如果是REINTERRUPT,则处理中断。

二、signal唤醒在Condition伫列的头结点firstWaiter对应的执行绪

上面分析了已经获取资源的执行绪因为呼叫了await,则需要把执行绪加入到condition伫列,然后释放资源的过程,我也说了,从阻塞中返回要么呼叫signal/signalAll,要么执行绪被中断,接下来继续分析signal

public final void signal() {

//如果不是独占模式,则直接丢掷异常

if (!isHeldExclusively())

throw new IllegalMonitorStateException();

//获取condition伫列中第一个节点

Node first = firstWaiter;

if (first != null)

//唤醒condition伫列的第一个节点

doSignal(first);

}

这个方法首先判断是否是独占模式,如果不是则直接丢掷异常,如果是则获取condition伫列中的第一个节点,然后唤醒第一个节点,而唤醒的工作交给了doSignal(),这里所说的唤醒:将执行绪从condition伫列迁移到sync伫列中,然后通过独占模式原理获取资源

private void doSignal(Node first) {

do {

if ( (firstWaiter = first.nextWaiter) == null)

lastWaiter = null;

first.nextWaiter = null;

} while (!transferForSignal(first) &&

(first = firstWaiter) != null);

}

这个方法首先把第一个节点从condition伫列中移除,然后呼叫transferFOrSignal()去改变节点的状态waitStatus,这个改变状态可能会失败,因为执行绪已经过时了或者被中断了。do{}while()循环直到被唤醒成功为止。

final boolean transferForSignal(Node node) {

if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

return false;

//如果上述CAS成功,则将节点从condition伫列转移到sync伫列的尾部

Node p = enq(node);

int ws = p.waitStatus;

if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

LockSupport.unpark(node.thread);

return true;

}

这个方法首先通过CAS将节点的waitStatus从CONDITION变成0,如果失败了,直接返回false,如果成功则直接将节点加入到sync伫列等待获取资源,enq()方法返回的是node的前驱节点,如果前驱节点不是SIGNAL,则需要呼叫unpark()方法唤醒被await阻塞的执行绪,被await阻塞的执行绪将继续执行await()方法中LockSupport.park()后面的程式码。

上面就是Condition的全部内容,大家可以仔细的读一下源代码,会有一个清晰的任务,对Object中wait()和notify()机制熟悉,那么Condition的await()和signal()是它的另一种实现方式,机制理解了,看源代码就非常的简单了。

2019-12-15 03:50:00

相关文章