ExecutorCompletionService能按任务完成顺序获取结果,它结合线程池与阻塞队列,提交的Callable任务完成后自动将Future放入队列,通过take()或poll()方法可实现谁先完成就先处理谁,适用于搜索任务、响应优先级高及耗时不均场景,配合cancel和超时设置可优化资源使用与响应速度。
在Java中处理并发任务时,ExecutorCompletionService 是一个非常实用的工具,它能有效管理多个异步任务的执行与结果获取。相比直接使用 ExecutorService 提交任务并轮询 Future 对象,ExecutorCompletionService 提供了更高效的任务完成通知机制,尤其适合需要按任务完成顺序处理结果的场景。
ExecutorCompletionService 是 Java 并发包中的一个辅助类,位于 java.util.concurrent 包下。它将一个 Executor(通常是线程池)和一个阻塞队列结合,每当有任务完成,就将该任务的结果(封装为 Future)放入内部的阻塞队列中。
这样,开发者可以通过调用 take() 或 poll() 方法,以“谁先完成就先处理谁”的方式获取结果,而不需要等待所有任务结束或轮询每个 Future 的状态。
BlockingQueue> 存储已完成任务的 Future。take() 获取最先完成的任务结果,支持阻塞等待;poll() 可尝试非阻塞获取。
以下是典型的使用步骤和代码示例:
ThreadPoolExecutor)作为执行器。ExecutorCompletionService 实例,传入线程池。take() 或 poll() 获取已完成任务的结果。示例代码:
ExecutorService executor = Executors.newFixedThreadPool(4); ExecutorCompletionServicecompletionService = new ExecutorCompletionService<>(executor); // 提交5个任务 for (int i = 0; i < 5; i++) { final int taskId = i; completionService.submit(() -> { Thread.sleep((long)(Math.random() * 5000)); // 模拟耗时 return "任务 " + taskId + " 完成"; }); }
// 按完成顺序处理结果 for (int i = 0; i < 5; i++) { try { String result = completionService.take().get(); // take() 阻塞直到有结果 System.out.println(result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
executor.shutdown();
在这个例子中,即使任务执行时间不同,输出也会按照实际完成顺序进行,而不是提交顺序。
ExecutorCompletionService 特别适用于以下几种情况:
使用技巧:
Future.cancel(true) 在获取首个成功结果后取消其余任务,节省资源。poll(long timeout, TimeUnit) 设置超时,防止无限等待。ExecutionException 中,需在 get() 时捕获。基本上就这些。掌握 ExecutorCompletionService 能让你在处理并发任务时更加灵活高效,特别是在关注任务完成顺序而非提交顺序的场景下,它的优势尤为明显。合理使用,可以显著提升程序的响应性和资源利用率。