Future详解( 四 )


【2】源码分析
1)属性分析
//线程池private final Executor executor;//判断线程池是否继承抽象类private final AbstractExecutorService aes;//阻塞队列private final BlockingQueue<Future<V>> completionQueue;2)构造方法
//对于线程池必须定义,而阻塞队列会有默认的//而默认的LinkedBlockingQueue对于并发编程来说是存在隐患的(依据阿里手册来说,因为队列的无尽性会导致OOM)//所以一般考虑要你自己去定义阻塞队列public ExecutorCompletionService(Executor executor) {if (executor == null)throw new NullPointerException();this.executor = executor;//如果是继承了抽象类的实现this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null;this.completionQueue = new LinkedBlockingQueue<Future<V>>();}public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) {if (executor == null || completionQueue == null)throw new NullPointerException();this.executor = executor;this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null;this.completionQueue = completionQueue;}3)阻塞队列元素的定义
private class QueueingFuture extends FutureTask<Void> {QueueingFuture(RunnableFuture<V> task) {super(task, null);this.task = task;}//FutureTask里面的拓展方法,在run的时候会被调用,所以是做完任务了会自动提交到队列里面protected void done() { completionQueue.add(task); }private final Future<V> task;}4)实现接口的方法
//采用newTaskFor来封装非标准的取消//因为传入的Callable或Runnable,这种不是FutureTask,故需要封装private RunnableFuture<V> newTaskFor(Callable<V> task) {if (aes == null)return new FutureTask<V>(task);elsereturn aes.newTaskFor(task);}private RunnableFuture<V> newTaskFor(Runnable task, V result) {if (aes == null)return new FutureTask<V>(task, result);elsereturn aes.newTaskFor(task, result);}//下面是对接口定义的方法的实现public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task);executor.execute(new QueueingFuture(f));return f;}public Future<V> submit(Runnable task, V result) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task, result);executor.execute(new QueueingFuture(f));return f;}public Future<V> take() throws InterruptedException {return completionQueue.take();}public Future<V> poll() {return completionQueue.poll();}public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {return completionQueue.poll(timeout, unit);}【3】汇总说明
1)说白了就是基于FutureTask 是单线程的任务,考虑可以等待获取返回结果,那么应该可以采用线程池的方法形成多任务并发的结果 。
2)故定义了CompletionService接口作为规范,ExecutorCompletionService类作为具体的实现类【作为管理者】,不然每次采用线程池来做的话都要自己定义去管理 。
3)当需要批量提交异步任务的时候建议你使用CompletionService 。CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单 。
4)CompletionService能够让异步任务的执行结果有序化 。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如Forking Cluster这样的需求 。
5)线程池隔离 。CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险 。
【4】示例展示
1)示例代码
public class CompletionServiceDemo {public static void main(String[] args) throws InterruptedException, ExecutionException {//创建线程池ExecutorService executor = Executors.newFixedThreadPool(10);//创建CompletionServiceCompletionService<Integer> cs = new ExecutorCompletionService<>(executor);//异步向电商S1询价cs.submit(() -> getPriceByS1());//异步向电商S2询价cs.submit(() -> getPriceByS2());//异步向电商S3询价cs.submit(() -> getPriceByS3());//将询价结果异步保存到数据库for (int i = 0; i < 3; i++) {//从阻塞队列获取futureTaskInteger r = cs.take().get();executor.execute(() -> save(r));}executor.shutdown();}private static void save(Integer r) {System.out.println("保存询价结果:{}"+r);}private static Integer getPriceByS1() throws InterruptedException {TimeUnit.MILLISECONDS.sleep(5000);System.out.println("电商S1询价信息1200");return 1200;}private static Integer getPriceByS2() throws InterruptedException {TimeUnit.MILLISECONDS.sleep(8000);System.out.println("电商S2询价信息1000");return 1000;}private static Integer getPriceByS3()throws InterruptedException {TimeUnit.MILLISECONDS.sleep(3000);System.out.println("电商S3询价信息800");return 800;}}

经验总结扩展阅读