
作者 | 阿文,責(zé)編 | 郭芮
頭圖 | CSDN 下載自東方IC
在多線程和高并發(fā)場景中,需要?jiǎng)?chuàng)建大量的線程來進(jìn)行業(yè)務(wù)處理,我們通常創(chuàng)建線程有兩種方法,一種是通過繼承Thread類,另一種是實(shí)現(xiàn)Runnable的接口,但是我們創(chuàng)建這兩種線程在運(yùn)行結(jié)束后都會(huì)被虛擬機(jī)銷毀,如果數(shù)量多的話,頻繁的創(chuàng)建和銷毀線程會(huì)大大浪費(fèi)時(shí)間和效率,更重要的是浪費(fèi)內(nèi)存,線程執(zhí)行完畢后變?yōu)樗劳鰻顟B(tài),線程對象變?yōu)槔@個(gè)需要依靠虛擬機(jī)進(jìn)行監(jiān)督和回收,影響系統(tǒng)的性能。這種問題使用線程池便可以很好的解決。通過線程池線程,銷毀及回收等交由線程池進(jìn)行管理,就可以避免以上的問題。
我們在使用過程中經(jīng)常會(huì)直接使用newSingleThreadExecutor,newCachedThreadPool,newFixedThreadPool(int Threads)等已經(jīng)封裝好的線程池,但這些都是通過ThreadPoolExecutor類中通過構(gòu)造函數(shù)傳入不同的參數(shù)封裝的對象,所以想要了解線程池,我們就要認(rèn)真研究一下線程池中最重要的ThreadPoolExecutor類。
ThreadPoolExecutor類最重要的構(gòu)造函數(shù):
publicThreadPoolExecutor( intcorePoolSize,
intmaximumPoolSize,
longkeepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
函數(shù)的參數(shù)含義如下:
corePoolSize:核心池大小,指定了線程池中的線程數(shù)量。
maximumPoolSize :最大池大小,指定了線程池中的最大線程數(shù)量。
keepAliveTime :存活時(shí)間,當(dāng)線程池?cái)?shù)量超過corePoolSize時(shí),多余的空閑線程的存活時(shí)間,即超過corePoolSize的空閑線程,在多長時(shí)間內(nèi)會(huì)被銷毀。
unit :keepAliveTime的單位。
workQueue :任務(wù)隊(duì)列,被提交單尚未被執(zhí)行的任務(wù)。
threadFactory :線程工廠,用于創(chuàng)建線程,一般用于默認(rèn)的即可。
handler:拒絕策略。當(dāng)任務(wù)太多來不及處理時(shí),如何拒絕任務(wù)。
核心池大小,最大池大小和存活時(shí)間共同管理這線程的創(chuàng)建與銷毀。核心池大小是目標(biāo)大小;線程池的實(shí)現(xiàn)試圖維護(hù)線程池的大小,即是沒有任務(wù)執(zhí)行,池的大小也等于核心池的大小,并且在工作隊(duì)列充滿前,線程池都不會(huì)創(chuàng)建更多的線程。最大池的大小是可同時(shí)活動(dòng)的線程數(shù)的上限。如果一個(gè)線程已經(jīng)閑置的時(shí)間超過了存活時(shí)間,它將被線程池回收。
構(gòu)造函數(shù)的參數(shù)中大部分都很簡單,只有參數(shù)workQueue和handler需要進(jìn)行詳細(xì)說明,下面對這兩個(gè)參數(shù)進(jìn)行詳細(xì)的說明:
參數(shù)workQueue指被提交但未執(zhí)行的任務(wù)隊(duì)列,它是一個(gè)BlockingQueue接口的對象,僅用于存放Runnable對象,根據(jù)隊(duì)列功能分類,在ThreadPoolExecutor類的構(gòu)造函數(shù)中可以使用以下幾種BlockingQueue接口。
1.直接提交的隊(duì)列:該功能由SynchronousQueue對象提供。SynchronousQueue是一個(gè)特殊的BlockingQueue。SynchronousQueue沒有容量,每一個(gè)插入操作都要等待一個(gè)相應(yīng)的刪除操作,反之,每一個(gè)刪除操作都要等待對應(yīng)的插入操作。如果使用SynchronousQueue,則提交的任務(wù)不會(huì)被真實(shí)的保存,而總是將新任務(wù)提交給線程執(zhí)行,如果沒有空閑線程,則嘗試創(chuàng)建新的線程,如果進(jìn)程數(shù)量已經(jīng)達(dá)到最大值,則執(zhí)行拒絕策略。因此,使用SynchronousQueue隊(duì)列,通常要設(shè)置很大的maximumPoolSize值,否則很容易執(zhí)行拒絕策略。
2.有界的任務(wù)隊(duì)列:有界的任務(wù)隊(duì)列可以使用ArrayBlockingQueue類實(shí)現(xiàn)。ArrayBlockingQueue類的構(gòu)造函數(shù)必須帶一個(gè)容量參數(shù),表示該隊(duì)列的最大容量:
當(dāng)使用有界的任務(wù)隊(duì)列時(shí),若有新的任務(wù)需要執(zhí)行,如果線程池的實(shí)際線程數(shù)小于corePoolSize,則會(huì)優(yōu)先創(chuàng)建新的線程,若大于corePoolSize,則會(huì)將新任務(wù)加入等待隊(duì)列。若等待隊(duì)列已滿,無法加入。則在總線程數(shù)不大于maximumPoolSize的前提下,創(chuàng)建新的進(jìn)程執(zhí)行任務(wù)。若大于maximumPoolSize,則執(zhí)行拒絕策略。可見,有界隊(duì)列僅當(dāng)在任務(wù)隊(duì)列裝滿時(shí),才可能將線程數(shù)提升到corePoolSize以上,換言之,除非系統(tǒng)非常繁忙,否則要確保核心線程數(shù)維持在corePoolSize。
3.無界的任務(wù)隊(duì)列:無界任務(wù)隊(duì)列可以通過LinkedBlockingQueue類實(shí)現(xiàn)。與有界隊(duì)列相比,除非系統(tǒng)資源耗盡,否則無界的任務(wù)隊(duì)列不存在任務(wù)入隊(duì)失敗的情況。當(dāng)有新的任務(wù)到來,系統(tǒng)的線程數(shù)小于corePoolSize時(shí),線程池會(huì)生成新的線程執(zhí)行任務(wù),但當(dāng)系統(tǒng)的線程數(shù)達(dá)到corePoolSize時(shí),線程就不會(huì)繼續(xù)增加了。若后續(xù)任由新的任務(wù)加入,而又沒有空閑的線程資源,則任務(wù)直接進(jìn)入隊(duì)列等待。若任務(wù)創(chuàng)建和處理的速度差異很大,無界隊(duì)列會(huì)保持快速增長,直到耗盡系統(tǒng)內(nèi)存。
4.優(yōu)先任務(wù)隊(duì)列:優(yōu)先任務(wù)隊(duì)列是帶有執(zhí)行優(yōu)先級(jí)的任務(wù)隊(duì)列。它通過PriorityBlockingQueue類實(shí)現(xiàn),可以控制任務(wù)的執(zhí)行先后順序。他是一個(gè)特殊的無界隊(duì)列。無論是有界隊(duì)列ArrayBlockingQueue類,還是未指定大小的無界隊(duì)列LinkedBlockingQueue類都是按照先進(jìn)先出算法處理任務(wù)的。而PriorityBlockingQueue類則可以根據(jù)任務(wù)自身的優(yōu)先級(jí)順序先后執(zhí)行,在確保系統(tǒng)性能的同時(shí),也能有很好的質(zhì)量保證(總是確保高優(yōu)先級(jí)的任務(wù)先執(zhí)行)。
拒絕策略:
ThreadPoolExecutor類的最后一個(gè)參數(shù)指定了拒絕策略。也就是當(dāng)任務(wù)數(shù)量超過系統(tǒng)實(shí)際承載能力時(shí),就要用到拒絕策略了。拒絕策略可以說是系統(tǒng)超負(fù)荷運(yùn)行時(shí)的補(bǔ)救措施,通常由于壓力太大而引起的,也就是線程池中的線程已經(jīng)用完了,無法繼續(xù)為新任務(wù)服務(wù),同時(shí),等待隊(duì)列中也已經(jīng)排滿了,再也放不下新任務(wù)了。這時(shí),我們就需要有一套機(jī)制合理的處理這個(gè)問題。
jdk在ThreadPoolExecutor類中定義了四種內(nèi)置的拒絕策略,其均實(shí)現(xiàn)RejectedExecutionHandler接口。其四種拒絕策略為:
1.AbortPolicy策略:該策略會(huì)直接拋出異常,阻止系統(tǒng)正常工作。
2.CallRunsPolicy策略:只要線程池未關(guān)閉,該策略直接在調(diào)用者線程中,運(yùn)行當(dāng)前被丟棄的任務(wù)。顯然這樣做不會(huì)真的丟棄任務(wù),但是,任務(wù)提交線程的性能極有可能會(huì)急劇下降。
3.DiscardOldestPolicy策略:該策略將丟棄最老的一個(gè)請求,也就是即將被執(zhí)行的一個(gè)任務(wù),并嘗試再次提交當(dāng)前任務(wù)。
4.DiscardPolicy策略:該策略默默地丟棄無法處理的任務(wù),不予任何處理。如果允許任務(wù)丟失,我覺得這可能是最好的一種方案了吧。
線程池的主要作用是為了線程復(fù)用,也就是避免了線程的頻繁創(chuàng)建。但是,最開始的那些線程從何而來呢?答案就是ThreadFactory。ThreadFactory是一個(gè)接口,它只有一個(gè)用來創(chuàng)建線程的方法:
當(dāng)線程池需要新建線程時(shí),就會(huì)調(diào)用這個(gè)方法。
對于核心的幾個(gè)線程池,無論是newFixedThreadPool方法,newSingleThreadExecutor方法,還是newCacheThreadPool方法,雖然看起來創(chuàng)建的線程有著完全不同的功能特點(diǎn),但其內(nèi)部實(shí)現(xiàn)均使用了ThreadPoolExecutor類,下面給出這三個(gè)線程池的實(shí)現(xiàn)方式
publicstaticExecutorService newFixedThreadPool( intnThreads) {
returnnewThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
newLinkedBlockingQueue<Runnable>);
}
newFixedThreadPool 方法的實(shí)現(xiàn),它返回了一個(gè)corePoolSize和maximumPoolSize大小一樣的,并且使用了LinkedBlockingQueue任務(wù)隊(duì)列的線程池。因?yàn)閷潭ù笮〉木€程池而言,不存在線程數(shù)量的動(dòng)態(tài)變化,因此corePoreSize和maximumPoolSize相等。同時(shí),它使用無界隊(duì)列存放無法立即執(zhí)行的任務(wù),當(dāng)任務(wù)提交非常頻繁的時(shí)候,該隊(duì)列可能迅速膨脹,從而耗盡系統(tǒng)資源。
publicstaticExecutorService newSingleThreadExecutor{
returnnewFinalizableDelegatedExecutorService
( newThreadPoolExecutor( 1, 1,
0L, TimeUnit.MILLISECONDS,
newLinkedBlockingQueue<Runnable>));
}
newSingleThreadExecutor方法返回的單線程線程池,是newFixedThreadPool方法的一種退化,只是簡單的將線程池線程數(shù)量設(shè)置為1。它的特點(diǎn)在于工作線程數(shù)目被限制為1,操作一個(gè)無界的工作隊(duì)列,所以他能保證了所有任務(wù)都是被順序執(zhí)行,最多會(huì)有一個(gè)任務(wù)處于活動(dòng)狀態(tài),并且不允許使用者改動(dòng)線程池實(shí)例,因此可以避免其改變線程數(shù)目。
publicstaticExecutorService newCachedThreadPool{
returnnewThreadPoolExecutor( 0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
newSynchronousQueue<Runnable>);
}
newCacheThreadPool方法返回corePoolSize為0,maximumPoolSize無窮大的線程池,這意味著在沒有任務(wù)時(shí),該線程池內(nèi)無線程,而當(dāng)任務(wù)被提交時(shí),該線程池會(huì)使用空閑的線程執(zhí)行任務(wù),若無空閑線程,則將任務(wù)加入SynchronousQueue隊(duì)列,而SynchronousQueue隊(duì)列時(shí)一種直接提交的隊(duì)列,它總會(huì)迫使線程池增加新的線程執(zhí)行任務(wù)。當(dāng)任務(wù)執(zhí)行完畢后,由于corePoolSize為0,因此空閑線程又會(huì)在指定時(shí)間內(nèi)60秒內(nèi)被回收。它是一種用來處理大量短時(shí)間工作任務(wù)的線程池,具有幾個(gè)鮮明特點(diǎn):它會(huì)試圖緩存線程并重用,當(dāng)無緩存線程可用時(shí),就會(huì)創(chuàng)建新的工作線程;如果線程閑置的時(shí)間超過60秒,則被終止并移除緩存,長時(shí)間閑置時(shí),這種線程池,不會(huì)消耗什么資源,其內(nèi)部使用SynchronousQueue作為工作隊(duì)列,無界線程池,可以進(jìn)行自動(dòng)線程回收。
在使用自定義線程池時(shí),要根據(jù)應(yīng)用的具體情況,選擇合適的并發(fā)隊(duì)列作為任務(wù)的緩沖。當(dāng)線程資源緊張時(shí),不同的并發(fā)隊(duì)列對系統(tǒng)行為和性能的影響也不相同。
importJAVA.util.concurrent.LinkedBlockingQueue;
importjava.util.concurrent.ThreadFactory;
importjava.util.concurrent.ThreadPoolExecutor;
importjava.util.concurrent.TimeUnit;
importjava.util.concurrent.atomic.AtomicInteger;
publicclassThreadPoolDefinedTest{
publicstaticvoidmain(String[] args){
LinkedBlockingQueue<Runnable> blockingQueue = newLinkedBlockingQueue<>( 100);
ThreadFactory threadFactory = newThreadFactory {
// int i = 0; 用并發(fā)安全的包裝類
AtomicInteger atomicInteger = newAtomicInteger( 1);
@Override
publicThread newThread(Runnable r){
//創(chuàng)建線程任務(wù)傳進(jìn)來
Thread thread = newThread(r);
// 給線程起個(gè)名字
thread.setName( "MyThread"+ atomicInteger.getAndIncrement);
returnthread;
}
};
ThreadPoolExecutor pool = newThreadPoolExecutor( 10, 10, 1, TimeUnit.SECONDS, blockingQueue, threadFactory);
for( inti = 0; i < 5; i++) {
pool.execute( newRunnable {
@Override
publicvoidrun{
try{
method;
} catch(InterruptedException e) {
e.printStackTrace;
}
}
});
}
}
privatestaticvoidmethodthrowsInterruptedException {
System.out.println( "ThreadName"+ Thread.currentThread.getName + "進(jìn)來了");
Thread.sleep( 2000);
System.out.println( "ThreadName"+ Thread.currentThread.getName + "出去了");
}
}
通過探究ThreadPoolExecutor類中封裝的線程池的構(gòu)造函數(shù),可以有效的理解創(chuàng)建線程池時(shí)的各個(gè)參數(shù)的作用,從而選擇適合我們業(yè)務(wù)場景所需要的線程池類型。線程池涵蓋的內(nèi)容很多很豐富,我們需要不斷通過學(xué)習(xí)和實(shí)踐,增強(qiáng)我們對線程,線程池的理解,希望通過本篇文章對你能有所幫助。






