通过Thread Pool Executor类解析线程池执行任务的核心流程( 二 )


//再次获取线程池的状态和线程池中线程的数量,用于二次检查int recheck = ctl.get();//如果线程池没有未处于RUNNING状态,从队列中删除任务if (! isRunning(recheck) && remove(command))//执行拒绝策略reject(command);//如果线程池为空,则向线程池中添加一个线程else if (workerCountOf(recheck) == 0)addWorker(null, false);(4)如果在步骤(3)中向任务队列中添加任务失败,则尝试开启新的线程执行任务 。此时,如果线程池中的线程数量已经大于线程池中的最大线程数maximumPoolSize,则不能再启动新线程 。此时,表示线程池中的任务队列已满,并且线程池中的线程已满,需要执行拒绝策略,代码如下所示 。
//任务队列已满,则新增worker线程,如果新增线程失败,则执行拒绝策略else if (!addWorker(command, false))reject(command);这里,我们将execute(Runnable)方法拆解,结合流程图来理解线程池中任务的执行流程就比较简单了 。可以这么说,execute(Runnable)方法的逻辑基本上就是一般线程池的执行逻辑,理解了execute(Runnable)方法,就基本理解了线程池的执行逻辑 。
注意:有关ScheduledThreadPoolExecutor类和ForkJoinPool类执行线程池的逻辑,在【高并发专题】系列文章中的后文中会详细说明,理解了这些类的执行逻辑,就基本全面掌握了线程池的执行流程 。
在分析execute(Runnable)方法的源码时,我们发现execute(Runnable)方法中多处调用了addWorker(Runnable, boolean)方法,接下来,我们就一起分析下addWorker(Runnable, boolean)方法的逻辑 。
addWorker(Runnable, boolean)方法总体上,addWorker(Runnable, boolean)方法可以分为三部分,第一部分是使用CAS安全的向线程池中添加工作线程;第二部分是创建新的工作线程;第三部分则是将任务通过安全的并发方式添加到workers中,并启动工作线程执行任务 。
接下来,我们看下addWorker(Runnable, boolean)方法的源码,如下所示 。
private boolean addWorker(Runnable firstTask, boolean core) {//标记重试的标识retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 检查队列是否在某些特定的条件下为空if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;//下面循环的主要作用为通过CAS方式增加线程的个数for (;;) {//获取线程池中的线程数量int wc = workerCountOf(c);//如果线程池中的线程数量超出限制,直接返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//通过CAS方式向线程池新增线程数量if (compareAndIncrementWorkerCount(c))//通过CAS方式保证只有一个线程执行成功,跳出最外层循环break retry;//重新获取ctl的值c = ctl.get();//如果CAS操作失败了,则需要在内循环中重新尝试通过CAS新增线程数量if (runStateOf(c) != rs)continue retry;}}//跳出最外层for循环,说明通过CAS新增线程数量成功//此时创建新的工作线程boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//将执行的任务封装成workerw = new Worker(firstTask);final Thread t = w.thread;if (t != null) {//独占锁,保证操作workers时的同步final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//此处需要重新检查线程池状态//原因是在获得锁之前可能其他的线程改变了线程池的状态int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive())throw new IllegalThreadStateException();//向worker中添加新任务workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;//将是否添加了新任务的标识设置为trueworkerAdded = true;}} finally {//释放独占锁mainLock.unlock();}//添加新任成功,则启动线程执行任务if (workerAdded) {t.start();//将任务是否已经启动的标识设置为trueworkerStarted = true;}}} finally {//如果任务未启动或启动失败,则调用addWorkerFailed(Worker)方法if (! workerStarted)addWorkerFailed(w);}//返回是否启动任务的标识return workerStarted;}

经验总结扩展阅读