盒子
盒子
文章目录
  1. 说明
  2. AbstractQueuedSynchronizer
    1. 干嘛的?
    2. 同步队列
      1. 同步队列节点的数据结构
  3. ReentrantLock
    1. NonfairSync(非公平锁)
    2. FairSync(公平锁)
    3. 公平锁/非公平锁
  4. ReentrantReadWriteLock
    1. ReadWriteLock
    2. 写锁
    3. 读锁
  5. 为什么默认非公平锁?

AQS 与 java 锁的具体实现

说明

内容基于 jdk1.8 与 [Java 并发编程的艺术]

AbstractQueuedSynchronizer

https://juejin.im/post/5a70297df265da3e5859a38f

同步器,也称 AQS.

从概念角度来讲,AQS 是用来构建锁或者其他同步组件的基础框架.

从 Java 代码角度来看,AbstractQueuedSynchronizer 是一个模板类,它实现了许多和锁相关的功能,并提供了钩子方法供用户实现,比如 tryAcquire, tryRelease 等.

干嘛的?

上面提到,从概念角度来讲,AQS 是用来构建锁或者其他同步组件的基础框架.

简单讲,AQS 可以拿来实现锁.

如果自己实现锁,应当怎么做.

锁的作用是锁住资源,只让获取到锁的线程使用资源.

在获取锁的时候,如何判断资源是闲置状态还是已经被占用呢?

可以用一个 标志 来标识资源是否被占用.

在 AQS 中使用了一个 int 成员变量表示同步状态,通过内置的 FIFO 队列来完成线程获取资源的排队工作.

redis 实现分布式锁 使用 SETNX 命令

在锁的实现中,聚合 AQS,利用 AQS 实现锁的语义.

大概来说,锁是面向使用者的,定义了使用者与锁交互的接口.而同步器面向锁的实现者,简化了锁的实现方式,屏蔽了同步状态管理,线程的排队,等待与唤醒等底层操作.

比如,同步器提供了这些方法 :

  • protected boolean tryAcquire(int arg)

    独占式获取同步状态(独占的意思是同一时刻只有一个线程获取到锁)

  • protected boolean tryRelease(int arg)

    独占式释放状态

同步队列

上面提到,AQS 通过内置的 FIFO 队列来完成线程获取资源的排队工作.

线程排队获取资源,其实也是资源的同步状态变更的过程,因此这也称 同步状态的管理.

当线程获取同步状态失败,同步器将当前线程以及等待状态等信息构造成为一个节点并将其加入同步队列,同时阻塞当前线程.当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态.

同步队列节点的数据结构

ReentrantLock

ReentrantLock 无参构造函数,默认使用非公平锁.

1
2
3
4
5
6
7
8
// ReentrantLock.java
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}

一步步看,调用 ReentrantLock 的 lock(),内部实际调用的是 sync.lock(),若使用无参构造函数,那么调用的就是非公平锁的 lock().

1
2
3
4
// ReentrantLock.java
public void lock() {
sync.lock();
}

再回头看含参构造函数

1
2
3
4
5
6
7
8
9
10
// ReentrantLock.java
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

ReentrantLock 默认有 非公平锁公平锁 这两种实现.

啥是公平锁,非公平锁?

那先看看 NonfairSync 吧.

NonfairSync(非公平锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// ReentrantLock.NonfairSync
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

NofairSync 是 ReetrantLock 的一个内部类.

看看 lock() 方法的实现.

lock() 的实现实际上使用了 CAS 机制.

因为 NonfairSync extends Sync ,而再看 Sync 的代码.

1
2
3
4
// ReentrantLock.Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
...
}

Sync extends AbstractQueuedSynchronizer

上文调用的 compareAndSetState(0, 1) ,此方法便是 AbstractQueuedSynchronizer 提供的.看方法名就能知道,这是通过 CAS 设置当前状态,CAS 还用于哪?用于 atomic 包下的原子类.这里的功能类似,是为了保证同步状态设置的原子性.

[先暂且到这一步吧]

再看看 tryAcquire() 里的 nonfairTryAcquire(acquires)(来自父类 Sync)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// ReentrantLock.Sync
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {// 初次获取锁
if (compareAndSetState(0, acquires)) {// 设置状态
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {// 是否重入?
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

这里非公平锁的获取,nonfairTryAcquire() 方法其实为了实现 锁的重入 .

锁的重入性,与 synchronized 对比

怎么说?

这里的 if-else 判断,就是判断线程是否获取锁,如果尚未获取锁 c == 0,就设置状态,返回 true .

如果不是第一次获取锁了(即 重入),则首先判断当前线程是否是获取锁的线程.若非,当然返回 false,若是,则重入,将同步值增加并返回 true .

相应的,若 重入,tryRelease() 也得执行多次,否则会返回 false.

1
2
3
4
5
6
7
8
9
10
11
12
13
// ReentrantLock.Sync
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

注意 : 对于 非公平锁,只要 CAS 设置同步状态成功,则表示当前线程获取了锁.

对于公平锁,则还有一些判断,,,我们看看代码.

FairSync(公平锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// ReentrantLock.FairSync
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

能看到,在通过 CAS 设置状态时,还有一个 !hasQueuedPredecessors() 的判断.

这个方法用于判断同步队列中当前节点是否有前驱节点.若此方法返回 true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁后才能继续获取锁.

现在我们回头对比一下公平锁与非公平锁的概念.

公平锁/非公平锁

公平锁即尽量以请求锁的顺序来获取锁.

比如同是有多个线程在等待一个锁,当这个锁被释放时,等待时间最久的线程(最先请求的线程)会获得该锁,这种就是公平锁.

非公平锁即无法保证锁的获取是按照请求锁的顺序进行的.这样就可能导致某个或者一些线程永远获取不到锁.


由此可以了解为什么公平锁用到了队列…

ReentrantReadWriteLock

上面 ReentrantLock 里有公平锁和非公平锁的区分,而不管 ReentrantLock 里的公平锁还是非公平锁,都属于排他锁.排他锁在同一时刻只允许一个线程访问.与之相对的,则有这里的读写锁.读写锁在同一时刻允许多个读线程访问,但在写线程访问时,所有的读线程和其他写线程都被阻塞.

好处是啥?

ReadWriteLock 就是 读写锁这个概念的 Java 实现.

先看看 ReadWriteLock 接口定义.

ReadWriteLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ReadWriteLock.java
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();

/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}

只定义了获取读锁和写锁的两个方法.

回到具体实现类 ReentrantReadWriteLock.

1
2
3
4
5
6
// ReentrantReadWriteLock.java
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

还是有公平锁,非公平锁区分.

写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// ReentrantReadWriteLock.Sync
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

源码 1,2,3 挺清晰啊…

注意 : exclusiveCount() 这个方法用于获取读状态.

这个方法是怎么获取的呢?

其实这里的状态变量 c = getState() 通过 按位分割 的方法,用高 16 位存储读状态,用低 16 位存写状态.

exclusiveCount() 方法内部做了某种位运算,获取到了读状态.

比起前文中 ReetrantLock 获取锁,除了重入锁的判断,这里还增加对这个 读状态 的判断.如果存在读锁就不能获取写锁.

为啥?

毕竟如果在其他读锁存在的情况下,这里又开始写数据,那么其他读锁获取不到这里最新的写入数据,肯定会出问题.

而一旦获取了写锁,读锁和其他写锁都将被阻塞.(读写锁嘛)

写锁的释放则与 ReetrantLock 的释放基本类似.

读锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// ReentrantReadWriteLock.Sync
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

当前线程通过 CAS 来增加读状态,jdk1.8,这个步骤在 fullTryAcquireShared(current) 方法中.

HoldCounter 的作用是存储读锁获取的次数.

我们已经知道,读锁可以被多个线程获取.HoldCounter 这里存储的便是所有线程获取的读锁次数.

而 firstReaderHoldCount 存储当前线程获取读锁的次数.(重入..)

// … 真麻烦啊

为什么默认非公平锁?

因为一般情况下非公平锁效率会比公平锁高点..

为什么?

想象一下,一个新线程现在来了,在公平锁里,它可能会先挂在线程队列里,然后把线程队列头部的一个挂起的线程激活.

而在非公平锁,它可能就直接执行这个新线程.

比起非公平锁,公平锁就多了一个挂起唤醒的操作,这就需要切成切换的开销.