补充部分---ScheduledThreadPoolExecutor类分析 线程池底层原理详解与源码分析( 四 )

1.DelayedWorkQueue类#add方法
public boolean add(Runnable e) {return offer(e);}public boolean offer(Runnable x) {//空值校验if (x == null)throw new NullPointerException();RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;final ReentrantLock lock = this.lock;//加锁lock.lock();try {int i = size;// 超过容量,扩容if (i >= queue.length)grow();size = i + 1; //更新当前节点数if (i == 0) {//插入的是第一个节点(阻塞队列原本为空)queue[0] = e;setIndex(e, 0); //setIndex(e, 0)用于修改ScheduledFutureTask的heapIndex属性,表示该对象在队列里的下标} else {//阻塞队列非空siftUp(i, e); //在插入新节点后对堆进行调整,进行节点上移,保持其特性(节点的值小于子节点的值)不变}/*** 这里最好结合take方法理解一下* 队列头等于当前任务,说明了当前任务的等待时间是最小的 。此时为什么要去清空leader?* leader代表的是某一个正在等待获取元素的线程句柄,* 在take的时候因为之前的头结点时间未到,不能拿,被休眠了一定时间(而这个时间就是距离之前那个队列头结点的可以出队列的时间差) 。* 此时头结点换了,理应清空句柄,唤醒它,让它再次尝试去获取最新的头结点(就算是再次休眠,时间也会比之前的少) 。*/if (queue[0] == e) {leader = null;available.signal();}} finally {lock.unlock(); //解锁}return true;}2.DelayedWorkQueue类#siftUp方法
//其实把这个队列看作树结构会更容易理解(要理解数组与完全二叉树的关联)private void siftUp(int k, RunnableScheduledFuture<?> key) {while (k > 0) {int parent = (k - 1) >>> 1; //父节点坐标RunnableScheduledFuture<?> e = queue[parent]; //获取父节点的值// 如果 节点>= 父节点,确定最终位置if (key.compareTo(e) >= 0)break;// 节点<父节点,将节点向上移动(就是将父节点放在k处)queue[k] = e;setIndex(e, k);k = parent;}//确定key的最后落脚处queue[k] = key;setIndex(key, k);}3.ScheduledFutureTask类#compareTo方法
/** * compareTo 作用是加入元素到延迟队列后,内部建立或者调整堆时候会使用该元素的 compareTo 方法与队列里面其他元素进行比较,* 让最快要过期的元素放到队首 。所以无论什么时候向队列里面添加元素,队首的的元素都是最即将过期的元素 。* 如果时间相同,序列号小的排前面 。*/public int compareTo(Delayed other) {if (other == this) // 如果2个指向的同一个对象,则返回0return 0;// other必须是ScheduledFutureTask类型的if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time; //两者之间的时间差if (diff < 0)return -1; //返回当前对象时间比目标对象小的标记【这个标记仅仅是标记,具体还要在上层方法逻辑中决定】else if (diff > 0)return 1;//返回当前对象时间比目标对象大的标记// 时间相同,比较序列号else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}// 到这里,说明other不是ScheduledFutureTask类型的long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}4.DelayedWorkQueue类#take方法
public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly(); //加锁,响应中断try {// 死循环自旋for (;;) {RunnableScheduledFuture<?> first = queue[0]; //头节点// 队列为null,则等待在条件上if (first == null)available.await();//队列非空else {//判断延时时间是否满足条件long delay = first.getDelay(NANOSECONDS);if (delay <= 0L)return finishPoll(first);// 头节点时间没到,还不能取出头节点first = null; // 等待的时候,不要持有头节点if (leader != null)//已经存在leader线程,当前线程await阻塞available.await();else {//如果不存在leader线程,当前线程作为leader线程,并制定头结点的延迟时间作为阻塞时间Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {//leader线程阻塞结束if (leader == thisThread)leader = null;}}}}} finally {//leader线程没有阻塞,可以找到头结点,唤醒阻塞线程if (leader == null && queue[0] != null)available.signal();lock.unlock();}}

经验总结扩展阅读