深度解析Java8 – ScheduledThreadPoolExecutor源码解析

1. 背景:

提到worker,读者的第一反应都是最常用的quartz框架,它提供了一套简单的定义周期性任务的方式并与Spring做了融合,可以让一个worker的配置简化到只需定义一个任务和执行周期后即完成了一个work的配置。比如:

<task:scheduled-tasks>
    <task:scheduled ref="testJob" method="work" cron="0 */1 * * * ?" />
</task:scheduled-tasks>

 <bean id="testJob" class="com.TestJob"/>

要清楚quartz是怎么实现任务的周期性执行的我们要先一起来看下定义task标签的xsd文件:其中,task标签声明了一组通过ref属性关联周期执行的任务,通过method属性指定了方法,cron属性指定了周期性执行任务的时间表达式。值得注意的是,或许是为了方便quartz将该表达式的格式和类UNIX系统上的定期指定任务的cron表达式保持了一致,但实际上quartz的时间表达式和类UNIX系统上cron的时间表达式完全没有关系,既不是通过JDK的封装去调用cron,也不是将该表达式传递给操作系统执行,而是仅仅表达式格式上保持了一致,具体功能的实现是交给了JDK的另外一个工具类,也就是本文将要说明的JDK的工具类去完成的。

1
可以看到上述的配置信息会映射到类型为TaskScheduler的scheduler属性上,它有下面这么几个子类
2
除了TimerManagerTaskScheduler 是为 CommonJ提供的封装外,其余两个子类都持有一个ScheduledExecutorService类型的变量,该变量会完成周期性执行任务的功能,而quartz默认会使用ConcurrentTaskScheduler这个实现类,当然,也可以在配置scheduled-tasks的时候通过ref属性重新指定。

既然quartz默认使用了ConcurrentTaskScheduler,那我们先从ConcurrentTaskScheduler开始,看下quartz是怎么实现定时任务的功能的。
3

通过ConcurrentTaskScheduler类的简介,可以知道他继承了ConcurrentTaskExecutor用于持有任务的配置信息,并且自己持有了一个ScheduledExecutorService的引用用于调度定时任务,通过查看他的schedule方法可以知道对于定时任务的调用ConcurrentTaskScheduler最终都是委派给scheduledExecutor属性的。4

事实上ConcurrentTaskScheduler本身更像是对时间配置,任务配置,融合Spring方面做了些封装,最终执行任务的,还是JDK的这个接口ScheduledExecutorService的实现类ScheduledThreadPoolExecutor。
可以说ScheduledThreadPoolExecutor是quartz实现定时任务的关键。

2. 解读ScheduledThreadPoolExecutor

下面我们一起来看看ScheduledThreadPoolExecutor到底做了些什么事情,先看下它的结构(为了保持一致,这次对ScheduledThreadPoolExecutor的讲解同样基于jdk1.8.0_20,和JDK1.6相比有不少改动):

5
继承自ThreadPoolExecutor并且实现了ScheduledExecutorService接口。
6
内部有很多方法和两个内部类:DelayedWorkQueue和ScheduledFutureTask,这两个类具体做了什么事情,后面再详述。

为了说明ScheduledThreadPoolExecutor是怎么实现的定时,延迟,重复执行任务的,我们从他最常用的API,schedule方法开始说起:
7
接受 一个Runnable接口的子类,推迟启动的时间单位和数量。
并且将他们包装到ScheduledFutureTask中,也是上面提到的ScheduledThreadPoolExecutor的内部类之一。

8
ScheduledFutureTask 有这么几个私有变量,时间(纳秒为单位,在上面构造时,通过triggerTime方法计算出来的),重复周期period,和一个序列号sequenceNumber,sequencer由ScheduledThreadPoolExecutor持有,
new好ScheduledFutureTask后用decorateTask方法包装一下,其实,什么也没做,直接将task返回了,这里应该是兼容老代码的原因,又偷懒了一下,没有直接改掉这个方法。

9
包装后调用delayedExecute方法,提交task。

10
delayedExecute方法中,如果当前线程池已经关闭,调用拒绝策略执行command,之所以说执行不是拒绝,是因为JDK的拒绝策略中并非都是直接将任务拒绝:
11
看注释可以知道,JDK拒绝策略中,有直接将任务丢弃的,有抛出异常的,还有在当前线程下直接调用run()方法执行的。

之所以说这个,是因为很多使用者在new线程池的时候没有注意到构造方法里面是需要这个参数的,并且默认的这个参数是:
12
就是抛出异常的拒绝策略,这回导致在线程池添加满任务后,直接抛出RejectExecutionException,而不是用当前线程执行任务。
因此通常我们更常用的应该是:CallerRunsPolicy。

回到原题,看下delayedExecute方法:
13
如果线程池没有关闭,检查当前线程池已启动的线程数,是否达到corePoolSize,没有的话,新建一个线程并启动它。注意,这个时候新建的线程是没有持有任何Runnable对象的,它是在启动后到queue(工作队列)中去取出任务执行。

创建完线程后向工作队列中添加commond任务。

到此为止,一个用ScheduledFutureTask包装的任务提交到了线程池的工作队列中,等待线程取出后执行。
任务被取出后,会先调用Runnable接口的run方法,那我们来看下run方法做了些什么

public void run() {
       boolean periodic = isPeriodic();
       if (!canRunInCurrentRunState(periodic))// 检查当前线程池状态是否需要取消
            cancel(false);
       else if (!periodic)// 如果不是周期性任务,直接调用父类FutureTask的run方法执行任务。
            ScheduledFutureTask.super.run();
        else if (ScheduledFutureTask.super.runAndReset()) {//否则,调用父类run方法执行任务,但是不设置结果,以便计算出下次执行的时间后,重复执行。
            setNextRunTime();
            reExecutePeriodic(outerTask);
       }
 }

1. ScheduledFutureTask是FutureTask的子类,它使用了FutureTask执行但是不设置结果的API:runAndReset()。这里有几点需要注意:

其他功能,有兴趣的可以看我原来的博客:futuretask-源码解析
2. 所谓的延迟启动,周期性重复执行的功能在这里还没有体现出来,到底怎么做的,我们继续看。

我们知道,每次调用schedule方法, 都会向线程池的工作队列中添加一个任务,而ScheduledThreadPoolExecutor自己实现了一个工作队列DelayedWorkQueue,也就是ScheduledThreadPoolExecutor中的另外一个内部类,会不会和这个有关系呢?

3. 解读DelayedWorkQueue

看看DelayedWorkQueue的add方法:
14

public boolean offer(Runnable x) {
   if (x == null)
           throw new NullPointerException();
      RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; //将x转成RunnableScheduledFuture 实时上ScheduledFutureTask也是RunnableScheduledFuture 的子类
      final ReentrantLock lock = this.lock;
      lock.lock();//加锁添加元素
      try {
           int i = size;
           if (i >= queue.length)
               grow(); //队列满,扩容
           size = i + 1;
           if (i == 0) {
               queue[0] = e;
               setIndex(e, 0);
           } else {
               siftUp(i, e);//注意:当队列中元素不为空时,调用了siftUp方法
           }
           if (queue[0] == e) {
               leader = null;
               available.signal();//唤醒取元素的线程,告诉它工作队列中有元素可以取了。
           }
       } finally {
            lock.unlock();
       }
      return true;
}

想知道 condition怎么实现的,看我原来的一篇博客:怎么理解Condition
整个方法没有什么特殊的地方,不外乎向队列中添加了一个元素,唯独有个siftup方法,这是理解整个ScheduledThreadPoolExecutor的重点。

在说siftup方法前,我们先看下DelayedWorkQueue的数据结构,其实ScheduledThreadPoolExecutor之所以自己实现工作队列,原因就是这个工作队列有非常特殊的结构。
我们知道如果有一个二叉树的数组,抽象起来用树表示是这个样子:
15
放在数组中后,是这个样子:
16
并且二叉树还有一个特性是:
任意结点的子节点的索引位置是其本身索引位置乘2后+1,比如,元素2的子节点是2*2+1=5
任意结点的父节点的索引位置是该结点的索引位置-1后除2并向下取整,比如6的子节点是(6-1)/2 = 2

这个特性对于查找父子结点来说非常的方便。

这个结构被ScheduledThreadPoolExecutor的作者Doug Lea用到了DelayedWorkQueue中,并且将DelayedWorkQueue变成了一个按超时时间升序排序的队列,遵循”左结点比右节点小(下次执行的时间更短)的原则“。
而按超时时间升序排序的队列的原因是为了将即将要执行的任务放在尽量靠前的位置,用二叉树的原因是因为二叉树的查找,加入都相对较快。

明白以上结构后,再看siftup方法
17
举个例子:
18
现在要插入一个新的元素,这个时候结合上面的代码可以知道,变量k=10,key为图中new结点,
1. 经过计算,parent = 4 e=parent[4]=3 (注意是索引为4的结点,它的值是3号元素)
2. key=new,e=parent[4] 两者比较下次执行的时间(超时时间)
比较方法如下:
19
3. 比较后,若key>=e 说明key元素下次执行的时间(超时时间)更长,应该靠后,直接添加在队列尾部。
4. 若key

同理,我们再来看看remove方法:

public boolean remove(Object x) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
             int i = indexOf(x);
             if (i < 0)
                return false;

             setIndex(queue[i], -1);
             int s = --size;
             RunnableScheduledFuture<?> replacement = queue[s]; //把最后一个元素取出来,用于缩短队列,并且,将取出的元素尝试放在被移除元素的位置上,至于是否要真的放在该位置上要看最后这个元素的下次一次的执行时间(超时时间)决定,因为,DelayedWorkQueue是个有序队列
             queue[s] = null;
             if (s != i) {
                siftDown(i, replacement);
                if (queue[i] == replacement)
                    siftUp(i, replacement);
            }
            return true;
       } finally {
              lock.unlock();
       }
}

当移除的元素不是最后一个元素时,意味着,最后一个元素要开始寻找合适的位置,这个时候就会调用siftDown方法, 该方法的目的是为replacement找到合适的位置,可能是被移除元素的位置,也可能不是。

先附上siftDown代码,对比下面例子看:
20
我们举例说明:
21
这里的被移除元素分为两类,一类是被移除元素没有子节点的,一类是被移除元素有子节点的。
从表中,根据二叉树的性质可以知道,位置索引大于等于队列长度一半的,没有子节点,小于队列长度一半的有子节点。
因此,分两类来看:

第一种情况:
被移除元素没有子节点的:
22
这个时候,可知,siftDown方法中,ke=6,key=[9]
1. 计算后half=5,k>half,说明被移除元素没有子节点。直接将[9]也就是原有队列的最后一个元素 9号元素放入索引位置为6的位置后,结束。
23
第二种情况:
被移除元素有子节点的。
24
这个时候,可知,k=3,key=[9]
1. 计算后可知,k<half,child=7,c=[7],right=8.
2. 进入第一个
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
这句话可以理解为选出[7],[8]两个位置中下次执行时间(超时时间)较小的那个,赋值给c.
3. if (key.compareTo(c) <= 0)
break;
将c和key,比较,如果key比较小,直接将key放到[3]中,变成[7][8]的父节点
25
如果c比较小

queue[k] = c;
setIndex(c, k);
k = child;

siftUp,siftDown方法总结:将c放到位置3,然后在位置7,8中为key找合适的位置。

两个方法分别针对元素的存和取时,调整元素的位置,并保证队列的有序性,其中用到了二叉树的各种性质,比如,查找父子结点。

4. 总结

知道了DelayedWorkQueue的内部结构,ScheduledThreadPoolExecutor的两个内部类我们都非常清楚了,因此我们可以这么总结ScheduledThreadPoolExecutor:

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,所以本质上说ScheduledThreadPoolExecutor还是一个线程池,也有coorPoolSize和workQueue,接受Runnable的子类作为任务被多线程调,特殊的地方在于,自己实现了工作队列,
目的是能够按一定的顺序对工作队列中的元素进行排序,比如,按照距离下次执行时间的长短的升序方式排列工作任务,让需要尽快执行的任务排在队首,“不那么着急”的任务排在队列后方。从而方便线程获取执行任务。
并配合FutureTask执行但是不设置结果的API:runAndReset() 来实现任务的周期执行(每次执行完毕后不设置执行结果,而是计算出下次执行的时间,重新放到工作队列中,等待下次调用)。
至于quartz就是借用了ScheduledThreadPoolExecutor来实现定时任务的执行与调度,只不过提供了一种更友好的方式去表达定时任务的配置方式,为ScheduledThreadPoolExecutor需要的数据做了封装。真正的功能还是围绕在ScheduledThreadPoolExecutor上。
总的来的,ScheduledThreadPoolExecutor的实现并不复杂,主要是理解有序队列的操作,以及对FutureTask的灵活运用,明白这些后,再看ScheduledThreadPoolExecutor就不是难事了。



相关文章

发表评论

Comment form

(*) 表示必填项

还没有评论。

跳到底部
返回顶部