上節, 我們提到, 在非同步任務程式中, 一種常見的場景是, 主執行緒提交多個非同步任務, 然後希望有任務完成就處理結果, 並且按任務完成順序逐個處理, 對於這種場景, Java並發包提供了一個方便的方法, 使用CompletionService, 這是一個介面, 它的實現類是ExecutorCompletionService, 本節我們就來探討它們。
基本用法
介面和類定義
與77節介紹的ExecutorService一樣, CompletionService也可以提交非同步任務, 它的不同是, 它可以按任務完成順序獲取結果, 其具體定義為:
public interface CompletionService { Future submit(Callable task); Future submit(Runnable task, V result); Future take throws InterruptedException; Future poll; Future poll(long timeout, TimeUnit unit) throws InterruptedException; }其submit方法與ExecutorService是一樣的, 多了take和poll方法, 它們都是獲取下一個完成任務的結果, take會阻塞等待, poll會立即返回, 如果沒有已完成的任務,
CompletionService的主要實現類是ExecutorCompletionService, 它依賴於一個Executor完成實際的任務提交, 而自己主要負責結果的排隊和處理, 它的構造方法有兩個:
public ExecutorCompletionService(Executor executor) public ExecutorCompletionService(Executor executor, BlockingQueue至少需要一個Executor參數, 可以提供一個BlockingQueue參數, 用作完成任務的佇列, 沒有提供的話, ExecutorCompletionService內部會創建一個LinkedBlockingQueue。
基本示例
public class CompletionServiceDemo { static class UrlTitleParser implements Callable { private String url; public UrlTitleParser(String url) { this.url = url; } @Override public String call throws Exception { Document doc = Jsoup.connect(url).get; Elements elements = doc.select("head title"); if (elements.size > 0) { return url + ": " + elements.get(0).text; } return null; } } public static void parse(List urls) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(10); try { CompletionService completionService = new ExecutorCompletionService<>( executor); for (String url : urls) { completionService.submit(new UrlTitleParser(url)); } for (int i = 0; i < urls.size;="" i++)="" {="" future="" result="completionService.take;" try="" {="" system.out.println(result.get);="" }="" catch="" (executionexception="" e)="" {="" e.printstacktrace;="" }="" }="" }="" finally="" {="" executor.shutdown;="" }="" }="" public="" static="" void="" main(string[]="" args)="" throws="" interruptedexception="" {="" list="" urls="Arrays.asList(new" string[]="" {="" "http://www.cnblogs.com/swiftma/p/5396551.html",="" "http://www.cnblogs.com/swiftma/p/5399315.html",="" "http://www.cnblogs.com/swiftma/p/5405417.html",="" "http://www.cnblogs.com/swiftma/p/5409424.html"="" });="" parse(urls);="" }="">在parse方法中, 首先創建了一個ExecutorService, 然後才是CompletionService, 通過後者提交任務、按完成順序逐個處理結果, 這樣, 是不是很方便?
基本原理
ExecutorCompletionService是怎麼讓結果有序處理的呢?其實, 也很簡單, 如前所述, 它有一個額外的佇列, 每個任務完成之後, 都會將代表結果的Future入隊。
那問題是, 任務完成後, 怎麼知道入隊呢?我們具體來看下。
在77節我們介紹過FutureTask, 任務完成後, 不管是正常完成、異常結束、還是被取消,
它的實現為空, 但它是一個protected方法, 子類可以重寫該方法。
在ExecutorCompletionService中, 提交的任務類型不是一般的FutureTask, 而是一個子類QueueingFuture, 如下所示:
public Future submit(Callable task) { if (task == null) throw new NullPointerException; RunnableFuture f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }該子類重寫了done方法, 在任務完成時將結果加入到完成佇列中, 其代碼為:
private class QueueingFuture extends FutureTask { QueueingFuture(RunnableFuture task) { super(task, null); this.task = task; } protected void done { completionQueue.add(task); } private final Future task; }ExecutorCompletionService的take/poll方法就是從該佇列獲取結果, 如下所示:
public Future take throws InterruptedException { return completionQueue.take; }實現invokeAny
我們在77節提到, AbstractExecutorService的invokeAny的實現, 就利用了ExecutorCompletionService, 它的基本思路是, 提交任務後, 通過take方法獲取結果, 獲取到第一個有效結果後, 取消所有其他任務, 不過, 它的具體實現有一些優化, 比較複雜。 我們看一個模擬的示例, 從多個搜尋引擎查詢一個關鍵字, 但只要任意一個的結果就可以, 模擬代碼如下:
public class InvokeAnyDemo { static class SearchTask implements Callable { private String engine; private String keyword; public SearchTask(String engine, String keyword) { this.engine = engine; this.keyword = keyword; } @Override public String call throws Exception { // 類比從給定引擎搜索結果 Thread.sleep(engine.hashCode % 1000); return " " + keyword; } } public static String search(List engines, String keyword) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(10); CompletionService cs = new ExecutorCompletionService<>(executor); ListSearchTask模擬從指定搜尋引擎查詢結果, search利用CompletionService/ExecutorService執行併發查詢, 在得到第一個有效結果後, 取消其他任務。
小結
本節比較簡單, 主要就是介紹了CompletionService的用法和原理, 它通過一個額外的結果佇列, 方便了對於多個非同步任務結果的處理。
下一節, 我們來探討一種常見的需求 - 定時任務。
(與其他章節一樣, 本節所有代碼位於 https://github.com/swiftma/program-logic)
----------------