通常我们用的 Synchronized 和 ReentrantLock 都是独占锁,即在同一时刻只有一个线程获取到锁。然而在有些业务场景中,我们大多在读取数据,很少写入数据。这种情况下,如果仍使用独占锁,效率将及其低下。针对这种情况,Java提供了读写锁——ReentrantReadWriteLock。读写锁允许同一时刻被多个读线程访问,但是在写线程访问时,所有的读线程和其他的写线程都会被阻塞。
接下来从源码的角度分析ReentrantReadWriteLock。
成员变量和构造器:
//内部类提供的读锁 private final ReentrantReadWriteLock.ReadLock readerLock; //=内部类提供的写锁 private final ReentrantReadWriteLock.WriteLock writerLock; //同步器 final Sync sync; /** 构造器 */ //默认构造非公平的读写锁 public ReentrantReadWriteLock() { this(false); } //可选的构造公平或非公平的读写锁 public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
读写锁的同步状态维护了写锁和读锁的状态,高16位表示读锁状态,低16位表示写锁状态。假设当前同步状态值为c,写状态等于c&0x0000FFFF(将高16位全部抹去),读状态等于c>>>16(无符号补0右移16位)。当写状态增加1时,等于c+1,当读状态增加1时,等于c+(1<<16),也就是c+0x00010000。
写锁的获取与释放
获取
步骤:
1、获取同步状态c 和写锁状态 w。如果 c==0 ,w肯定为0,表示写锁未被获取,则将当前线程CAS获取写锁;
2、如果 c!=0且w==0 ,则读状态大于0,即读锁被获取,那么当前线程进入等待状态
如果 c!=0且w!=0 ,则写锁被获取;如果当前线程不是写锁的拥有者,则当前线程进入等待状态
如果w!=0且当前线程获得写锁,则可重入获取写锁
该方法除了重入条件(当前线程为获取了写锁的线程)之外,增加了一个读锁是否存在的判断。如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); //获取写锁的同步状态 int w = exclusiveCount(c); if (c != 0) { // 如果 c!=0且w==0 ,则读状态大于0,即读锁被获取,那么当前线程进入等待状态 // 如果 c!=0且w!=0 ,则写锁被获取;如果当前线程不是写锁的拥有者,则当前线程进入等待状态 if (w == 0 || current != getExclusiveOwnerThread()) return false; // w==0且当前线程持有写锁 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 可重入的获取写锁 setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }释放
1、如果当前线程不是写锁的占有者,抛出异常
2、写状态减1,如果写状态为0,写锁占有线程设为null
protected final boolean tryRelease(int releases) { //当前线程是否是写锁的占有者 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //写状态的增加与减少是直接在同步状态上做加减 //读状态的增加与减少需要对参数进行移位操作 int nextc = getState() - releases; //如果写状态为0,则不存在写锁了 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }读锁的获取与释放
获取
读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。
解释相关属性:
firstReader :是第一个获取读锁的线程。
firstReaderHoldCount:是firstThread持有读锁的数目。
cachedHoldCounter :表示上一个成功获取读锁的线程的读锁计数器。
readHolds:ThreadLocal,它存储着每一个线程读锁的持有数。
根据偏向锁我们知道大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得。所以个人觉得设置firstReader和cachedHoldCounter 就是由于这个原因,使得获取锁的效率更高。下面介绍读锁的获取步骤:
1、获取同步状态c,获取写锁状态w、读锁状态r。
2、如果w!=0且写锁被其他线程获取,则进入等待状态。
3.1、如果读锁未被获取(r==0),则设置当前线程为第一个获取读锁的线程,并设置持有读锁数目为1
3.2、如果当前线程是第一个获取读锁的线程,则持有读锁的数目加1
3.3、如果读锁被获取了,且当前线程不是第一个获取读锁的线程。那么:
3.3.1 如果上一个获取读锁的线程不是当前线程,则拿到当前线程的ThreadLocal变量,并赋予cachedHoldCounter,持有读锁数目加1
3.3.2 如果上一个获取读锁的线程是当前线程,直接持有读锁数目加1
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); /** * 写锁状态不为0 并且 当前线程不是写锁的占有者,即写锁由其他 * 线程占有,则获取读锁失败 */ if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //读锁的同步状态 int r = sharedCount(c); // 读状态的增加与减少需要对参数进行移位操作 // compareAndSetState(c, c + SHARED_UNIT) 即为c+ 1<<16 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //如果读锁未被获取 if (r == 0) { //firstThread 是第一个获取读锁的线程 //firstReaderHoldCount 是firstThread持有读锁的数目 firstReader = current; firstReaderHoldCount = 1; //如果当前线程是第一个获取读锁的线程,则计数器++ } else if (firstReader == current) { firstReaderHoldCount++; //如果读锁被获取了,且当前线程不是第一个获取读锁的线程 } else { //HoldCounter是每一个线程读锁持有数目的计数器,它包含两个成员变量:count和线程id //cachedHoldCounter表示上一个成功获取读锁的线程的读锁计数器 HoldCounter rh = cachedHoldCounter; /** * rh==null 表示这是第一个获取读锁的线程 * rh.tid != rh.tid != getThreadId(current)) 表示当前线程不是上一个成功获取读锁的线程 * 其实下面的if和else if都是在更新cachedHoldCounter,读锁持有数目的增加在rh.count++ */ if (rh == null || rh.tid != getThreadId(current)) //获得当前线程的计数器 并将其设为cachedHoldCounter cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }从28行开始,乍看之下不知道在做些什么,其实我们举个获取读锁的例子,然后根据代码一步步操作就会发现很简单。 举个例子:假定此时readHold是的状态如下图,上一个成功获取读锁的线程为1003。
此时线程1004来获取读锁(当前线程是1004)。从上面代码28行开始:我们将1003的计数器赋予了rh,由于rh不为null且当前线程不是1003,所以我们获取当前线程的计数器(实际上就是线程id=1004,count=0),并将该计数器赋予rh和cachedHoldCounter;然后进行rh.count++,即读锁计数加1.
a处黑线代码中没有该操作,我这里是说明1004的计数器由readHolds维护。
同样,如果现在是1002线程来获取读锁,那么获取1002线程的计数器,然后进行相应操作。
释放
如果弄懂了读锁的获取,那么读锁的释放就很简单了。同样是获取到readHolds中维护的线程的计数器,然后对计数器进行减1操作,同时如果计数器为0,则从readHolds中移除。
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }