java5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁的功能,它提供了与synchronized关键字类似的同步功能。既然有了synchronized这种内置的锁功能,为何要新增Lock接口?先来想象一个场景:手把手的进行锁获取和释放,先获得锁A,然后再获取锁B,当获取锁B后释放锁A同时获取锁C,当锁C获取后,再释放锁B同时获取锁D,以此类推,这种场景下,synchronized关键字就不那么容易实现了,而使用Lock却显得容易许多。
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
//默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//fair为false时,采用公平锁策略
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
public void unlock() { sync.release(1);}
public Condition newCondition() {
return sync.newCondition();
}
...
}
从源代码可以Doug lea巧妙的采用组合模式把lock和unlock方法委托给同步器完成。
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try {
while(条件判断表达式) {
condition.wait();
}
// 处理逻辑
} finally {
lock.unlock();
}
需要显示的获取锁,并在finally块中显示的释放锁,目的是保证在获取到锁之后,最终能够被释放。
在非公平锁中,每当线程执行lock方法时,都尝试利用CAS把state从0设置为1。
那么Doug lea是如何实现锁的非公平性呢?
我们假设这样一个场景:
通过上述场景描述,我们可以看书,即使线程B等了很长时间也得和新来的线程G同时竞争锁,如此的不公平。
static final class NonfairSync extends Sync {
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
下面我们用线程A和线程B来描述非公平锁的竞争过程。
线程A和B同时执行CAS指令,假设线程A成功,线程B失败,则表明线程A成功获取锁,并把同步器中的exclusiveOwnerThread设置为线程A。
竞争失败的线程B,在nonfairTryAcquire方法中,会再次尝试获取锁,
Doug lea会在多处尝试重新获取锁,应该是在这段时间如果线程A释放锁,线程B就可以直接获取锁而不用挂起
。完整的执行流程如下:
在公平锁中,每当线程执行lock方法时,如果同步器的队列中有线程在等待,则直接加入到队列中。
场景分析:
所以每个线程获取锁的过程是公平的,等待时间最长的会最先被唤醒获取锁。
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
重入锁,即线程可以重复获取已经持有的锁。在非公平和公平锁中,都对重入锁进行了实现。
if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
条件变量很大一个程度上是为了解决Object.wait/notify/notifyAll难以使用的问题。
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
public final void signal() {}
public final void signalAll() {}
public final void awaitUninterruptibly() {}
public final void await() throws InterruptedException {}
}
先看一个condition在生产者消费者的应用场景:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by j_zhan on 2016/7/13.
*/
public class Queue<T> {
private final T[] items;
private final Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
private int head, tail, count;
public Queue(int maxSize) {
items = (T[]) new Object[maxSize];
}
public Queue() {
this(10);
}
public void put(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
//数组满时,线程进入等待队列挂起。线程被唤醒时,从这里返回。
notFull.await();
}
items[tail] = t;
if (++tail == items.length) {
tail = 0;
}
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
T o = items[head];
items[head] = null;//GC
if (++head == items.length) {
head = 0;
}
--count;
notFull.signal();
return o;
} finally {
lock.unlock();
}
}
}
假设线程AB在并发的往items中插入数据,当items中元素存满时。如果线程A获取到锁,继续添加数据,满足count == items.length条件,导致线程A执行await方法。
ReentrantLock是独占锁,同一时刻只有一个线程能获取到锁,所以在lock.lock()和lock.unlock()之间可能有一次释放锁的操作(同样也必然还有一次获取锁的操作)。在Queue类中,不管take还是put,在线程持有锁之后只有await()方法有可能释放锁,然后挂起线程,一旦条件满足就被唤醒,再次获取锁。具体实现如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
await实现逻辑:
假设线程B获取锁之后,执行了take操作和条件变量的signal,signal通过某种实现唤醒了线程A,具体实现如下:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node); //线程A插入到AQS的等待队列中
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
signal实现逻辑: