前言
我在 2. SOFAJRaft源碼分析—JRaft的定時任務調度器是怎么做的? 這篇文章里已經講解過時間輪算法在JRaft中是怎么應用的,但是我感覺我并沒有講解清楚這個東西,導致看了這篇文章依然和沒看是一樣的,所以我打算重新說透時間輪算法。
時間輪的應用并非 JRaft 獨有,其應用場景還有很多,在 Netty、Akka、Quartz、ZooKeeper 、Kafka等組件中都存在時間輪的蹤影。
我們下面講解的時間輪的實現以JRaft中的為例子進行講解,因為JRaft這部分的代碼是參考Netty的,所以大家也可以去Netty中去尋找源碼實現。
時間輪用來解決什么問題?
如果一個系統中存在著大量的調度任務,而大量的調度任務如果每一個都使用自己的調度器來管理任務的生命周期的話,浪費cpu的資源并且很低效。
時間輪是一種高效來利用線程資源來進行批量化調度的一種調度模型。把大批量的調度任務全部都綁定到同一個的調度器上面,使用這一個調度器來進行所有任務的管理(manager),觸發(trigger)以及運行(runnable)。能夠高效的管理各種延時任務,周期任務,通知任務等等。
不過,時間輪調度器的時間精度可能不是很高,對于精度要求特別高的調度任務可能不太適合。因為時間輪算法的精度取決于,時間段“指針”單元的最小粒度大小,比如時間輪的格子是一秒跳一次,那么調度精度小于一秒的任務就無法被時間輪所調度。
時間輪結構
如圖,JRaft中時間輪(HashedWheelTimer)是一個存儲定時任務的環形隊列,底層采用數組實現,數組中的每個元素可以存放一個定時任務列表(HashedWheelBucket),HashedWheelBucket是一個環形的雙向鏈表,鏈表中的每一項表示的都是定時任務項(HashedWheelTimeout),其中封裝了真正的定時任務(TimerTask)。
時間輪由多個時間格組成,每個時間格代表當前時間輪的基本時間跨度(tickDuration)。時間輪的時間格個數是固定的,可用 wheel.length 來表示。
時間輪還有一個表盤指針(tick),用來表示時間輪當前指針跳動的次數,可以用tickDuration * (tick + 1)來表示下一次到期的任務,需要處理此時間格所對應的 HashedWheelBucket 中的所有任務。
時間輪運行邏輯
時間輪在啟動的時候會記錄一下當前啟動的時間賦值給startTime。時間輪在添加任務的時候首先會計算延遲時間(deadline),比如一個任務的延遲時間為24ms,那么會將當前的時間(currentTime)+24ms-時間輪啟動時的時間(startTime)。然后將任務封裝成HashedWheelTimeout加入到timeouts隊列中,作為緩存。
時間輪在運行的時候會將timeouts中緩存的HashedWheelTimeout任務取10萬個出來進行遍歷。
然后需要計算出幾個參數值:
- HashedWheelTimeout的總共延遲的次數:將每個任務的延遲時間(deadline)/tickDuration 計算出tick需要總共跳動的次數;
- 計算時間輪round次數:根據計算的需要走的(總次數- 當前tick數量)/ 時間格個數(wheel.length)。比如tickDuration為1ms,時間格個數為20個,那么時間輪走一圈需要20ms,那么添加進一個延時為24ms的數據,如果當前的tick為0,那么計算出的輪數為1,指針沒運行一圈就會將round取出來減一,所以需要轉動到第二輪之后才可以將輪數round減為0之后才會運行
- 計算出該任務需要放置到時間輪(wheel)的槽位,然后加入到槽位鏈表最后
將timeouts中的數據放置到時間輪wheel中之后,計算出當前時針走到的槽位的位置,并取出槽位中的鏈表數據,將deadline和當前的時間做對比,運行過期的數據。
源碼分析
構造器
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, long maxPendingTimeouts) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } //unit = MILLISECONDS if (unit == null) { throw new NullPointerException("unit"); } if (tickDuration <= 0) { throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0) { throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } // Normalize ticksPerWheel to power of two and initialize the wheel. // 創建一個HashedWheelBucket數組 // 創建時間輪基本的數據結構,一個數組。長度為不小于ticksPerWheel的最小2的n次方 wheel = createWheel(ticksPerWheel); // 這是一個標示符,用來快速計算任務應該呆的格子。 // 我們知道,給定一個deadline的定時任務,其應該呆的格子=deadline%wheel.length.但是%操作是個相對耗時的操作,所以使用一種變通的位運算代替: // 因為一圈的長度為2的n次方,mask = 2^n-1后低位將全部是1,然后deadline&mast == deadline%wheel.length // JAVA中的HashMap在進行hash之后,進行index的hash尋址尋址的算法也是和這個一樣的 mask = wheel.length - 1; // Convert tickDuration to nanos. //tickDuration傳入是1的話,這里會轉換成1000000 this.tickDuration = unit.toNanos(tickDuration); // Prevent overflow. // 校驗是否存在溢出。即指針轉動的時間間隔不能太長而導致tickDuration*wheel.length>Long.MAX_VALUE if (this.tickDuration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } //將worker包裝成thread workerThread = threadFactory.newThread(worker); //maxPendingTimeouts = -1 this.maxPendingTimeouts = maxPendingTimeouts; //如果HashedWheelTimer實例太多,那么就會打印一個error日志 if (instanceCounter.incrementAndGet() > INSTANCE_COUNT_LIMIT && warnedTooManyInstances.compareAndSet(false, true)) { reportTooManyInstances(); }}
在這個構造器中有幾個細節需要注意:
- 調用createWheel方法創建的wheel數組一定是2次方數,比如傳入的ticksPerWheel是6,那么初始化的wheel長度一定是8。這樣做是為了讓mask & tick 來計算出槽位
- tickDuration用的是納秒
- 在構造里面并不會里面啟動時間輪,而是要等到有第一個任務加入到時間輪的時候才啟動。在構造器里面會將工作線程worker封裝成workerThread
放入任務到時間輪中
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } // 如果時間輪沒有啟動,則啟動 start(); // Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // Guard against overflow. //在delay為正數的情況下,deadline是不可能為負數 //如果為負數,那么說明超過了long的最大值 if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } // 這里定時任務不是直接加到對應的格子中,而是先加入到一個隊列里,然后等到下一個tick的時候, // 會從隊列里取出最多100000個任務加入到指定的格子中 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); //Worker會去處理timeouts隊列里面的數據 timeouts.add(timeout); return timeout;}
- 如果時間輪沒有啟動,那么就調用start方法啟動時間輪,啟動時間輪之后會為startTime設置為當前時間
- 計算延遲時間deadline
- 將task任務封裝到HashedWheelTimeout中,然后添加到timeouts隊列中進行緩存
start
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);public void start() { //workerState一開始的時候是0(WORKER_STATE_INIT),然后才會設置為1(WORKER_STATE_STARTED) switch (workerStateUpdater.get(this)) { case WORKER_STATE_INIT: //使用cas來獲取啟動調度的權力,只有競爭到的線程允許來進行實例啟動 if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { //如果成功設置了workerState,那么就調用workerThread線程 workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } // 等待worker線程初始化時間輪的啟動時間 // Wait until the startTime is initialized by the worker. while (startTime == 0) { try { //這里使用countDownLauch來確保調度的線程已經被啟動 startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } }}
start方法會根據當前的workerState狀態來啟動時間輪。并且用了startTimeInitialized來控制線程的運行,如果workerThread沒有啟動起來,那么newTimeout方法會一直阻塞在運行start方法中。如果不阻塞,newTimeout方法會獲取不到startTime。
啟動時間輪
時間輪的啟動在HashedWheelTimer的內部類Worker中。調用workerThread#start方法會調用Worker的run方法啟動時間輪。
下面我們看時間輪啟動做了什么,下面的分析不考慮任務被取消的情況。
Worker#run
public void run() { // Initialize the startTime. startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; } //HashedWheelTimer的start方法會繼續往下運行 // Notify the other threads waiting for the initialization at start(). startTimeInitialized.countDown(); do { //返回的是當前的nanoTime- startTime //也就是返回的是 每 tick 一次的時間間隔 final long deadline = waitForNextTick(); if (deadline > 0) { //算出時間輪的槽位 int idx = (int) (tick & mask); //移除cancelledTimeouts中的bucket // 從bucket中移除timeout processCancelledTasks(); HashedWheelBucket bucket = wheel[idx]; // 將newTimeout()方法中加入到待處理定時任務隊列中的任務加入到指定的格子中 transferTimeoutsToBuckets(); bucket.expireTimeouts(deadline); tick++; } // 校驗如果workerState是started狀態,那么就一直循環 } while (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // Fill the unprocessedTimeouts so we can return them from stop() method. for (HashedWheelBucket bucket : wheel) { bucket.clearTimeouts(unprocessedTimeouts); } for (;;) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } //如果有沒有被處理的timeout,那么加入到unprocessedTimeouts對列中 if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } //處理被取消的任務 processCancelledTasks();}
- 時間輪運行的時候首先會記錄一下啟動時間(startTime),然后調用startTimeInitialized釋放外層的等待線程;
- 進入dowhile循環,調用waitForNextTick睡眠等待到下一次的tick指針的跳動,并返回當前時間減去startTime作為deadline
- 由于mask= wheel.length -1 ,wheel是2的次方數,所以可以直接用tick & mask 計算出此次在wheel中的槽位
- 調用processCancelledTasks將cancelledTimeouts隊列中的任務取出來,并將當前的任務從時間輪中移除
- 調用transferTimeoutsToBuckets方法將timeouts隊列中緩存的數據取出加入到時間輪中
- 運行目前指針指向的槽位中的bucket鏈表數據
時間輪指針跳動
waitForNextTick
//sleep, 直到下次tick到來, 然后返回該次tick和啟動時間之間的時長private long waitForNextTick() { //tickDuration這里是100000 //tick表示總tick數 long deadline = tickDuration * (tick + 1); for (;;) { final long currentTime = System.nanoTime() - startTime; // 計算需要sleep的時間, 之所以加999999后再除10000000,前面是1所以這里需要減去1, // 才能計算準確,還有通過這里可以看到 其實線程是以睡眠一定的時候再來執行下一個ticket的任務的, //這樣如果ticket的間隔設置的太小的話,系統會頻繁的睡眠然后啟動, //其實感覺影響部分的性能,所以為了更好的利用系統資源步長可以稍微設置大點 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; //sleepTimeMs小于零表示走到了下一個時間輪位置 if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } } // Check if we run on windows, as if thats the case we will need // to round the sleepTime as workaround for a bug that only affect // the JVM if it runs on windows. // // See https://github.com/netty/netty/issues/356 if (Platform.isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; } try { Thread.sleep(sleepTimeMs); } catch (InterruptedException ignored) { if (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } }}
可以想象一下在時鐘的秒鐘上面秒與秒之間的時間是需要等待的,那么waitForNextTick這個方法就是根據當前的時間計算出跳動到下個時間的間隔時間,并進行sleep操作,然后返回當前時間距離時間輪啟動時間的時間段。
轉移任務到時間輪中
在調用時間輪的方法加入任務的時候并沒有直接加入到時間輪中,而是緩存到了timeouts隊列中,所以在運行的時候需要將timeouts隊列中的任務轉移到時間輪數據的鏈表中
transferTimeoutsToBuckets
private void transferTimeoutsToBuckets() { // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just // adds new timeouts in a loop. // 每次tick只處理10w個任務,以免阻塞worker線程 for (int i = 0; i < 100000; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { // all processed break; } //已經被取消了; if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { // Was cancelled in the meantime. continue; } //calculated = tick 次數 long calculated = timeout.deadline / tickDuration; // 計算剩余的輪數, 只有 timer 走夠輪數, 并且到達了 task 所在的 slot, task 才會過期 timeout.remainingRounds = (calculated - tick) / wheel.length; //如果任務在timeouts隊列里面放久了, 以至于已經過了執行時間, 這個時候就使用當前tick, 也就是放到當前bucket, 此方法調用完后就會被執行 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past. //// 算出任務應該插入的 wheel 的 slot, slotIndex = tick 次數 & mask, mask = wheel.length - 1 int stopIndex = (int) (ticks & mask); HashedWheelBucket bucket = wheel[stopIndex]; //將timeout加入到bucket鏈表中 bucket.addTimeout(timeout); }}
在這個轉移方法中,寫死了一個循環,每次都只轉移10萬個任務。
然后根據HashedWheelTimeout的deadline延遲時間計算出時間輪需要運行多少次才能運行當前的任務,如果當前的任務延遲時間大于時間輪跑一圈所需要的時間,那么就計算需要跑幾圈才能到這個任務運行。
最后計算出該任務在時間輪中的槽位,添加到時間輪的鏈表中。
運行時間輪中的任務
當指針跳到時間輪的槽位的時間,會將槽位的HashedWheelBucket取出來,然后遍歷鏈表,運行其中到期的任務。
expireTimeouts
// 過期并執行格子中的到期任務,tick到該格子的時候,worker線程會調用這個方法//根據deadline和remainingRounds判斷任務是否過期public void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; // process all timeouts //遍歷格子中的所有定時任務 while (timeout != null) { // 先保存next,因為移除后next將被設置為null HashedWheelTimeout next = timeout.next; if (timeout.remainingRounds <= 0) { //從bucket鏈表中移除當前timeout,并返回鏈表中下一個timeout next = remove(timeout); //如果timeout的時間小于當前的時間,那么就調用expire執行task if (timeout.deadline <= deadline) { timeout.expire(); } else { //不可能發生的情況,就是說round已經為0了,deadline卻>當前槽的deadline // The timeout was placed into a wrong slot. This should never hAppen. throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { next = remove(timeout); } else { //因為當前的槽位已經過了,說明已經走了一圈了,把輪數減一 timeout.remainingRounds--; } //把指針放置到下一個timeout timeout = next; }}
HashedWheelBucket是一個鏈表,所以我們需要從head節點往下進行遍歷。如果鏈表沒有遍歷到鏈表尾部那么就繼續往下遍歷。
獲取的timeout節點節點,如果剩余輪數remainingRounds大于0,那么就說明要到下一圈才能運行,所以將剩余輪數減一;
如果當前剩余輪數小于等于零了,那么就將當前節點從bucket鏈表中移除,并判斷一下當前的時間是否大于timeout的延遲時間,如果是則調用timeout的expire執行任務。






