您的位置:首頁>正文

電腦程式的思維邏輯 (79)-方便的CompletionService

上節, 我們提到, 在非同步任務程式中, 一種常見的場景是, 主執行緒提交多個非同步任務, 然後希望有任務完成就處理結果, 並且按任務完成順序逐個處理, 對於這種場景, Java並發包提供了一個方便的方法, 使用CompletionService, 這是一個介面, 它的實現類是ExecutorCompletionService, 本節我們就來探討它們。

基本用法

介面和類定義

與77節介紹的ExecutorService一樣, CompletionService也可以提交非同步任務, 它的不同是, 它可以按任務完成順序獲取結果, 其具體定義為:

public interface CompletionService { Future submit(Callable task); Future submit(Runnable task, V result); Future take throws InterruptedException; Future poll; Future poll(long timeout, TimeUnit unit) throws InterruptedException; }

其submit方法與ExecutorService是一樣的, 多了take和poll方法, 它們都是獲取下一個完成任務的結果, take會阻塞等待, poll會立即返回, 如果沒有已完成的任務,

返回null, 帶時間參數的poll方法會最多等待限定的時間。

CompletionService的主要實現類是ExecutorCompletionService, 它依賴於一個Executor完成實際的任務提交, 而自己主要負責結果的排隊和處理, 它的構造方法有兩個:

public ExecutorCompletionService(Executor executor) public ExecutorCompletionService(Executor executor, BlockingQueue completionQueue)

至少需要一個Executor參數, 可以提供一個BlockingQueue參數, 用作完成任務的佇列, 沒有提供的話, ExecutorCompletionService內部會創建一個LinkedBlockingQueue。

基本示例

public class CompletionServiceDemo { static class UrlTitleParser implements Callable { private String url; public UrlTitleParser(String url) { this.url = url; } @Override public String call throws Exception { Document doc = Jsoup.connect(url).get; Elements elements = doc.select("head title"); if (elements.size > 0) { return url + ": " + elements.get(0).text; } return null; } } public static void parse(List urls) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(10); try { CompletionService completionService = new ExecutorCompletionService<>( executor); for (String url : urls) { completionService.submit(new UrlTitleParser(url)); } for (int i = 0; i < urls.size;="" i++)="" {="" future="" result="completionService.take;" try="" {="" system.out.println(result.get);="" }="" catch="" (executionexception="" e)="" {="" e.printstacktrace;="" }="" }="" }="" finally="" {="" executor.shutdown;="" }="" }="" public="" static="" void="" main(string[]="" args)="" throws="" interruptedexception="" {="" list="" urls="Arrays.asList(new" string[]="" {="" "http://www.cnblogs.com/swiftma/p/5396551.html",="" "http://www.cnblogs.com/swiftma/p/5399315.html",="" "http://www.cnblogs.com/swiftma/p/5405417.html",="" "http://www.cnblogs.com/swiftma/p/5409424.html"="" });="" parse(urls);="" }="">

在parse方法中, 首先創建了一個ExecutorService, 然後才是CompletionService, 通過後者提交任務、按完成順序逐個處理結果, 這樣, 是不是很方便?

基本原理

ExecutorCompletionService是怎麼讓結果有序處理的呢?其實, 也很簡單, 如前所述, 它有一個額外的佇列, 每個任務完成之後, 都會將代表結果的Future入隊。

那問題是, 任務完成後, 怎麼知道入隊呢?我們具體來看下。

在77節我們介紹過FutureTask, 任務完成後, 不管是正常完成、異常結束、還是被取消,

都會調用finishCompletion方法, 而該方法會調用一個done方法, 該方法代碼為:

protected void done { }

它的實現為空, 但它是一個protected方法, 子類可以重寫該方法。

在ExecutorCompletionService中, 提交的任務類型不是一般的FutureTask, 而是一個子類QueueingFuture, 如下所示:

public Future submit(Callable task) { if (task == null) throw new NullPointerException; RunnableFuture f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }

該子類重寫了done方法, 在任務完成時將結果加入到完成佇列中, 其代碼為:

private class QueueingFuture extends FutureTask { QueueingFuture(RunnableFuture task) { super(task, null); this.task = task; } protected void done { completionQueue.add(task); } private final Future task; }

ExecutorCompletionService的take/poll方法就是從該佇列獲取結果, 如下所示:

public Future take throws InterruptedException { return completionQueue.take; }

實現invokeAny

我們在77節提到, AbstractExecutorService的invokeAny的實現, 就利用了ExecutorCompletionService, 它的基本思路是, 提交任務後, 通過take方法獲取結果, 獲取到第一個有效結果後, 取消所有其他任務, 不過, 它的具體實現有一些優化, 比較複雜。 我們看一個模擬的示例, 從多個搜尋引擎查詢一個關鍵字, 但只要任意一個的結果就可以, 模擬代碼如下:

public class InvokeAnyDemo { static class SearchTask implements Callable { private String engine; private String keyword; public SearchTask(String engine, String keyword) { this.engine = engine; this.keyword = keyword; } @Override public String call throws Exception { // 類比從給定引擎搜索結果 Thread.sleep(engine.hashCode % 1000); return " " + keyword; } } public static String search(List engines, String keyword) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(10); CompletionService cs = new ExecutorCompletionService<>(executor); List futures = new ArrayList( engines.size); String result = null; try { for (String engine : engines) { futures.add(cs.submit(new SearchTask(engine, keyword))); } for (int i = 0; i < engines.size;="" i++)="" {="" try="" {="" result="cs.take.get;" if="" (result="" !="null)" {="" break;="" }="" }="" catch="" (executionexception="" ignore)="" {="" 出現異常,
結果無效, 繼續="" }="" }="" }="" finally="" {="" 取消所有任務, 對於已完成的任務, 取消沒有什麼效果="" for="" (future="" f="" :="" futures)="" f.cancel(true);="" executor.shutdown;="" }="" return="" result;="" }="" public="" static="" void="" main(string[]="" args)="" throws="" interruptedexception="" {="" list="" engines="Arrays.asList(new" string[]="" {="" "www.baidu.com",="" "www.sogou.com",="" "www.so.com",="" "www.google.com"="" });="" system.out.println(search(engines,="" "老馬說程式設計"));="" }="">

SearchTask模擬從指定搜尋引擎查詢結果, search利用CompletionService/ExecutorService執行併發查詢, 在得到第一個有效結果後, 取消其他任務。

小結

本節比較簡單, 主要就是介紹了CompletionService的用法和原理, 它通過一個額外的結果佇列, 方便了對於多個非同步任務結果的處理。

下一節, 我們來探討一種常見的需求 - 定時任務。

(與其他章節一樣, 本節所有代碼位於 https://github.com/swiftma/program-logic)

----------------

同類文章
Next Article
喜欢就按个赞吧!!!
点击关闭提示