在Java线程基础中使用过synchronized,在JDK5一起,同步都是基于synchronized,在场景非常复杂的地方,使用synchronized不方便,JDK5引入了Lock。在包java.util.concurrent.locks中就有锁相关的类。
AbstractOwnableSynchronizer 抽象独占同步锁。提供设置当前拥有独占访问的线程,获取设置的独占线程。 只能子类构造。
AbstractQueuedSynchronizer 抽象队列同步,继承AbstractOwnableSynchronizer。通过队列先进先出来实现等待队列的阻塞,内部维护一个线程链表Node。在获取锁失败后,会生成Node节点,并放入链表末尾,直到等待超时或者线程中断或被上一个节点唤醒。其节点等待状态waitStatus有:
0:节点刚被初始化状态,或者可能已经完成状态
CANCELLED(1):取消状态,需要取消该节点时设置的状态
SIGNAL(-1):节点等待状态
CONDITION(-2):节点在等待队列中,在调用Condition.signal()之后会变为SIGNAL状态进入等待获取锁队列
PROPAGATE(-3):共享同步状态
AbstractQueuedSynchronizer内部通过Unsafe.compareAndSet(原子操作int)来操作内存,保证线程的同步。其他API可查看相关文档。
AbstractQueuedLongSynchronizer 同AbstractQueuedSynchronizer,只不过AbstractQueuedLongSynchronizer内部通过long字段来实现原子操作。当创建需要 64 位状态的多级别锁和屏障等同步器时使用。
Condition 实际是将Object中监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,通过和Lock.newCondition()结合使用。通过Condition实现生产者消费者模型。例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 public class Main { private ReentrantLock lock = new ReentrantLock (); private Condition emptyCondition = lock.newCondition(); private Condition fullCondition = lock.newCondition(); private int count, putIndex, takeIndex; private String[] items = new String [100 ]; public void addItem (String str) { try { lock.lock(); while (items.length == count) { System.out.println("数据已满" ); fullCondition.await(); } items[putIndex] = str; if (++putIndex == items.length) putIndex = 0 ; count++; emptyCondition.signal(); System.out.println("放入:" + str); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public String takeItem () { String item = null ; try { lock.lock(); while (count == 0 ) { System.out.println("数据为空" ); emptyCondition.await(); } item = items[takeIndex]; if (++takeIndex == item.length())takeIndex = 0 ; --count; fullCondition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } System.out.println("取出:" + item); return item; } public static void main (String[] args) throws Exception { Main main = new Main (); for (int i = 0 ; i < 5 ; i++) { new Thread (() ->{ while (true ){ SimpleDateFormat simpleDateFormat = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss SS" ); String format = simpleDateFormat.format(new Date ()); main.addItem(format); try { TimeUnit.MILLISECONDS.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } for (int i = 0 ; i < 3 ; i++) { new Thread (() -> { while (true ){ main.takeItem(); try { TimeUnit.MILLISECONDS.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } } }
Lock Lock 实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作。此实现允许更灵活的结构,可以具有差别很大的属性,可以支持多个相关的 Condition 对象。
LockSupport 用来创建锁和其他同步类的基本线程阻塞原语。一般采用其中park、unparkt,作用和wart、notify类似,但其实现原理不一样,而且不需要依赖监视器,和wart、notify无交集使用更加灵活。 例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class Main { public static void main (String[] args) throws Exception { String blocker = "blocker" ; Thread thread = new Thread (() -> { System.out.println("线程开始" ); LockSupport.park(blocker); System.out.println("线程结束" ); }); thread.start(); TimeUnit.SECONDS.sleep(5 ); System.out.println("重新启动线程" ); LockSupport.unpark(thread); } }
ReentrantLock Lock子类,提供可重入互斥锁。如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class Main { ReentrantLock reentrantLock = new ReentrantLock (); public static void main (String[] args) throws Exception { new Main ().lockTest(); } public void lockTest () { reentrantLock.lock(); try { System.out.println("do something" ); } finally { reentrantLock.unlock(); } } }
通过ReentrantLock.lock()获取锁,之后释放锁。释放锁最好放入finally,避免因为锁未释放导致出现问题。 在ReentrantLock内部抽象类Sync继承AbstractQueuedSynchronizer,用于同步控制,同时Sync有两个子类NonfairSync(非公平)、FairSync(公平)。
FairSync 使用公平锁,new ReentrantLock(true),在使用公平锁获取锁时。
调用ReentrantLock.lock获取锁,因为是公平锁,最后调用的是FairSync.lock,FairSync调用父类AbstractQueuedSynchronizer.acquire获取锁,之后调用子类方法FairSync.tryAcquire 试图获取锁获取锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
获取锁成功后直接返回
获取锁失败后,生成Node节点放入等待列表,AbstractQueuedSynchronizer.addWaiter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
设置链表完成后,开始循环处理队列,直到轮到当前节点AbstractQueuedSynchronizer.acquireQueued
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
在获取锁失败后,调用shouldParkAfterFailedAcquire、parkAndCheckInterrupt设置阻塞当前线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
上述描述了线程节点加入链表,之后线程被阻塞,当线程解除阻塞后。重新获取锁,并返回线程中断状态。之后调用线程interrupt。 如果在上述,循环获取锁过程中,出现异常导致退出循环且未获取到锁,将会调用cancelAcquire
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 private void cancelAcquire (Node node) { if (node == null ) return ; node.thread = null ; Node pred = node.prev; while (pred.waitStatus > 0 ) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null ); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null ) { Node next = node.next; if (next != null && next.waitStatus <= 0 ) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; } } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
解锁过程,ReentrantLock.unlock,之后调用父类AbstractQueuedSynchronizer.release
1 2 3 4 5 6 7 8 9 10 11 12 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
以上是公平锁加锁解锁过程,公平锁就是如果多个未获取锁的线程在等待锁,维护一段链表,把当前线程放入链表末尾,之后线程进入阻塞状态,直到线程被上一个节点解除阻塞。
NonfairSync 非公平锁,创建ReentrantLock默认就是非公平锁。 在非公平锁情况下,如果调用lock,最终调用的是NonfairSync.lock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
非公平锁在获取锁失败后,后续操作和公平锁一样,直接生产线程节点,放入队列末尾,阻塞等待解除。
公平锁和非公平锁区别在于: 公平锁:去获取锁时,多判断一次,当前线程节点是否是下一个获取锁的节点(就算锁已经释放,也不获取),按照链表先后顺序执行。 非公平锁:获取锁时,直接尝试获取锁,如果这时候锁已经释放,直接获取锁,如果获取失败,后续按照公平锁方式创建线程节点加入等待链表。
在ReentrantLock.tryLock(),实际上是调用Sync.nonfairTryAcquire,如上,直接尝试获取锁,如果失败,不进入等待队列。 调用ReentrantLock.tryLock(long timeout, TimeUnit unit),获取锁时调用的tryAcquire,和tryLock调用Sync.nonfairTryAcquire直接尝试获取锁不一样,该方法受是否公平模式影响。获取失败后会进入链表,并设置阻塞时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public final boolean tryAcquireNanos (int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } private boolean doAcquireNanos (int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L ) return false ; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return true ; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L ) return false ; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if (Thread.interrupted()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } }