6.Java并发容器和框架

HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashMap类似,是一种数组和链表结构。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁。

Segment的get操作实现非常简单和高效。先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素。get操作的高效之处在于整个get过程不需要加锁,除非读到的值是空才会加锁重读。不需要加锁是因为get方法里将要使用的共享变量都被定义成了volatile类型。

put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须加锁。put方法首先定位Segment,然后在Segment里进行插入操作。插入操作需要经历两个步骤,第一步判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位置,然后将其放在HashEntry数组里。

  • 插入元素前会先判断Segment里的HashEntry数组是否超过容量(threshold),如果超过阈值,则对数组进行扩容。在扩容时,首先会创建一个容量是原来容量两倍的数组,然后将原数组里的元素进行再散列后插入新的数组里。为了高效,ConcurrentHashMap不会对整个容器进行扩容,而只对某个Segment扩容。
  • size操作。Segment里的全局变量count是一个volatile变量。累加count操作过程中,之前累加过的count发生变化的几率很小,所以ConcurrentHashMap的做法是先尝试2次不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,再采用加锁的方式来统计所有Segment的大小。那么ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?使用modCount变量,在put、remove和clean方法里操作元素前都会将modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。

要实现一个线程安全的队列有两种方式,一种是阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两把锁(入队和出队用不同的锁)等方式来实现。非阻塞的实现方式则可以使用循环CAS的方式来实现。

阻塞队列BlockingQueue是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

  • 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
  • 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

当阻塞队列不可用时,这两个附加操作提供了4种处理方式。

方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove(e) poll() take() poll(time, unit)
检查方法 element() peek() 不可用 不可用
  • 抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。
  • 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。
  • 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。

如果是无界阻塞队列,队列不可能会出现满的情况,所以使用put或offer方法永远不会被阻塞,而且offer方法永远返回true。

JDK 7提供了7个阻塞队列:

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列
  • SynchronousQueue:一个不存储元素的阻塞队列
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列

ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照FIFO的原则对元素进行排序。访问线程的公平性是使用可重入锁实现的。

LinkedBlockingQueue是一个用链表实现的有界阻塞队列,此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。DelayQueue非常有用,可以将DelayQueue运用在以下应用场景。

  • 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
  • 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。

延时阻塞队列的实现很简单,当消费者从队列里获取元素时,如果元素没有达到延时时间,就阻塞当前线程。

SyncrhonousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。

  • transfer方法:如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立即transfer给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列tail节点,并等到该元素被消费者消费了才返回。
  • tryTransfer方法:tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。对于带时间限制的tryTransfer(E e,long timeout, TimeUnit unit)方法,试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回。如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移除元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst、peekLast等方法。插入方法add等同于addLast,移除方法remove等同于removeFirst,take方法等同于takeFirst。在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。LinkedBlockingDeque可以运用在“工作窃取”模式中。

阻塞队列实现原理:使用通知模式实现。ArrayBlockingQueue使用了Condition来实现。

results matching ""

    No results matching ""