简介
首先各种blocking lock 和 synchronizers (semaphores, events, etc)都是依赖于FIFO队列的。而AbstractQueuedSynchronizer 通过一个整型(int)的数值表示当前队列的状态,子类中可以通过getState/setStatye/compareAndSetState方法来改变队列状态。两种模式:
共享模式 shared mode 线程共享排他模式 exclusive mode 线程独占
数据结构
Queue中的节点 - Node
Node{
int waitState;
Node prev;
Node next;
Thread thread;
Node nextWaiter;
}
Node 中定义了Queue的运行模式shared/exclusive(default)
static final Node SHARED =
new Node();
static final Node EXCLUSIVE =
null;
waitStatus(节点状态):
CANCELLED 1 由于超时、被打断导致cancel。该线程不会再被阻塞。SIGNAL -1 这个记号表示当前node之后的节点已经或即将被阻塞,需要再release或者cancel后唤醒一个或若干个后续节点,在该节点被release/cancel的时候,需要唤醒后面的节点。CONDITION -2表示该节点处于CONDITION队列中。PROPAGATE -3 当前场景下后续的acquireShared能够得以执行。0 表示当前节点在队列中,等待获取锁。
AbstractQueuedSynchronizer 持有Queue
{
Node head;
Node tail;
int state;
}
AbstractQueuedSynchronizer 操作Queue
getState();
setState();
Node enq(
final Node node)
Node addWaiter(Node mode)
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws <
0)
compareAndSetWaitStatus(node, ws,
0);
Node s = node.next;
if (s ==
null || s.waitStatus >
0) {
s =
null;
for (Node t = tail; t !=
null && t != node; t = t.prev)
if (t.waitStatus <=
0)
s = t;
}
if (s !=
null)
LockSupport.unpark(s.thread);
}
同步器实现
获取
public final void acquire(
int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(
final Node node,
int arg) {
boolean failed =
true;
try {
boolean interrupted =
false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next =
null;
failed =
false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted =
true;
}
}
finally {
if (failed)
cancelAcquire(node);
}
}
从上面的代码中可以看出,acquire会忽略Interruption,为此代码提供者还实现了可终止的方法来获取状态-抛异常。
public final void acquireInterruptibly(
int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
release
public final boolean release(
int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h !=
null && h.waitStatus !=
0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws <
0)
compareAndSetWaitStatus(node, ws,
0);
Node s = node.next;
if (s ==
null || s.waitStatus >
0) {
s =
null;
for (Node t = tail; t !=
null && t != node; t = t.prev)
if (t.waitStatus <=
0)
s = t;
}
if (s !=
null)
LockSupport.unpark(s.thread);
}
ReentrantLock#FairSync 中的实现
lock
final void lock() {
acquire(
1);
}
public final void acquire(
int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(
int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c ==
0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(
0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc <
0)
throw new Error(
"Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
release
protected final boolean tryRelease(
int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free =
false;
if (c ==
0) {
free =
true;
setExclusiveOwnerThread(
null);
}
setState(c);
return free;
}
疑问点:
unparkSuccessor 中 查找non-canceled node 为什么需要从tail开始查找?