14.构建自定义的同步工具

正如每个Java对象都可以作为一个锁,每个对象同样可以作为一个条件队列,并且Object中的wait、notify和notifyAll方法就构成了内部条件队列的API。对象的内置锁与其内部条件队列是相互关联的,要调用对象X中条件队列的任何一个方法,必须持有对象X上的锁。这是因为“等待由状态构成的条件”与“维护状态一致性”这两种机制必须被紧密地绑定在一起:只有能对状态进行检查时,才能在某个条件上等待,并且只有能修改状态时,才能从条件等待中释放另一个线程。

Object.wait会自动释放锁,并请求操作系统挂起当前线程,从而使其他线程能够获得这个锁并修改对象的状态。当被挂起的线程醒来时,它将在返回之前重新获取锁。

要想正确地使用条件队列,关键是找出对象在哪个条件谓词上等待。条件谓词是使某个操作称为状态依赖操作的前提条件。在有界缓存中,只有当缓存不为空时,take方法才能执行,否则必须等待。对take方法来说,它的条件谓词就是“缓存不为空”,take方法在执行之前必须首先测试该条件谓词。同样,put方法的条件谓词是缓存不满。条件谓词是由类中各个状态变量构成的表达式。BaseBoundedBuffer在测试“缓存不为空”时将把count与0进行比较,在测试“缓存不满”时将把count与缓存的大小进行比较。

在条件等待中存在一种重要的三元关系,包括加锁、wait方法和一个条件谓词。在条件谓词中包含多个状态变量,而状态变量由一个锁来保护,因此在测试条件谓词之前必须先持有这个锁。锁对象与条件队列对象(即调用wait和notify等方法所在的对象)必须是同一个对象。

void stateDependentMethod() throws InterruptedException {
    // 必须通过一个锁来保护条件谓词
    synchronized(lock) {
        while( !conditionPredicate() ) {
            lock.wait();
        }
        // 现在对象处于合适的状态
    }
}

当使用条件等待时(例如Object.wait或Condition.await):

  • 通常都有一个条件谓词——包括一些对象状态的测试,线程在执行前必须首先通过这些测试
  • 在调用wait之前测试条件谓词,并且从wait中返回时再次测试
  • 在一个循环中调用wait
  • 确保使用与条件队列相关的锁来保护构成条件谓词的各个状态变量
  • 当调用wait、notify或notifyAll等方法时,一定要持有与条件队列相关的锁
  • 在检查条件谓词之后以及开始执行相应的操作之前,不要释放锁。

每当在等待一个条件时,一定要确保在条件谓词变为真时通过某种方式发出通知。

在条件队列API中有两个发出通知的方法,即notify和notifyAll。无论调用哪一个,都必须持有与条件队列对象相关联的锁。在调用notify时,JVM会从这个条件队列上等待的多个线程中选择一个来唤醒,而调用notifyAll则会唤醒所有在这个条件队列上等待的线程。由于在调用notify或notifyAll时必须持有条件队列对象的锁,而如果这些等待中线程此时不能重新获得锁,那么无法从wait返回,因此发出通知的线程应该尽快地释放锁,从而确保正在等待的线程尽可能快地解除阻塞。

只有同时满足以下两个条件时,才能用单一的notify而不是notifyAll:

  • 所有等待线程的类型都相同。只有一个条件谓词与条件队列相关,并且每个线程在从wait返回后将执行相同的操作。
  • 单进单出。在条件变量上的每次通知,最多只能唤醒一个线程来执行。
public class ThreadGate {
    private boolean isOpen;
    private int generation;

    public synchronized void close() {
        isOpen = false;
    }
    public synchronized void open() {
        ++generation;
        isOpen = true;
        notifyAll();
    }
    public synchronized void await() throws InterruptedException {
        int arrivalGeneration = generation;
        while (!isOpen && arrivalGeneration == generation) {
            wait();
        }
    }
}

如果阀门打开时有N个线程正在等待它,那么这些线程都应该被允许执行。然而,如果阀门在打开后又非常快速地关闭了,并且await方法只检查isOpen,那么所有线程都可能无法释放:当所有编程收到通知时,将重新请求锁并退出wait,而此时的阀门可能已经再次关闭了。因此,在ThreadGate中使用了一个更复杂的条件谓词:每次阀门开启时,递增一个generation计数器,如果阀门现在是打开的,或者阀门自从该线程到达后就一直是打开的,那么线程就可以通过await。(有点费解)

内置条件队列存在一些缺陷。每个内置锁都只能有一个相关联的条件队列,因而在像BoundedBuffer这种类中,多个线程可能在同一个条件队列上等待不同的条件谓词,并且在最常见的加锁模式下公开条件队列对象。这些因素都使得无法满足在使用notifyAll时所有等待线程为同一类型的需求。如果想编写一个带有多个条件谓词的并发对象,或者想获得除了条件队列可见性之外的更多控制权,就可以使用显式的Lock和Condition而不是内置锁和条件队列,这是一种更灵活的选择。一个Condition和一个Lock关联在一起,就像一个条件队列和一个内置锁相关联一样。要创建一个Condition,可以在相关联的Lock上调用Lock.newCondition方法。正如Lock比内置锁提供了更为丰富的功能,Condition同样比内置条件队列提供了更丰富的功能:在每个锁上可存在多个等待、条件等待可以是可中断的或不可中断的、基于时限的等待,以及公平的或非公平的条件队列操作。与内置条件队列不同的是,对于每个Lock,可以由任意数量的Condition对象。Condition对象继承了相关的Lock对象的公平性,对于公平的锁,线程会按照FIFO顺序从Condition.await中释放。

public class ConditionBoundedBuffer<T> {
    protected final Lock lock = new ReentrantLock();
    // 条件谓词:notFull (count < items.length)
    private final Condition notFull = lock.newCondition();
    // 条件谓词:notEmpty (count>0)
    private final Condition notEmpty = lock.newCondition();
    private final T[] items;
    private int head,tail,count;

    public ConditionBoundedBuffer(int limit) {
        items = (T[]) new Object[limit];
    }

    // 阻塞并直到: notFull
    public void put(T x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) {
                notFull.await();
            }
            items[tail] = x;
            if (++tail == items.length) {
                tail = 0;
            }
            count++;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }
    // 阻塞并直到: notEmpty
    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await();
            }
            T x = items[head];
            items[head] = null;
            if (++head == items.length) {
                head = 0;
            }
            count--;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

通过将两个条件谓词分开并放到两个等待线程集中,Condition使其更容易满足单次通知的需求。signal比signalAll更高效,它能极大地减少在每次缓存操作中发生的上下文切换与锁请求的次数。

与内置锁和条件队列一样,当使用显式的Lock和Condition时,也必须满足锁、条件谓词和条件变量之间的三元关系。在条件谓词中包含的变量必须由Lock来保护,并且在检查条件谓词以及调用await和signal时,必须持有Lock对象。

在ReentrantLock和Semaphore这两个接口之间存在许多共同点。这两个类都可以用作一个“阀门”,即每次只允许一定数量的线程通过,并当线程到达阀门时,可以通过(在调用lock或acquire时返回成功),也可以等待(在调用lock或acquire时阻塞),还可以取消(在调用tryLock或tryAcquire时返回“假”,表示在指定时间内锁是不可用的或者无法获得许可)。而且,这两个接口都支持可中断的、不可中断的以及限时的获取操作,并且也都支持等待线程执行公平或非公平的队列操作。

public class SemaphoreOnLock {
    private final Lock lock = new ReentrantLock();
    // 条件谓词: permitsAvailable (permits > 0)
    private final Condition permitsAvailable = lock.newCondition();
    private int permits;

    public SemaphoreOnLock(int initialPermits) {
        lock.lock();
        try {
            permits = initialPermits;
        } finally {
            lock.unlock();
        }
    }

    public void acquire() throws InterruptedException {
        lock.lock();
        try {
            while (permits <= 0) {
                permitsAvailable.await();
            }
            --permits;
        } finally {
            lock.unlock();
        }
    }

    public void release() throws InterruptedException {
        lock.lock();
        try {
            ++permits;
            permitsAvailable.signal();
        } finally {
            lock.unlock();
        }
    }
}

在基于AQS构建的同步器类中,最基本的操作包括各种形式的获取操作和释放操作。获取操作时一种依赖状态的操作,并且通常会阻塞。当使用锁或信号量时,“获取”操作的含义就很直观,即获取的事锁或者许可,并且调用者可能会一直等待知道同步器类处于可被获取的状态。在使用CountDownLatch时,“获取”操作意味着“等待并直到闭锁到达结束状态”,而在使用FutureTask时,则意味着“等待并直到任务已经完成”。“释放”并不是一个可阻塞的操作,当执行“释放”操作时,所有在请求时被阻塞的线程都会开始执行。

AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过getStatesetState以及compareAndSetState等protected类型方法来进行操作。这个整数可以用来表示任意状态。例如,ReentrantLock用它来表示所有者线程已经重复获取该锁的次数,Semaphore用它来表示剩余的许可数量,FutureTask用它来表示任务的状态(尚未开始、正在运行、已完成以及已取消)。在同步器类中还可以自行管理一些额外的状态变量,例如ReentrantLock保存了锁的当前所有者的信息,这样就能区分某个获取操作时重入的还是竞争的。

根据同步器的不同,获取操作可以是一种独占操作(例如ReentrantLock),也可以是一个非独占操作(例如Semaphore和CountDownLatch)。一个获取操作包括两部分,首先,同步器判断当前状态是否允许获得操作,如果是,则允许线程执行,否则获取操作将阻塞或失败。其次,就是更新同步器的状态,获取同步器的某个线程可能会对其他线程能否也获取该同步器造成影响。

如果某个同步器支持独占的获取操作,那么需要实现一些保护方法,包括tryAcquire、tryRelease和isHeldExclusively等,而对于支持共享获取的同步器,则应该实现tryAcquireShared和tryReleaseShared等方法。AQS中的acquire、acquireShared、release和releaseShared等方法都将调用这些方法在子类中带有前缀try的版本来判断某个操作是否能执行。在同步器的子类中,可以根据其获取操作和释放操作的语义,使用getState、setState以及compareAndSetState来检查和更新状态,并通过返回的状态值来告诉基类“获取”或“释放”同步器的操作是否成功。例如,如果tryAcquireShared返回一个负值,那么表示获取操作失败,返回零值表示同步器通过独占方式被获取,返回正值则表示同步器通过非独占方式被获取。对于tryRelease和tryReleaseShared方法来说,如果释放操作使得所有在获取同步器时被阻塞的线程恢复执行,那么这两个方法应该返回true。

    boolean acquire() throws InterruptedException {
        while (当前状态不允许获取操作) {
            if (需要阻塞获取请求) {
                如果当前线程不在等待队列中,则将其插入等待队列
                阻塞当前线程        
            } else {
                返回失败
            }
        }
        可能更新同步器的状态
        如果线程位于等待队列中,则将其移除队列
        返回成功
    }

    void release() {
        更新同步器的状态
        if(新的状态允许某个被阻塞的线程获取成功) {
            解除等待队列中一个或多个线程的阻塞状态
        }
    }
public class OneShotLatch {
    private class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected int tryAcquireShared(int arg) {
            return (getState() == 1) ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            setState(1);
            return true;
        }
    }

    private final Sync sync = new Sync();

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(0);
    }

    public void signal() {
        sync.releaseShared(0);
    }
}

ReentrantLock只支持独占方式的获取操作,因此它实现了tryAcquire、tryRelease和isHeldExclusively。ReentrantLock将同步状态用于保存锁获取操作的次数,并且维护了一个owner变量来保存当前所有者线程的标识符,只有在当前线程刚刚获取到锁,或者正要释放锁的时候,才会修改这个变量。在tryRelease中检查owner域,从而确保当前线程在执行unlock操作之前已经获取了锁,在tryAcquire中将使用这个域来区分获取操作是重入的还是竞争的。

    protected boolean tryAcquire(int ignored) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, 1)) {
                owner = thread;
                return true;
            }
        } else if (current == owner) {
            setState(c+1);
            return true;
        }
        return false;
    }

Semaphore将AQS的同步状态用于保存当前可用许可的数量。tryAcquireShared首先计算剩余许可的数量,如果没有足够的许可,那么将返回一个值表示获取操作失败。如果还有剩余的许可,那么tryAcquireShared会通过compareAndSetState以原子方式来降低许可的计数。如果这个操作成功(这意味着许可的计数自从上一次读取后就没有被修改过),那么将返回一个值表示获取操作成功。在返回值中还包含了表示其他共享获取操作能否成功的信息,如果成功,那么其他等待的线程同样会解除阻塞。当没有足够的许可,或者当tryAcquireShared可以通过原子方式来更新许可的计数以响应获取操作时,while循环将终止。虽然compareAndSetState的调用可能由于与另一个线程发生竞争而失败,并使其重新尝试,但在经过了一定次数的重试操作以后,在这两个结束条件中有一个会变为真。同样tryReleaseShared将增加许可计数,这可能会解除等待中线程的阻塞状态,并且不断地重试直到更新操作成功。tryReleaseShared的返回值表示在这次释放操作中解除了其他线程的阻塞。

CountDownLatch使用AQS的方式与Semaphore很相似:在同步状态中保存的是当前的计数值。countDown方法调用release,从而导致计数值递减,并且当计数值为零时,解除所有等待线程的阻塞。await调用acquire,当计数值为零时,acquire将立即返回,否则将阻塞。

    protected int tryAcquireShared(int acquires) {
        while (true) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 || compareAndSetState(available, remaining)) {
                return remaining;
            }
        }
    }

    protected boolean tryReleaseShared(int releases) {
        while (true) {
            int p = getState();
            if (compareAndSetState(p, p+releases)) {
                return true;
            }
        }
    }

ReadWriteLock接口表示存在两个锁:一个读取锁和一个写入锁,但在基于AQS实现的ReentrantReadWriteLock中,单个AQS子类将同时管理读取加锁和写入加锁。ReentrantReadWriteLock使用了一个16位的状态来表示写入锁的计数,并且使用了另一个16位的状态来表示读取锁的计数。在读取锁上的操作将使用共享的获取方法与释放方法,在写入锁上的操作将使用独占的获取方法与释放方法。

AQS在内部维护一个等待线程队列,其中记录了某个线程请求的事独占访问还是共享访问。在ReentrantReadWriteLock中,当锁可用时,如果位于队列头部的线程执行写入操作,那么线程会得到这个锁,如果位于队列头部的线程执行读取访问,那么队列中在第一个写入线程之前的所有线程都将获得这个锁。

results matching ""

    No results matching ""