本文筆者來為大家介紹?.NETty的核心引擎Reactor的運(yùn)轉(zhuǎn)架構(gòu),希望通過本文的介紹能夠讓大家對(duì)Reactor是如何驅(qū)動(dòng)著整個(gè)Netty框架的運(yùn)轉(zhuǎn)有一個(gè)全面的認(rèn)識(shí)。也為我們后續(xù)進(jìn)一步介紹Netty關(guān)于處理網(wǎng)絡(luò)請(qǐng)求的整個(gè)生命周期的相關(guān)內(nèi)容做一個(gè)前置知識(shí)的鋪墊,方便大家后續(xù)理解。
那么在開始本文正式的內(nèi)容之前,筆者先來帶著大家回顧下前邊文章介紹的關(guān)于Netty整個(gè)框架如何搭建的相關(guān)內(nèi)容,沒有看過筆者前邊幾篇文章的讀者朋友也沒關(guān)系,這些并不會(huì)影響到本文的閱讀,只不過涉及到相關(guān)細(xì)節(jié)的部分,大家可以在回看下。
前文回顧
在《聊聊Netty那些事兒之Reactor在Netty中的實(shí)現(xiàn)(創(chuàng)建篇)》一文中,我們介紹了Netty服務(wù)端的核心引擎主從Reactor線程組的創(chuàng)建過程以及相關(guān)核心組件里的重要屬性。在這個(gè)過程中,我們還提到了Netty對(duì)各種細(xì)節(jié)進(jìn)行的優(yōu)化,比如針對(duì)JDK NIO 原生Selector做的一些優(yōu)化,展現(xiàn)了Netty對(duì)性能極致的追求。最終我們創(chuàng)建出了如下結(jié)構(gòu)的Reactor。
在上篇文章《詳細(xì)圖解Netty Reactor啟動(dòng)全流程》中,我們完整地介紹了Netty服務(wù)端啟動(dòng)的整個(gè)流程,并介紹了在啟動(dòng)過程中涉及到的ServerBootstrap相關(guān)的屬性以及配置方式。用于接收連接的服務(wù)端NIOServerSocketChannel的創(chuàng)建和初始化過程以及其類的繼承結(jié)構(gòu)。其中重點(diǎn)介紹了NioServerSocketChannel向Reactor的注冊(cè)過程以及Reactor線程的啟動(dòng)時(shí)機(jī)和pipeline的初始化時(shí)機(jī)。最后介紹了NioServerSocketChannel綁定端口地址的整個(gè)流程。在這個(gè)過程中我們了解了Netty的這些核心組件是如何串聯(lián)起來的。
當(dāng)Netty啟動(dòng)完畢后,我們得到了如下的框架結(jié)構(gòu):
主Reactor線程組中管理的是NioServerSocketChannel用于接收客戶端連接,并在自己的pipeline中的ServerBootstrapAcceptor里初始化接收到的客戶端連接,隨后會(huì)將初始化好的客戶端連接注冊(cè)到從Reactor線程組中。
從Reactor線程組主要負(fù)責(zé)監(jiān)聽處理注冊(cè)其上的所有客戶端連接的IO就緒事件。
其中一個(gè)Channel只能分配給一個(gè)固定的Reactor。一個(gè)Reactor負(fù)責(zé)處理多個(gè)Channel上的IO就緒事件,這樣可以將服務(wù)端承載的全量客戶端連接分?jǐn)偟蕉鄠€(gè)Reactor中處理,同時(shí)也能保證Channel上IO處理的線程安全性。Reactor與Channel之間的對(duì)應(yīng)關(guān)系如下圖所示:
以上內(nèi)容就是對(duì)筆者前邊幾篇文章的相關(guān)內(nèi)容回顧,大家能回憶起來更好,回憶不起來也沒關(guān)系,一點(diǎn)也不影響大家理解本文的內(nèi)容。如果對(duì)相關(guān)細(xì)節(jié)感興趣的同學(xué),可以在閱讀完本文之后,在去回看下。
我們言歸正傳,正式開始本文的內(nèi)容,筆者接下來會(huì)為大家介紹這些核心組件是如何相互配合從而驅(qū)動(dòng)著整個(gè)Netty Reactor框架運(yùn)轉(zhuǎn)的。
當(dāng)Netty Reactor框架啟動(dòng)完畢后,接下來第一件事情也是最重要的事情就是如何來高效的接收客戶端的連接。
那么在探討Netty服務(wù)端如何接收連接之前,我們需要弄清楚Reactor線程的運(yùn)行機(jī)制,它是如何監(jiān)聽并處理Channel上的IO就緒事件的。
本文相當(dāng)于是后續(xù)我們介紹Reactor線程監(jiān)聽處理ACCEPT事件,Read事件,Write事件的前置篇,本文專注于講述Reactor線程的整個(gè)運(yùn)行框架。理解了本文的內(nèi)容,對(duì)理解后面Reactor線程如何處理IO事件會(huì)大有幫助。
我們?cè)贜etty框架的創(chuàng)建階段和啟動(dòng)階段無數(shù)次的提到了Reactor線程,那么在本文要介紹的運(yùn)行階段就該這個(gè)Reactor線程來大顯神威了。
經(jīng)過前邊文章的介紹,我們了解到Netty中的Reactor線程主要干三件事情:
- 輪詢注冊(cè)在Reactor上的所有Channel感興趣的IO就緒事件。
- 處理Channel上的IO就緒事件。
- 執(zhí)行Netty中的異步任務(wù)。
正是這三個(gè)部分組成了Reactor的運(yùn)行框架,那么我們現(xiàn)在來看下這個(gè)運(yùn)行框架具體是怎么運(yùn)轉(zhuǎn)的~~
Reactor線程的整個(gè)運(yùn)行框架
大家還記不記得筆者在《聊聊Netty那些事兒之從內(nèi)核角度看IO模型》一文中提到的,IO模型的演變是圍繞著"如何用盡可能少的線程去管理盡可能多的連接"這一主題進(jìn)行的。
Netty的IO模型是通過JDK NIO Selector實(shí)現(xiàn)的IO多路復(fù)用模型,而Netty的IO線程模型為主從Reactor線程模型。
根據(jù)《聊聊Netty那些事兒之從內(nèi)核角度看IO模型》一文中介紹的IO多路復(fù)用模型我們很容易就能理解到Netty會(huì)使用一個(gè)用戶態(tài)的Reactor線程去不斷的通過Selector在內(nèi)核態(tài)去輪訓(xùn)Channel上的IO就緒事件。
說白了Reactor線程其實(shí)執(zhí)行的就是一個(gè)死循環(huán),在死循環(huán)中不斷的通過Selector去輪訓(xùn)IO就緒事件,如果發(fā)生IO就緒事件則從Selector系統(tǒng)調(diào)用中返回并處理IO就緒事件,如果沒有發(fā)生IO就緒事件則一直阻塞在Selector系統(tǒng)調(diào)用上,直到滿足Selector喚醒條件。
以下三個(gè)條件中只要滿足任意一個(gè)條件,Reactor線程就會(huì)被從Selector上喚醒:
- 當(dāng)Selector輪詢到有IO活躍事件發(fā)生時(shí)。
- 當(dāng)Reactor線程需要執(zhí)行的定時(shí)任務(wù)到達(dá)任務(wù)執(zhí)行時(shí)間deadline時(shí)。
- 當(dāng)有異步任務(wù)提交給Reactor時(shí),Reactor線程需要從Selector上被喚醒,這樣才能及時(shí)的去執(zhí)行異步任務(wù)。
這里可以看出Netty對(duì)Reactor線程的壓榨還是比較狠的,反正現(xiàn)在也沒有IO就緒事件需要去處理,不能讓Reactor線程在這里白白等著,要立即喚醒它,轉(zhuǎn)去處理提交過來的異步任務(wù)以及定時(shí)任務(wù)。Reactor線程堪稱996典范一刻不停歇地運(yùn)作著。
在了解了Reactor線程的大概運(yùn)行框架后,我們接下來就到源碼中去看下它的核心運(yùn)轉(zhuǎn)框架是如何實(shí)現(xiàn)出來的。
由于這塊源碼比較龐大繁雜,所以筆者先把它的運(yùn)行框架提取出來,方便大家整體的理解整個(gè)運(yùn)行過程的全貌。
上圖所展示的就是Reactor整個(gè)工作體系的全貌,主要分為如下幾個(gè)重要的工作模塊:
- Reactor線程在Selector上阻塞獲取IO就緒事件。在這個(gè)模塊中首先會(huì)去檢查當(dāng)前是否有異步任務(wù)需要執(zhí)行,如果有異步需要執(zhí)行,那么不管當(dāng)前有沒有IO就緒事件都不能阻塞在Selector上,隨后會(huì)去非阻塞的輪詢一下Selector上是否有IO就緒事件,如果有,正好可以和異步任務(wù)一起執(zhí)行。優(yōu)先處理IO就緒事件,在執(zhí)行異步任務(wù)。
- 如果當(dāng)前沒有異步任務(wù)需要執(zhí)行,那么Reactor線程會(huì)接著查看是否有定時(shí)任務(wù)需要執(zhí)行,如果有則在Selector上阻塞直到定時(shí)任務(wù)的到期時(shí)間deadline,或者滿足其他喚醒條件被喚醒。如果沒有定時(shí)任務(wù)需要執(zhí)行,Reactor線程則會(huì)在Selector上一直阻塞直到滿足喚醒條件。
- 當(dāng)Reactor線程滿足喚醒條件被喚醒后,首先會(huì)去判斷當(dāng)前是因?yàn)橛蠭O就緒事件被喚醒還是因?yàn)橛挟惒饺蝿?wù)需要執(zhí)行被喚醒或者是兩者都有。隨后Reactor線程就會(huì)去處理IO就緒事件和執(zhí)行異步任務(wù)。
- 最后Reactor線程返回循環(huán)起點(diǎn)不斷的重復(fù)上述三個(gè)步驟。
以上就是Reactor線程運(yùn)行的整個(gè)核心邏輯,下面是筆者根據(jù)上述核心邏輯,將Reactor的整體代碼設(shè)計(jì)框架提取出來,大家可以結(jié)合上邊的Reactor工作流程圖,從總體上先感受下整個(gè)源碼實(shí)現(xiàn)框架,能夠把Reactor的核心處理步驟和代碼中相應(yīng)的處理模塊對(duì)應(yīng)起來即可,這里不需要讀懂每一行代碼,要以邏輯處理模塊為單位理解。后面筆者會(huì)將這些一個(gè)一個(gè)的邏輯處理模塊在單獨(dú)拎出來為大家詳細(xì)介紹。
@Override
protected void run() {
//記錄輪詢次數(shù) 用于解決JDK epoll的空輪訓(xùn)bug
int selectCnt = 0;
for (;;) {
try {
//輪詢結(jié)果
int strategy;
try {
//根據(jù)輪詢策略獲取輪詢結(jié)果 這里的hasTasks()主要檢查的是普通隊(duì)列和尾部隊(duì)列中是否有異步任務(wù)等待執(zhí)行
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// NIO不支持自旋(BUSY_WAIT)
case SelectStrategy.SELECT:
核心邏輯是有任務(wù)需要執(zhí)行,則Reactor線程立馬執(zhí)行異步任務(wù),如果沒有異步任務(wù)執(zhí)行,則進(jìn)行輪詢IO事件
default:
}
} catch (IOException e) {
................省略...............
}
執(zhí)行到這里說明滿足了喚醒條件,Reactor線程從selector上被喚醒開始處理IO就緒事件和執(zhí)行異步任務(wù)
/**
* Reactor線程需要保證及時(shí)的執(zhí)行異步任務(wù),只要有異步任務(wù)提交,就需要退出輪詢。
* 有IO事件就優(yōu)先處理IO事件,然后處理異步任務(wù)
* */
selectCnt++;
//主要用于從IO就緒的SelectedKeys集合中剔除已經(jīng)失效的selectKey
needsToSelectAgain = false;
//調(diào)整Reactor線程執(zhí)行IO事件和執(zhí)行異步任務(wù)的CPU時(shí)間比例 默認(rèn)50,表示執(zhí)行IO事件和異步任務(wù)的時(shí)間比例是一比一
final int ioRatio = this.ioRatio;
這里主要處理IO就緒事件,以及執(zhí)行異步任務(wù)
需要優(yōu)先處理IO就緒事件,然后根據(jù)ioRatio設(shè)置的處理IO事件CPU用時(shí)與異步任務(wù)CPU用時(shí)比例,
來決定執(zhí)行多長時(shí)間的異步任務(wù)
//判斷是否觸發(fā)JDK Epoll BUG 觸發(fā)空輪詢
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
//既沒有IO就緒事件,也沒有異步任務(wù),Reactor線程從Selector上被異常喚醒 觸發(fā)JDK Epoll空輪訓(xùn)BUG
//重新構(gòu)建Selector,selectCnt歸零
selectCnt = 0;
}
} catch (CancelledKeyException e) {
................省略...............
} catch (Error e) {
................省略...............
} catch (Throwable t) {
................省略...............
} finally {
................省略...............
}
}
}
折疊
從上面提取出來的Reactor的源碼實(shí)現(xiàn)框架中,我們可以看出Reactor線程主要做了下面幾個(gè)事情:
- 通過JDK NIO Selector輪詢注冊(cè)在Reactor上的所有Channel感興趣的IO事件。對(duì)于NioServerSocketChannel來說因?yàn)樗饕?fù)責(zé)接收客戶端連接所以監(jiān)聽的是OP_ACCEPT事件,對(duì)于客戶端NioSocketChannel來說因?yàn)樗饕?fù)責(zé)處理連接上的讀寫事件所以監(jiān)聽的是OP_READ和OP_WRITE事件。
這里需要注意的是netty只會(huì)自動(dòng)注冊(cè)O(shè)P_READ事件,而OP_WRITE事件是在當(dāng)Socket寫入緩沖區(qū)以滿無法繼續(xù)寫入發(fā)送數(shù)據(jù)時(shí)由用戶自己注冊(cè)。
- 如果有異步任務(wù)需要執(zhí)行,則立馬停止輪詢操作,轉(zhuǎn)去執(zhí)行異步任務(wù)。這里分為兩種情況:
- 既有IO就緒事件發(fā)生,也有異步任務(wù)需要執(zhí)行。則優(yōu)先處理IO就緒事件,然后根據(jù)ioRatio設(shè)置的執(zhí)行時(shí)間比例決定執(zhí)行多長時(shí)間的異步任務(wù)。這里Reactor線程需要控制異步任務(wù)的執(zhí)行時(shí)間,因?yàn)镽eactor線程的核心是處理IO就緒事件,不能因?yàn)楫惒饺蝿?wù)的執(zhí)行而耽誤了最重要的事情。
- 沒有IO就緒事件發(fā)生,但是有異步任務(wù)或者定時(shí)任務(wù)到期需要執(zhí)行。則只執(zhí)行異步任務(wù),盡可能的去壓榨Reactor線程。沒有IO就緒事件發(fā)生也不能閑著。
- 這里第二種情況下只會(huì)執(zhí)行64個(gè)異步任務(wù),目的是為了防止過度執(zhí)行異步任務(wù),耽誤了最重要的事情輪詢IO事件。
- 在最后Netty會(huì)判斷本次Reactor線程的喚醒是否是由于觸發(fā)了JDK epoll 空輪詢 BUG導(dǎo)致的,如果觸發(fā)了該BUG,則重建Selector。繞過JDK BUG,達(dá)到解決問題的目的。
正常情況下Reactor線程從Selector中被喚醒有兩種情況:
輪詢到有IO就緒事件發(fā)生。有異步任務(wù)或者定時(shí)任務(wù)需要執(zhí)行。
而JDK epoll 空輪詢 BUG會(huì)在上述兩種情況都沒有發(fā)生的時(shí)候,Reactor線程會(huì)意外的從Selector中被喚醒,導(dǎo)致CPU空轉(zhuǎn)。
JDK epoll 空輪詢 BUG:
https://bugs.JAVA.com/bugdatabase/view_bug.do?bug_id=6670302
好了,Reactor線程的總體運(yùn)行結(jié)構(gòu)框架我們現(xiàn)在已經(jīng)了解了,下面我們來深入到這些核心處理模塊中來各個(gè)擊破它們~~
1. Reactor線程輪詢IO就緒事件
在《聊聊Netty那些事兒之Reactor在Netty中的實(shí)現(xiàn)(創(chuàng)建篇)》一文中,筆者在講述主從Reactor線程組NioEventLoopGroup的創(chuàng)建過程的時(shí)候,提到一個(gè)構(gòu)造器參數(shù)SelectStrategyFactory 。
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
Reactor線程最重要的一件事情就是輪詢IO就緒事件,SelectStrategyFactory 就是用于指定輪詢策略的,默認(rèn)實(shí)現(xiàn)為
DefaultSelectStrategyFactory.INSTANCE。
而在Reactor線程開啟輪詢的一開始,就是用這個(gè)selectStrategy 去計(jì)算一個(gè)輪詢策略strategy ,后續(xù)會(huì)根據(jù)這個(gè)strategy進(jìn)行不同的邏輯處理。
@Override
protected void run() {
//記錄輪詢次數(shù) 用于解決JDK epoll的空輪訓(xùn)bug
int selectCnt = 0;
for (;;) {
try {
//輪詢結(jié)果
int strategy;
try {
//根據(jù)輪詢策略獲取輪詢結(jié)果 這里的hasTasks()主要檢查的是普通隊(duì)列和尾部隊(duì)列中是否有異步任務(wù)等待執(zhí)行
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// NIO不支持自旋(BUSY_WAIT)
case SelectStrategy.SELECT:
核心邏輯是有任務(wù)需要執(zhí)行,則Reactor線程立馬執(zhí)行異步任務(wù),如果沒有異步任務(wù)執(zhí)行,則進(jìn)行輪詢IO事件
default:
}
} catch (IOException e) {
................省略...............
}
................省略...............
}
下面我們來看這個(gè)輪詢策略strategy 具體的計(jì)算邏輯是什么樣的?
1.1 輪詢策略
public interface SelectStrategy {
/**
* Indicates a blocking select should follow.
*/
int SELECT = -1;
/**
* Indicates the IO loop should be retried, no blocking select to follow directly.
*/
int CONTINUE = -2;
/**
* Indicates the IO loop to poll for new events without blocking.
*/
int BUSY_WAIT = -3;
int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}
我們首先來看下Netty中定義的這三種輪詢策略:
- SelectStrategy.SELECT:此時(shí)沒有任何異步任務(wù)需要執(zhí)行,Reactor線程可以安心的阻塞在Selector上等待IO就緒事件的來臨。
- SelectStrategy.CONTINUE:重新開啟一輪IO輪詢。
- SelectStrategy.BUSY_WAIT: Reactor線程進(jìn)行自旋輪詢,由于NIO 不支持自旋操作,所以這里直接跳到SelectStrategy.SELECT策略。
下面我們來看下輪詢策略的計(jì)算邏輯calculateStrategy :
final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() { }
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
/**
* Reactor線程要保證及時(shí)的執(zhí)行異步任務(wù)
* 1:如果有異步任務(wù)等待執(zhí)行,則馬上執(zhí)行selectNow()非阻塞輪詢一次IO就緒事件
* 2:沒有異步任務(wù),則跳到switch select分支
* */
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}
- 在Reactor線程的輪詢工作開始之前,需要首先判斷下當(dāng)前是否有異步任務(wù)需要執(zhí)行。判斷依據(jù)就是查看Reactor中的異步任務(wù)隊(duì)列taskQueue和用于統(tǒng)計(jì)信息任務(wù)用的尾部隊(duì)列tailTask是否有異步任務(wù)。
@Override
protected boolean hasTasks() {
return super.hasTasks() || !tailTasks.isEmpty();
}
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}
- 如果Reactor中有異步任務(wù)需要執(zhí)行,那么Reactor線程需要立即執(zhí)行,不能阻塞在Selector上。在返回前需要再順帶調(diào)用selectNow()非阻塞查看一下當(dāng)前是否有IO就緒事件發(fā)生。如果有,那么正好可以和異步任務(wù)一起被處理,如果沒有,則及時(shí)地處理異步任務(wù)。
這里Netty要表達(dá)的語義是:首先Reactor線程需要優(yōu)先保證IO就緒事件的處理,然后在保證異步任務(wù)的及時(shí)執(zhí)行。如果當(dāng)前沒有IO就緒事件但是有異步任務(wù)需要執(zhí)行時(shí),Reactor線程就要去及時(shí)執(zhí)行異步任務(wù)而不是繼續(xù)阻塞在Selector上等待IO就緒事件。
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
int selectNow() throws IOException {
//非阻塞
return selector.selectNow();
}
- 如果當(dāng)前Reactor線程沒有異步任務(wù)需要執(zhí)行,那么calculateStrategy 方法直接返回SelectStrategy.SELECT也就是SelectStrategy接口中定義的常量-1。當(dāng)calculateStrategy 方法通過selectNow()返回非零數(shù)值時(shí),表示此時(shí)有IO就緒的Channel,返回的數(shù)值表示有多少個(gè)IO就緒的Channel。
@Override
protected void run() {
//記錄輪詢次數(shù) 用于解決JDK epoll的空輪訓(xùn)bug
int selectCnt = 0;
for (;;) {
try {
//輪詢結(jié)果
int strategy;
try {
//根據(jù)輪詢策略獲取輪詢結(jié)果 這里的hasTasks()主要檢查的是普通隊(duì)列和尾部隊(duì)列中是否有異步任務(wù)等待執(zhí)行
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// NIO不支持自旋(BUSY_WAIT)
case SelectStrategy.SELECT:
核心邏輯是有任務(wù)需要執(zhí)行,則Reactor線程立馬執(zhí)行異步任務(wù),如果沒有異步任務(wù)執(zhí)行,則進(jìn)行輪詢IO事件
default:
}
} catch (IOException e) {
................省略...............
}
................處理IO就緒事件以及執(zhí)行異步任務(wù)...............
}
從默認(rèn)的輪詢策略我們可以看出
selectStrategy.calculateStrategy只會(huì)返回三種情況:
- 返回 -1: switch邏輯分支進(jìn)入SelectStrategy.SELECT分支,表示此時(shí)Reactor中沒有異步任務(wù)需要執(zhí)行,Reactor線程可以安心的阻塞在Selector上等待IO就緒事件發(fā)生。
- 返回 0: switch邏輯分支進(jìn)入default分支,表示此時(shí)Reactor中沒有IO就緒事件但是有異步任務(wù)需要執(zhí)行,流程通過default分支直接進(jìn)入了處理異步任務(wù)的邏輯部分。
- 返回 > 0:switch邏輯分支進(jìn)入default分支,表示此時(shí)Reactor中既有IO就緒事件發(fā)生也有異步任務(wù)需要執(zhí)行,流程通過default分支直接進(jìn)入了處理IO就緒事件和執(zhí)行異步任務(wù)邏輯部分。
現(xiàn)在Reactor的流程處理邏輯走向我們清楚了,那么接下來我們把重點(diǎn)放在SelectStrategy.SELECT分支中的輪詢邏輯上。這塊是Reactor監(jiān)聽IO就緒事件的核心。
1.2 輪詢邏輯
case SelectStrategy.SELECT:
//當(dāng)前沒有異步任務(wù)執(zhí)行,Reactor線程可以放心的阻塞等待IO就緒事件
//從定時(shí)任務(wù)隊(duì)列中取出即將快要執(zhí)行的定時(shí)任務(wù)deadline
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
// -1代表當(dāng)前定時(shí)任務(wù)隊(duì)列中沒有定時(shí)任務(wù)
curDeadlineNanos = NONE; // nothing on the calendar
}
//最早執(zhí)行定時(shí)任務(wù)的deadline作為 select的阻塞時(shí)間,意思是到了定時(shí)任務(wù)的執(zhí)行時(shí)間
//不管有無IO就緒事件,必須喚醒selector,從而使reactor線程執(zhí)行定時(shí)任務(wù)
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
//再次檢查普通任務(wù)隊(duì)列中是否有異步任務(wù)
//沒有的話開始select阻塞輪詢IO就緒事件
strategy = select(curDeadlineNanos);
}
} finally {
// 執(zhí)行到這里說明Reactor已經(jīng)從Selector上被喚醒了
// 設(shè)置Reactor的狀態(tài)為蘇醒狀態(tài)AWAKE
// lazySet優(yōu)化不必要的volatile操作,不使用內(nèi)存屏障,不保證寫操作的可見性(單線程不需要保證)
nextWakeupNanos.lazySet(AWAKE);
}
流程走到這里,說明現(xiàn)在Reactor上沒有任何事情可做,可以安心的阻塞在Selector上等待IO就緒事件到來。
那么Reactor線程到底應(yīng)該在Selector上阻塞多久呢??
在回答這個(gè)問題之前,我們?cè)诨仡櫹隆读牧腘etty那些事兒之Reactor在Netty中的實(shí)現(xiàn)(創(chuàng)建篇)》一文中在講述Reactor的創(chuàng)建時(shí)提到,Reactor線程除了要輪詢Channel上的IO就緒事件,以及處理IO就緒事件外,還有一個(gè)任務(wù)就是負(fù)責(zé)執(zhí)行Netty框架中的異步任務(wù)。
而Netty框架中的異步任務(wù)分為三類:
- 存放在普通任務(wù)隊(duì)列taskQueue中的普通異步任務(wù)。
- 存放在尾部隊(duì)列tailTasks 中的用于執(zhí)行統(tǒng)計(jì)任務(wù)等收尾動(dòng)作的尾部任務(wù)。
- 還有一種就是這里即將提到的定時(shí)任務(wù)。存放在Reactor中的定時(shí)任務(wù)隊(duì)列scheduledTaskQueue中。
從ReactorNioEventLoop類中的繼承結(jié)構(gòu)我們也可以看出,Reactor具備執(zhí)行定時(shí)任務(wù)的能力。
既然Reactor需要執(zhí)行定時(shí)任務(wù),那么它就不能一直阻塞在Selector上無限等待IO就緒事件。
那么我們回到本小節(jié)一開始提到的問題上,為了保證Reactor能夠及時(shí)地執(zhí)行定時(shí)任務(wù),Reactor線程需要在即將要執(zhí)行的的第一個(gè)定時(shí)任務(wù)deadline到達(dá)之前被喚醒。
所以在Reactor線程開始輪詢IO就緒事件之前,我們需要首先計(jì)算出來Reactor線程在Selector上的阻塞超時(shí)時(shí)間。
1.2.1 Reactor的輪詢超時(shí)時(shí)間
首先我們需要從Reactor的定時(shí)任務(wù)隊(duì)列scheduledTaskQueue 中取出即將快要執(zhí)行的定時(shí)任務(wù)deadline。將這個(gè)deadline作為Reactor線程在Selector上輪詢的超時(shí)時(shí)間。這樣可以保證在定時(shí)任務(wù)即將要執(zhí)行時(shí),Reactor現(xiàn)在可以及時(shí)的從Selector上被喚醒。
private static final long AWAKE = -1L;
private static final long NONE = Long.MAX_VALUE;
// nextWakeupNanos is:
// AWAKE when EL is awake
// NONE when EL is waiting with no wakeup scheduled
// other value T when EL is waiting with wakeup scheduled at time T
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
// -1代表當(dāng)前定時(shí)任務(wù)隊(duì)列中沒有定時(shí)任務(wù)
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
protected final long nextScheduledTaskDeadlineNanos() {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
}
final ScheduledFutureTask<?> peekScheduledTask() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
}
}
nextScheduledTaskDeadlineNanos 方法會(huì)返回當(dāng)前Reactor定時(shí)任務(wù)隊(duì)列中最近的一個(gè)定時(shí)任務(wù)deadline時(shí)間點(diǎn),如果定時(shí)任務(wù)隊(duì)列中沒有定時(shí)任務(wù),則返回-1。
NioEventLoop中nextWakeupNanos 變量用來存放Reactor從Selector上被喚醒的時(shí)間點(diǎn),設(shè)置為最近需要被執(zhí)行定時(shí)任務(wù)的deadline,如果當(dāng)前并沒有定時(shí)任務(wù)需要執(zhí)行,那么就設(shè)置為Long.MAX_VALUE一直阻塞,直到有IO就緒事件到達(dá)或者有異步任務(wù)需要執(zhí)行。
1.2.2 Reactor開始輪詢IO就緒事件
if (!hasTasks()) {
//再次檢查普通任務(wù)隊(duì)列中是否有異步任務(wù), 沒有的話 開始select阻塞輪詢IO就緒事件
strategy = select(curDeadlineNanos);
}
在Reactor線程開始阻塞輪詢IO就緒事件之前還需要再次檢查一下是否有異步任務(wù)需要執(zhí)行。
如果此時(shí)恰巧有異步任務(wù)提交,就需要停止IO就緒事件的輪詢,轉(zhuǎn)去執(zhí)行異步任務(wù)。如果沒有異步任務(wù),則正式開始輪詢IO就緒事件。
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
//無定時(shí)任務(wù),無普通任務(wù)執(zhí)行時(shí),開始輪詢IO就緒事件,沒有就一直阻塞 直到喚醒條件成立
return selector.select();
}
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
如果deadlineNanos == NONE,經(jīng)過上小節(jié)的介紹,我們知道NONE
表示當(dāng)前Reactor中并沒有定時(shí)任務(wù),所以可以安心的阻塞在Selector上等待IO就緒事件到來。
selector.select()調(diào)用是一個(gè)阻塞調(diào)用,如果沒有IO就緒事件,Reactor線程就會(huì)一直阻塞在這里直到IO就緒事件到來。這里占時(shí)不考慮前邊提到的JDK NIO Epoll的空輪詢BUG.
讀到這里那么問題來了,此時(shí)Reactor線程正阻塞在selector.select()調(diào)用上等待IO就緒事件的到來,如果此時(shí)正好有異步任務(wù)被提交到Reactor中需要執(zhí)行,并且此時(shí)無任何IO就緒事件,而Reactor線程由于沒有IO就緒事件到來,會(huì)繼續(xù)在這里阻塞,那么如何去執(zhí)行異步任務(wù)呢??
解鈴還須系鈴人,既然異步任務(wù)在被提交后希望立馬得到執(zhí)行,那么就在提交異步任務(wù)的時(shí)候去喚醒Reactor線程。
//addTaskWakesUp = true 表示 當(dāng)且僅當(dāng)只有調(diào)用addTask方法時(shí) 才會(huì)喚醒Reactor線程
//addTaskWakesUp = false 表示 并不是只有addTask方法才能喚醒Reactor 還有其他方法可以喚醒Reactor 默認(rèn)設(shè)置false
private final boolean addTaskWakesUp;
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
//如果當(dāng)前線程不是Reactor線程,則啟動(dòng)Reactor線程
//這里可以看出Reactor線程的啟動(dòng)是通過 向NioEventLoop添加異步任務(wù)時(shí)啟動(dòng)的
startThread();
.....................省略...................
}
if (!addTaskWakesUp && immediate) {
//io.netty.channel.nio.NioEventLoop.wakeup
wakeup(inEventLoop);
}
}
對(duì)于execute方法我想大家一定不會(huì)陌生,在上篇文章《詳細(xì)圖解Netty Reactor啟動(dòng)全流程》中我們?cè)诮榻BReactor線程的啟動(dòng)時(shí)介紹過該方法。
在啟動(dòng)過程中涉及到的重要操作Register操作,Bind操作都需要封裝成異步任務(wù)通過該方法提交到Reactor中執(zhí)行。
這里我們將重點(diǎn)放在execute方法后半段wakeup邏輯部分。
我們先介紹下和wakeup邏輯相關(guān)的兩個(gè)參數(shù)boolean immediate和boolean addTaskWakesUp。
- immediate:表示提交的task是否需要被立即執(zhí)行。Netty中只要你提交的任務(wù)類型不是LazyRunnable類型的任務(wù),都是需要立即執(zhí)行的。immediate = true
- addTaskWakesUp : true 表示當(dāng)且僅當(dāng)只有調(diào)用addTask方法時(shí)才會(huì)喚醒Reactor線程。調(diào)用別的方法并不會(huì)喚醒Reactor線程。
在初始化NioEventLoop時(shí)會(huì)設(shè)置為false,表示并不是只有addTask方法才能喚醒Reactor線程 還有其他方法可以喚醒Reactor線程,比如這里的execute方法就會(huì)喚醒Reactor線程。
針對(duì)execute方法中的這個(gè)喚醒條件!addTaskWakesUp && immediate,netty這里要表達(dá)的語義是:當(dāng)immediate參數(shù)為true的時(shí)候表示該異步任務(wù)需要立即執(zhí)行,addTaskWakesUp 默認(rèn)設(shè)置為false 表示不僅只有addTask方法可以喚醒Reactor,還有其他方法比如這里的execute方法也可以喚醒。但是當(dāng)設(shè)置為true時(shí),語義就變?yōu)橹挥衋ddTask才可以喚醒Reactor,即使execute方法里的immediate = true也不能喚醒Reactor,因?yàn)閳?zhí)行的是execute方法而不是addTask方法。
private static final long AWAKE = -1L;
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
//將Reactor線程從Selector上喚醒
selector.wakeup();
}
}
當(dāng)nextWakeupNanos = AWAKE時(shí)表示當(dāng)前Reactor正處于蘇醒狀態(tài),既然是蘇醒狀態(tài)也就沒有必要去執(zhí)行 selector.wakeup()重復(fù)喚醒Reactor了,同時(shí)也能省去這一次的系統(tǒng)調(diào)用開銷。
在《1.2小節(jié) 輪詢邏輯》開始介紹的源碼實(shí)現(xiàn)框架里Reactor被喚醒之后執(zhí)行代碼會(huì)進(jìn)入finally{...}語句塊中,在那里會(huì)將nextWakeupNanos設(shè)置為AWAKE。
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// 執(zhí)行到這里說明Reactor已經(jīng)從Selector上被喚醒了
// 設(shè)置Reactor的狀態(tài)為蘇醒狀態(tài)AWAKE
// lazySet優(yōu)化不必要的volatile操作,不使用內(nèi)存屏障,不保證寫操作的可見性(單線程不需要保證)
nextWakeupNanos.lazySet(AWAKE);
}
這里Netty用了一個(gè)AtomicLong類型的變量nextWakeupNanos,既能表示當(dāng)前Reactor線程的狀態(tài),又能表示Reactor線程的阻塞超時(shí)時(shí)間。我們?cè)谌粘i_發(fā)中也可以學(xué)習(xí)下這種技巧。
我們繼續(xù)回到Reactor線程輪詢IO就緒事件的主線上。
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
//無定時(shí)任務(wù),無普通任務(wù)執(zhí)行時(shí),開始輪詢IO就緒事件,沒有就一直阻塞 直到喚醒條件成立
return selector.select();
}
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
當(dāng)deadlineNanos不為NONE,表示此時(shí)Reactor有定時(shí)任務(wù)需要執(zhí)行,Reactor線程需要阻塞在Selector上等待IO就緒事件直到最近的一個(gè)定時(shí)任務(wù)執(zhí)行時(shí)間點(diǎn)deadline到達(dá)。
這里的deadlineNanos表示的就是Reactor中最近的一個(gè)定時(shí)任務(wù)執(zhí)行時(shí)間點(diǎn)deadline,單位是納秒。指的是一個(gè)絕對(duì)時(shí)間。
而我們需要計(jì)算的是Reactor線程阻塞在Selector的超時(shí)時(shí)間timeoutMillis,單位是毫秒,指的是一個(gè)相對(duì)時(shí)間。
所以在Reactor線程開始阻塞在Selector上之前,我們需要將這個(gè)單位為納秒的絕對(duì)時(shí)間deadlineNanos轉(zhuǎn)化為單位為毫秒的相對(duì)時(shí)間timeoutMillis。
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
//無定時(shí)任務(wù),無普通任務(wù)執(zhí)行時(shí),開始輪詢IO就緒事件,沒有就一直阻塞 直到喚醒條件成立
return selector.select();
}
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
這里大家可能會(huì)好奇,通過deadlineToDelayNanos方法計(jì)算timeoutMillis的時(shí)候,為什么要給deadlineNanos在加上0.995毫秒呢??
大家想象一下這樣的場(chǎng)景,當(dāng)最近的一個(gè)定時(shí)任務(wù)的deadline即將在5微秒內(nèi)到達(dá),那么這時(shí)將納秒轉(zhuǎn)換成毫秒計(jì)算出的timeoutMillis 會(huì)是0。
而在Netty中timeoutMillis = 0 要表達(dá)的語義是:定時(shí)任務(wù)執(zhí)行時(shí)間已經(jīng)到達(dá)deadline時(shí)間點(diǎn),需要被執(zhí)行。
而現(xiàn)實(shí)情況是定時(shí)任務(wù)還有5微秒才能夠到達(dá)deadline,所以對(duì)于這種情況,需要在deadlineNanos在加上0.995毫秒湊成1毫秒不能讓其為0。
所以從這里我們可以看出,Reactor在有定時(shí)任務(wù)的情況下,至少要阻塞1毫秒。
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
protected static long deadlineToDelayNanos(long deadlineNanos) {
return ScheduledFutureTask.deadlineToDelayNanos(deadlineNanos);
}
}
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
static long deadlineToDelayNanos(long deadlineNanos) {
return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - nanoTime());
}
//啟動(dòng)時(shí)間點(diǎn)
private static final long START_TIME = System.nanoTime();
static long nanoTime() {
return System.nanoTime() - START_TIME;
}
static long deadlineNanos(long delay) {
//計(jì)算定時(shí)任務(wù)執(zhí)行deadline 去除啟動(dòng)時(shí)間
long deadlineNanos = nanoTime() + delay;
// Guard against overflow
return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
}
}
這里需要注意一下,在創(chuàng)建定時(shí)任務(wù)時(shí)會(huì)通過deadlineNanos方法計(jì)算定時(shí)任務(wù)的執(zhí)行deadline,deadline的計(jì)算邏輯是當(dāng)前時(shí)間點(diǎn)+任務(wù)延時(shí)delay-系統(tǒng)啟動(dòng)時(shí)間。這里需要扣除系統(tǒng)啟動(dòng)的時(shí)間。
所以這里在通過deadline計(jì)算延時(shí)delay(也就是timeout)的時(shí)候需要在加上系統(tǒng)啟動(dòng)的時(shí)間 : deadlineNanos - nanoTime()
當(dāng)通過deadlineToDelayNanos 計(jì)算出的timeoutMillis <= 0時(shí),表示Reactor目前有臨近的定時(shí)任務(wù)需要執(zhí)行,這時(shí)候就需要立馬返回,不能阻塞在Selector上影響定時(shí)任務(wù)的執(zhí)行。當(dāng)然在返回執(zhí)行定時(shí)任務(wù)前,需要在順手通過selector.selectNow()非阻塞輪詢一下Channel上是否有IO就緒事件到達(dá),防止耽誤IO事件的處理。真是操碎了心~~
當(dāng)timeoutMillis > 0時(shí),Reactor線程就可以安心的阻塞在Selector上等待IO事件的到來,直到timeoutMillis超時(shí)時(shí)間到達(dá)。
timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis)
當(dāng)注冊(cè)在Reactor上的Channel中有IO事件到來時(shí),Reactor線程就會(huì)從selector.select(timeoutMillis)調(diào)用中喚醒,立即去處理IO就緒事件。
這里假設(shè)一種極端情況,如果最近的一個(gè)定時(shí)任務(wù)的deadline是在未來很遠(yuǎn)的一個(gè)時(shí)間點(diǎn),這樣就會(huì)使timeoutMillis的時(shí)間非常非常久,那么Reactor豈不是會(huì)一直阻塞在Selector上造成 Netty 無法工作?
筆者覺得大家現(xiàn)在心里應(yīng)該已經(jīng)有了答案,我們?cè)凇?.2.2 Reactor開始輪詢IO就緒事件》小節(jié)一開始介紹過,當(dāng)Reactor正在Selector上阻塞時(shí),如果此時(shí)用戶線程向Reactor提交了異步任務(wù),Reactor線程會(huì)通過execute方法被喚醒。
流程到這里,Reactor中最重要也是最核心的邏輯:輪詢Channel上的IO就緒事件的處理流程我們就講解完了。
當(dāng)Reactor輪詢到有IO活躍事件或者有異步任務(wù)需要執(zhí)行時(shí),就會(huì)從Selector上被喚醒,下面就到了該介紹Reactor被喚醒之后是如何處理IO就緒事件以及如何執(zhí)行異步任務(wù)的時(shí)候了。
Netty畢竟是一個(gè)網(wǎng)絡(luò)框架,所以它會(huì)優(yōu)先去處理Channel上的IO事件,基于這個(gè)事實(shí),所以Netty不會(huì)容忍異步任務(wù)被無限制的執(zhí)行從而影響IO吞吐。
Netty通過ioRatio變量來調(diào)配Reactor線程在處理IO事件和執(zhí)行異步任務(wù)之間的CPU時(shí)間分配比例。
下面我們就來看下這個(gè)執(zhí)行時(shí)間比例的分配邏輯是什么樣的~~~
2. Reactor處理IO與處理異步任務(wù)的時(shí)間比例分配
無論什么時(shí)候,當(dāng)有IO就緒事件到來時(shí),Reactor都需要保證IO事件被及時(shí)完整的處理完,而ioRatio主要限制的是執(zhí)行異步任務(wù)所需用時(shí),防止Reactor線程處理異步任務(wù)時(shí)間過長而導(dǎo)致 I/O 事件得不到及時(shí)地處理。
//調(diào)整Reactor線程執(zhí)行IO事件和執(zhí)行異步任務(wù)的CPU時(shí)間比例 默認(rèn)50,表示執(zhí)行IO事件和異步任務(wù)的時(shí)間比例是一比一
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) { //先一股腦執(zhí)行IO事件,在一股腦執(zhí)行異步任務(wù)(無時(shí)間限制)
try {
if (strategy > 0) {
//如果有IO就緒事件 則處理IO就緒事件
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
//處理所有異步任務(wù)
ranTasks = runAllTasks();
}
} else if (strategy > 0) {//先執(zhí)行IO事件 用時(shí)ioTime 執(zhí)行異步任務(wù)只能用時(shí)ioTime * (100 - ioRatio) / ioRatio
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
// 限定在超時(shí)時(shí)間內(nèi) 處理有限的異步任務(wù) 防止Reactor線程處理異步任務(wù)時(shí)間過長而導(dǎo)致 I/O 事件阻塞
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else { //沒有IO就緒事件處理,則只執(zhí)行異步任務(wù) 最多執(zhí)行64個(gè) 防止Reactor線程處理異步任務(wù)時(shí)間過長而導(dǎo)致 I/O 事件阻塞
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
- 當(dāng)ioRatio = 100時(shí),表示無需考慮執(zhí)行時(shí)間的限制,當(dāng)有IO就緒事件時(shí)(strategy > 0)Reactor線程需要優(yōu)先處理IO就緒事件,處理完IO事件后,執(zhí)行所有的異步任務(wù)包括:普通任務(wù),尾部任務(wù),定時(shí)任務(wù)。無時(shí)間限制。
strategy的數(shù)值表示IO就緒的Channel個(gè)數(shù)。它是前邊介紹的
io.netty.channel.nio.NioEventLoop#select方法的返回值。
- 當(dāng)ioRatio設(shè)置的值不為100時(shí),默認(rèn)為50。需要先統(tǒng)計(jì)出執(zhí)行IO事件的用時(shí)ioTime ,根據(jù)ioTime * (100 - ioRatio) / ioRatio計(jì)算出,后面執(zhí)行異步任務(wù)的限制時(shí)間。也就是說Reactor線程需要在這個(gè)限定的時(shí)間內(nèi),執(zhí)行有限的異步任務(wù),防止Reactor線程由于處理異步任務(wù)時(shí)間過長而導(dǎo)致I/O 事件得不到及時(shí)地處理。
默認(rèn)情況下,執(zhí)行IO事件用時(shí)和執(zhí)行異步任務(wù)用時(shí)比例設(shè)置的是一比一。
ioRatio設(shè)置的越高,則Reactor線程執(zhí)行異步任務(wù)的時(shí)間占比越小。
要想得到Reactor線程執(zhí)行異步任務(wù)所需的時(shí)間限制,必須知道執(zhí)行IO事件的用時(shí)ioTime然后在根據(jù)ioRatio計(jì)算出執(zhí)行異步任務(wù)的時(shí)間限制。
那如果此時(shí)并沒有IO就緒事件需要Reactor線程處理的話,這種情況下我們無法得到ioTime,那怎么得到執(zhí)行異步任務(wù)的限制時(shí)間呢??
在這種特殊情況下,Netty只允許Reactor線程最多執(zhí)行64個(gè)異步任務(wù),然后就結(jié)束執(zhí)行。轉(zhuǎn)去繼續(xù)輪訓(xùn)IO就緒事件。核心目的還是防止Reactor線程由于處理異步任務(wù)時(shí)間過長而導(dǎo)致I/O 事件得不到及時(shí)地處理。
默認(rèn)情況下,當(dāng)Reactor有異步任務(wù)需要處理但是沒有IO就緒事件時(shí),Netty只會(huì)允許Reactor線程執(zhí)行最多64個(gè)異步任務(wù)。
現(xiàn)在我們對(duì)Reactor處理IO事件和異步任務(wù)的整體框架已經(jīng)了解了,下面我們就來分別介紹下Reactor線程在處理IO事件和異步任務(wù)的具體邏輯是什么樣的?
3. Reactor線程處理IO就緒事件
//該字段為持有selector對(duì)象selectedKeys的引用,當(dāng)IO事件就緒時(shí),直接從這里獲取
private SelectedSelectionKeySet selectedKeys;
private void processSelectedKeys() {
//是否采用netty優(yōu)化后的selectedKey集合類型 是由變量DISABLE_KEY_SET_OPTIMIZATION決定的 默認(rèn)為false
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
看到這段代碼大家眼熟嗎??
不知大家還記不記得我們?cè)凇读牧腘etty那些事兒之Reactor在Netty中的實(shí)現(xiàn)(創(chuàng)建篇)》一文中介紹Reactor NioEventLoop類在創(chuàng)建Selector的過程中提到,出于對(duì)JDK NIO Selector中selectedKeys 集合的插入和遍歷操作性能的考慮Netty將自己用數(shù)組實(shí)現(xiàn)的SelectedSelectionKeySet 集合替換掉了JDK NIO Selector中selectedKeys 的HashSet實(shí)現(xiàn)。
public abstract class SelectorImpl extends AbstractSelector {
// The set of keys with data ready for an operation
// //IO就緒的SelectionKey(里面包裹著channel)
protected Set<SelectionKey> selectedKeys;
// The set of keys registered with this Selector
//注冊(cè)在該Selector上的所有SelectionKey(里面包裹著channel)
protected HashSet<SelectionKey> keys;
...............省略...................
}
Netty中通過優(yōu)化開關(guān)
DISABLE_KEY_SET_OPTIMIZATION 控制是否對(duì)JDK NIO Selector進(jìn)行優(yōu)化。默認(rèn)是需要優(yōu)化。
在優(yōu)化開關(guān)開啟的情況下,Netty會(huì)將創(chuàng)建的SelectedSelectionKeySet 集合保存在NioEventLoop的private SelectedSelectionKeySet selectedKeys字段中,方便Reactor線程直接從這里獲取IO就緒的SelectionKey。
在優(yōu)化開關(guān)關(guān)閉的情況下,Netty會(huì)直接采用JDK NIO Selector的默認(rèn)實(shí)現(xiàn)。此時(shí)NioEventLoop的selectedKeys字段就會(huì)為null。
忘記這段的同學(xué)可以在回顧下《聊聊Netty那些事兒之Reactor在Netty中的實(shí)現(xiàn)(創(chuàng)建篇)》一文中關(guān)于Reactor的創(chuàng)建過程。
經(jīng)過對(duì)前邊內(nèi)容的回顧,我們看到了在Reactor處理IO就緒事件的邏輯也分為兩個(gè)部分,一個(gè)是經(jīng)過Netty優(yōu)化的,一個(gè)是采用JDK 原生的。
我們先來看采用JDK 原生的Selector的處理方式,理解了這種方式,在看Netty優(yōu)化的方式會(huì)更加容易。
3.1 processSelectedKeysPlain
我們?cè)凇读牧腘etty那些事兒之Reactor在Netty中的實(shí)現(xiàn)(創(chuàng)建篇)》一文中介紹JDK NIO Selector的工作過程時(shí)講過,當(dāng)注冊(cè)在Selector上的Channel發(fā)生IO就緒事件時(shí),Selector會(huì)將IO就緒的SelectionKey插入到Set<SelectionKey> selectedKeys集合中。
這時(shí)Reactor線程會(huì)從
java.nio.channels.Selector#select(long)調(diào)用中返回。隨后調(diào)用java.nio.channels.Selector#selectedKeys獲取IO就緒的SelectionKey集合。
所以Reactor線程在調(diào)用processSelectedKeysPlain方法處理IO就緒事件之前需要調(diào)用selector.selectedKeys()去獲取所有IO就緒的SelectionKeys。
processSelectedKeysPlain(selector.selectedKeys())
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
//注意每次迭代末尾的keyIterator.remove()調(diào)用。Selector不會(huì)自己從已選擇鍵集中移除SelectionKey實(shí)例。
//必須在處理完通道時(shí)自己移除。下次該通道變成就緒時(shí),Selector會(huì)再次將其放入已選擇鍵集中。
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
//目的是再次進(jìn)入for循環(huán) 移除失效的selectKey(socketChannel可能從selector上移除)
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
3.1.1 獲取IO就緒的Channel
Set<SelectionKey> selectedKeys集合里面裝的全部是IO就緒的SelectionKey,注意,此時(shí)Set<SelectionKey> selectedKeys的實(shí)現(xiàn)類型為HashSet類型。因?yàn)槲覀冞@里首先介紹的是JDK NIO 原生實(shí)現(xiàn)。
通過獲取HashSet的迭代器,開始逐個(gè)處理IO就緒的Channel。
Iterator<SelectionKey> i = selectedKeys.iterator();
final SelectionKey k = i.next();
final Object a = k.attachment();
大家還記得這個(gè)SelectionKey中的attachment屬性里存放的是什么嗎??
在上篇文章《詳細(xì)圖解Netty Reactor啟動(dòng)全流程》中我們?cè)谥vNioServerSocketChannel向Main Reactor注冊(cè)的時(shí)候,通過this指針將自己作為SelectionKey的attachment屬性注冊(cè)到Selector中。這一步完成了Netty自定義Channel和JDK NIO Channel的綁定。
public abstract class AbstractNioChannel extends AbstractChannel {
//channel注冊(cè)到Selector后獲得的SelectKey
volatile SelectionKey selectionKey;
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrAppedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...............省略....................
}
}
}
}
而我們也提到SelectionKey就相當(dāng)于是Channel在Selector中的一種表示,當(dāng)Channel上有IO就緒事件時(shí),Selector會(huì)將Channel對(duì)應(yīng)的SelectionKey返回給Reactor線程,我們可以通過返回的這個(gè)SelectionKey里的attachment屬性獲取到對(duì)應(yīng)的Netty自定義Channel。
對(duì)于客戶端連接事件(OP_ACCEPT)活躍時(shí),這里的Channel類型為NioServerSocketChannel。
對(duì)于客戶端讀寫事件(Read,Write)活躍時(shí),這里的Channel類型為NioSocketChannel。
當(dāng)我們通過k.attachment()獲取到Netty自定義的Channel時(shí),就需要把這個(gè)Channel對(duì)應(yīng)的SelectionKey從Selector的就緒集合Set<SelectionKey> selectedKeys中刪除。因?yàn)镾elector自己不會(huì)主動(dòng)刪除已經(jīng)處理完的SelectionKey,需要調(diào)用者自己主動(dòng)刪除,這樣當(dāng)這個(gè)Channel再次IO就緒時(shí),Selector會(huì)再次將這個(gè)Channel對(duì)應(yīng)的SelectionKey放入就緒集合Set<SelectionKey> selectedKeys中。
i.remove();
3.1.2 處理Channel上的IO事件
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
從這里我們可以看出Netty向SelectionKey中的attachment屬性附加的對(duì)象分為兩種:
- 一種是我們熟悉的Channel,無論是服務(wù)端使用的NioServerSocketChannel還是客戶端使用的NioSocketChannel都屬于AbstractNioChannel 。Channel上的IO事件是由Netty框架負(fù)責(zé)處理,也是本小節(jié)我們要重點(diǎn)介紹的
- 另一種就是NioTask,這種類型是Netty提供給用戶可以自定義一些當(dāng)Channel上發(fā)生IO就緒事件時(shí)的自定義處理。
public interface NioTask<C extends SelectableChannel> {
/**
* Invoked when the {@link SelectableChannel} has been selected by the {@link Selector}.
*/
void channelReady(C ch, SelectionKey key) throws Exception;
/**
* Invoked when the {@link SelectionKey} of the specified {@link SelectableChannel} has been cancelled and thus
* this {@link NioTask} will not be notified anymore.
*
* @param cause the cause of the unregistration. {@code null} if a user called {@link SelectionKey#cancel()} or
* the event loop has been shut down.
*/
void channelUnregistered(C ch, Throwable cause) throws Exception;
}
NioTask和Channel其實(shí)本質(zhì)上是一樣的都是負(fù)責(zé)處理Channel上的IO就緒事件,只不過一個(gè)是用戶自定義處理,一個(gè)是Netty框架處理。這里我們重點(diǎn)關(guān)注Channel的IO處理邏輯
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//獲取Channel的底層操作類Unsafe
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
......如果SelectionKey已經(jīng)失效則關(guān)閉對(duì)應(yīng)的Channel......
}
try {
//獲取IO就緒事件
int readyOps = k.readyOps();
//處理Connect事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
//移除對(duì)Connect事件的監(jiān)聽,否則Selector會(huì)一直通知
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//觸發(fā)channelActive事件處理Connect事件
unsafe.finishConnect();
}
//處理Write事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
//處理Read事件或者Accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
- 首先我們需要獲取IO就緒Channel底層的操作類Unsafe,用于對(duì)具體IO就緒事件的處理。
這里可以看出,Netty對(duì)IO就緒事件的處理全部封裝在Unsafe類中。比如:對(duì)OP_ACCEPT事件的具體處理邏輯是封裝在NioServerSocketChannel中的UnSafe類中。對(duì)OP_READ或者OP_WRITE事件的處理是封裝在NioSocketChannel中的Unsafe類中。
- 從Selectionkey中獲取具體IO就緒事件 readyOps 。
SelectonKey中關(guān)于IO事件的集合有兩個(gè)。一個(gè)是interestOps,用于記錄Channel感興趣的IO事件,在Channel向Selector注冊(cè)完畢后,通過pipeline中的HeadContext節(jié)點(diǎn)的ChannelActive事件回調(diào)中添加。下面這段代碼就是在ChannelActive事件回調(diào)中Channel在向Selector注冊(cè)自己感興趣的IO事件。
public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
/**
* 1:ServerSocketChannel 初始化時(shí) readInterestOp設(shè)置的是OP_ACCEPT事件
* 2:SocketChannel 初始化時(shí) readInterestOp設(shè)置的是OP_READ事件
* */
if ((interestOps & readInterestOp) == 0) {
//注冊(cè)監(jiān)聽OP_ACCEPT或者OP_READ事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}
}
另一個(gè)就是這里的readyOps,用于記錄在Channel感興趣的IO事件中具體哪些IO事件就緒了。
Netty中將各種事件的集合用一個(gè)int型變量來保存。
- 用&操作判斷,某個(gè)事件是否在事件集合中:(readyOps & SelectionKey.OP_CONNECT) != 0,這里就是判斷Channel是否對(duì)Connect事件感興趣。
- 用|操作向事件集合中添加事件:interestOps | readInterestOp
- 從事件集合中刪除某個(gè)事件,是通過先將要?jiǎng)h除事件取反~,然后在和事件集合做&操作:ops &= ~SelectionKey.OP_CONNECT
Netty這種對(duì)空間的極致利用思想,很值得我們平時(shí)在日常開發(fā)中學(xué)習(xí)~~
現(xiàn)在我們已經(jīng)知道哪些Channel現(xiàn)在處于IO就緒狀態(tài),并且知道了具體哪些類型的IO事件已經(jīng)就緒。
下面就該針對(duì)Channel上的不同IO就緒事件做出相應(yīng)的處理了。
3.1.2.1 處理Connect事件
Netty客戶端向服務(wù)端發(fā)起連接,并向客戶端的Reactor注冊(cè)Connect事件,當(dāng)連接建立成功后,客戶端的NioSocketChannel就會(huì)產(chǎn)生Connect就緒事件,通過前面內(nèi)容我們講的Reactor的運(yùn)行框架,最終流程會(huì)走到這里。
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//觸發(fā)channelActive事件
unsafe.finishConnect();
}
如果IO就緒的事件是Connect事件,那么就調(diào)用對(duì)應(yīng)客戶端NioSocketChannel中的Unsafe操作類中的finishConnect方法處理Connect事件。這時(shí)會(huì)在Netty客戶端NioSocketChannel中的pipeline中傳播ChannelActive事件。
最后需要將OP_CONNECT事件從客戶端NioSocketChannel所關(guān)心的事件集合interestOps中刪除。否則Selector會(huì)一直通知Connect事件就緒。
3.1.2.2 處理Write事件
關(guān)于Reactor線程處理Netty中的Write事件的流程,筆者后續(xù)會(huì)專門用一篇文章來為大家介紹。本文我們重點(diǎn)關(guān)注Reactor線程的整體運(yùn)行框架。
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
這里大家只需要記住,OP_WRITE事件的注冊(cè)是由用戶來完成的,當(dāng)Socket發(fā)送緩沖區(qū)已滿無法繼續(xù)寫入數(shù)據(jù)時(shí),用戶會(huì)向Reactor注冊(cè)O(shè)P_WRITE事件,等到Socket發(fā)送緩沖區(qū)變得可寫時(shí),Reactor會(huì)收到OP_WRITE事件活躍通知,隨后在這里調(diào)用客戶端NioSocketChannel中的forceFlush方法將剩余數(shù)據(jù)發(fā)送出去。
3.1.2.3 處理Read事件或者Accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
這里可以看出Netty中處理Read事件和Accept事件都是由對(duì)應(yīng)Channel中的Unsafe操作類中的read方法處理。
服務(wù)端NioServerSocketChannel中的Read方法處理的是Accept事件,客戶端NioSocketChannel中的Read方法處理的是Read事件。
這里大家只需記住各個(gè)IO事件在對(duì)應(yīng)Channel中的處理入口,后續(xù)文章我們會(huì)詳細(xì)分析這些入口函數(shù)。
3.1.3 從Selector中移除失效的SelectionKey
//用于及時(shí)從selectedKeys中清除失效的selectKey 比如 socketChannel從selector上被用戶移除
private boolean needsToSelectAgain;
//目的是再次進(jìn)入for循環(huán) 移除失效的selectKey(socketChannel可能被用戶從selector上移除)
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
在前邊介紹Reactor運(yùn)行框架的時(shí)候,我們看到在每次Reactor線程輪詢結(jié)束,準(zhǔn)備處理IO就緒事件以及異步任務(wù)的時(shí)候,都會(huì)將needsToSelectAgain 設(shè)置為false。
那么這個(gè)needsToSelectAgain 究竟是干嘛的?以及為什么我們需要去“Select Again”呢?
首先我們來看下在什么情況下會(huì)將needsToSelectAgain 這個(gè)變量設(shè)置為true,通過這個(gè)設(shè)置的過程,我們是否能夠從中找到一些線索?
我們知道Channel可以將自己注冊(cè)到Selector上,那么當(dāng)然也可以將自己從Selector上取消移除。
在上篇文章中我們也花了大量的篇幅講解了這個(gè)注冊(cè)的過程,現(xiàn)在我們來看下Channel的取消注冊(cè)。
public abstract class AbstractNioChannel extends AbstractChannel {
//channel注冊(cè)到Selector后獲得的SelectKey
volatile SelectionKey selectionKey;
@Override
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}
protected SelectionKey selectionKey() {
assert selectionKey != null;
return selectionKey;
}
}
Channel取消注冊(cè)的過程很簡單,直接調(diào)用NioChannel的doDeregister 方法,Channel綁定的Reactor會(huì)將其從Selector中取消并停止監(jiān)聽Channel上的IO事件。
public final class NioEventLoop extends SingleThreadEventLoop {
//記錄Selector上移除socketChannel的個(gè)數(shù) 達(dá)到256個(gè) 則需要將無效的selectKey從SelectedKeys集合中清除掉
private int cancelledKeys;
private static final int CLEANUP_INTERVAL = 256;
/**
* 將socketChannel從selector中移除 取消監(jiān)聽IO事件
* */
void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
// 當(dāng)從selector中移除的socketChannel數(shù)量達(dá)到256個(gè),設(shè)置needsToSelectAgain為true
// 在io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain 中重新做一次輪詢,將失效的selectKey移除,
// 以保證selectKeySet的有效性
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}
}
- 調(diào)用JDK NIO SelectionKey的API cancel方法,將Channel從Selector中取消掉。SelectionKey#cancel方法調(diào)用完畢后,此時(shí)調(diào)用SelectionKey#isValid將會(huì)返回false。SelectionKey#cancel方法調(diào)用后,Selector會(huì)將要取消的這個(gè)SelectionKey加入到Selector中的cancelledKeys集合中。
public abstract class AbstractSelector extends Selector {
private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();
void cancel(SelectionKey k) {
synchronized (cancelledKeys) {
cancelledKeys.add(k);
}
}
}
- 當(dāng)Channel對(duì)應(yīng)的SelectionKey取消完畢后,Channel取消計(jì)數(shù)器cancelledKeys 會(huì)加1,當(dāng)cancelledKeys = 256時(shí),將needsToSelectAgain 設(shè)置為true。
- 隨后在Selector的下一次輪詢過程中,會(huì)將cancelledKeys集合中的SelectionKey從Selector中所有的KeySet中移除。這里的KeySet包括Selector用于存放就緒SelectionKey的selectedKeys集合,以及用于存放所有注冊(cè)的Channel對(duì)應(yīng)的SelectionKey的keys集合。
public abstract class SelectorImpl extends AbstractSelector {
protected Set<SelectionKey> selectedKeys = new HashSet();
protected HashSet<SelectionKey> keys = new HashSet();
.....................省略...............
}
我們看到Reactor線程中對(duì)needsToSelectAgain 的判斷是在processSelectedKeysPlain方法處理IO就緒的SelectionKey的循環(huán)體中進(jìn)行判斷的。
之所以這里特別提到needsToSelectAgain 判斷的位置,是要讓大家注意到此時(shí)Reactor正在處理本次輪詢的IO就緒事件。
而前邊也說了,當(dāng)調(diào)用SelectionKey#cancel方法后,需要等到下次輪詢的過程中Selector才會(huì)將這些取消的SelectionKey從Selector中的所有KeySet集合中移除,當(dāng)然這里也包括就緒集合selectedKeys 。
當(dāng)在本次輪詢期間,假如大量的Channel從Selector中取消,Selector中的就緒集合selectedKeys 中依然會(huì)保存這些Channel對(duì)應(yīng)SelectionKey直到下次輪詢。那么當(dāng)然會(huì)影響本次輪詢結(jié)果selectedKeys的有效性。
所以為了保證Selector中所有KeySet的有效性,需要在Channel取消個(gè)數(shù)達(dá)到256時(shí),觸發(fā)一次selectNow,目的是清除無效的SelectionKey。
private void selectAgain() {
needsToSelectAgain = false;
try {
selector.selectNow();
} catch (Throwable t) {
logger.warn("Failed to update SelectionKeys.", t);
}
}
到這里,我們就對(duì)JDK 原生 Selector的處理方式processSelectedKeysPlain方法就介紹完了,其實(shí) 對(duì)IO就緒事件的處理邏輯都是一樣的,在我們理解了processSelectedKeysPlain方法后,
processSelectedKeysOptimized方法對(duì)IO就緒事件的處理,我們理解起來就非常輕松了。
3.2 processSelectedKeysOptimized
Netty默認(rèn)會(huì)采用優(yōu)化過的Selector對(duì)IO就緒事件的處理。但是處理邏輯是大同小異的。下面我們主要介紹一下這兩個(gè)方法的不同之處。
private void processSelectedKeysOptimized() {
// 在openSelector的時(shí)候?qū)DK中selector實(shí)現(xiàn)類中得selectedKeys和publicSelectKeys字段類型
// 由原來的HashSet類型替換為 Netty優(yōu)化后的數(shù)組實(shí)現(xiàn)的SelectedSelectionKeySet類型
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// 對(duì)應(yīng)迭代器中得remove selector不會(huì)自己清除selectedKey
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
- JDK NIO 原生 Selector存放IO就緒的SelectionKey的集合為HashSet類型的selectedKeys 。而Netty為了優(yōu)化對(duì)selectedKeys 集合的遍歷效率采用了自己實(shí)現(xiàn)的SelectedSelectionKeySet類型,從而用對(duì)數(shù)組的遍歷代替用HashSet的迭代器遍歷。
- Selector會(huì)在每次輪詢到IO就緒事件時(shí),將IO就緒的Channel對(duì)應(yīng)的SelectionKey插入到selectedKeys集合,但是Selector只管向selectedKeys集合放入IO就緒的SelectionKey,當(dāng)SelectionKey被處理完畢后,Selector是不會(huì)自己主動(dòng)將其從selectedKeys集合中移除的,典型的管殺不管埋。所以需要Netty自己在遍歷到IO就緒的 SelectionKey后,將其刪除。
- 在processSelectedKeysPlain中是直接將其從迭代器中刪除。
- 在processSelectedKeysOptimized中將其在數(shù)組中對(duì)應(yīng)的位置置為Null,方便垃圾回收。
- 在最后清除無效的SelectionKey時(shí),在processSelectedKeysPlain中由于采用的是JDK NIO 原生的Selector,所以只需要執(zhí)行SelectAgain就可以,Selector會(huì)自動(dòng)清除無效Key。
但是在processSelectedKeysOptimized 中由于是Netty自己實(shí)現(xiàn)的優(yōu)化類型,所以需要Netty自己將SelectedSelectionKeySet數(shù)組中的SelectionKey全部清除,最后在執(zhí)行SelectAgain。
好了,到這里,我們就將Reactor線程如何處理IO就緒事件的整個(gè)過程講述完了,下面我們就該到了介紹Reactor線程如何處理Netty框架中的異步任務(wù)了。
4. Reactor線程處理異步任務(wù)
Netty關(guān)于處理異步任務(wù)的方法有兩個(gè):
- 一個(gè)是無超時(shí)時(shí)間限制的runAllTasks()方法。當(dāng)ioRatio設(shè)置為100時(shí),Reactor線程會(huì)先一股腦的處理IO就緒事件,然后在一股腦的執(zhí)行異步任務(wù),并沒有時(shí)間的限制。
- 另一個(gè)是有超時(shí)時(shí)間限制的runAllTasks(long timeoutNanos)方法。當(dāng)ioRatio != 100時(shí),Reactor線程執(zhí)行異步任務(wù)會(huì)有時(shí)間限制,優(yōu)先一股腦的處理完IO就緒事件統(tǒng)計(jì)出執(zhí)行IO任務(wù)耗時(shí)ioTime。根據(jù)公式ioTime * (100 - ioRatio) / ioRatio)計(jì)算出Reactor線程執(zhí)行異步任務(wù)的超時(shí)時(shí)間。在超時(shí)時(shí)間限定范圍內(nèi),執(zhí)行有限的異步任務(wù)。
下面我們來分別看下這兩個(gè)執(zhí)行異步任務(wù)的方法處理邏輯:
4.1 runAllTasks()
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
//將到達(dá)執(zhí)行時(shí)間的定時(shí)任務(wù)轉(zhuǎn)存到普通任務(wù)隊(duì)列taskQueue中,統(tǒng)一由Reactor線程從taskQueue中取出執(zhí)行
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
//執(zhí)行尾部隊(duì)列任務(wù)
afterRunningAllTasks();
return ranAtLeastOne;
}
Reactor線程執(zhí)行異步任務(wù)的核心邏輯就是:
- 先將到期的定時(shí)任務(wù)一股腦的從定時(shí)任務(wù)隊(duì)列scheduledTaskQueue中取出并轉(zhuǎn)存到普通任務(wù)隊(duì)列taskQueue中。
- 由Reactor線程統(tǒng)一從普通任務(wù)隊(duì)列taskQueue中取出任務(wù)執(zhí)行。
- 在Reactor線程執(zhí)行完定時(shí)任務(wù)和普通任務(wù)后,開始執(zhí)行存儲(chǔ)于尾部任務(wù)隊(duì)列tailTasks中的尾部任務(wù)。
下面我們來分別看下上述幾個(gè)核心步驟的實(shí)現(xiàn):
4.1.1 fetchFromScheduledTaskQueue
/**
* 從定時(shí)任務(wù)隊(duì)列中取出達(dá)到deadline執(zhí)行時(shí)間的定時(shí)任務(wù)
* 將定時(shí)任務(wù) 轉(zhuǎn)存到 普通任務(wù)隊(duì)列taskQueue中,統(tǒng)一由Reactor線程從taskQueue中取出執(zhí)行
*
* */
private boolean fetchFromScheduledTaskQueue() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
//從定時(shí)任務(wù)隊(duì)列中取出到達(dá)執(zhí)行deadline的定時(shí)任務(wù) deadline <= nanoTime
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return true;
}
if (!taskQueue.offer(scheduledTask)) {
// taskQueue沒有空間容納 則在將定時(shí)任務(wù)重新塞進(jìn)定時(shí)任務(wù)隊(duì)列中等待下次執(zhí)行
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
}
}
- 獲取當(dāng)前要執(zhí)行異步任務(wù)的時(shí)間點(diǎn)nanoTime
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
private static final long START_TIME = System.nanoTime();
static long nanoTime() {
return System.nanoTime() - START_TIME;
}
}
- 從定時(shí)任務(wù)隊(duì)列中找出deadline <= nanoTime的異步任務(wù)。也就是說找出所有到期的定時(shí)任務(wù)。
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
//從定時(shí)隊(duì)列中取出要執(zhí)行的定時(shí)任務(wù) deadline <= nanoTime
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
return null;
}
//符合取出條件 則取出
scheduledTaskQueue.remove();
scheduledTask.setConsumed();
return scheduledTask;
}
- 將到期的定時(shí)任務(wù)插入到普通任務(wù)隊(duì)列taskQueue中,如果taskQueue已經(jīng)沒有空間容納新的任務(wù),則將定時(shí)任務(wù)重新塞進(jìn)定時(shí)任務(wù)隊(duì)列中等待下次拉取。
if (!taskQueue.offer(scheduledTask)) {
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
- fetchFromScheduledTaskQueue方法的返回值為true時(shí)表示到期的定時(shí)任務(wù)已經(jīng)全部拉取出來并轉(zhuǎn)存到普通任務(wù)隊(duì)列中。
返回值為false時(shí)表示到期的定時(shí)任務(wù)只拉取出來一部分,因?yàn)檫@時(shí)普通任務(wù)隊(duì)列已經(jīng)滿了,當(dāng)執(zhí)行完普通任務(wù)時(shí),還需要在進(jìn)行一次拉取。
當(dāng)?shù)狡诘亩〞r(shí)任務(wù)從定時(shí)任務(wù)隊(duì)列中拉取完畢或者當(dāng)普通任務(wù)隊(duì)列已滿時(shí),這時(shí)就會(huì)停止拉取,開始執(zhí)行普通任務(wù)隊(duì)列中的異步任務(wù)。
4.1.2 runAllTasksFrom
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
safeExecute(task);
task = pollTaskFrom(taskQueue);
if (task == null) {
return true;
}
}
}
- 首先runAllTasksFrom 方法的返回值表示是否執(zhí)行了至少一個(gè)異步任務(wù)。后面會(huì)賦值給ranAtLeastOne變量,這個(gè)返回值我們后續(xù)會(huì)用到。
- 從普通任務(wù)隊(duì)列中拉取異步任務(wù)。
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) {
Runnable task = taskQueue.poll();
if (task != WAKEUP_TASK) {
return task;
}
}
}
- Reactor線程執(zhí)行異步任務(wù)。
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
4.1.3 afterRunningAllTasks
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
//執(zhí)行尾部隊(duì)列任務(wù)
afterRunningAllTasks();
return ranAtLeastOne;
如果Reactor線程執(zhí)行了至少一個(gè)異步任務(wù),那么設(shè)置lastExecutionTime,并將ranAtLeastOne標(biāo)識(shí)返回。這里的ranAtLeastOne標(biāo)識(shí)就是runAllTasksFrom方法的返回值。
最后執(zhí)行收尾任務(wù),也就是執(zhí)行尾部任務(wù)隊(duì)列中的尾部任務(wù)。
@Override
protected void afterRunningAllTasks() {
runAllTasksFrom(tailTasks);
}
4.2 runAllTasks(long timeoutNanos)
這里在處理異步任務(wù)的核心邏輯還是和之前一樣的,只不過就是多了對(duì)超時(shí)時(shí)間的控制。
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
//普通隊(duì)列中沒有任務(wù)時(shí) 執(zhí)行隊(duì)尾隊(duì)列的任務(wù)
afterRunningAllTasks();
return false;
}
//異步任務(wù)執(zhí)行超時(shí)deadline
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ++;
//每運(yùn)行64個(gè)異步任務(wù) 檢查一下 是否達(dá)到 執(zhí)行deadline
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
//到達(dá)異步任務(wù)執(zhí)行超時(shí)deadline,停止執(zhí)行異步任務(wù)
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
- 首先還是通過fetchFromScheduledTaskQueue 方法從Reactor中的定時(shí)任務(wù)隊(duì)列中拉取到期的定時(shí)任務(wù),轉(zhuǎn)存到普通任務(wù)隊(duì)列中。當(dāng)普通任務(wù)隊(duì)列已滿或者到期定時(shí)任務(wù)全部拉取完畢時(shí),停止拉取。
- 將ScheduledFutureTask.nanoTime() + timeoutNanos 作為Reactor線程執(zhí)行異步任務(wù)的超時(shí)時(shí)間點(diǎn)deadline。
- 由于系統(tǒng)調(diào)用System.nanoTime()需要一定的系統(tǒng)開銷,所以每執(zhí)行完64個(gè)異步任務(wù)的時(shí)候才會(huì)去檢查一下執(zhí)行時(shí)間是否到達(dá)了deadline。如果到達(dá)了執(zhí)行截止時(shí)間deadline則退出停止執(zhí)行異步任務(wù)。如果沒有到達(dá)deadline則繼續(xù)從普通任務(wù)隊(duì)列中取出任務(wù)循環(huán)執(zhí)行下去。
從這個(gè)細(xì)節(jié)又可以看出Netty對(duì)性能的考量還是相當(dāng)講究的
流程走到這里,我們就對(duì)Reactor的整個(gè)運(yùn)行框架以及如何輪詢IO就緒事件,如何處理IO就緒事件,如何執(zhí)行異步任務(wù)的具體實(shí)現(xiàn)邏輯就剖析完了。
下面還有一個(gè)小小的尾巴,就是Netty是如何解決文章開頭提到的JDK NIO Epoll 的空輪詢BUG的,讓我們一起來看下吧~~~
5. 解決JDK Epoll空輪詢BUG
前邊提到,由于JDK NIO Epoll的空輪詢BUG存在,這樣會(huì)導(dǎo)致Reactor線程在沒有任何事情可做的情況下被意外喚醒,導(dǎo)致CPU空轉(zhuǎn)。
其實(shí)Netty也沒有從根本上解決這個(gè)JDK BUG,而是選擇巧妙的繞過這個(gè)BUG。
下面我們來看下Netty是如何做到的。
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
//既沒有IO就緒事件,也沒有異步任務(wù),Reactor線程從Selector上被異常喚醒 觸發(fā)JDK Epoll空輪訓(xùn)BUG
//重新構(gòu)建Selector,selectCnt歸零
selectCnt = 0;
}
在Reactor線程處理完IO就緒事件和異步任務(wù)后,會(huì)檢查這次Reactor線程被喚醒有沒有執(zhí)行過異步任務(wù)和有沒有IO就緒的Channel。
- boolean ranTasks 這時(shí)候就派上了用場(chǎng),這個(gè)ranTasks正是前邊我們?cè)谥vrunAllTasks方法時(shí)提到的返回值。用來表示是否執(zhí)行過至少一次異步任務(wù)。
- int strategy 正是JDK NIO Selector的select方法的返回值,用來表示IO就緒的Channel個(gè)數(shù)。
如果ranTasks = false 并且 strategy = 0這代表Reactor線程本次既沒有異步任務(wù)執(zhí)行也沒有IO就緒的Channel需要處理卻被意外的喚醒。等于是空轉(zhuǎn)了一圈啥也沒干。
這種情況下Netty就會(huì)認(rèn)為可能已經(jīng)觸發(fā)了JDK NIO Epoll的空輪詢BUG
int SELECTOR_AUTO_REBUILD_THRESHOLD = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
private boolean unexpectedSelectorWakeup(int selectCnt) {
..................省略...............
/**
* 走到這里的條件是 既沒有IO就緒事件,也沒有異步任務(wù),Reactor線程從Selector上被異常喚醒
* 這種情況可能是已經(jīng)觸發(fā)了JDK Epoll的空輪詢BUG,如果這種情況持續(xù)512次 則認(rèn)為可能已經(jīng)觸發(fā)BUG,于是重建Selector
*
* */
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
return true;
}
return false;
}
- 如果Reactor這種意外喚醒的次數(shù)selectCnt 超過了配置的次數(shù)SELECTOR_AUTO_REBUILD_THRESHOLD ,那么Netty就會(huì)認(rèn)定這種情況可能已經(jīng)觸發(fā)了JDK NIO Epoll空輪詢BUG,則重建Selector(將之前注冊(cè)的所有Channel重新注冊(cè)到新的Selector上并關(guān)閉舊的Selector),selectCnt計(jì)數(shù)歸0。
SELECTOR_AUTO_REBUILD_THRESHOLD 默認(rèn)為512,可以通過系統(tǒng)變量-D io.netty.selectorAutoRebuildThreshold指定自定義數(shù)值。
- 如果selectCnt小于SELECTOR_AUTO_REBUILD_THRESHOLD ,則返回不做任何處理,selectCnt繼續(xù)計(jì)數(shù)。
Netty就這樣通過計(jì)數(shù)Reactor被意外喚醒的次數(shù),如果計(jì)數(shù)selectCnt達(dá)到了512次,則通過重建Selector 巧妙的繞開了JDK NIO Epoll空輪詢BUG。
我們?cè)谌粘i_發(fā)中也可以借鑒Netty這種處理問題的思路,比如在項(xiàng)目開發(fā)中,當(dāng)我們發(fā)現(xiàn)我們無法保證徹底的解決一個(gè)問題時(shí),或者為了解決這個(gè)問題導(dǎo)致我們的投入產(chǎn)出比不高時(shí),我們就該考慮是不是應(yīng)該換一種思路去繞過這個(gè)問題,從而達(dá)到同樣的效果。*解決問題的最高境界就是不解決它,巧妙的繞過去~~~~~!!*
總結(jié)
本文花了大量的篇幅介紹了Reactor整體的運(yùn)行框架,并深入介紹了Reactor核心的工作模塊的具體實(shí)現(xiàn)邏輯。
通過本文的介紹我們知道了Reactor如何輪詢注冊(cè)在其上的所有Channel上感興趣的IO事件,以及Reactor如何去處理IO就緒的事件,如何執(zhí)行Netty框架中提交的異步任務(wù)和定時(shí)任務(wù)。
最后介紹了Netty如何巧妙的繞過JDK NIO Epoll空輪詢的BUG,達(dá)到解決問題的目的。