0%

CountDownLatch应用及原理

CountDownLatch功能很简单,它主要有两个方法,await()countDown(),初始创建的时候需要给它提供一个数值,调用countDown()方法会将数值减1,调用await()方法的时候会判断值是不是等于0,如果等于0就继续执行,否则就阻塞等待

1
2
3
4
5
6
// 初始化创建,值为1
CountDownLatch countDownLatch = new CountDownLatch(1);
// 如果countDownLatch中的值不是0则阻塞等待
countDownLatch.await();
// 将countDownLatch中的值进行减1
countDownLatch.countDown();

利用这两个方法,我们能用来做什么呢?

让一组线程等待同时开始

这种用法,我们只需要将CountDownLatch的初始值设为1,在需要线程阻塞的地方调用countDownLatch.await()方法,当满足开始条件后,调用countDownLatch.countDown()方法将类中的值减为0,让其他所有阻塞的线程可以几乎同时得到执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ExecutorService executorService = Executors.newCachedThreadPool();
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " waiting...");
try {
// 阻塞等待countdownlatch中的值被减为0
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + " working...");
} catch (Exception e) {
e.printStackTrace();
}
});
}

TimeUnit.SECONDS.sleep(4);
System.out.println("\nstart!");
countDownLatch.countDown();

executorService.shutdown();

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
pool-1-thread-1 waiting...
pool-1-thread-3 waiting...
pool-1-thread-2 waiting...
pool-1-thread-4 waiting...
pool-1-thread-5 waiting...

start!
pool-1-thread-2 working...
pool-1-thread-3 working...
pool-1-thread-5 working...
pool-1-thread-1 working...
pool-1-thread-4 working...

等待一组线程执行完毕

这时候,我们需要将CountDownLatch的初始值设置为线程的数量,每次执行完毕后调用countDownLatch.countDown()方法对值进行减1,等待线程可以调用countDownLatch.await()阻塞等待,待所有线程执行完毕后,类中的值减为0,这时阻塞线程就可以继续向下执行了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ExecutorService executorService = Executors.newCachedThreadPool();
int num = 5;
CountDownLatch countDownLatch = new CountDownLatch(num);
System.out.println("start");
for (int i = 0; i < num; i++) {
executorService.execute(() -> {
// 加上延时,让效果明显些
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " working...");
countDownLatch.countDown();
});
}
// 阻塞等待线程执行完成
countDownLatch.await();
System.out.println("task end!");
executorService.shutdown();

执行结果:

1
2
3
4
5
6
7
start
pool-1-thread-4 working...
pool-1-thread-3 working...
pool-1-thread-5 working...
pool-1-thread-2 working...
pool-1-thread-1 working...
task end!

CountDownLatch内部实现

CountDownLath内部使用AbstractQueuedSynchronizer来实现

将AQS中的staus用作CountDownLatch的初始值

获取资源时会判断status是否等于0,等于0则通过,否则加入AQS中的CLH队列阻塞等待

释放资源时会对status进行减一操作,如果结果等于0则进行真正的释放操作,将等待队列中的任务唤醒执行

CountDownLatch核心源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// await 和 countDown时,调用对应内部Sync类方法
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
sync.releaseShared(1);
}

// CountDownLatch 内部类,继承AQS,为了方便查看,将AQS中对应代码也粘贴到此处
private static final class Sync extends AbstractQueuedSynchronizer {

// 实现AQS中的获取共享资源操作
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

// 实现AQS中的释放共享资源操作
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}


// ******* AQS中对应部分代码 ************
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 如果释放成功,status == 0, 那面唤醒线程
doReleaseShared();
return true;
}
return false;
}

private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
}

关于AbstractQueuedSynchronizer有不明白的,可以参考之前的AbstractQueuedSynchronizer简述