0%

AbstractQueuedSynchronizer简述

工作流程

AbstractQueuedSynchronizer 是一个用于在竞争资源(如多线程)时使用的同步器,它内部使用了一个int类型的字段status表示需要同步的资源状态, 并基于一个先进先出(FIFO)的等待队列,队列中的每个节点表示要获取资源的线程

同步器主要是用于控制资源的获取以及释放,它可以用于独占模式和共享模式,这里我们以独占模式为例

在获取和释放资源时,我们需要实现自己的尝试获取和尝试释放的方法,利用status字段来控制成功与否

获取资源(独占模式)

1
2
3
4
5
6
7
8
9
10
11
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// 在独占模式下尝试获取资源(需要自己实现),成功返回true, 失败返回false
protected boolean tryAcquire(int arg) {
// 尝试获取资源与释放资源都是依靠 status 来实现具体的逻辑
// 可以基于 status 与 arg 字段来实现此方法 (我们可以赋予status任何意义来实现逻辑)
}

获取资源的流程如下(独占模式)

  1. 首先尝试获取资源,如果成功直接返回,进行后续流程
  2. 失败则创建一个新节点,将其添加到链表尾部(如果链表为空,则先创建一个空的表头再添加)
  3. 之后判断当前节点前一个节点是不是头结点(头节点HEAD无实际作用,可以不把它当作等待队列的一部分),如果是则再次尝试获取资源,成功则将当前节点置为头节点,进行获取资源后的流程
  4. 如果当前节点前一个节点不是头结点,那么将当前节点中的线程挂起->LockSupport.park(this),然后等待被唤醒(挂起前务必将其前一节点状态改为SIGNAL, SIGNAL表示当前node的后继节点对应的线程需要被唤醒)
  5. 被唤醒后则跳转到第3步继续执行

释放资源(独占模式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

// 在独占模式下尝试释放资源(需要自己实现)
protected boolean tryRelease(int arg) {
// 尝试获取资源与释放资源都是依靠 status 来实现具体的逻辑
// 可以基于 status 与 arg 字段来实现此方法 (我们可以赋予status任何意义来实现逻辑)
}

释放资源的流程如下(独占模式)
1.首先尝试释放资源
2.成功后判断,如果头结点不为null,同时其状态不是初始0值(需要有后继节点更改其状态),那么将当前节点状态置为0,同时唤醒下一节点中线程

共享模式

共享模式与独占模式基本相同

区别主要在于此方法,当线程被唤醒后获取资源,如果成功且返回值>0,则会继续唤醒后续线程
返回负数:失败
返回0:成功,但是其他线程无法再获取资源
返回正数:成功,其他线程可能继续获取资源(需要尝试后知道)

1
2
3
protected int tryAcquireShared(int arg) {
// todo
}

下面举一个《Java并发编程实战》中的二元闭锁例子来说明AQS的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* 使用 AQS 实现的二元闭锁(所有线程都会阻塞,直到状态改变被唤醒,此时所有线程都得到执行)
* status表是开关状态,=0时关闭,=1时开启
*/
public class OneShotLatch {

private final Sync sync = new Sync();

public void signal() {
sync.releaseShared(0);
}

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(0);
}

private class Sync extends AbstractQueuedSynchronizer {

/**
* 如果闭锁是开的 (state==1),那么这个操作成功,否则失败阻塞
* @param ignored
* @return
*/
@Override
protected int tryAcquireShared(int ignored) {
return (getState() == 1) ? 1: -1;
}

/**
* 打开status状态开关,放开所有线程
* @param ignored
* @return
*/
@Override
protected boolean tryReleaseShared(int ignored) {
setState(1);
return true;
}
}

// 测试方法
public static void main(String[] args) throws Exception {
OneShotLatch oneShotLatch = new OneShotLatch();
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
oneShotLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("completed");
}).start();
}

// 2S后唤醒
TimeUnit.SECONDS.sleep(2);
oneShotLatch.signal();
TimeUnit.SECONDS.sleep(5);
System.out.println("end");
}
}

小结

使用AQS后,我们要做的就是通过status来控制请求和释放资源操作及是否成功,而AQS会负责在获取失败后将其放入队列,等待有释放资源操作后被唤醒,进而再次请求资源等操作