APP下载

多执行绪不能被遗忘的同步器

消息来源:baojiabao.com 作者: 发布时间:2024-11-01

报价宝综合消息多执行绪不能被遗忘的同步器

Java提供了synchronized关键字对临界区进行执行绪同步访问,我们也知道通过synchronized很难写出正确的同步程式码,于是并发工具类提供了更高阶的同步器。 倒计时门闩count down latch,同步屏障cyclic barrier,交换器exchanger,讯号量semaphore 和 phaser同步器

6.1 倒计时门闩CountDownLatch

倒计时门闩会导致一条或多条执行绪在"门口"一直等待,直到一条执行绪开启这扇门,执行绪得以继续执行。 由一个计数变数和两个操作组成。 这些应用程序比对应的单执行绪程式提供了更好的效能和响应能力。但依然存在问题

void await() 除非执行绪被中断,否则强制呼叫执行绪一直等到计数器递减至0.void await(long timeout, TimeUnit unit) 除非执行绪被中断,否则强制呼叫执行绪一直等到计数器递减至0,或以unit为单位的timeout超时void countDown() 递减计数,当降至0时,释放所有等待执行绪。long getCount() 返回当前计数public class CountDownLatchThread {

private static final int THREAD_SIZE = 3;

public static void main(String[] args) {

CountDownLatch doneSignal = new CountDownLatch(THREAD_SIZE);

Runnable runnable = () -> {

try {

Thread.sleep(new Random().nextInt(1000));

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(Thread.currentThread().getName() + " starting, count " + doneSignal.getCount());

doneSignal.countDown();

System.out.println(Thread.currentThread().getName() + " done, count " + doneSignal.getCount());

};

ExecutorService executorService = Executors.newFixedThreadPool(THREAD_SIZE);

int count = 0;

while(3 > count){

executorService.execute(runnable);

count++;

}

try {

doneSignal.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

executorService.shutdown();

System.out.println("main thread done.");

}

}

6.2 同步屏障 CyclicBarrier

同步屏障允许一组执行绪彼此相互等待,直到抵达某个公共的屏障点。 因为该屏障在等待执行绪被释放之前可以重用,所以称它为可循环使用的屏障。 该同步器对于数量固定并且相互之间必须不时等待彼此的多执行绪应用。

打个比方:有一个古墓,古墓的入口已经被封闭千年,需要集齐东南西北四大法器,同时拼在开门机关上,古墓才能被开启,同步屏障就是那道门。

CyclicBarrier建构函式

CyclicBarrier(int parties) 拥有共同执行目标的执行绪数目CyclicBarrier(int parties, Runnable barrierAction) 在parties条执行绪执行之前,执行BarrierAction中的run(),多用于更新共享变数。CyclicBarrier提供的方法

int await() 强制呼叫执行绪一直等待直到所有的parties都已经在同步屏障上呼叫了await()方法。 当呼叫执行绪自己或其他等待执行绪被中断、有执行绪在等待中超时或者有执行绪在同步屏障之上呼叫reset()方法,该呼叫执行绪就会停止等待。int await(long timeout, TimeUnit unit) 除了让你指定呼叫执行绪愿意等待的时长之外,该方法等同于上面的方法。int getNumberWaiting() 返回在当前同步屏障上等待的执行绪数目。int getParties() 返回需要跨越同步屏障的执行绪数目。boolean isBroken() 当一条或多条执行绪由于在同步屏障建立或在上次重置之后,中断或超时从而被打破同步屏障,或者因为一个异常导致barrier action失败时,返回true,否则返回false。void reset() 把同步屏障重置到其原始状态 /**

* 同步屏障

*/

public class CyclicBarrierThread {

public static void main(String[] args) {

// 1

CyclicBarrier barrier = new CyclicBarrier(2);

Runnable r = () -> {

try {

System.out.println("waiting " + barrier.getNumberWaiting() + " / " + barrier.getParties());

barrier.await();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (BrokenBarrierException e) {

e.printStackTrace();

}

};

new Thread(r).start();

new Thread(r).start();

// 输出

// waiting 0 / 2

// waiting 1 / 2

// 2

CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Tomb());

Runnable runnable = () -> {

try {

System.out.println("waiting " + cyclicBarrier.getNumberWaiting() + " / " + cyclicBarrier.getParties());

cyclicBarrier.await();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (BrokenBarrierException e) {

e.printStackTrace();

}

};

new Thread(runnable).start();

new Thread(runnable).start();

// 输出

// waiting 0 / 2

// waiting 1 / 2

// open the door.

}

}

class Tomb implements Runnable {

@Override

public void run() {

System.out.println("open the door.");

}

}

CyclicBarrier与CountDownLatch区别是,CyclicBarrier可以重复使用;

new Thread(runnable).start();

new Thread(runnable).start();

new Thread(runnable).start();

new Thread(runnable).start();

new Thread(runnable).start();

new Thread(runnable).start();

// 重复使用 输出

// Thread-4 waiting 0 / 2

// Thread-5 waiting 1 / 2

// Thread-5 open the door.

// Thread-6 waiting 0 / 2

// Thread-7 waiting 1 / 2

// Thread-7 open the door.

// Thread-8 waiting 0 / 2

// Thread-9 waiting 1 / 2

// Thread-9 open the door.

6.3 交换器 Exchanger

交换器提供了一个执行绪彼此之间能够交换物件的同步点。

* V exchange(V x) 在这个互动点上,等待其他执行绪的到达,之后将所给物件传入其中,接收其他执行绪的物件作为返回。

* V exchange(V x, long timeout, TimeUnit unit) 除指定执行绪愿意等待的时长之外,功能同上。

/**

* 交换器

*/

public class ExchangerThread {

public static void main(String[] args) {

final Exchanger exchanger = new Exchanger();

final List shared = new ArrayList();

Runnable r = () -> {

try {

while (true){

String name = Thread.currentThread().getName();

// String exchangeData = exchanger.exchange(name);

String exchangeData = exchanger.exchange(name, 1, TimeUnit.SECONDS);

System.out.println(name + " " + exchangeData);

}

} catch (InterruptedException e) {

e.printStackTrace();

} catch (TimeoutException e) {

e.printStackTrace();

}

};

new Thread(r).start();

new Thread(r).start();

new Thread(r).start();

// 输出

// Thread-1 Thread-0

// Thread-1 Thread-2

// Thread-0 Thread-2

// Thread-0 Thread-1

// Thread-2 Thread-1

// Thread-2 Thread-0

// Thread-1 Thread-0

// Thread-1 Thread-2

}

}

6.4 讯号量

讯号量维护了一组许可证(permit),以约束访问被限制资源的执行绪数。 当没有可用的许可证时,执行绪的获取尝试会一直阻塞,知道其他执行绪释放一个许可证。

计数讯号量 当前的值可以被递增 1二进位制或互斥讯号量 当前的值只能是 0 和 1建构函式

Semaphore(int permits) 指定许可证数量,预设设定成不公平策略Semaphore(int permits, boolean fair) 指定许可证数量和公平策略6.5 讯号量和公平策略

当公平策略设定成false,讯号量不会保证执行绪获取讯号量的顺序(抢占式的)。

即便执行绪已经在等待,呼叫了acquire()方法的新执行绪还是能先于这条执行绪被分配许可证。

逻辑上,新执行绪把自己放到了等待执行绪伫列的队首了。当公平策略设定为true,

讯号量就能保证呼叫acquire()方法的任意执行绪能按照方法被呼叫处理的顺序获取许可证(先进先出,FIFO)。

不限时tryAcquire()方法不会遵循公平策略的设定。

一般来讲,讯号量通常用来控制资源访问,它应当初始化成公平的,从而保证不会有任何执行绪在访问资源时饿死。

* void acquire() 从讯号量中获取一个许可证,否则阻塞,直到有一个许可证可用或者呼叫执行绪被中断。

* void acquire(int permits) 从讯号量中获取permits个许可证,否则阻塞,直到有一个许可证可用或者呼叫执行绪被中断。

* void acquireUninterruptibly() 从讯号量中获取一个许可证,否则阻塞,直到有一个许可证可用。

* void acquireUninterruptibly(int permits) 从讯号量中获取permits个许可证,否则阻塞,直到有一个许可证可用。

* int availablePermits() 返回当前可用许可证数量

* int drainPermits() 获取并返回立即可用许可证数量

* int getQueueLength() 返回等待获取许可证的大致执行绪数

* boolean hasQueueThreads() 查询是否存在等待获取许可证的执行绪

* boolean isFair() 返回公平性设定

* void release() 释放一个许可证

* void release(int permits) 释放permits个许可证

* boolean tryAcquire() 仅当呼叫时有一个许可证可用的情况,才能从这个讯号量中获取这个讯号

* boolean tryAcquire(int permits) 仅当呼叫时有permits个许可证可用的情况,才能从这个讯号量中获取这些个讯号

* boolean tryAcquire(int permits, long timeout, TimeUnit unit) 仅增加超时,其他同上

* boolean tryAcquire(long timeout, TimeUnit unit) 呼叫执行绪会一直等待直到有一个许可证可用。

/**

* 讯号量

*/

public class SemaphoreThread {

public static void main(String[] args) {

final Semaphore semaphore = new Semaphore(10);

Runnable r = () -> {

try {

semaphore.acquire();

Thread.sleep(new Random().nextInt(1000));

System.out.println(Thread.currentThread().getName() + " handle " + semaphore.getQueueLength());

} catch (InterruptedException e) {

e.printStackTrace();

}finally {

semaphore.release();

}

};

for (int i=0; i new Thread(r).start();

}

// 输出

// Thread-1 handle

// Thread-2 handle

// Thread-9 handle

// Thread-11 handle

// Thread-8 handle

// Thread-7 handle

// Thread-5 handle

// Thread-12 handle

// ......

}

}

6.6 Phaser

Phaser 是一个更加弹性的同步屏障。

一个 Phaser 是的一组执行绪在屏障上等待,在最后一条执行绪到达之后,这些执行绪得以继续执行。

Phaser也提供Barrier Action的等价操作。一个Phaser可以协调不定数目的执行绪。这些执行绪可以在任何时候注册。

parties 参与者

phase 阶段

arrive 抵达

advance 进阶

* int register() 往这个Phaser中新增一条尚未抵达的执行绪,同时返回phase值作抵达分类用,这个值称为抵达phase值。

* int arriveAndAwaitAdvance() 记录到达并等待Phaser前进,返回抵达phase值。

* int arriveAndDeregister() 抵达此Phaser,同时从中登出而不会等待其他执行绪到达,由此减少未来phase上需要前进的执行绪数量。

/**

* Phaser

*/

public class PhaserThread {

public static void main(String[] args) {

final Phaser phaser = new Phaser();

Runnable r = () -> {

try {

System.out.println(Thread.currentThread().getName() + " at " + System.currentTimeMillis() + " phase: " + phaser.arriveAndAwaitAdvance());

Thread.sleep(new Random().nextInt(1000));

} catch (InterruptedException e) {

e.printStackTrace();

}

};

for (int i=0; i System.out.println(phaser.register());

}

for (int i=0; i new Thread(r).start();

System.out.println("parties: " + phaser.getRegisteredParties() + ", phase: " + phaser.getPhase());

}

// 等待所有参与者都执行完,登出该phaser

phaser.arriveAndDeregister();

// 输出

// 0

// 0

// 0

// 0

// 0

// 0

// 0

// 0

// 0

// 0

// parties: 10, phase: 0

// parties: 10, phase: 0

// parties: 10, phase: 0

// parties: 10, phase: 0

// parties: 10, phase: 0

// parties: 10, phase: 0

// parties: 10, phase: 0

// parties: 10, phase: 0

// parties: 10, phase: 0

// parties: 10, phase: 0

// Thread-7 at 1561593950111 phase: 1

// Thread-2 at 1561593950110 phase: 1

// Thread-3 at 1561593950110 phase: 1

// Thread-4 at 1561593950110 phase: 1

// Thread-6 at 1561593950111 phase: 1

// Thread-8 at 1561593950111 phase: 1

// Thread-5 at 1561593950111 phase: 1

// Thread-1 at 1561593950108 phase: 1

// Thread-0 at 1561593950106 phase: 1

}

}

2019-09-13 16:50:00

相关文章