亚洲视频二区_亚洲欧洲日本天天堂在线观看_日韩一区二区在线观看_中文字幕不卡一区

公告:魔扣目錄網(wǎng)為廣大站長提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請做好本站友鏈:【 網(wǎng)站目錄:http://www.430618.com 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

并發(fā)編程之定時任務(wù)&定時線程池原理解析

作者公眾號:一角錢技術(shù)(org_yijiaoqian)

前言

線程池的具體實(shí)現(xiàn)有兩種,分別是ThreadPoolExecutor 默認(rèn)線程池和ScheduledThreadPoolExecutor 定時線程池,上一篇已經(jīng)分析過ThreadPoolExecutor原理與使用了,本篇我們來重點(diǎn)分析下ScheduledThreadPoolExecutor的原理與使用。

  • 《并發(fā)編程之Executor線程池原理與源碼解讀》

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 與 ThreadPoolExecutor 線程池的概念有些區(qū)別,它是一個支持任務(wù)周期性調(diào)度的線程池。

ScheduledThreadPoolExecutor 繼承 ThreadPoolExecutor,同時通過實(shí)現(xiàn) ScheduledExecutorSerivce 來擴(kuò)展基礎(chǔ)線程池的功能,使其擁有了調(diào)度能力。其整個調(diào)度的核心在于內(nèi)部類 DelayedWorkQueue ,一個有序的延時隊列。

定時線程池類的類結(jié)構(gòu)圖如下:

并發(fā)編程之定時任務(wù)&定時線程池原理解析

 

ScheduledThreadPoolExecutor 的出現(xiàn),很好的彌補(bǔ)了傳統(tǒng) Timer 的不足,具體對比看下表:

TimerScheduledThreadPoolExecutor線程單線程多線程多任務(wù)任務(wù)之間相互影響任務(wù)之間不影響調(diào)度時間絕對時間相對時間異常單任務(wù)異常,后續(xù)任務(wù)受影響無影響

工作原理

它用來處理延時任務(wù)或定時任務(wù)

并發(fā)編程之定時任務(wù)&定時線程池原理解析

 

它接收SchduledFutureTask類型的任務(wù),是線程池調(diào)度任務(wù)的最小單位,有三種提交任務(wù)的方式:

  1. schedule,特定時間延時后執(zhí)行一次任務(wù)
  2. scheduledAtFixedRate,固定周期執(zhí)行任務(wù)(與任務(wù)執(zhí)行時間無關(guān),周期是固定的)
  3. scheduledWithFixedDelay,固定延時執(zhí)行任務(wù)(與任務(wù)執(zhí)行時間有關(guān),延時從上一次任務(wù)完成后開始)
并發(fā)編程之定時任務(wù)&定時線程池原理解析

 

它采用 DelayedWorkQueue 存儲等待的任務(wù)

  1. DelayedWorkQueue 內(nèi)部封裝了一個 PriorityQueue ,它會根據(jù) time 的先后時間排序,若 time 相同則根據(jù) sequenceNumber 排序;
  2. DelayedWorkQueue 也是一個無界隊列;

因?yàn)榍懊嬷v阻塞隊列實(shí)現(xiàn)的時候,已經(jīng)對DelayedWorkQueue進(jìn)行了說明,更多內(nèi)容請查看《阻塞隊列 — DelayedWorkQueue源碼分析》

工作線程的執(zhí)行過程:

  • 工作線程會從DelayedWorkerQueue取已經(jīng)到期的任務(wù)去執(zhí)行;
  • 執(zhí)行結(jié)束后重新設(shè)置任務(wù)的到期時間,再次放回DelayedWorkerQueue。

take方法是什么時候調(diào)用的呢? 在ThreadPoolExecutor中,getTask方法,工作線程會循環(huán)地從workQueue中取任務(wù)。但定時任務(wù)卻不同,因?yàn)槿绻坏ゞetTask方法取出了任務(wù)就開始執(zhí)行了,而這時可能還沒有到執(zhí)行的時間,所以在take方法中,要保證只有在到指定的執(zhí)行時間的時候任務(wù)才可以被取走。

PS:對于以上原理的理解,可以通過下面的源碼分析加深印象。

源碼分析

構(gòu)造方法

ScheduledThreadPoolExecutor有四個構(gòu)造形式:

public ScheduledThreadPoolExecutor(int corePoolSize) {
 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
     new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
     new DelayedWorkQueue(), threadFactory, handler);
}

當(dāng)然我們也可以使用工具類Executors的newScheduledThreadPool的方法,快速創(chuàng)建。注意這里使用的DelayedWorkQueue

ScheduledThreadPoolExecutor沒有提供帶有最大線程數(shù)的構(gòu)造函數(shù)的,默認(rèn)是Integer.MAX_VALUE,說明其可以無限制的開啟任意線程執(zhí)行任務(wù),在大量任務(wù)系統(tǒng),應(yīng)注意這一點(diǎn),避免內(nèi)存溢出。

核心方法

核心方法主要介紹ScheduledThreadPoolExecutor的調(diào)度方法,其他方法與 ThreadPoolExecutor 一致。調(diào)度方法均由 ScheduledExecutorService 接口定義:

public interface ScheduledExecutorService extends ExecutorService {
    // 特定時間延時后執(zhí)行一次Runnable
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
    // 特定時間延時后執(zhí)行一次Callable
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);
    // 固定周期執(zhí)行任務(wù)(與任務(wù)執(zhí)行時間無關(guān),周期是固定的)
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
     // 固定延時執(zhí)行任務(wù)(與任務(wù)執(zhí)行時間有關(guān),延時從上一次任務(wù)完成后開始)
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}

我們再來看一下接口的實(shí)現(xiàn),具體是怎么來實(shí)現(xiàn)線程池任務(wù)的提交。因?yàn)樽罱K都回調(diào)用 delayedExecute 提交任務(wù)。所以,我們這里只分析schedule方法,該方法是指任務(wù)在指定延遲時間到達(dá)后觸發(fā),只會執(zhí)行一次。源代碼如下:

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    //參數(shù)校驗(yàn)
    if (command == null || unit == null)
        throw new NullPointerException();
    //這里是一個嵌套結(jié)構(gòu),首先把用戶提交的任務(wù)包裝成ScheduledFutureTask
    //然后在調(diào)用decorateTask進(jìn)行包裝,該方法是留給用戶去擴(kuò)展的,默認(rèn)是個空方法
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
   //包裝好任務(wù)以后,就進(jìn)行提交了
 delayedExecute(t);
    return t;
}

delayedExecute 任務(wù)提交方法

private void delayedExecute(RunnableScheduledFuture<?> task) {
    //如果線程池已經(jīng)關(guān)閉,則使用拒絕策略把提交任務(wù)拒絕掉
 if (isShutdown())
        reject(task);
    else {
  //與ThreadPoolExecutor不同,這里直接把任務(wù)加入延遲隊列
        super.getQueue().add(task);//使用用的DelayedWorkQueue
  //如果當(dāng)前狀態(tài)無法執(zhí)行任務(wù),則取消
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
         //這里是增加一個worker線程,避免提交的任務(wù)沒有worker去執(zhí)行
         //原因就是該類沒有像ThreadPoolExecutor一樣,woker滿了才放入隊列
           ensurePrestart();
    }
}

我們可以看到提交到線程池的任務(wù)都包裝成了 ScheduledFutureTask,繼續(xù)往下我們再來研究下。

ScheduledFutureTask

從ScheduledFutureTask類的定義可以看出,ScheduledFutureTask類是ScheduledThreadPoolExecutor類的私有內(nèi)部類,繼承了FutureTask類,并實(shí)現(xiàn)了RunnableScheduledFuture接口。也就是說,ScheduledFutureTask具有FutureTask類的所有功能,并實(shí)現(xiàn)了RunnableScheduledFuture接口的所有方法。ScheduledFutureTask類的定義如下所示:

private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V>

ScheduledFutureTask類繼承圖如下:

并發(fā)編程之定時任務(wù)&定時線程池原理解析

 

成員變量

SchduledFutureTask接收的參數(shù)(成員變量):

// 任務(wù)開始的時間
private long time;
// 任務(wù)添加到ScheduledThreadPoolExecutor中被分配的唯一序列號
private final long sequenceNumber;
// 任務(wù)執(zhí)行的時間間隔
private final long period;
//ScheduledFutureTask對象,實(shí)際指向當(dāng)前對象本身
RunnableScheduledFuture<V> outerTask = this;
//當(dāng)前任務(wù)在延遲隊列中的索引,能夠更加方便的取消當(dāng)前任務(wù)
int heapIndex;

解析

  • sequenceNumber:任務(wù)添加到ScheduledThreadPoolExecutor中被分配的唯一序列號,可以根據(jù)這個序列號確定唯一的一個任務(wù),如果在定時任務(wù)中,如果一個任務(wù)是周期性執(zhí)行的,但是它們的sequenceNumber的值相同,則被視為是同一個任務(wù)。
  • time:下一次執(zhí)行任務(wù)的時間。
  • period:任務(wù)的執(zhí)行周期。
  • outerTask:ScheduledFutureTask對象,實(shí)際指向當(dāng)前對象本身。此對象的引用會被傳入到周期性執(zhí)行任務(wù)的ScheduledThreadPoolExecutor類的reExecutePeriodic方法中。
  • heapIndex:當(dāng)前任務(wù)在延遲隊列中的索引,這個索引能夠更加方便的取消當(dāng)前任務(wù)。

構(gòu)造方法

ScheduledFutureTask類繼承了FutureTask類,并實(shí)現(xiàn)了RunnableScheduledFuture接口。在ScheduledFutureTask類中提供了如下構(gòu)造方法。

ScheduledFutureTask(Runnable r, V result, long ns) {
 super(r, result);
 this.time = ns;
 this.period = 0;
 this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
 super(r, result);
 this.time = ns;
 this.period = period;
 this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask(Callable<V> callable, long ns) {
 super(callable);
 this.time = ns;
 this.period = 0;
 this.sequenceNumber = sequencer.getAndIncrement();
}

FutureTask的構(gòu)造方法如下:

public FutureTask(Runnable runnable, V result) {
 this.callable = Executors.callable(runnable, result);
 this.state = NEW;       // ensure visibility of callable
}

通過源碼可以看到,在ScheduledFutureTask類的構(gòu)造方法中,首先會調(diào)用FutureTask類的構(gòu)造方法為FutureTask類的callable和state成員變量賦值,接下來為ScheduledFutureTask類的time、period和sequenceNumber成員變量賦值。理解起來比較簡單。

getDelay方法

我們先來看getDelay方法的源碼,如下所示:

//獲取下次執(zhí)行任務(wù)的時間距離當(dāng)前時間的納秒數(shù)
public long getDelay(TimeUnit unit) {
 return unit.convert(time - now(), NANOSECONDS);
}

getDelay方法比較簡單,主要用來獲取下次執(zhí)行任務(wù)的時間距離當(dāng)前系統(tǒng)時間的納秒數(shù)。

compareTo方法

ScheduledFutureTask類在類的結(jié)構(gòu)上實(shí)現(xiàn)了Comparable接口,compareTo方法主要是對Comparable接口定義的compareTo方法的實(shí)現(xiàn)。源碼如下所示:

public int compareTo(Delayed other) {
 if (other == this) 
  return 0;
 if (other instanceof ScheduledFutureTask) {
  ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
  long diff = time - x.time;
  if (diff < 0)
   return -1;
  else if (diff > 0)
   return 1;
  else if (sequenceNumber < x.sequenceNumber)
   return -1;
  else
   return 1;
 }
 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

這段代碼看上去好像是對各種數(shù)值類型數(shù)據(jù)的比較,本質(zhì)上是對延遲隊列中的任務(wù)進(jìn)行排序。排序規(guī)則為:

  • 首先比較延遲隊列中每個任務(wù)下次執(zhí)行的時間,下次執(zhí)行時間距離當(dāng)前時間短的任務(wù)會排在前面;
  • 如果下次執(zhí)行任務(wù)的時間相同,則會比較任務(wù)的sequenceNumber值,sequenceNumber值小的任務(wù)會排在前面。

isPeriodic方法

isPeriodic方法的源代碼如下所示:

//判斷是否是周期性任務(wù)
public boolean isPeriodic() {
 return period != 0;
}

這個方法主要是用來判斷當(dāng)前任務(wù)是否是周期性任務(wù)。這里只要判斷運(yùn)行任務(wù)的執(zhí)行周期不等于0就能確定為周期性任務(wù)了。因?yàn)闊o論period的值是大于0還是小于0,當(dāng)前任務(wù)都是周期性任務(wù)。

setNextRunTime方法

setNextRunTime方法的作用主要是設(shè)置當(dāng)前任務(wù)下次執(zhí)行的時間,源碼如下所示:

private void setNextRunTime() {
 long p = period;
 //固定頻率,上次執(zhí)行任務(wù)的時間加上任務(wù)的執(zhí)行周期
 if (p > 0)
  time += p;
 //相對固定的延遲執(zhí)行,當(dāng)前系統(tǒng)時間加上任務(wù)的執(zhí)行周期
 else
  time = triggerTime(-p);
}

這里再一次證明了使用isPeriodic方法判斷當(dāng)前任務(wù)是否為周期性任務(wù)時,只要判斷period的值是否不等于0就可以了。

  • 因?yàn)槿绻?dāng)前任務(wù)時固定頻率執(zhí)行的周期性任務(wù),會將周期period當(dāng)作正數(shù)來處理;
  • 如果是相對固定的延遲執(zhí)行當(dāng)前任務(wù),則會將周期period當(dāng)作負(fù)數(shù)來處理。

這里我們看到在setNextRunTime方法中,調(diào)用了ScheduledThreadPoolExecutor類的triggerTime方法。接下來我們看下triggerTime方法的源碼。

ScheduledThreadPoolExecutor類的triggerTime方法

triggerTime方法用于獲取延遲隊列中的任務(wù)下一次執(zhí)行的具體時間。源碼如下所示。

private long triggerTime(long delay, TimeUnit unit) {
 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}

long triggerTime(long delay) {
 return now() +
  ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

這兩個triggerTime方法的代碼比較簡單,就是獲取下一次執(zhí)行任務(wù)的具體時間。有一點(diǎn)需要注意的是:delay < (Long.MAX_VALUE >> 1判斷delay的值是否小于Long.MAX_VALUE的一半,如果小于Long.MAX_VALUE值的一半,則直接返回delay,否則需要處理溢出的情況。

我們看到在triggerTime方法中處理防止溢出的邏輯使用了ScheduledThreadPoolExecutor類的overflowFree方法,接下來,我們就看看ScheduledThreadPoolExecutor類的overflowFree方法的實(shí)現(xiàn)。

ScheduledThreadPoolExecutor類的overflowFree方法

overflowFree方法的源代碼如下所示:

private long overflowFree(long delay) {
 //獲取隊列中的節(jié)點(diǎn)
 Delayed head = (Delayed) super.getQueue().peek();
 //獲取的節(jié)點(diǎn)不為空,則進(jìn)行后續(xù)處理
 if (head != null) {
  //從隊列節(jié)點(diǎn)中獲取延遲時間
  long headDelay = head.getDelay(NANOSECONDS);
  //如果從隊列中獲取的延遲時間小于0,并且傳遞的delay
  //值減去從隊列節(jié)點(diǎn)中獲取延遲時間小于0
  if (headDelay < 0 && (delay - headDelay < 0))
   //將delay的值設(shè)置為Long.MAX_VALUE + headDelay
   delay = Long.MAX_VALUE + headDelay;
 }
 //返回延遲時間
 return delay;
}

通過對overflowFree方法的源碼分析,可以看出overflowFree方法本質(zhì)上就是為了限制隊列中的所有節(jié)點(diǎn)的延遲時間在Long.MAX_VALUE值之內(nèi),防止在compareTo方法中溢出。

cancel方法

cancel方法的作用主要是取消當(dāng)前任務(wù)的執(zhí)行,源碼如下所示:

public boolean cancel(boolean mayInterruptIfRunning) {
 //取消任務(wù),返回任務(wù)是否取消的標(biāo)識
 boolean cancelled = super.cancel(mayInterruptIfRunning);
 //如果任務(wù)已經(jīng)取消
 //并且需要將任務(wù)從延遲隊列中刪除
 //并且任務(wù)在延遲隊列中的索引大于或者等于0
 if (cancelled && removeOnCancel && heapIndex >= 0)
  //將當(dāng)前任務(wù)從延遲隊列中刪除
  remove(this);
 //返回是否成功取消任務(wù)的標(biāo)識
 return cancelled;
}

這段代碼理解起來相對比較簡單,首先調(diào)用取消任務(wù)的方法,并返回任務(wù)是否已經(jīng)取消的標(biāo)識。如果任務(wù)已經(jīng)取消,并且需要移除任務(wù),同時,任務(wù)在延遲隊列中的索引大于或者等于0,則將當(dāng)前任務(wù)從延遲隊列中移除。最后返回任務(wù)是否成功取消的標(biāo)識。

run方法

run方法可以說是ScheduledFutureTask類的核心方法,是對Runnable接口的實(shí)現(xiàn),源碼如下所示:

public void run() {
 //當(dāng)前任務(wù)是否是周期性任務(wù)
 boolean periodic = isPeriodic();
 //線程池當(dāng)前運(yùn)行狀態(tài)下不能執(zhí)行周期性任務(wù)
 if (!canRunInCurrentRunState(periodic))
  //取消任務(wù)的執(zhí)行
  cancel(false);
 //如果不是周期性任務(wù)
 else if (!periodic)
  //則直接調(diào)用FutureTask類的run方法執(zhí)行任務(wù)
  ScheduledFutureTask.super.run();
 //如果是周期性任務(wù),則調(diào)用FutureTask類的runAndReset方法執(zhí)行任務(wù)
 //如果任務(wù)執(zhí)行成功
 else if (ScheduledFutureTask.super.runAndReset()) {
  //設(shè)置下次執(zhí)行任務(wù)的時間
  setNextRunTime();
  //重復(fù)執(zhí)行任務(wù)
  reExecutePeriodic(outerTask);
 }
}

整理一下方法的邏輯:

  1. 首先判斷當(dāng)前任務(wù)是否是周期性任務(wù)。如果線程池當(dāng)前運(yùn)行狀態(tài)下不能執(zhí)行周期性任務(wù),則取消任務(wù)的執(zhí)行,否則執(zhí)行步驟2;
  2. 如果當(dāng)前任務(wù)不是周期性任務(wù),則直接調(diào)用FutureTask類的run方法執(zhí)行任務(wù),會設(shè)置執(zhí)行結(jié)果,然后直接返回,否則執(zhí)行步驟3;
  3. 如果當(dāng)前任務(wù)是周期性任務(wù),則調(diào)用FutureTask類的runAndReset方法執(zhí)行任務(wù),不會設(shè)置執(zhí)行結(jié)果,然后直接返回,否則執(zhí)行步驟4;
  4. 如果任務(wù)執(zhí)行成功,則設(shè)置下次執(zhí)行任務(wù)的時間,同時,將任務(wù)設(shè)置為重復(fù)執(zhí)行。

這里,調(diào)用了FutureTask類的run方法和runAndReset方法,并且調(diào)用了ScheduledThreadPoolExecutor類的reExecutePeriodic方法。接下來,我們分別看下這些方法的實(shí)現(xiàn)。

FutureTask類的run方法

FutureTask類的run方法源碼如下所示:

public void run() {
    //狀態(tài)如果不是NEW,說明任務(wù)或者已經(jīng)執(zhí)行過,或者已經(jīng)被取消,直接返回
    //狀態(tài)如果是NEW,則嘗試把當(dāng)前執(zhí)行線程保存在runner字段中
    //如果賦值失敗則直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //執(zhí)行任務(wù)
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                //任務(wù)異常
                setException(ex);
            }
            if (ran)
                //任務(wù)正常執(zhí)行完畢
                set(result);
        }
    } finally {

        runner = null;
        int s = state;
        //如果任務(wù)被中斷,執(zhí)行中斷處理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

代碼的整體邏輯為:

  • 判斷當(dāng)前任務(wù)的state是否等于NEW,如果不為NEW則說明任務(wù)或者已經(jīng)執(zhí)行過,或者已經(jīng)被取消,直接返回;
  • 如果狀態(tài)為NEW則接著會通過unsafe類把任務(wù)執(zhí)行線程引用CAS的保存在runner字段中,如果保存失敗,則直接返回;
  • 執(zhí)行任務(wù);如果任務(wù)執(zhí)行發(fā)生異常,則調(diào)用setException()方法保存異常信息。

FutureTask類的runAndReset方法

方法的源碼如下所示:

protected boolean runAndReset() {
 if (state != NEW ||
  !UNSAFE.compareAndSwapObject(this, runnerOffset,
          null, Thread.currentThread()))
  return false;
 boolean ran = false;
 int s = state;
 try {
  Callable<V> c = callable;
  if (c != null && s == NEW) {
   try {
    c.call(); // don't set result
    ran = true;
   } catch (Throwable ex) {
    setException(ex);
   }
  }
 } finally {
  // runner must be non-null until state is settled to
  // prevent concurrent calls to run()
  runner = null;
  // state must be re-read after nulling runner to prevent
  // leaked interrupts
  s = state;
  if (s >= INTERRUPTING)
   handlePossibleCancellationInterrupt(s);
 }
 return ran && s == NEW;
}

FutureTask類的runAndReset方法與run方法的邏輯基本相同,只是runAndReset方法會重置當(dāng)前任務(wù)的執(zhí)行狀態(tài)。

ScheduledThreadPoolExecutor類的reExecutePeriodic方法

reExecutePeriodic重復(fù)執(zhí)行任務(wù)方法,源代碼如下所示:

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
 //線程池當(dāng)前狀態(tài)下能夠執(zhí)行任務(wù)
 if (canRunInCurrentRunState(true)) {
  //與ThreadPoolExecutor不同,這里直接把任務(wù)加入延遲隊列
        super.getQueue().add(task);//使用用的DelayedWorkQueue
  //線程池當(dāng)前狀態(tài)下不能執(zhí)行任務(wù),并且成功移除任務(wù)
  if (!canRunInCurrentRunState(true) && remove(task))
   //取消任務(wù)
   task.cancel(false);
  else
   //這里是增加一個worker線程,避免提交的任務(wù)沒有worker去執(zhí)行
            //原因就是該類沒有像ThreadPoolExecutor一樣,woker滿了才放入隊列  
   ensurePrestart();
 }
}

總體來說reExecutePeriodic方法的邏輯比較簡單,需要注意的是:調(diào)用reExecutePeriodic方法的時候已經(jīng)執(zhí)行過一次任務(wù),所以,并不會觸發(fā)線程池的拒絕策略;傳入reExecutePeriodic方法的任務(wù)一定是周期性的任務(wù)。

DelayedWorkQueue

ScheduledThreadPoolExecutor之所以要自己實(shí)現(xiàn)阻塞的工作隊列,是因?yàn)?ScheduleThreadPoolExecutor 要求的工作隊列有些特殊。

DelayedWorkQueue是一個基于堆的數(shù)據(jù)結(jié)構(gòu),類似于DelayQueue和PriorityQueue。在執(zhí)行定時任務(wù)的時候,每個任務(wù)的執(zhí)行時間都不同,所以DelayedWorkQueue的工作就是按照執(zhí)行時間的升序來排列,執(zhí)行時間距離當(dāng)前時間越近的任務(wù)在隊列的前面(注意:這里的順序并不是絕對的,堆中的排序只保證了子節(jié)點(diǎn)的下次執(zhí)行時間要比父節(jié)點(diǎn)的下次執(zhí)行時間要大,而葉子節(jié)點(diǎn)之間并不一定是順序的)。

堆結(jié)構(gòu)如下圖:

并發(fā)編程之定時任務(wù)&定時線程池原理解析

 

可見,DelayedWorkQueue是一個基于最小堆結(jié)構(gòu)的隊列。堆結(jié)構(gòu)可以使用數(shù)組表示,可以轉(zhuǎn)換成如下的數(shù)組:

并發(fā)編程之定時任務(wù)&定時線程池原理解析

 

在這種結(jié)構(gòu)中,可以發(fā)現(xiàn)有如下特性: 假設(shè)“第一個元素” 在數(shù)組中的索引為 0 的話,則父結(jié)點(diǎn)和子結(jié)點(diǎn)的位置關(guān)系如下:

  • 索引為 的左孩子的索引是 (2∗i+1);
  • 索引為 的右孩子的索引是 (2∗i+2);
  • 索引為 的父結(jié)點(diǎn)的索引是 floor((i−1)/2);

為什么要使用DelayedWorkQueue呢?

  • 定時任務(wù)執(zhí)行時需要取出最近要執(zhí)行的任務(wù),所以任務(wù)在隊列中每次出隊時一定要是當(dāng)前隊列中執(zhí)行時間最靠前的,所以自然要使用優(yōu)先級隊列。
  • DelayedWorkQueue是一個優(yōu)先級隊列,它可以保證每次出隊的任務(wù)都是當(dāng)前隊列中執(zhí)行時間最靠前的,由于它是基于堆結(jié)構(gòu)的隊列,堆結(jié)構(gòu)在執(zhí)行插入和刪除操作時的最壞時間復(fù)雜度是 O(logN)

因?yàn)榍懊嬷v阻塞隊列實(shí)現(xiàn)的時候,已經(jīng)對DelayedWorkQueue進(jìn)行了說明,更多內(nèi)容請查看《阻塞隊列 — DelayedWorkQueue源碼分析》

總結(jié)

  1. 與Timer執(zhí)行定時任務(wù)比較,相比Timer,ScheduledThreadPoolExecutor有說明優(yōu)點(diǎn)?(文章前面分析過)
  2. ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,所以它也是一個線程池,也有 coorPoolSize 和 workQueue,但是 ScheduledThreadPoolExecutor特殊的地方在于,自己實(shí)現(xiàn)了優(yōu)先工作隊列 DelayedWorkQueue ;
  3. ScheduledThreadPoolExecutor 實(shí)現(xiàn)了 ScheduledExecutorService,所以就有了任務(wù)調(diào)度的方法,如 schedule 、 scheduleAtFixedRate 、 scheduleWithFixedDelay ,同時注意他們之間的區(qū)別;
  4. 內(nèi)部類 ScheduledFutureTask 繼承者FutureTask,實(shí)現(xiàn)了任務(wù)的異步執(zhí)行并且可以獲取返回結(jié)果。同時實(shí)現(xiàn)了Delayed接口,可以通過getDelay方法獲取將要執(zhí)行的時間間隔;
  5. 周期任務(wù)的執(zhí)行其實(shí)是調(diào)用了FutureTask的 runAndReset 方法,每次執(zhí)行完不設(shè)置結(jié)果和狀態(tài)。
  6. DelayedWorkQueue的數(shù)據(jù)結(jié)構(gòu),它是一個基于最小堆結(jié)構(gòu)的優(yōu)先隊列,并且每次出隊時能夠保證取出的任務(wù)是當(dāng)前隊列中下次執(zhí)行時間最小的任務(wù)。同時注意一下優(yōu)先隊列中堆的順序,堆中的順序并不是絕對的,但要保證子節(jié)點(diǎn)的值要比父節(jié)點(diǎn)的值要大,這樣就不會影響出隊的順序。

總體來說,ScheduedThreadPoolExecutor的重點(diǎn)是要理解下次執(zhí)行時間的計算,以及優(yōu)先隊列的出隊、入隊和刪除的過程,這兩個是理解ScheduedThreadPoolExecutor的關(guān)鍵。

PS:以上代碼提交在 Github :

https://github.com/Niuh-Study/niuh-juc-final.git

文章持續(xù)更新,可以公眾號搜一搜「 一角錢技術(shù) 」第一時間閱讀, 本文 GitHub org_hejianhui/JAVAStudy 已經(jīng)收錄,歡迎 Star。

分享到:
標(biāo)簽:并發(fā) 編程
用戶無頭像

網(wǎng)友整理

注冊時間:

網(wǎng)站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運(yùn)動步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績評定2018-06-03

通用課目體育訓(xùn)練成績評定