CountDownLatch功能很简单,它主要有两个方法,await()
与countDown()
,初始创建的时候需要给它提供一个数值,调用countDown()
方法会将数值减1,调用await()
方法的时候会判断值是不是等于0,如果等于0就继续执行,否则就阻塞等待
1 2 3 4 5 6
| CountDownLatch countDownLatch = new CountDownLatch(1);
countDownLatch.await();
countDownLatch.countDown();
|
利用这两个方法,我们能用来做什么呢?
让一组线程等待同时开始
这种用法,我们只需要将CountDownLatch
的初始值设为1,在需要线程阻塞的地方调用countDownLatch.await()
方法,当满足开始条件后,调用countDownLatch.countDown()
方法将类中的值减为0,让其他所有阻塞的线程可以几乎同时得到执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| ExecutorService executorService = Executors.newCachedThreadPool(); CountDownLatch countDownLatch = new CountDownLatch(1); for (int i = 0; i < 5; i++) { executorService.execute(() -> { System.out.println(Thread.currentThread().getName() + " waiting..."); try { countDownLatch.await(); System.out.println(Thread.currentThread().getName() + " working..."); } catch (Exception e) { e.printStackTrace(); } }); }
TimeUnit.SECONDS.sleep(4); System.out.println("\nstart!"); countDownLatch.countDown(); executorService.shutdown();
|
执行结果:
1 2 3 4 5 6 7 8 9 10 11 12
| pool-1-thread-1 waiting... pool-1-thread-3 waiting... pool-1-thread-2 waiting... pool-1-thread-4 waiting... pool-1-thread-5 waiting...
start! pool-1-thread-2 working... pool-1-thread-3 working... pool-1-thread-5 working... pool-1-thread-1 working... pool-1-thread-4 working...
|
等待一组线程执行完毕
这时候,我们需要将CountDownLatch
的初始值设置为线程的数量,每次执行完毕后调用countDownLatch.countDown()
方法对值进行减1,等待线程可以调用countDownLatch.await()
阻塞等待,待所有线程执行完毕后,类中的值减为0,这时阻塞线程就可以继续向下执行了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| ExecutorService executorService = Executors.newCachedThreadPool(); int num = 5; CountDownLatch countDownLatch = new CountDownLatch(num); System.out.println("start"); for (int i = 0; i < num; i++) { executorService.execute(() -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " working..."); countDownLatch.countDown(); }); }
countDownLatch.await(); System.out.println("task end!"); executorService.shutdown();
|
执行结果:
1 2 3 4 5 6 7
| start pool-1-thread-4 working... pool-1-thread-3 working... pool-1-thread-5 working... pool-1-thread-2 working... pool-1-thread-1 working... task end!
|
CountDownLatch内部实现
CountDownLath内部使用AbstractQueuedSynchronizer来实现
将AQS中的staus用作CountDownLatch的初始值
在获取资源时会判断status是否等于0,等于0则通过,否则加入AQS中的CLH队列阻塞等待
在释放资源时会对status进行减一操作,如果结果等于0则进行真正的释放操作,将等待队列中的任务唤醒执行
CountDownLatch核心源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
public void countDown() { sync.releaseShared(1); }
private static final class Sync extends AbstractQueuedSynchronizer { protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0; } } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } } }
|
关于AbstractQueuedSynchronizer有不明白的,可以参考之前的AbstractQueuedSynchronizer简述