背景
Tomcat 源碼中多處用了JAVA.util.concurrent 包中的類,用以處理多線程環(huán)境下的流程控制。近日分析了下NioEndpoint 源碼,本文將以此類為背景,膜拜下 Java 大神們使用 CountDownLatch 并發(fā)控制的手法,其實(shí)也就是簡單的實(shí)際應(yīng)用,算不上高深。
類圖框架
NIO tailored thread pool, providing the following services:
Socket acceptor thread:Acceptor
Socket poller thread:Poller
Worker threads pool:Executor
以上是該類的注釋,結(jié)合源碼我們知道 NioEndpoint 就是一個(gè)定制線程池,管理了三種線程:Acceptor、Poller、Worker。
(百來的一張很清晰的結(jié)構(gòu)圖如下:)
初始化
NioEndpoint 類維護(hù)了一個(gè) stopLatch 的變量,其類型就是 CountDownLatch。它根據(jù) Poller 線程的個(gè)數(shù)進(jìn)行初始化的,源碼如下:
public void bind() throws Exception {
....
if (acceptorThreadCount == 0) {
// FIXME: Doesn't seem to work that well with multiple accept threads
acceptorThreadCount = 1;
}
if (pollerThreadCount <= 0) {
//minimum one poller thread
pollerThreadCount = 1;
}
stopLatch = new CountDownLatch(pollerThreadCount);
....
}
NioEndpont 類初始化時(shí)指定了 Poller 和 Accetpor 線程數(shù),而且從上面代碼的注釋信息來看 acceptorThreadCount 的固定 是 1,即 Tomcat 的 NIO 并不支持多個(gè) Accepor 線程,此外也沒有可以修改該屬性的途徑。
stopLatch 控制流程
stopLatch,顧名思義,是控制 Tomcat 的組件停止時(shí)使用的鎖,利用 CountDownLatch ,主線程等待一組線程到達(dá)某個(gè)狀態(tài)后,才進(jìn)行后面的處理。NioEndpoint 的 stopInternal() 方法的流程如下:
public void stopInternal() {
releaseConnectionLatch();
if (!paused) {
pause();
}
if (running) {
running = false;
unlockAccept();
for (int i=0; pollers!=null && i<pollers.length; i++) {
if (pollers[i]==null) continue;
pollers[i].destroy();
pollers[i] = null;
}
try {
stopLatch.await(selectorTimeout + 100, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
}
shutdownExecutor();
eventCache.clear();
nioChannels.clear();
processorCache.clear();
}
}
該方法將導(dǎo)致所有的處理線程都停止工作,其流程為:
- 首先,通知Poller線程停止工作,調(diào)用其 destroy,設(shè)置 Poller 的 close 標(biāo)識(shí)為 true。
- 其次,設(shè)置 running 為 false,通知 Acceptor 線程終止 run 方法的循環(huán)處理。
- 第三,當(dāng)前線程 stopLatch.await ,等待所有的 Poller 線程的 run方法結(jié)束。Poller 的 run 方法最后一句是 stopLatch.countDown(),當(dāng) stopInternal 的 await 方法被喚醒時(shí),說明所有的Poller 線程都結(jié)束了。
- 第四,此處調(diào)用 await 操作的超時(shí)時(shí)間設(shè)置為 selectorTimeout,這個(gè)值也是Poller處理時(shí)的阻塞時(shí)間,也就是說:如果Poller的在輪詢過程中調(diào)用了selector.select(selectorTimeout);的話,最多等待這么長時(shí)間,就能保證所有的Poller都及時(shí)結(jié)束了。
此處,之所以不用考慮 Acceptor 的結(jié)束問題,是因?yàn)?Acceptor 線程只有一個(gè),而且它沒有阻塞處理,所以一旦 running 標(biāo)識(shí)為 false,它就會(huì)立即結(jié)束。
所有的處理線程都結(jié)束之后,shutdownExecutor() 操作會(huì)關(guān)閉工作線程池的調(diào)度器,至此,所有的線程都被關(guān)閉了。
啟示錄
開發(fā)中,如何需要自定義線程池框架,就可以參照這個(gè)流程對(duì)線程池資源進(jìn)行關(guān)閉,用 JUC 包中的并發(fā)工具類,比自己寫同步計(jì)數(shù)器方便多了!Tomcat 教我們的這一招,你學(xué)會(huì)了嗎?






