5.基础构建模块
同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作。容器上常见的复合操作包括:迭代(反复访问元素,直到遍历完容器中所有元素)、跳转(根据指定顺序找到当前元素的下一个元素)以及条件运算,例如“若没有则添加”(检查在Map中是否存在键值K,如果没有,就加入二元组(K,V))。
无论是直接迭代还是在Java5.0引入的for-each循环语法中,对容器类进行迭代的标准方式都是使用Iterator。然而,如果有其他线程并发地修改容器,那么即使是使用迭代器也无法避免在迭代期间对容器加锁。
设计同步容器类的迭代器时并没有考虑到并发修改的问题,并且它们表现出的行为是及时失败fail-fast的。这意味着,当它们发现容器在迭代过程中被修改时,就会抛出一个ConcurrentModificationException异常。
如果不希望在迭代期间对容器加锁,那么一种替代方法就是克隆容器,并在副本上进行迭代。
容器的hashCode
和equals
等方法也会间接地执行迭代操作,当容器作为另一个容器的元素或键值时,就会出现这种情况。同样,containsAll
,removeAll
、retainAll
等方法,以及把容器作为参数的构造函数,都会对容器进行迭代。多有这些间接的迭代操作都可能抛出ConcurrentModificationException
。
通过并发容器来代替同步容器,可以极大地提高伸缩性并降低风险。
在Java5.0增加了ConcurrentHashMap,用来替代同步且基于散列的Map,以及CopyOnWriteArrayList,用于在遍历操作为主要操作的情况下代替同步的List。在新的ConcurrentMap接口中增加了对一些常见复合操作的支持,例如“若没有则添加”、替换以及有条件删除等。
Java5.0增加了两种新的容器类型:Queue和BlockingQueue。Queue用来临时保存一组等待处理的元素。它提供了几种实现,包括:ConcurrentLinkedQueue,这是一个传统的先进先出队列,以及PriorityQueue,这是一个非并发的优先队列。Queue上的操作不会阻塞,如果队列为空,那么获取元素的操作将返回空值。虽然可以用List来模拟Queue的行为——事实上,正是通过LinkedList来实现Queue的,但还需要一个Queue的类,因为它能去掉List的随机访问需求,从而实现更高效的并发。
BlockingQueue扩展了Queue,增加了可阻塞的插入和获取等操作。如果队列为空,那么获取元素的操作将一直阻塞,直到队列中出现一个可用的元素。如果队列已满(对于有界队列来说),那么插入元素的操作将一直阻塞。直到队列中出现可用的空间。在生产者——消费者这种设计模式中,阻塞队列是非常有用的。
正如ConcurrentHashMap用于代替基于散列的同步Map,Java6也引入了ConcurrentSkipListMap和ConcurrentSkipListSet,分别作为同步的SortedMap和SortedSet的并发替代品(例如用synchronizedMap包装的TreeMap或TreeSet)。
与HashMap一样,ConcurrentHashMap也是一个基于散列的Map,但它使用了一种完全不同的加锁策略来提供更高的并发性和伸缩性。ConcurrentHashMap并不是将每个方法都在同一个锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为分段锁(Lock Striping)。在这种机制中,任意数量的读取线程可以并发地访问Map,执行读取操作的线程和执行写入操作的线程可以并发地访问Map,并且一定数量的写入线程可以并发地修改Map。ConcurrentHashMap带来的结果是,在并发访问环境下将实现更高的吞吐量,而在单线程环境中只损失非常小的性能。
ConcurrentHashMap与其他并发容器一起增强了同步容器类:它们提供的迭代器不会抛出ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。ConcurrentHashMap返回的迭代器具有弱一致性(Weakly Consistent),而并非“及时失败”。弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有的元素,并可以(但是不保证)在迭代器被构造后将修改操作反映给容器。
尽管有这些改进,但仍然有一些需要权衡的因素。对于一些需要在整个Map上进行计算的方法,例如size和isEmpty,这些方法的语义被略微减弱了以反映容器的并发特性。由于size返回的结果在计算时可能已经过期了,它实际上只是一个估计值,因此允许size返回一个近似值而不是一个精确值。
/**
* A {@link java.util.Map} providing thread safety and atomicity
* guarantees.
*
* <p>Memory consistency effects: As with other concurrent
* collections, actions in a thread prior to placing an object into a
* {@code ConcurrentMap} as a key or value
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* actions subsequent to the access or removal of that object from
* the {@code ConcurrentMap} in another thread.
*
* <p>This interface is a member of the
* <a href="{@docRoot}/../technotes/guides/collections/index.html">
* Java Collections Framework</a>.
*
* @since 1.5
* @author Doug Lea
* @param <K> the type of keys maintained by this map
* @param <V> the type of mapped values
*/
public interface ConcurrentMap<K, V> extends Map<K, V> {
/**
* {@inheritDoc}
*
* @implNote This implementation assumes that the ConcurrentMap cannot
* contain null values and {@code get()} returning null unambiguously means
* the key is absent. Implementations which support null values
* <strong>must</strong> override this default implementation.
*
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @since 1.8
*/
@Override
default V getOrDefault(Object key, V defaultValue) {
V v;
return ((v = get(key)) != null) ? v : defaultValue;
}
/**
* {@inheritDoc}
*
* @implSpec The default implementation is equivalent to, for this
* {@code map}:
* <pre> {@code
* for ((Map.Entry<K, V> entry : map.entrySet())
* action.accept(entry.getKey(), entry.getValue());
* }</pre>
*
* @implNote The default implementation assumes that
* {@code IllegalStateException} thrown by {@code getKey()} or
* {@code getValue()} indicates that the entry has been removed and cannot
* be processed. Operation continues for subsequent entries.
*
* @throws NullPointerException {@inheritDoc}
* @since 1.8
*/
@Override
default void forEach(BiConsumer<? super K, ? super V> action) {
Objects.requireNonNull(action);
for (Map.Entry<K, V> entry : entrySet()) {
K k;
V v;
try {
k = entry.getKey();
v = entry.getValue();
} catch(IllegalStateException ise) {
// this usually means the entry is no longer in the map.
continue;
}
action.accept(k, v);
}
}
/**
* If the specified key is not already associated
* with a value, associate it with the given value.
* This is equivalent to
* <pre> {@code
* if (!map.containsKey(key))
* return map.put(key, value);
* else
* return map.get(key);
* }</pre>
*
* except that the action is performed atomically.
*
* @implNote This implementation intentionally re-abstracts the
* inappropriate default provided in {@code Map}.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with the specified key, or
* {@code null} if there was no mapping for the key.
* (A {@code null} return can also indicate that the map
* previously associated {@code null} with the key,
* if the implementation supports null values.)
* @throws UnsupportedOperationException if the {@code put} operation
* is not supported by this map
* @throws ClassCastException if the class of the specified key or value
* prevents it from being stored in this map
* @throws NullPointerException if the specified key or value is null,
* and this map does not permit null keys or values
* @throws IllegalArgumentException if some property of the specified key
* or value prevents it from being stored in this map
*/
V putIfAbsent(K key, V value);
/**
* Removes the entry for a key only if currently mapped to a given value.
* This is equivalent to
* <pre> {@code
* if (map.containsKey(key) && Objects.equals(map.get(key), value)) {
* map.remove(key);
* return true;
* } else
* return false;
* }</pre>
*
* except that the action is performed atomically.
*
* @implNote This implementation intentionally re-abstracts the
* inappropriate default provided in {@code Map}.
*
* @param key key with which the specified value is associated
* @param value value expected to be associated with the specified key
* @return {@code true} if the value was removed
* @throws UnsupportedOperationException if the {@code remove} operation
* is not supported by this map
* @throws ClassCastException if the key or value is of an inappropriate
* type for this map
* (<a href="../Collection.html#optional-restrictions">optional</a>)
* @throws NullPointerException if the specified key or value is null,
* and this map does not permit null keys or values
* (<a href="../Collection.html#optional-restrictions">optional</a>)
*/
boolean remove(Object key, Object value);
/**
* Replaces the entry for a key only if currently mapped to a given value.
* This is equivalent to
* <pre> {@code
* if (map.containsKey(key) && Objects.equals(map.get(key), oldValue)) {
* map.put(key, newValue);
* return true;
* } else
* return false;
* }</pre>
*
* except that the action is performed atomically.
*
* @implNote This implementation intentionally re-abstracts the
* inappropriate default provided in {@code Map}.
*
* @param key key with which the specified value is associated
* @param oldValue value expected to be associated with the specified key
* @param newValue value to be associated with the specified key
* @return {@code true} if the value was replaced
* @throws UnsupportedOperationException if the {@code put} operation
* is not supported by this map
* @throws ClassCastException if the class of a specified key or value
* prevents it from being stored in this map
* @throws NullPointerException if a specified key or value is null,
* and this map does not permit null keys or values
* @throws IllegalArgumentException if some property of a specified key
* or value prevents it from being stored in this map
*/
boolean replace(K key, V oldValue, V newValue);
/**
* Replaces the entry for a key only if currently mapped to some value.
* This is equivalent to
* <pre> {@code
* if (map.containsKey(key)) {
* return map.put(key, value);
* } else
* return null;
* }</pre>
*
* except that the action is performed atomically.
*
* @implNote This implementation intentionally re-abstracts the
* inappropriate default provided in {@code Map}.
*
* @param key key with which the specified value is associated
* @param value value to be associated with the specified key
* @return the previous value associated with the specified key, or
* {@code null} if there was no mapping for the key.
* (A {@code null} return can also indicate that the map
* previously associated {@code null} with the key,
* if the implementation supports null values.)
* @throws UnsupportedOperationException if the {@code put} operation
* is not supported by this map
* @throws ClassCastException if the class of the specified key or value
* prevents it from being stored in this map
* @throws NullPointerException if the specified key or value is null,
* and this map does not permit null keys or values
* @throws IllegalArgumentException if some property of the specified key
* or value prevents it from being stored in this map
*/
V replace(K key, V value);
/**
* {@inheritDoc}
*
* @implSpec
* <p>The default implementation is equivalent to, for this {@code map}:
* <pre> {@code
* for ((Map.Entry<K, V> entry : map.entrySet())
* do {
* K k = entry.getKey();
* V v = entry.getValue();
* } while(!replace(k, v, function.apply(k, v)));
* }</pre>
*
* The default implementation may retry these steps when multiple
* threads attempt updates including potentially calling the function
* repeatedly for a given key.
*
* <p>This implementation assumes that the ConcurrentMap cannot contain null
* values and {@code get()} returning null unambiguously means the key is
* absent. Implementations which support null values <strong>must</strong>
* override this default implementation.
*
* @throws UnsupportedOperationException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
* @since 1.8
*/
@Override
default void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
Objects.requireNonNull(function);
forEach((k,v) -> {
while(!replace(k, v, function.apply(k, v))) {
// v changed or k is gone
if ( (v = get(k)) == null) {
// k is no longer in the map.
break;
}
}
});
}
/**
* {@inheritDoc}
*
* @implSpec
* The default implementation is equivalent to the following steps for this
* {@code map}, then returning the current value or {@code null} if now
* absent:
*
* <pre> {@code
* if (map.get(key) == null) {
* V newValue = mappingFunction.apply(key);
* if (newValue != null)
* return map.putIfAbsent(key, newValue);
* }
* }</pre>
*
* The default implementation may retry these steps when multiple
* threads attempt updates including potentially calling the mapping
* function multiple times.
*
* <p>This implementation assumes that the ConcurrentMap cannot contain null
* values and {@code get()} returning null unambiguously means the key is
* absent. Implementations which support null values <strong>must</strong>
* override this default implementation.
*
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @since 1.8
*/
@Override
default V computeIfAbsent(K key,
Function<? super K, ? extends V> mappingFunction) {
Objects.requireNonNull(mappingFunction);
V v, newValue;
return ((v = get(key)) == null &&
(newValue = mappingFunction.apply(key)) != null &&
(v = putIfAbsent(key, newValue)) == null) ? newValue : v;
}
/**
* {@inheritDoc}
*
* @implSpec
* The default implementation is equivalent to performing the following
* steps for this {@code map}, then returning the current value or
* {@code null} if now absent. :
*
* <pre> {@code
* if (map.get(key) != null) {
* V oldValue = map.get(key);
* V newValue = remappingFunction.apply(key, oldValue);
* if (newValue != null)
* map.replace(key, oldValue, newValue);
* else
* map.remove(key, oldValue);
* }
* }</pre>
*
* The default implementation may retry these steps when multiple threads
* attempt updates including potentially calling the remapping function
* multiple times.
*
* <p>This implementation assumes that the ConcurrentMap cannot contain null
* values and {@code get()} returning null unambiguously means the key is
* absent. Implementations which support null values <strong>must</strong>
* override this default implementation.
*
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @since 1.8
*/
@Override
default V computeIfPresent(K key,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
Objects.requireNonNull(remappingFunction);
V oldValue;
while((oldValue = get(key)) != null) {
V newValue = remappingFunction.apply(key, oldValue);
if (newValue != null) {
if (replace(key, oldValue, newValue))
return newValue;
} else if (remove(key, oldValue))
return null;
}
return oldValue;
}
/**
* {@inheritDoc}
*
* @implSpec
* The default implementation is equivalent to performing the following
* steps for this {@code map}, then returning the current value or
* {@code null} if absent:
*
* <pre> {@code
* V oldValue = map.get(key);
* V newValue = remappingFunction.apply(key, oldValue);
* if (oldValue != null ) {
* if (newValue != null)
* map.replace(key, oldValue, newValue);
* else
* map.remove(key, oldValue);
* } else {
* if (newValue != null)
* map.putIfAbsent(key, newValue);
* else
* return null;
* }
* }</pre>
*
* The default implementation may retry these steps when multiple
* threads attempt updates including potentially calling the remapping
* function multiple times.
*
* <p>This implementation assumes that the ConcurrentMap cannot contain null
* values and {@code get()} returning null unambiguously means the key is
* absent. Implementations which support null values <strong>must</strong>
* override this default implementation.
*
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @since 1.8
*/
@Override
default V compute(K key,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
Objects.requireNonNull(remappingFunction);
V oldValue = get(key);
for(;;) {
V newValue = remappingFunction.apply(key, oldValue);
if (newValue == null) {
// delete mapping
if (oldValue != null || containsKey(key)) {
// something to remove
if (remove(key, oldValue)) {
// removed the old value as expected
return null;
}
// some other value replaced old value. try again.
oldValue = get(key);
} else {
// nothing to do. Leave things as they were.
return null;
}
} else {
// add or replace old mapping
if (oldValue != null) {
// replace
if (replace(key, oldValue, newValue)) {
// replaced as expected.
return newValue;
}
// some other value replaced old value. try again.
oldValue = get(key);
} else {
// add (replace if oldValue was null)
if ((oldValue = putIfAbsent(key, newValue)) == null) {
// replaced
return newValue;
}
// some other value replaced old value. try again.
}
}
}
}
/**
* {@inheritDoc}
*
* @implSpec
* The default implementation is equivalent to performing the following
* steps for this {@code map}, then returning the current value or
* {@code null} if absent:
*
* <pre> {@code
* V oldValue = map.get(key);
* V newValue = (oldValue == null) ? value :
* remappingFunction.apply(oldValue, value);
* if (newValue == null)
* map.remove(key);
* else
* map.put(key, newValue);
* }</pre>
*
* <p>The default implementation may retry these steps when multiple
* threads attempt updates including potentially calling the remapping
* function multiple times.
*
* <p>This implementation assumes that the ConcurrentMap cannot contain null
* values and {@code get()} returning null unambiguously means the key is
* absent. Implementations which support null values <strong>must</strong>
* override this default implementation.
*
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @since 1.8
*/
@Override
default V merge(K key, V value,
BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
Objects.requireNonNull(remappingFunction);
Objects.requireNonNull(value);
V oldValue = get(key);
for (;;) {
if (oldValue != null) {
V newValue = remappingFunction.apply(oldValue, value);
if (newValue != null) {
if (replace(key, oldValue, newValue))
return newValue;
} else if (remove(key, oldValue)) {
return null;
}
oldValue = get(key);
} else {
if ((oldValue = putIfAbsent(key, value)) == null) {
return value;
}
}
}
}
}
CopyOnWriteArrayList用于替代同步List,在某些情况下它提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制。类似地,CopyOnWriteArraySet的作用是替代同步Set。写入时复制Copy-On-Write容器的线程安全性在于,只要正确地发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。
显然,每当修改容器时都会复制底层数组,这需要一定的开销,特别是当容器的规模较大时。仅当迭代操作远远多于修改操作时,才应该使用写入时复制容器。
/**
* A collection designed for holding elements prior to processing.
* Besides basic {@link java.util.Collection Collection} operations,
* queues provide additional insertion, extraction, and inspection
* operations. Each of these methods exists in two forms: one throws
* an exception if the operation fails, the other returns a special
* value (either {@code null} or {@code false}, depending on the
* operation). The latter form of the insert operation is designed
* specifically for use with capacity-restricted {@code Queue}
* implementations; in most implementations, insert operations cannot
* fail.
*
* <table BORDER CELLPADDING=3 CELLSPACING=1>
* <caption>Summary of Queue methods</caption>
* <tr>
* <td></td>
* <td ALIGN=CENTER><em>Throws exception</em></td>
* <td ALIGN=CENTER><em>Returns special value</em></td>
* </tr>
* <tr>
* <td><b>Insert</b></td>
* <td>{@link Queue#add add(e)}</td>
* <td>{@link Queue#offer offer(e)}</td>
* </tr>
* <tr>
* <td><b>Remove</b></td>
* <td>{@link Queue#remove remove()}</td>
* <td>{@link Queue#poll poll()}</td>
* </tr>
* <tr>
* <td><b>Examine</b></td>
* <td>{@link Queue#element element()}</td>
* <td>{@link Queue#peek peek()}</td>
* </tr>
* </table>
*
* <p>Queues typically, but do not necessarily, order elements in a
* FIFO (first-in-first-out) manner. Among the exceptions are
* priority queues, which order elements according to a supplied
* comparator, or the elements' natural ordering, and LIFO queues (or
* stacks) which order the elements LIFO (last-in-first-out).
* Whatever the ordering used, the <em>head</em> of the queue is that
* element which would be removed by a call to {@link #remove() } or
* {@link #poll()}. In a FIFO queue, all new elements are inserted at
* the <em>tail</em> of the queue. Other kinds of queues may use
* different placement rules. Every {@code Queue} implementation
* must specify its ordering properties.
*
* <p>The {@link #offer offer} method inserts an element if possible,
* otherwise returning {@code false}. This differs from the {@link
* java.util.Collection#add Collection.add} method, which can fail to
* add an element only by throwing an unchecked exception. The
* {@code offer} method is designed for use when failure is a normal,
* rather than exceptional occurrence, for example, in fixed-capacity
* (or "bounded") queues.
*
* <p>The {@link #remove()} and {@link #poll()} methods remove and
* return the head of the queue.
* Exactly which element is removed from the queue is a
* function of the queue's ordering policy, which differs from
* implementation to implementation. The {@code remove()} and
* {@code poll()} methods differ only in their behavior when the
* queue is empty: the {@code remove()} method throws an exception,
* while the {@code poll()} method returns {@code null}.
*
* <p>The {@link #element()} and {@link #peek()} methods return, but do
* not remove, the head of the queue.
*
* <p>The {@code Queue} interface does not define the <i>blocking queue
* methods</i>, which are common in concurrent programming. These methods,
* which wait for elements to appear or for space to become available, are
* defined in the {@link java.util.concurrent.BlockingQueue} interface, which
* extends this interface.
*
* <p>{@code Queue} implementations generally do not allow insertion
* of {@code null} elements, although some implementations, such as
* {@link LinkedList}, do not prohibit insertion of {@code null}.
* Even in the implementations that permit it, {@code null} should
* not be inserted into a {@code Queue}, as {@code null} is also
* used as a special return value by the {@code poll} method to
* indicate that the queue contains no elements.
*
* <p>{@code Queue} implementations generally do not define
* element-based versions of methods {@code equals} and
* {@code hashCode} but instead inherit the identity based versions
* from class {@code Object}, because element-based equality is not
* always well-defined for queues with the same elements but different
* ordering properties.
*
*
* <p>This interface is a member of the
* <a href="{@docRoot}/../technotes/guides/collections/index.html">
* Java Collections Framework</a>.
*
* @see java.util.Collection
* @see LinkedList
* @see PriorityQueue
* @see java.util.concurrent.LinkedBlockingQueue
* @see java.util.concurrent.BlockingQueue
* @see java.util.concurrent.ArrayBlockingQueue
* @see java.util.concurrent.LinkedBlockingQueue
* @see java.util.concurrent.PriorityBlockingQueue
* @since 1.5
* @author Doug Lea
* @param <E> the type of elements held in this collection
*/
public interface Queue<E> extends Collection<E> {
/**
* Inserts the specified element into this queue if it is possible to do so
* immediately without violating capacity restrictions, returning
* {@code true} upon success and throwing an {@code IllegalStateException}
* if no space is currently available.
*
* @param e the element to add
* @return {@code true} (as specified by {@link Collection#add})
* @throws IllegalStateException if the element cannot be added at this
* time due to capacity restrictions
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null and
* this queue does not permit null elements
* @throws IllegalArgumentException if some property of this element
* prevents it from being added to this queue
*/
boolean add(E e);
/**
* Inserts the specified element into this queue if it is possible to do
* so immediately without violating capacity restrictions.
* When using a capacity-restricted queue, this method is generally
* preferable to {@link #add}, which can fail to insert an element only
* by throwing an exception.
*
* @param e the element to add
* @return {@code true} if the element was added to this queue, else
* {@code false}
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null and
* this queue does not permit null elements
* @throws IllegalArgumentException if some property of this element
* prevents it from being added to this queue
*/
boolean offer(E e);
/**
* Retrieves and removes the head of this queue. This method differs
* from {@link #poll poll} only in that it throws an exception if this
* queue is empty.
*
* @return the head of this queue
* @throws NoSuchElementException if this queue is empty
*/
E remove();
/**
* Retrieves and removes the head of this queue,
* or returns {@code null} if this queue is empty.
*
* @return the head of this queue, or {@code null} if this queue is empty
*/
E poll();
/**
* Retrieves, but does not remove, the head of this queue. This method
* differs from {@link #peek peek} only in that it throws an exception
* if this queue is empty.
*
* @return the head of this queue
* @throws NoSuchElementException if this queue is empty
*/
E element();
/**
* Retrieves, but does not remove, the head of this queue,
* or returns {@code null} if this queue is empty.
*
* @return the head of this queue, or {@code null} if this queue is empty
*/
E peek();
}
/**
* A {@link java.util.Queue} that additionally supports operations
* that wait for the queue to become non-empty when retrieving an
* element, and wait for space to become available in the queue when
* storing an element.
*
* <p>{@code BlockingQueue} methods come in four forms, with different ways
* of handling operations that cannot be satisfied immediately, but may be
* satisfied at some point in the future:
* one throws an exception, the second returns a special value (either
* {@code null} or {@code false}, depending on the operation), the third
* blocks the current thread indefinitely until the operation can succeed,
* and the fourth blocks for only a given maximum time limit before giving
* up. These methods are summarized in the following table:
*
* <table BORDER CELLPADDING=3 CELLSPACING=1>
* <caption>Summary of BlockingQueue methods</caption>
* <tr>
* <td></td>
* <td ALIGN=CENTER><em>Throws exception</em></td>
* <td ALIGN=CENTER><em>Special value</em></td>
* <td ALIGN=CENTER><em>Blocks</em></td>
* <td ALIGN=CENTER><em>Times out</em></td>
* </tr>
* <tr>
* <td><b>Insert</b></td>
* <td>{@link #add add(e)}</td>
* <td>{@link #offer offer(e)}</td>
* <td>{@link #put put(e)}</td>
* <td>{@link #offer(Object, long, TimeUnit) offer(e, time, unit)}</td>
* </tr>
* <tr>
* <td><b>Remove</b></td>
* <td>{@link #remove remove()}</td>
* <td>{@link #poll poll()}</td>
* <td>{@link #take take()}</td>
* <td>{@link #poll(long, TimeUnit) poll(time, unit)}</td>
* </tr>
* <tr>
* <td><b>Examine</b></td>
* <td>{@link #element element()}</td>
* <td>{@link #peek peek()}</td>
* <td><em>not applicable</em></td>
* <td><em>not applicable</em></td>
* </tr>
* </table>
*
* <p>A {@code BlockingQueue} does not accept {@code null} elements.
* Implementations throw {@code NullPointerException} on attempts
* to {@code add}, {@code put} or {@code offer} a {@code null}. A
* {@code null} is used as a sentinel value to indicate failure of
* {@code poll} operations.
*
* <p>A {@code BlockingQueue} may be capacity bounded. At any given
* time it may have a {@code remainingCapacity} beyond which no
* additional elements can be {@code put} without blocking.
* A {@code BlockingQueue} without any intrinsic capacity constraints always
* reports a remaining capacity of {@code Integer.MAX_VALUE}.
*
* <p>{@code BlockingQueue} implementations are designed to be used
* primarily for producer-consumer queues, but additionally support
* the {@link java.util.Collection} interface. So, for example, it is
* possible to remove an arbitrary element from a queue using
* {@code remove(x)}. However, such operations are in general
* <em>not</em> performed very efficiently, and are intended for only
* occasional use, such as when a queued message is cancelled.
*
* <p>{@code BlockingQueue} implementations are thread-safe. All
* queuing methods achieve their effects atomically using internal
* locks or other forms of concurrency control. However, the
* <em>bulk</em> Collection operations {@code addAll},
* {@code containsAll}, {@code retainAll} and {@code removeAll} are
* <em>not</em> necessarily performed atomically unless specified
* otherwise in an implementation. So it is possible, for example, for
* {@code addAll(c)} to fail (throwing an exception) after adding
* only some of the elements in {@code c}.
*
* <p>A {@code BlockingQueue} does <em>not</em> intrinsically support
* any kind of "close" or "shutdown" operation to
* indicate that no more items will be added. The needs and usage of
* such features tend to be implementation-dependent. For example, a
* common tactic is for producers to insert special
* <em>end-of-stream</em> or <em>poison</em> objects, that are
* interpreted accordingly when taken by consumers.
*
* <p>
* Usage example, based on a typical producer-consumer scenario.
* Note that a {@code BlockingQueue} can safely be used with multiple
* producers and multiple consumers.
* <pre> {@code
* class Producer implements Runnable {
* private final BlockingQueue queue;
* Producer(BlockingQueue q) { queue = q; }
* public void run() {
* try {
* while (true) { queue.put(produce()); }
* } catch (InterruptedException ex) { ... handle ...}
* }
* Object produce() { ... }
* }
*
* class Consumer implements Runnable {
* private final BlockingQueue queue;
* Consumer(BlockingQueue q) { queue = q; }
* public void run() {
* try {
* while (true) { consume(queue.take()); }
* } catch (InterruptedException ex) { ... handle ...}
* }
* void consume(Object x) { ... }
* }
*
* class Setup {
* void main() {
* BlockingQueue q = new SomeQueueImplementation();
* Producer p = new Producer(q);
* Consumer c1 = new Consumer(q);
* Consumer c2 = new Consumer(q);
* new Thread(p).start();
* new Thread(c1).start();
* new Thread(c2).start();
* }
* }}</pre>
*
* <p>Memory consistency effects: As with other concurrent
* collections, actions in a thread prior to placing an object into a
* {@code BlockingQueue}
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* actions subsequent to the access or removal of that element from
* the {@code BlockingQueue} in another thread.
*
* <p>This interface is a member of the
* <a href="{@docRoot}/../technotes/guides/collections/index.html">
* Java Collections Framework</a>.
*
* @since 1.5
* @author Doug Lea
* @param <E> the type of elements held in this collection
*/
public interface BlockingQueue<E> extends Queue<E> {
/**
* Inserts the specified element into this queue if it is possible to do
* so immediately without violating capacity restrictions, returning
* {@code true} upon success and throwing an
* {@code IllegalStateException} if no space is currently available.
* When using a capacity-restricted queue, it is generally preferable to
* use {@link #offer(Object) offer}.
*
* @param e the element to add
* @return {@code true} (as specified by {@link Collection#add})
* @throws IllegalStateException if the element cannot be added at this
* time due to capacity restrictions
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
boolean add(E e);
/**
* Inserts the specified element into this queue if it is possible to do
* so immediately without violating capacity restrictions, returning
* {@code true} upon success and {@code false} if no space is currently
* available. When using a capacity-restricted queue, this method is
* generally preferable to {@link #add}, which can fail to insert an
* element only by throwing an exception.
*
* @param e the element to add
* @return {@code true} if the element was added to this queue, else
* {@code false}
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
boolean offer(E e);
/**
* Inserts the specified element into this queue, waiting if necessary
* for space to become available.
*
* @param e the element to add
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
void put(E e) throws InterruptedException;
/**
* Inserts the specified element into this queue, waiting up to the
* specified wait time if necessary for space to become available.
*
* @param e the element to add
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
E take() throws InterruptedException;
/**
* Retrieves and removes the head of this queue, waiting up to the
* specified wait time if necessary for an element to become available.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Returns the number of additional elements that this queue can ideally
* (in the absence of memory or resource constraints) accept without
* blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic
* limit.
*
* <p>Note that you <em>cannot</em> always tell if an attempt to insert
* an element will succeed by inspecting {@code remainingCapacity}
* because it may be the case that another thread is about to
* insert or remove an element.
*
* @return the remaining capacity
*/
int remainingCapacity();
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equals(e)}, if this queue contains one or more such
* elements.
* Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
* @throws ClassCastException if the class of the specified element
* is incompatible with this queue
* (<a href="../Collection.html#optional-restrictions">optional</a>)
* @throws NullPointerException if the specified element is null
* (<a href="../Collection.html#optional-restrictions">optional</a>)
*/
boolean remove(Object o);
/**
* Returns {@code true} if this queue contains the specified element.
* More formally, returns {@code true} if and only if this queue contains
* at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
* @return {@code true} if this queue contains the specified element
* @throws ClassCastException if the class of the specified element
* is incompatible with this queue
* (<a href="../Collection.html#optional-restrictions">optional</a>)
* @throws NullPointerException if the specified element is null
* (<a href="../Collection.html#optional-restrictions">optional</a>)
*/
public boolean contains(Object o);
/**
* Removes all available elements from this queue and adds them
* to the given collection. This operation may be more
* efficient than repeatedly polling this queue. A failure
* encountered while attempting to add elements to
* collection {@code c} may result in elements being in neither,
* either or both collections when the associated exception is
* thrown. Attempts to drain a queue to itself result in
* {@code IllegalArgumentException}. Further, the behavior of
* this operation is undefined if the specified collection is
* modified while the operation is in progress.
*
* @param c the collection to transfer elements into
* @return the number of elements transferred
* @throws UnsupportedOperationException if addition of elements
* is not supported by the specified collection
* @throws ClassCastException if the class of an element of this queue
* prevents it from being added to the specified collection
* @throws NullPointerException if the specified collection is null
* @throws IllegalArgumentException if the specified collection is this
* queue, or some property of an element of this queue prevents
* it from being added to the specified collection
*/
int drainTo(Collection<? super E> c);
/**
* Removes at most the given number of available elements from
* this queue and adds them to the given collection. A failure
* encountered while attempting to add elements to
* collection {@code c} may result in elements being in neither,
* either or both collections when the associated exception is
* thrown. Attempts to drain a queue to itself result in
* {@code IllegalArgumentException}. Further, the behavior of
* this operation is undefined if the specified collection is
* modified while the operation is in progress.
*
* @param c the collection to transfer elements into
* @param maxElements the maximum number of elements to transfer
* @return the number of elements transferred
* @throws UnsupportedOperationException if addition of elements
* is not supported by the specified collection
* @throws ClassCastException if the class of an element of this queue
* prevents it from being added to the specified collection
* @throws NullPointerException if the specified collection is null
* @throws IllegalArgumentException if the specified collection is this
* queue, or some property of an element of this queue prevents
* it from being added to the specified collection
*/
int drainTo(Collection<? super E> c, int maxElements);
}
阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞直到有空间可用;如果队列为空,那么take方法将会阻塞直到有元素可用。队列可以是有界的也可以是无界的,无界队列永远都不会充满,因此无界队列上的put方法也永远不会阻塞。
阻塞队列同样提供了一个offer方法,如果数据项不能被添加到队列中,那么将返回一个失败状态。这样你就能够创建更多灵活的策略来处理负荷过载的情况,例如减轻负载,将多余的工作项序列化并写入磁盘,减少生产者线程的数量,或者通过某种方式来抑制生产者线程。
LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,二者分别与LinkedList和ArrayList类似,但比同步List拥有更好的并发性能。PriorityBlockingQueue是一个按照优先级排序的队列,当你希望按照某种顺序而不是FIFO来处理元素时,这个队列将非常有用。PriorityBlockingQueue既可以根据元素的自然顺序来比较元素(如果它们实现了Comparable方法),也可以使用Comparator来比较。
最后一个BlockingQueue实现是SynchronousQueue,实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着把元素加入或移出队列。因为SynchronousQueue没有存储功能,因此put和take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。
Java6增加了两种容器类型,Deque和BlockingDeque,它们分别对Queue和BlockingQueue进行了扩展。Deque是一个双端队列,实现了在队列头和队列尾的高效插入和移除。具体实现包括ArrayDeque和LinkedBlockingDeque。
正如阻塞队列适用于生产者——消费者模式,双端队列同样适用于另一种相关模式,即工作密取(Work Stealing)。在生产者——消费者设计中,所有消费者有一个共享的工作队列,而在工作密取设计中,每个消费者都有各自的双端队列。如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者双端队列末尾秘密地获取工作。密取工作模式比传统的生产者——消费者模式具有更高的可伸缩性,这是因为工作者线程不会在单个共享的任务队列上发生竞争。在大多数时候,它们都只是访问自己的双端队列,从而极大地减少了竞争。当工作者线程需要访问另一个队列时,它会从队列的尾部而不是从头部获取工作,因此进一步降低了队列上的竞争程度。
线程可能会阻塞或暂停执行,原因有多种:等待I/O操作结束,等待获得一个锁,等待从Thread.sleep方法中醒来,或是等待另一个线程的计算结果。当线程阻塞时,它通常被挂起,并处于某种阻塞状态(BLOCKED、WAITING或TIMED_WAITING)。
BlockingQueue的put和take等方法会抛出受检查异常(Checked Exception)InterruptedException,这与类库中其他一些方法的做法相同,例如Thread.sleep。
Thread提供了interrupt方法,用于中断线程或者查询线程是否已经被中断。每个线程都有一个布尔类型的属性,表示线程的中断状态,当中断线程时将设置这个状态。中断是一种协作机制。一个线程不能强制其他线程停止正在执行的操作而去执行其他的操作。当线程A中断B时,A仅仅是要求B在执行到某个可以暂停的地方停止正在执行的操作——前提是如果线程B愿意停下来。
当在代码中调用了一个将抛出InterruptedException异常的方法时,你自己的方法也就变成了一个阻塞方法,并且必须要处理对中断的响应。对于库代码来说,有两种基本选择:
- 传递InterruptedException。避开这个异常通常是最明智的策略——只需把InterruptedException传递给方法的调用者。传递InterruptedException的方法包括,根本不捕获该异常,或者捕获该异常,然后在执行某种简单的清理工作后再次抛出这个异常。
- 恢复中断。有时候不能抛出InterruptedException。例如当代码是Runnable的一部分时,在这些情况下,必须捕获InterruptedException,并通过调用当前线程上的interrupt方法恢复中断状态,这样在调用栈中更高层的代码将看到引发了一个中断。
闭锁是一种同步工具类,可以延迟线程的进度知道其到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动指导其他活动都完成后才继续执行。CountDownLatch是一种灵活的闭锁实现,可以使一个或多个线程等待一组事件发生。
FutureTask也可以用作闭锁。FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3种状态:等待运行(waiting to run),正在运行(Running)和运行完成(Completed)。执行完成表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。当FutureTask进入完成状态后,它会永远停止在这个状态上。Future.get的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则get将阻塞直到任务进入完成状态。然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。
Callable表示的任务可以抛出受检查的或未受检查的异常,并且任何代码都可能抛出一个Error。无论任务代码抛出什么异常,都会被封装到一个ExecutionException中,并在Future.get中被重新抛出。
计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。
Semaphore中管理着一组虚拟的许可permit,许可的初始数量可通过构造函数来指定,在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用后释放许可。如果没有许可,那么acquire将阻塞直到有许可(或者直到被中断或者操作超时)。release方法将返回一个许可给信号量。计算信号量的一种简化形式是二值信号量,即初始值为1的Semaphore。二值信号量可以用作互斥体mutex,并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。
public class BoundedHashSet<T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<>());
this.sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded){
sem.release();
}
}
}
public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved) {
sem.release();
}
return wasRemoved;
}
}
闭锁是一次性对象,一旦进入终止状态,就不能被重置。栅栏Barrier类似于闭锁,它能阻塞一组线程知道某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。
CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的子问题。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。如果成功通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来选举产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。CyclicBarrier还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable,当成功通过栅栏时会在一个子任务线程中执行它,但在阻塞线程被释放之前是不能执行的。
另一种形式的栅栏时Exchanger,它是一种两方栅栏,各方在栅栏位置上交换数据。当两方执行不对称的操作时,Exchanger会非常有用,例如当一个线程想缓冲区写入数据,而另一个线程从缓冲区中读取数据。这些线程可以使用Exchanger来汇合,并将满的缓冲区与空的缓冲区交换。当两个线程通过Exchanger交换对象时,这种交换就把这两个对象安全地发布给另一方。
public class Memoizer<A, V> implements Computable<A, V> {
private final ConcurrentHashMap<A, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> c;
public Memoizer(Computable<A, V> c) {
this.c = c;
}
@Override
public V compute(A arg) throws InterruptedException {
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
@Override
public V call() throws InterruptedException {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<>(eval);
f = cache.putIfAbsent(arg, ft);
if (f == null) {
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(arg, f);
} catch (ExecutionException e) {
// throws exception
}
}
}
}
- 可变状态是至关重要的,所有的并发问题都可以归纳为如何协调对并发状态的访问。可变状态越少,就越容易确保线程安全性。
- 尽量将域声明为final类型,除非需要它们是可变的。
- 不可变对象一定是线程安全的。不可变对象能极大地降低并发编程的复杂性。它们更为简单而且安全,可以任意共享而无需使用加锁或保护性复制等机制。
- 封装有助于管理复杂性。在编写线程安全的程序时,虽然可以将所有数据都保存在全局变量中,但为什么要这么做?将数据封装在对象中,更易于维护不变性条件:将同步机制封装在对象中,更易于遵循同步策略。
- 用锁来保护每个可变变量。
- 当保护同一个不变性条件中的所有变量时,要使用同一个锁。
- 在执行复合操作期间,要持有锁。
- 如果从多个线程中访问同一个可变变量时没有同步机制,那么程序会出现问题。
- 不要故作聪明地推断出不需要使用同步。
- 在设计过程中考虑线程安全,或者在文档中明确地指出它不是线程安全的。
- 将同步策略文档化。