6.任务执行

在生产环境中,为每个任务分配一个线程这种方法存在一些缺陷,尤其是当需要创建大量的线程时:

  • 线程生命周期的开销非常高
  • 资源消耗,活跃的线程会消耗系统资源,尤其是内存。
  • 稳定性,在可创建线程的数量上存在一个限制。这个限制值将随着平台的不同而不同,并且受多个因素制约,包括JVM的启动参数、Thread构造函数中请求的栈大小,以及底层操作系统对线程的限制等。在32位机器上,其中一个主要的限制因素是线程栈的地址空间。每个线程都维护两个执行栈,一个用于Java代码,另一个用于原生代码。通常JVM在默认情况下会生成一个复合的栈,大小约为0.5MB。(可以通过JVM标志-Xss或者通过Thread的构造函数来修改这个值)。如果将2^32除以每个线程的栈大小,那么线程数量将被限制为几千到几万。其他一些因素,例如操作系统的限制等,则可能会施加更加严格的约束。

线程池,从字面含义来看,是指管理一组同构工作线程的资源池。线程池是与工作队列Work Queue密切相关的,其中在工作队列中保存了所有等待执行的任务。工作者线程Worker Thread的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。

可以通过调用Executors中的静态工厂方法之一来创建一个线程池:

  • newFixedThreadPool。将创建一个固定长度的线程池,每当提交一个任务时就创建一个线程。知道达到线程池的最大数量,这时线程池的规模将不再变化(如果某个线程由于发生了未预期的Exception而结束,那么线程池会补充一个新的线程)
  • newCachedThreadPool将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
  • newSingleThreadExecutor是一个单线程的Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。newSingleThreadExecutor能确保按照任务在队列中的顺序来串行执行(例如FIFO、LIFO、优先级)。
  • newScheduledThreadPool创建了一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer。

Executor的实现通常会创建线程来执行任务。但JVM只有在所有非守护线程全部终止后才会退出。因此,如果无法正确地关闭Executor,那么JVM将无法结束。

ExecutorService扩展了Executor接口,添加了一些用于生命周期管理的方法(同时还有一些用于任务提交的便利方法)。ExecutorService的生命周期有3种状态:运行、关闭和已终止。ExecutorService在初始创建时处于运行状态。shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成——包括那些还未开始执行的任务。shutdownNow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。

在ExecutorService关闭后提交的任务将由拒绝执行处理器Rejected Execution Handler来处理,他会抛弃任务,或者使得execute方法抛出一个未检查的RejectedExecutionException。等所有任务都完成后,ExecutorService将转入终止状态。可以调用awaitTermination来等待ExecutorService到达终止状态,或者通过调用isTerminated来轮询ExecutorService是否已经终止。通常在调用awaitTermination之后会立即调用shutdown,从而产生同步地关闭ExecutorService的效果。

Timer类负责管理延迟任务以及周期任务。然而,Timer存在一些缺陷,因此应该考虑使用ScheduledThreadPoolExecutor来代替它。

Timer在执行所有定时任务时置灰创建一个线程。如果某个线程的执行时间过长,那么将破坏其他TimerTask的定时精确性。Timer的另一个问题是,如果TimerTask抛出了一个未检查的异常,那么Timer将表现出糟糕的行为。Timer线程并不捕获异常。因此当TimerTask抛出未检查的异常时将终止定时线程。这种情况下,Timer也不会恢复线程的执行,而是会错误地认为整个Timer都被取消了。因此,已经被调度但尚未执行的TimerTask将不会再执行,新的任务也不能被调度。Java5.0之后将很少使用Timer,应该使用ScheduledThreadPoolExecutor。如果要构建自己的调度服务,那么可以使用DelayQueue,它实现了BlockingQueue,并为ScheduledThreadPoolExecutor提供调度功能。DelayQueue管理着一组Delayed对象。每个Delayed对象都有一个相应的延迟时间:在DelayQueue中,只有某个元素逾期后,才能从DelayQueue中执行take操作。从DelayQueue中返回的对象将根据它们的延迟时间进行排序。

Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。get方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已经完成,那么get会立即返回或者抛出一个Exception,如果任务没有完成,那么get将阻塞并直到任务完成。如果任务抛出了异常,那么get将该异常封装为ExecutionException并重新抛出。如果任务被取消,那么get将抛出CancellationException。如果get抛出了ExecutionException,可以通过getCause来获得被封装的初始异常。

CompletionService将Executor和BlockingQueue的功能融合在一起。可以将Callable任务提交给它来执行,然后使用类似于队列操作的take和poll方法来获得已完成的结果,而这些结果会在完成时将被封装为Future。ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。

ExecutorCompletionService的实现非常简单。在构造函数中创建一个BlockingQueue来保存计算完成的结果。当计算完成时,调用FutureTask的done方法。当提交某个任务时,该任务将首先包装为一个QueueingFuture,这是FutureTask的一个子类,然后再改写子类的done方法,并将结果放入BlockingQueue中。

ExecutorService的invokeAll方法的参数为一组任务,并返回一组Future。invokeAll按照任务结合中迭代器的顺序将所有的Future添加到返回的集合中,从而使调用者能将各个Future与其表示的Callable关联起来。当所有任务都执行完毕时,或者调用线程被中断时,又或者超过指定时限时,invokeAll将返回。但超过指定时限后,任何还未完成的任务都会取消。当invokeAll返回后,每个任务要么正常地完成,要么被取消,而客户端代码可以调用get或isCancelled来判断究竟是何种情况。

results matching ""

    No results matching ""