清晰的任务边界
在理想情况下,各个任务之间是相互独立的:任务并不依赖于其他任务的状态,结果或边界效应。 独立性有助于实现并发。
大多数服务器应用程序都提供了一种自然的任务边界选择方式,以独立的客户请求为边界。
在应用程序中可以通过多种策略来调度任务,而其中一些策略能够更好地利用潜在并发性。
最简单的策略就是在单个线程中串行地执行各项任务。
SingleThreadWebServer将串行地处理它的任务(通过80端口接收到HTTP请求)。
// 6-1 串行的Web服务器(并不好) public class SingleThreadWebServer { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { //处理多个请求 Socket connection = socket.accept();//通过80端口接收到HTTP请求 handleRequest(connection); //处理请求 } } private static void handleRequest(Socket connection) { // 请求处理 } }通过80端口接收到HTTP请求很简单且理论上是正确的,但在实际生产环境中的执行性能很糟糕,因为它每次只能处理一个请求。主线程在接受连接与处理相关请求等操作之间不断交替运行。
在Web请求的处理中包含了一组不同的运算与I/O操作。服务器必须处理套接字I/O以读取请求和写回响应,这些操作通常会由于网络阻塞或连通性问题而被阻塞。
在服务器应用程序中,串行处理机制通常都无法提供高吞吐率或快速响应性。
通过为每个请求创建一个新的线程来提供服务,从而实现更高的响应性。
// 6-2 在Web服务器中为每个请求启动一个新的线程(不要这么做) public class ThreadPerTaskWebServer { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { //多个请求 final Socket connection = socket.accept(); Runnable task = new Runnable() { //为每个请求启动一个新的线程 public void run() { handleRequest(connection); } }; new Thread(task).start();//线程启动 } } private static void handleRequest(Socket connection) { // request-handling logic here } }ThreadPerTaskWebServer的结构与串行版本类似——主线程仍然不断地交替执行“接受外部请求”与“分发请求”等操作。区别在于,对于每个连接,主循环都创建一个新线程来处理请求,而不是在主循环中处理。
由此得出3个主要结论: ①任务处理过程从主线程中分离出来,使得主循环能够更快地等待下个到来的连接。这使得程序在完成前面的请求之前可以接受新的请求,从而提高响应性。 ②任务可以并行处理 ,从而同时服务多个请求。如果有多个处理器,或者任务由于某个原因被阻塞,例如等待I/O完成,获取锁或者资源可用性等,程序的吞吐率将得到提高。 ③任务处理代码必须是线程安全的,因为当有多个任务时会并发地调用这段代码。
正常情况下,“为每个任务分配一个线程”能提升串行执行的性能。只要请求的到达速率不超出服务器的请求处理能力,那么这种方法可以同时带来更快的响应性和更高的吞吐率。
“为每个任务分配一个线程”存在一些缺陷,特别是当需要创建大量的线程时: ①线程生命周期的开销非常高。
②资源消耗。 活跃的线程会消耗系统资源,尤其是内存。
③稳定性 在可创建线程的数量上存在一个限制。
在一定的范围内,增加线程可以提高系统的吞吐率,但如果超出了这个范围,在创建更多的线程只会降低程序的执行速度,并且如果过多地创建一个线程,那么整个应用程序将崩溃。
要想避免这种危险,就应该对应用程序可以创建的线程数量进行限制,并且全面地测试应用程序,从而确保在线程数量达到限制时,程序也不会耗尽资源。
“为每个任务分配一个线程”问题在于,没有限制可创建线程的数量,只限制了远程用户提交HTTP请求的速率。
任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。
把所有任务放在单个线程中,以及将每个任务放在各自的线程中执行,这两种方式都存在一些严格限制:串行执行的问题在于其糟糕的响应性和吞吐率,而“为每个任务分配一个线程”问题在于资源管理的复杂性。
线程池简化了线程的管理任务,并且java.util.concurrent提供了一种灵活的线程池实现作为Executor框架的一部分。在Java类库中,任务执行的主要抽象不是Thread,而是Executor。
// Excutor接口 public interface Executor{ void execute(Runnable command); }Executor是个简单的接口,为灵活且强大的异步执行任务框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。
Executor基于生产者-消费者模式,提交任务的操作相当于生成者(生成待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。
如果要在程序中实现一个生产者-消费者的设计,那么最简答的方式就是使用Executor。(而我们之前的章节中使用的是阻塞队列来实现的,利用其take和put方法)
TaskExecutionWebServer 代替了硬编码的线程创建过程。
使用了标准的Executor实现,即一个固定长度的线程池,可以容纳100个线程
// 6-4 基于线程池的Web服务器 public class TaskExecutionWebServer { private static final int NTHREADS=100; private static final Executor exec= Executors.newFixedThreadPool(NTHREADS); //创建了一个固定长度的线程池,可以容纳100个线程 public static void main(String[] args)throws IOException{ ServerSocket socket=new ServerSocket(80);//创建一个Server Socket绑定到80端口自 while(true){ final Socket connection=socket.accept(); Runnable task=new Runnable(){ public void run(){ handleRequest(connection); } }; exec.execute(task);//将任务提交到工作队列中,在某个时刻被工作线程取出并执行 } } private static void handleRequest(Socket connection) { //处理请求 } }通过使用Executor,将请求处理任务的提交与任务的实际执行解耦开来,并且只需采用另一种不同的Executor实现,就可以改变服务器的行为。
通常,Executor的配置是一次性,因此可以在部署阶段可以完成,而提交任务的代码却会不断地扩散到整个程序中,增加了修改的难度。
我们可以将TaskExecutionWebServer修改为类似ThreadPerTaskWebServer的行为,只需使用一个为每个请求都创建新线程的Executor。
// 6-5 为每个请求启动一个新线程的Executor public class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); }; }同样,可以编写一个Executor使TaskExecutionWebServer的行为类似于单线程的行为,即以同步的方法执行每个任务,然后再返回。
// 6-6 在调用线程中以同步方式执行所有任务的Executor public class WithinThreadExecutor implements Executor { public void execute(Runnable r) { r.run(); }; }通过将任务的提交与执行解耦开来,从而无须太大的困难就可以为某种类型的任务指定和修改执行策略。
执行策略包括: ①在什么线程中执行任务 ②任务按照什么顺序执行(FIFO,LIFO,优先级) ③有多少个任务可并发执行 ④在队列中有多少个任务在等待执行 ⑤如果系统由于过程而需要拒绝一个任务,应该选择哪一个任务?另外,如何通知应用程序有任务被拒绝? ⑥在执行一个任务之前或之后,应该进行哪些动作
各种执行策略都是一种资源管理工具,最佳策略取决于可用的计算资源以及对服务质量的需求。
每当看到下面这中形式的代码时: new Thread(runnable).start() 并且你希望获得一种更灵活的执行策略时,请考虑使用Executor来代替Thread
线程池,是指管理一组同构工作线程的资源池。线程池是与工作队列(Work Queue)密切相关的,其中在工作队列中保存了所有等待执行的任务。工作者线程(Worker Thread)的任务很简单,从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。
“在线程池中执行任务”比“为每个任务分配一个线程”优势更多。 通过重用现有的线程而不是创建新的线程,可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销。 另一个额外的好处就是,当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性。 通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态,同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。
向线程池提交一个任务后,它的主要处理流程如下图所示: 一个线程从被提交(submit)到执行共经历以下流程: 线程池判断核心线程池里是的线程是否都在执行任务,如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下一个流程 线程池判断工作队列是否已满。如果工作队列没有满,则将新提交的任务储存在这个工作队列里。如果工作队列满了,则进入下一个流程。 线程池判断其内部线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已满了,则交给饱和策略来处理这个任务。
可以通过调用Executor中的静态工厂方法之一来创建一个线程池: ①newFixedThreadPool(Fixed为固定的)。newFixedThreadPool将创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化(如果某个先后才能由于发生了未预期的Exception而结束,那么线程池会补充一个新的线程)
②newCachedThreadPool。newCachedThreadPool将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
③newScheduledThreadPool(Scheduled,预定的)。newScheduledThreadPool创建一个固定长度的线程,而且以延迟或定时的方式来执行任务,类似Timer。
newFixedThreadPool和newCachedThreadPool这两个工厂方法返回类似通用的ThreadPoolExecutor实例,这些实例可以直接用来构造专门用途的executor。
TaskExecutionWebServer(6.2.1)中的Web服务器使用了一个带有有界线程池的Executor。通过execute方法将任务提交到工作队列中,工作线程反复地到工作队列中取出任务并执行它们。
“为每个任务分配一个线程”策略编程基于线程池的策略,将对应用程序的稳定产生重大的影响: Web服务器不会在高负载下失败(不会因创建过多线程而失败)。
通过Executor,可以实现各种调优,管理,监视,记录日志,错误报告和其他功能,如果不使用任务执行框架,那么要增加这些功能是非常困难的。
Executor的实现通常会创建线程来执行任务。但JVM只有在所有(非守护)线程全部终止后才会退出,因此,如果无法正确地关闭Executor,JVM将无法结束。
由于Executor以异步方式来执行任务,因此在任何时刻,之前提交任务的状态不是立即可见的。有些任务可能已经完成,有些可能正在运行,而其他的任务可能在队列中等待执行。
Executor是为应用程序提供服务的,它们是可关闭的,并将在关闭操作中受影响的任务的状态反馈给应用程序。
为了解决执行服务的生命周期问题,ExecutorService接口继承了Executor,添加了一些用于生命周期管理的方法(同时还有一些用于任务提交的便利方法)。
// 6-7 ExecutorService中的生命周期管理方法 public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); //关闭 boolean isTerminated(); //已终结 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // ... 其他用于任务提交的便利方法 }ExecutorService的生命周期有三种状态:运行,关闭和已终止。ExecutorService在初始创建时处于运行状态。shutdown方法将执行平缓的关闭过程:不在接受新的任务,同时等待已经提交的任务执行完成——包括那么还没有开始执行的方法。 shutdownnow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。
在ExecutorService关闭提交的任务将由“拒绝执行处理器(Rejected Execution Handler)”来处理(8.3.3中),它会抛弃任务,或者使得execute方法抛出一个未检查的RejectedExecutionException。 等所有任务都完成后,ExecutorService将转入终止状态。可以调用awaitTermination来等待ExecutorService达到终止状态,或者通过调用isTerminated来轮询ExecutorService是否已经终止。 通常在调用awaitTermiation之后会立即调用shutdown,从而产生同步地关闭ExecutorService的效果。
LifecycleWebServer通过增加生命周期来扩展Web服务器的功能。
// 6-8 支持关闭操作的Web服务器 public class LifecycleWebServer { private final ExecutorService exec = Executors.newCachedThreadPool(); //新建可缓存的线程池 public void start() throws IOException { ServerSocket socket = new ServerSocket(80); while (!exec.isShutdown()) { //只要未被关闭,exec.isShutdown当关闭返回true try { final Socket conn = socket.accept(); exec.execute(new Runnable() { public void run() { handleRequest(conn); } }); } catch (RejectedExecutionException e) { //在关闭后提交的任务,会抛出异常 if (!exec.isShutdown()) //如果还没关闭 log("task submission rejected", e); } } } public void stop() { exec.shutdown(); //不再接受新的任务,同时等待已经提交的任务执行完成——包括还未开始执行的任务 } private void log(String msg, Exception e) { Logger.getAnonymousLogger().log(Level.WARNING, msg, e); //创建一个匿名的记录器 } void handleRequest(Socket connection) { Request req = readRequest(connection); if (isShutdownRequest(req)) stop(); //不再接受新的任务,同时等待已经提交的任务执行完成——包括还未开始执行的任务 else dispatchRequest(req); } interface Request { } private Request readRequest(Socket s) { //读取ExecutorService生命周期的状态 return null; } private void dispatchRequest(Request r) { } private boolean isShutdownRequest(Request r) { //已被关闭的请求 return false; } }可以通过两种方法来关闭Wen服务器,在程序中调用stop,或者以客户端请求形式向Web服务器发送一个特定格式的HTTP请求。
Timer类负责管理延迟任务(在“100ms后执行该任务”)和周期任务(“每10ms执行一次该任务”)。 然而Timer存在一些缺陷(Timer支持基于对绝对事件而不是相对的调度机制,因此任务的执行对系统时钟变化很敏感),因此应该考虑使用ScheduledThreadPoolExecutor(只支持基于相对时间的调度)来替代它。可以通过ScheduledThreadPoolExecutor的构造函数或newScheduledThreadPool工厂方法来创建该类的对象。
Timer在执行所有定时任务时只会创建一个线程。如果某个任务的执行时间过长,那么将破坏其他TimerTask的定时准确性。假如某个周期的TimerTask需要每10ms执行一次,另一个TimerTask需要执行40ms,那么这个周期任务在40ms任务执行完成后快速连续地调用4次,或者彻底“丢失”4次调用(取决于它是基于固定速率(fixed rate)来调度还是基于固定延时(fixed delay)来调度)。 线程池能弥补这个缺陷,它可以提供多个线程来执行延时任务和周期任务。
Timer的另一个问题是:如果TimerTask抛出了一个未检查的异常,那么Timer将表现出糟糕的行为。 Timer线程并不捕获异常,因此当TimerTask抛出未检查的异常时将终止定时线程。 这种情况下,Timer也不会恢复线程的执行,而是错误地认为整个Timer都被取消了。 因此,已经被调度但尚未执行的TimeTask将不会再执行,新的任务也不能被调用。(这个问题被称为“线程泄漏(Thread Leakage)”,7.3中)
下面的OutOfTime中给出Timer为什么会出现这个问题,以及如何使得试图提交TimerTask的调用者也出现问题。
// 6-9 错误的Timer行为 public class OutOfTime { public static void main(String[] args) throws Exception { Timer timer = new Timer(); //新建一个Timer timer.schedule(new ThrowTask(), 1); //排定任务在延迟后执行 SECONDS.sleep(1);//SECONDS为秒单位,sleep休眠1秒 timer.schedule(new ThrowTask(), 1); SECONDS.sleep(5); } static class ThrowTask extends TimerTask { public void run() { throw new RuntimeException(); } } }你可能会认为程序会运行6秒后退出,但实际情况是运行1秒就结束了,并抛出了一个异常信息“Timer already cancelled”。
ScheduledThreadPoolExecutor能正确处理这些变现出错误的任务。在Java5以上的JDK中,Timer将很少被使用。
如果要构建自己的调度服务,那么可以使用DelayQueue,它实现了BlockingQueue,并为ScheduledThreadPoolExecutor提供调度功能。 DelayQueue管理着一组Delayed对象。每个Delayed对象都有一个相应的延迟时间:在DelayQueue中,只有某个元素逾期后,才能从DelayQueue中执行take操作。 从DelayQueue中返回的对象将根据它们的延迟时间进行排序。
Executor框架帮助指定执行策略,要使用Executor,必须将任务表示为一个Runnable。
在多数服务器应用程序中都存在一个明显的任务边界:单个客户请求。但有时候,任务边界并非是显而易见的。 例如在许多桌面应用程序中,即使是服务器应用程序,在单个客户请求中仍可能存在可发掘的并行性,例如数据库服务器。
本节我们尝试开发不同版本的组件,并且每个版本都实现了不同程度的并发性。
6.3.1 串行的页面渲染器(Sequential Page Renderer)
浏览器程序中的页面渲染(Page-Rendering)功能,它的作用是将HTML页面绘制到图像缓存中。为了简单,假设HTML页面只包含标签文本,以及预定大小的图片和URL。
最简单的方法就是对HTML文档进行串行处理。当遇到文本标签时,将其绘制到图像缓存中。当遇到图像引用时,想通过网络获取它,然后再绘制到图像缓存中。
这很容易实现,程序只需将输入中的每个元素处理一次(甚至不需要缓存文档),但这种方法可能会令用户感到烦恼,他们必须等待很长时间,直到显示所有的文本。
另一种方法是串行执行。它先绘制文本元素,同时为图像预留出矩形的占位空间。在处理完第一遍文本后,程序开始下载图像,并将它们绘制到相应的占位空间中。
// 6-10 串行地渲染页面元素(不好) public abstract class SingleThreadRenderer { void renderPage(CharSequence source){//CharSequence是char类型的值的一个可读序列 renderText(source); //渲染文本 List<ImageData> imageData=new ArrayList<ImageData>(); for(ImageInfo imageInfo:scanForImageInfo(source)) imageData.add(imageInfo.downloadImage()); //下载图片 for(ImageData data:imageData) renderImage(data); //渲染图片 } interface ImageData { } interface ImageInfo { ImageData downloadImage(); } abstract void renderText(CharSequence s); abstract List<ImageInfo> scanForImageInfo(CharSequence s); abstract void renderImage(ImageData i); }图像下载过程的大部分时间都是在等待I/O操作执行完成,在这期间CPU几乎不做任何工作,因此这种串性方法没有充分利用CPU,使得用户在看到最终页面之前要等待过长的时间。
通过将问题分解为多个独立的任务并发执行,能够获得更高的CPU利用率和响应灵敏度。
Executor框架能使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象,虽然run能写入到日志文件或者将结果放入某个共享的数据结构中,但它不能返回一个值或抛出一个受检查的异常。
许多任务实际上都是存在延迟的计算——执行数据库查询,从网络上获取资源,或者计算某个复杂的功能。 对于这些任务,Callable是一种更好的抽象:它预料主入口点(即call)将返回一个值,并可以抛出一个异常。(如果要使用Callable来表示无返回值的任务,可使用Callable).
Runnable和Callable描述的都是抽象的计算任务。这些任务通常是有范围的,即都有一个明确的起始点,并且最终会结束。
Executor执行的任务有4个生命周期阶段:创建,提交,开始和完成。 由于有些任务可能要执行很长时间,因此通常希望能够取消这些任务。在Executor框架中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当它们响应中断时,才能取消。取消一个已经完成的任务不会有任何影响。
Callable 和Runnable和Future都是接口。
// 6-11 Callable接口 public interface Callable<V> { V call() throws Exception; }Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。
// 6-11 Future接口 public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException, CancellationException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException; }Future可以拿到异步执行任务的返回值
Future规范中包含的隐含意义是,任务的生命周期只能向前,不能后退,就像ExecutorService的生命周期一样。当某个任务完成后,它就永远停留在“完成”状态上。
get方法的行为取决与任务的状态(尚未开始,正在运行,已完成)。 如果任务已完成,那么get会立即返回或抛出一个Exception。 如果任务没有完成,那么get将阻塞并知道任务完成。 如果任务抛出了异常,那么get将异常封装为ExecutionException并重新抛出。 如果任务被取消,那么get将抛出CancellationException。 如果get抛出了ExecutionExecutor,那么可以通过getCause来获得被封装的初始异常。
可以通过多种方法来创建一个future来描述任务。ExecutorService中的所有submit方法都将返回一个Future,从而将一个Runnable或Callable提交给Executor,并得到一个Future用来获得任务的执行结果或取消任务。 还可以显示地为某个指定的Runnable或Callable实例化一个FutureTask(由于FutureTask实现了Runnable,因此可以将它提交给Executor来执行,或者直接调用它的run方法)
ExecutorService实现可以改写AbstractExecutorService中的newTaskFor方法,从而根据已提交的Runnable或Callable来控制Future的实例化过程。
在默认实现中仅创建了一个新的FutureTask
// 6-12 newTask的默认实现 protected <T> RunnableFuture<T> newTaskFor(Callable<T> task) { return new FutureTask<T>(task); }在将Runnable或Callable提交到Executor的过程中,包含了一个安全发布过程(详见3.5),即将Runnable或Callable从提交线程发布到最终执行任务的线程。 类似地,在设置Future结果的过程中也包含了一个安全发布,即将这个结果从计算它的线程发布到任何通过get获得它的线程。
为了使页面渲染器实现更高的并发性,首先将渲染过程分解为两个任务,一个是渲染所有的文本,一个是下载所有的图像(因为其中一个任务是CPU密集型,而另一个任务是I/O密集型,因此这种方法即使在单CPU系统上也能提升性能)。
Callable和Future有组与表示这些协同任务之间的交互。
在FutureRenderer中创建一个Callable来下载所有的图像,并将其提交到一个ExecutorService。这将返回一个描述任务执行情况的Future。
当主任务需要图像时,它会等到Future.get调用结果,如果幸运的话,当开始请求时所有图像就已经下载完成,即使没有,至少图像的下载任务也已经提前开始了。
// 6-13 使用Future等待图片下载 public abstract class FutureRenderer { private final ExecutorService executor=Executors.newCachedThreadPool();//新建线程池 void renderPage(CharSequence source){ final List<ImageInfo> imageInfos=scanForImageInfo(source);//页面信息 // 新建了一个Callable来下载所有的图像。 Callable<List<ImageData>> task= new Callable<List<ImageData>>(){ public List<ImageData> call(){ //图片 List<ImageData> result= new ArrayList<ImageData>(); for(ImageInfo imageInfo :imageInfos) result.add(imageInfo.downloadImage());//下载图片 return result; //Callable会返回结果 } }; //提交一个带返回值的任务,并且返回一个代表即将得到任务的结果的Future Future<List<ImageData>> future=executor.submit(task); renderText(source); //渲染文本 try{ //当主任务需要图像时,它会等到Future.get调用结果,如果幸运的话,当开始请求时所有图像就已经下载完成,即使没有,至少图像的下载任务也已经提前开始了。 List<ImageData> imageData=future.get(); //得到图片 for(ImageData data:imageData) renderImage(data); //渲染图片 }catch (InterruptedException e) { //重新设置线程的中断状态 Thread.currentThread().interrupt(); //由于不需要结果,因此取消任务。 future.cancel(true); }catch (ExecutionException e) { throw LaunderThrowable.launderThrowable(e.getCause()); } } interface ImageData { } interface ImageInfo { ImageData downloadImage(); } abstract void renderText(CharSequence s); abstract List<ImageInfo> scanForImageInfo(CharSequence s); abstract void renderImage(ImageData i); }get方法拥有“状态依赖”的内在特性,因而调用者不需要知道任务的状态,此外在任务提交和获得结果中包含的发布属性也确保了这个方法是线程安全的。
Future.get的异常处理代码将处理两个可能的问题:任务遇到一个Exception,或者调用get的线程在获得结果之前被中断(详见5.5.2节和5.4)
FutureRenderer使得渲染文本任务与下载图片数据的任务并发地执行。当所有图像下载完成后,会显示到页面上,这将提升用户体验,不仅使用户更快地看到结果,还有效利用了并行性。
但我们还可以做得更好,用户不用等待所有的图像都下载完成,而希望看到每当下载完一幅图像就立刻显示出来。
在FutureRenderer中,我们尝试并行地执行两个不同类型的任务-下载图像与渲染页面。然而,通过对异构任务进行并行化来获得重大的性能提升是困难的。
两个人可以很好地分担洗碗的工作:其中一个人负责清洗,另一个负责烘干。然而,要将不同类型的任务平均分给每个工人并不容易。当人数增加时,如果确保他们能帮忙而不是妨碍其他人工作,或者在重新分配工作时,并不是容易的事。如果没有在相似的任务之间找出细粒度的并行性,那么这种方法的好处将减少。
当在多个工人之间分配异构的任务时,还有一个问题就是各个任务的大小可能完全不同。 当在多个工人之间分解任务时,还需要一定的任务协调开销:为了使任务分解能提高性能,这种开销不能高于并行性实现的提升。
FutureRenderer使用了两个任务,其中一个复杂渲染文本,另一个负责下载图像。如果渲染文本的速度远远高于下载图像速度(可能性很大),那么程序的最终性能与串行执行相比性能差别不大,而代码却复杂了。
虽然做了很多工作来并发执行异构任务以提高并发度,但从中得到的并发性却是十分有限的。
只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的工作负载分配到多个任务中带来的真正性能提升。
6.3.5CompletionService: Executor 与 BlockingQueue 如果想Executor提交了一组计算任务,并且希望在计算完成后得到结果,那么可以保留与每个任务关联的Future,然后反复使用get,同时将参数timeout指定为0,从而通过轮询来判断任务是否可行。
这种方法虽然可行,但很繁琐,还有一种更好的方法:完成服务(CompletionService) CompletionService将Executor与BlockingQueue的功能融合在一起。你可以将Callable任务提交给它执行,然后使用类似与队列操作的take和poll等方法来获得已完成的结果。而这些结果会在完成时被封装为Future。ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。
ExecutorCompletionService的实现很简单。在构造函数中创建一个BlockingQueue来保存计算完成的结果。当计算完成时,调用Future-Task中的done方法。当提交给某个任务时,该任务将首先包装为一个QueueingFuture,这是FutureTask的一个子类,然后再改写子类的done方法,并将结果放入BlockingQueue中。
// 6-14 由ExecutorCompletionService使用的QueueingFuture类 private class QueueingFuture<V> extends FutureTask<V> { QueueingFuture(Callable<V> c) { super(c); } QueueingFuture(Runnable t, V r) { super(t, r); } protected void done() { completionQueue.add(this); } }可以通过CompletionServ从两个方法来提高页面渲染器的性能:缩短总时间以及提高响应性。
为每一幅图像的下载都创建一个独立任务,并从线程池中执行它们,从而将串行的下载过程转化为并行的过程:浙江减少下载的总时间。
此外,通过CompletionService中获取结果以及使每张图片在下载完成后立刻显示出来,使用户获得一个更加动态和更高响应性的用户界面。
// 6-15 使用CompletionService,使页面在下载完成后立即显示出来 public abstract class Renderer { //CompletionService将Executor与BlockingQueue的功能融合在一起 private final ExecutorService executor; Renderer(ExecutorService executor){ this.executor=executor; } void renderPage(CharSequence source){ List<ImageInfo> info=scanForImageInfo(source); //ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。 CompletionService<ImageData> completionService= new ExecutorCompletionService<ImageData>(executor); for(final ImageInfo imageInfo:info) //提交一个带返回值的任务,并且返回一个代表即将得到任务的结果的Future completionService.submit(new Callable<ImageData>(){ public ImageData call(){ return imageInfo.downloadImage(); //CompletionService中有类似队列的操作 } }); renderText(source); try{ //通过CompletionService中获取结果以及使每张图片在下载完成后立刻显示出来 for(int t=0,n=info.size();t<n;t++){ //取得并移除已完成任务,如果没有则等待 Future<ImageData> f=completionService.take(); ImageData imageData=f.get(); renderImage(imageData); } }catch (InterruptedException e) { Thread.currentThread().interrupt(); }catch (ExecutionException e) { throw LaunderThrowable.launderThrowable(e.getCause()); } } interface ImageData { } interface ImageInfo { ImageData downloadImage(); } abstract void renderText(CharSequence s); abstract List<ImageInfo> scanForImageInfo(CharSequence s); abstract void renderImage(ImageData i); }有时候,如果某个任务无法在指定时间内完成,那么将不再需要它的结果,此时可以放弃这个任务。 例如,某个Web应用程序从外部的广告服务器上获取广告信息,但如果该应用程序在两秒内得不到响应,那么将显示一个默认的广告,这样即使不能获得广告信息,也不会降低站点的响应性能。
在支持时间限制的Future.get中支持这种需求:当结果可用时,它立即返回,如果在指定时限内没有计算出结果,那么将抛出TimeoutException。
在使用限时任务时应注意,当这些任务超时后应该立即停止,从而避免为继续计算一个不再使用的结果而浪费计算资源。要实现这个功能,可以由人任务本身来管理它的限定事件,并且在超时后中职执行或取消任务。此时可再使用Future,如果一个限时的get方法跑出了TimeoutException,那么可以通过Future取消任务。future.cancel(true)。
RenderWithTimeBudget 给出了限时Future.get的一种典型应用。在它生成的页面中包括响应用户请求的内容以及从广告服务器上获得广告。它将获取广告的任务提交给一个Executor,然后计算剩余的广告文本内容,最后等待广告信息,知道超出限定的时间(传递给get的timeout参数的计算方法是,将指定时限减除当前时间,这可能会得到负数,但在这里与时限有关的负数都视为0)。如果get超时,那么将取消广告获取任务,并转而使用默认的广告信息。
// 6-16 在指定时间内获取广告信息 public class RenderWithTimeBudget { private static final Ad DEFAULT_AD = new Ad(); private static final long TIME_BUDGET = 1000; private static final ExecutorService exec = Executors.newCachedThreadPool(); Page renderPageWithAd() throws InterruptedException{ long endNanos=System.nanoTime()+TIME_BUDGET; //返回纳秒级的时间,再加上时限 Future<Ad> f=exec.submit(new FetchAdTask()); //在等待广告的同时显示页面 Page page=renderPageBody(); Ad ad; try{ //只等待指定的时间长度 long timeLeft=endNanos-System.nanoTime(); ad = f.get(timeLeft, NANOSECONDS);//在指定时限内获取,NANOSECONDS为时间单位 }catch (ExecutionException e) { ad=DEFAULT_AD; }catch (TimeoutException e) { //如果超时了,广告转为默认广告,并取消获取任务 ad=DEFAULT_AD; f.cancel(true); } page.setAd(ad); //为页面设置广告 return page; } Page renderPageBody() { return new Page(); } static class Ad { } static class Page { public void setAd(Ad ad) { } } static class FetchAdTask implements Callable<Ad> { public Ad call() { return new Ad(); } } }考虑这样一个旅行预定门户网站:用户输入旅行的日期和其他要求,门户网站获取并显示来自多条航线,旅店或汽车租赁公司的报价。可能会调用Web服务,访问数据库,执行一个EDI事物或其他机制。
在这种情况下,不宜让页面的响应时间受限于最慢的响应时间,而应该只显示在指定时间内收到的信息。对于没有及时响应的服务提供者,页面可以忽略或者显示提示信息。
从一个公司获取报价的过程与其他公司获得报价的过程无关,因此可以将获取报价的过程当成一个任务,从而使获得报价的过程能并发执行。创建n个任务,将其提交到线程池,保留n个Futrue,并使用限时的get方法通过Future串行地获取每一个结果,这一切都很简单,但我们还可以使用一个更简单的方法——invokeAll(invoke 援引)
下面的代码使用了支持显示的invokeAll,将多个任务提交到一个ExecutorService并获得结果。
InvokeAll方法的参数为一组任务,并返回一组Future。这两个集合有着相同的结构。 InvokeAll按照任务集合中迭代器额顺序肩所有的Future添加到返回的集合中,从而使调用者能将各个Future与其表示Callable关联起来。当所有任务都执行完毕时,或者调用线程被中断时,又或者超过指定时限时,invokeAll将返回。将超过指定时限后,任何还未完成的任务都会被取消。当invokeAll返回后,每个任务要么正常地完成,要么被取消,没有正在执行的任务,而客户端可以调用get或isCancelled来判断究竟是何种情况。
// 6-17 在预定时间内请求旅游报价 // 请求旅游报价的方法 private class QuoteTask implements Callable<TravelQuote>{ private final TravelCompany company; private final TravelInfo travelInfo; public TravelQuote call()throw Exception{ return company.solicitQuote(travelInfo); } } public List<TravelQuote> getRankedTravelQuotes( TravelInfo travelInfo, Set<TravelCompany> companies, Comparator<TravelQuote> ranking(long time, TimeUnit unit) throws InterruptedException { List<QuoteTask> tasks = new ArrayList<QuoteTask>(); //为每家公司添加报价任务 for (TravelCompany company : companies) tasks.add(new QuoteTask(company, travelInfo)); //InvokeAll方法的参数为一组任务,并返回一组Future ,用时限来限制时间 List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit); List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size()); Iterator<QuoteTask> taskIter = tasks.iterator(); for (Future<TravelQuote> f : futures) { QuoteTask task = taskIter.next(); try { //invokeAll按照任务集合中迭代器额顺序肩所有的Future添加到返回的集合中 quotes.add(f.get()); } catch (ExecutionException e) { quotes.add(task.getFailureQuote(e.getCause())); } catch (CancellationException e) { quotes.add(task.getTimeoutQuote(e)); } } Collections.sort(quotes, ranking); return quotes; }