//共享锁状态统计,也就是共享锁线程获取数,如果当前状态为196608,二进制表示为110000000000000000, //通过位运算向右移动16位,得到二进制11,表示有3个线程获取了读锁 staticintsharedCount(int c) { return c >>> SHARED_SHIFT; } //排他锁统计状态,通过与1111111111111111做与运算,得出当前线程是否有被写线程获取 staticintexclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
publicfinalvoidacquireShared(int arg) { //尝试获取共享锁 if (tryAcquireShared(arg) < 0) //获取共享锁 doAcquireShared(arg); } //tryAcquireShared实际调用的是子类Sync.tryAcquireShared方法 protectedfinalinttryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ //获取当前线程 Threadcurrent= Thread.currentThread(); //获取当前锁状态 intc= getState(); //判断排它锁数量,如果有排他锁,但并不是当前线程获取锁,返回-1(失败) if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //获取共享锁获数量,实际提供 状态c>>>16获取 intr= sharedCount(c); //判断读是否需要阻塞,通过子类公平锁还是非公平锁处理 if (!readerShouldBlock() && //判断读锁最大获取线程是否超过最大值 r < MAX_COUNT && //尝试修改当前状态获取锁 compareAndSetState(c, c + SHARED_UNIT)) { //如果当前线程锁获取成功 //如果当前没有线程获取读锁 if (r == 0) { //设置第一个读取线程为当前线程 firstReader = current; //设置第一个读线程获取次数为1 firstReaderHoldCount = 1; //如果第一个读线程为当前线程 } elseif (firstReader == current) { //第一个读线程获取次数+1 firstReaderHoldCount++; } else { //用于统计读线程获锁次数,该值存储在线程中ThreadLocal HoldCounterrh= cachedHoldCounter; //如果cachedHoldCounter为空,或者缓存线程id不是当前线程id //也就是1 cachedHoldCounter未初始化,进入 //2 cachedHoldCounter已经初始化,但是当前读锁线程id并不是缓存中的读锁线程id,进入 if (rh == null || rh.tid != getThreadId(current)) //设置缓存统计次数为当前线程中缓存次数,readHolds在Sync构造方法中初始化完成,ThreadLocal子类,用于记录线程读取次数 //重新获取线程中的数据 cachedHoldCounter = rh = readHolds.get(); //如果为当前线程缓存的数据,且统计值为0, //因为ThreadLocalHoldCounter重写了ThreadLocal.initialValue方法,调用get方法时,会初始化HoldCounter elseif (rh.count == 0) readHolds.set(rh); //统计+1 rh.count++; } return1; } //上述条件失败后调用 return fullTryAcquireShared(current); } //尝试获取锁的完整版,用于CAS获取失败,或者tryAcquireShared获取失败是调用 finalintfullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ HoldCounterrh=null; //cas for (;;) { //获取锁当前状态 intc= getState(); //比较排他锁 if (exclusiveCount(c) != 0) { //如果存在排它锁,且不是当前线程获取,返回-1获取失败 if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. //判断读锁是否需要阻塞,依据是否公平锁处理,如果需要阻塞,执行下面步骤 } elseif (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; //如果第一个读线程,不是当前线程 } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } //判断是否超过最大读线程数量 if (sharedCount(c) == MAX_COUNT) thrownewError("Maximum lock count exceeded"); //尝试修改state获取锁 if (compareAndSetState(c, c + SHARED_UNIT)) { //获取锁成功 //判断之前锁是否未被获取 if (sharedCount(c) == 0) { //未被获取,设置第一个读取线程为当前线程 firstReader = current; //设置第一个读线程获取次数为1 firstReaderHoldCount = 1; } elseif (firstReader == current) { //如果当前线程为第一个获取读锁的线程,获取次数+1 firstReaderHoldCount++; } else { //设置当前线程重入锁次数 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); elseif (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return1; } } }
//在tryAcquireShared中如果获取锁失败,进入doAcquireShared //AbstractQueuedSynchronizer.doAcquireShared privatevoiddoAcquireShared(int arg) { //生产共享锁线程节点 finalNodenode= addWaiter(Node.SHARED); booleanfailed=true; try { booleaninterrupted=false; //CAS for (;;) { //获取当前线程上一个节点 finalNodep= node.predecessor(); //判断上一个节点是否头节点 if (p == head) { //尝试获取共享锁 intr= tryAcquireShared(arg); if (r >= 0) { //获取成功 //设置头节点为当前线程传播 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //同ReentrantLock if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //读线程获取锁后调用 privatevoidsetHeadAndPropagate(Node node, int propagate) { //置换头节点为下一个节点 Nodeh= head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Nodes= node.next; if (s == null || s.isShared()) //下一个节点为共享锁 doReleaseShared(); } } //释放共享锁 privatevoiddoReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Nodeh= head; if (h != null && h != tail) { intws= h.waitStatus; //如果当前状态为SIGNAL状态,目的是为了唤醒当前后续的共享线程节点 if (ws == Node.SIGNAL) { //设置当前节点状态为0,失败重试 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //如果设置节点状态成功,唤醒下一个节点 unparkSuccessor(h); } //如果当前状态为0,设置为PROPAGATE elseif (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //判断头节点是否被修改,如果被修改了,继续唤醒下一个节点 //这里会出现唤醒的线程一直修改了头,就可能出现多个线程进入doReleaseShared方法,尝试修改下一个节点,这样唤醒节点更快吗? if (h == head) // loop if head changed break; } }