本文共 21590 字,大约阅读时间需要 71 分钟。
JUC是指java并发包,全称是 java.util.concurrent 包
JUC提供了Lock可以方便的进行锁操作,但是有时候我们也需要对线程进行条件性的阻塞和唤醒,这时我们就需要condition条件变量,它就像是在线程上加了多个开关,可以方便的对持有锁的线程进行阻塞和唤醒。Condition主要是为了在J.U.C框架中提供和Java传统的监视器风格的wait,notify和notifyAll方法类似的功能。condition 是依赖于 ReentrantLock 的,不管是调用 await 进入等待还是 signal 唤醒,都必须获取到锁才能进行操作。
Condition是一个接口,它的接口实现类是AQS(AbstractQueuedSynchronizer )中的内部类ConditionObject。其类原型如下:
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; // 不要管这里的关键字 transient,是不参与序列化的意思 private transient Node firstWaiter; // 条件队列的第一个节点 private transient Node lastWaiter; // 条件队列的最后一个节点 ......}
在AQS中有一个内部类Node类,是线程阻塞队列的节点类,代码如下:
static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
在AQS中有一个阻塞队列,用于保存等待获取锁的线程的队列。同样,也有一个条件队列,是一个单链表结构,它是Condition主要作用的场所。从上述Node类中可以知道Node类主要有以下几个属性:
volatile int waitStatus; // 可取值 0、CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3) volatile Node prev; //阻塞队列的前驱结点volatile Node next; //阻塞队列的后继结点volatile Thread thread; //节点类中包装的线程Node nextWaiter; //条件队列中的后继节点
对于任意一个Java对象,都拥有一组监视器方法(定义在Object类中),主要包括wait,notify,notifyAll方法,这些方法与synchornized关键字相配合,可以实现等待/通知模式。
Condition接口也提供了类似的Object的监视器方法,但是它是与Lock配合可以实现等待/通知模式。但是这两者在使用方式以及功能上还是有差别的。两者之间的一个对比:
Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象创建出来的,换句话说,Condition是依赖Lock对象的。Condition中方法定义如下:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //构造一个新的等待队列Node加入到队尾 int savedState = fullyRelease(node); //释放当前线程的独占锁,不管重入几次,都把state释放为0 int interruptMode = 0; //如果当前节点没有在同步队列上,即还没有被signal,则将当前线程阻塞 while (!isOnSyncQueue(node)) { // 线程挂起 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //被中断则直 接退出自旋 break; }//退出了上面自旋说明当前节点已经在同步队列上,但是当前节点不一定在同步队列队首。acquireQueued将阻塞直到当前节点成为队首,即当前线程获得了锁。然后await()方法就可以退出了,让 线程继续执行await()后的代码。 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
方法过程解析:
在await()方法中主要用到了以下几个方法:
addConditionWaiter()方法
// 将当前线程对应的节点入队,插入队尾 private Node addConditionWaiter() { Node t = lastWaiter; // 如果条件队列的最后一个节点取消了,将其清除出去 if (t != null && t.waitStatus != Node.CONDITION) { // 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列 unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); // 如果队列为空 //3、完全释放独占锁 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }//unlinkCancelledWaiters方法原型:遍历整个条件队列,将已取消的所有节点清除出队列// 等待队列是一个单向链表,遍历链表将已经取消等待的节点清除出去 private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; // 如果节点的状态不是 Node.CONDITION 的话,这个节点就是被取消的 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; }else trail = t; t = next; } }
fullyRelease(node)方法
该方法用于释放当前线程的独占锁,方法原型如下:
// 我们要先观察到返回值 savedState 代表 release 之前的 state 值 // 对于最简单的操作:先 lock.lock(),然后 condition1.await()。 // 那么 state 经过这个方法由 1 变为 0,锁释放,此方法返回 1 // 相应的,如果 lock 重入了 n 次,savedState == n // 如果这个方法失败,会将节点设置为"取消"状态,并抛出异常 IllegalMonitorStateException final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 这里使用了当前的 state 作为 release 的参数,也就是完全释放掉锁,将 state 置为 0 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
方法总体比较简单,首先获取到state值,通过getState()方法得到。然后依赖release()方法进行锁的释放。release()方法如下:
public final boolean release(int arg) { //先将state释放为0 if (tryRelease(arg)) { //取到阻塞队列的头节点 Node h = head; if (h != null && h.waitStatus != 0) //唤醒头节点,则第一个等待的节点会继续获取锁 unparkSuccessor(h); return true; } return false; }private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //从后面开始往前找,找到第一个状态为-1的节点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //唤醒第一个状态为-1的节点,则该节点会继续获取锁 LockSupport.unpark(s.thread); }
release()方法通过CAS机制来释放当前锁并唤醒第一个状态为-1的节点。
isOnSyncQueue(node)
await()方法中通过while循环和isOnSyncQueue(node)方法等待进入阻塞队列。即:
int interruptMode = 0; while (!isOnSyncQueue(node)) { // 线程挂起 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
isOnSyncQueue(Node node) 用于判断节点是否已经转移到阻塞队列了,然后通过工具类LockSupport对线程进行挂起。isOnSyncQueue(Node node)方法原型如下:
final boolean isOnSyncQueue(Node node) { //如果当前节点状态是CONDITION或node.prev是null,则证明当前节点在等待队列上而不是同步队 列上。之所以可以用node.prev来判断,//是因为一个节点如果要加入同步队列,在加入前就会设置好prev字 段。 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //如果node.next不为null,则一定在同步队列上,因为node.next是在节点加入同步队列后设置的 if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); //前面的两个判断没有返回的话,就从同步队列队尾遍历一个 一个看是不是当前节点。 }// 从同步队列的队尾往前遍历,如果找到,返回 true private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
到此,就完成了await()方法的线程挂起全流程。
与await()线程挂起方法相对应的就是signal()线程唤醒方法。该方法用于唤醒线程,转移到阻塞队列。
唤醒操作通常由另一个线程来操作,就像生产者-消费者模式中,如果线程因为等待消费而挂起,那么当生产者生产了一个东西后,会调用 signal 唤醒正在等待的线程来消费。signal()方法原型如下:
// 唤醒等待了最久的线程 // 其实就是,将这个线程对应的 node 从条件队列转移到阻塞队列 public final void signal() { // 调用 signal 方法的线程必须持有当前的独占锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }// 从条件队列队头往后遍历,找出第一个需要转移的 node // 因为前面我们说过,有些线程会取消排队,但是还在队列中private void doSignal(Node first) { do { // 将 firstWaiter 指向 first 节点后面的第一个 // 如果将队头移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); // 这里 while 循环,如果 first 转移不成功,那么选择 first 后面的第一个节点进行转移, 依此类推 }// 将节点从条件队列转移到阻塞队列 // true 代表成功转移 // false 代表在 signal 之前,节点已经取消了 final boolean transferForSignal(Node node) { // CAS 如果失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取 消, // 既然已经取消,也就不需要转移了,方法返回,转移后面一个节点 // 否则,将 waitStatus 置为 0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // enq(node): 自旋进入阻塞队列的队尾 // 注意,这里的返回值 p 是 node 在阻塞队列的前驱节点 Node p = enq(node); int ws = p.waitStatus; // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。唤醒 之后会怎么样,后面再解释 // 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用,上篇介绍的时候说过,节点入 队后,需要把前驱节点的状态设为 Node.SIGNAL(-1) if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程,之后的操作看下一节 LockSupport.unpark(node.thread); return true; }
signal()方法本身很简单。首先获取等待了最久的节点firstWaiter,然后通过doSingal()对其进行唤醒。doSingal()方法首先从条件队列中从头到尾方向找到真正需要唤醒的节点(因为有的节点会取消等待,但是仍然会在队列中占坑)。找到节点之后,通过transferForSignal()方法将节点从条件队列转移到阻塞队列。transferForSignal()方法拿到节点后首先通过CAS机制判断节点是否是真正需要转移的,如果节点已经取消,则不需要再进行转移,否则继续往下,通过自旋将节点插入到阻塞队列的队尾,然后再次通过CAS机制利用工具类LockSupport对节点线程进行唤醒。
从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED,就尝试唤醒下一个节
点;如果再CANCELLED则继续迭代。
对每个节点执行唤醒操作时,首先将节点加入同步队列,此时await()操作的步骤3的解锁条件就已经开
启了。然后分两种情况讨论:
wait()
与 notify()
需要搭配 synchronized
关键字使用。例如: synchronized(obj){ obj.wait();//消费方没东西了,等待}synchronize(obj){ obj.notify();//有东西了,唤醒 消费进程}
而await()和signal()方法需要搭配着Lock锁,Condition来控制被阻塞线程使用,Condition 这个接口把 Object 的 wait(), notify(), notifyAll()
分解到了不同的对象中, 搭配上任意一种 Lock 的使用, 使得一个对象可以拥有多个等待集。
例如:
// 消费者lock.lock();condition.await();lock.unlock();//生产者lock.lock(); condition.signal(); lock.unlock();
为什么 wait(), notify() 需要搭配 synchronized 关键字使用 ?
wait(), notify() 操作的目的是基于某种条件, 协调多个线程间的运行状态, 由于涉及到多个线程间基于共享变量的相互通信, 必然需要引入某种同步机制, 以确保wait(), notify() 操作在线程层面的原子性。await(), signal(),signalAll() 的功用和 wait(), notify(), notifyAll() 基本相同, 区别是, 基于 Condition 的 await(), signal(), signalAll() 使得我们可以在同一个锁的代码块内, 优雅地实现基于多个条件的线程间挂起与唤醒操作。
//Thread.class中的变量/* ThreadLocal values pertaining to this thread. This map is maintained * by the ThreadLocal class. */ ThreadLocal.ThreadLocalMap threadLocals = null;
每个新线程都会实例化为一个ThreadLocalMap并且赋值给成员变量ThreadLocals,使用时若已经存在threadLocals则直接使用已经存在的对象。使用场合:
public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); }
set方法首先取出了当前线程 t,然后调用getMap(t)方法时传入了当前线程。判定如果这个map不为空,那么设置到Map中的Key就是this,值就是外部传入的参数。这个 this 就是定义的ThreadLocal对象。getMap(Thread)方法如下:
ThreadLocalMap getMap(Thread t) { return t.threadLocals; }
可以看到ThreadLocalMap其实就是线程里面的一个属性,该属性定义如下:
ThreadLocal.ThreadLocalMap threadLocals = null;
void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); }
public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); }public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) m.remove(this); }
protected T initialValue() { return null; }
首先看到ThreadLocalMap类中的Entry内部类:
static class Entry extends WeakReference> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal k, Object v) { super(k); value = v; } }
强弱引用的区别:
key 使用强引用 :引用的 ThreadLocal 的对象被回收了,但是 ThreadLocalMap 还持有ThreadLocal 的强引用,如果没有手动删除, ThreadLocal 不会被回收,导致 Entry 内存泄漏。key 使用弱引用 :引用的 ThreadLocal 的对象被回收了,由于 ThreadLocalMap 持有ThreadLocal 的弱引用,即使没有手动删除, ThreadLocal 也会被回收。 value 在下一次ThreadLocalMap 调用 set , get , remove 的时候会被清除。
转载地址:http://ffuqb.baihongyu.com/