工作流程
AbstractQueuedSynchronizer 是一个用于在竞争资源(如多线程)时使用的同步器,它内部使用了一个int
类型的字段status
表示需要同步的资源状态, 并基于一个先进先出(FIFO)的等待队列,队列中的每个节点表示要获取资源的线程
同步器主要是用于控制资源的获取以及释放,它可以用于独占模式和共享模式,这里我们以独占模式为例
在获取和释放资源时,我们需要实现自己的尝试获取和尝试释放的方法,利用status
字段来控制成功与否
获取资源(独占模式)
1 2 3 4 5 6 7 8 9 10 11
| public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected boolean tryAcquire(int arg) { }
|
获取资源的流程如下(独占模式)
- 首先尝试获取资源,如果成功直接返回,进行后续流程
- 失败则创建一个新节点,将其添加到链表尾部(如果链表为空,则先创建一个空的表头再添加)
- 之后判断当前节点前一个节点是不是头结点(头节点HEAD无实际作用,可以不把它当作等待队列的一部分),如果是则再次尝试获取资源,成功则将当前节点置为头节点,进行获取资源后的流程
- 如果当前节点前一个节点不是头结点,那么将当前节点中的线程挂起->
LockSupport.park(this)
,然后等待被唤醒(挂起前务必将其前一节点状态改为SIGNAL, SIGNAL表示当前node的后继节点对应的线程需要被唤醒)
- 被唤醒后则跳转到第3步继续执行
释放资源(独占模式)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
protected boolean tryRelease(int arg) { }
|
释放资源的流程如下(独占模式)
1.首先尝试释放资源
2.成功后判断,如果头结点不为null,同时其状态不是初始0值(需要有后继节点更改其状态),那么将当前节点状态置为0,同时唤醒下一节点中线程
共享模式
共享模式与独占模式基本相同
区别主要在于此方法,当线程被唤醒后获取资源,如果成功且返回值>0,则会继续唤醒后续线程
返回负数:失败
返回0:成功,但是其他线程无法再获取资源
返回正数:成功,其他线程可能继续获取资源(需要尝试后知道)
1 2 3
| protected int tryAcquireShared(int arg) { }
|
下面举一个《Java并发编程实战》中的二元闭锁例子来说明AQS的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
|
public class OneShotLatch {
private final Sync sync = new Sync();
public void signal() { sync.releaseShared(0); }
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(0); }
private class Sync extends AbstractQueuedSynchronizer {
@Override protected int tryAcquireShared(int ignored) { return (getState() == 1) ? 1: -1; }
@Override protected boolean tryReleaseShared(int ignored) { setState(1); return true; } } public static void main(String[] args) throws Exception { OneShotLatch oneShotLatch = new OneShotLatch(); for (int i = 0; i < 3; i++) { new Thread(() -> { try { oneShotLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("completed"); }).start(); }
TimeUnit.SECONDS.sleep(2); oneShotLatch.signal(); TimeUnit.SECONDS.sleep(5); System.out.println("end"); } }
|
小结
使用AQS后,我们要做的就是通过status来控制请求和释放资源操作及是否成功,而AQS会负责在获取失败后将其放入队列,等待有释放资源操作后被唤醒,进而再次请求资源等操作