JAVA多線程的實現(xiàn)方式
Java程序中,常見有4種方式實現(xiàn)多線程
①繼承Thread類
②實現(xiàn)Runnable接口
③實現(xiàn)Callable接口
④使用Executor框架
在JDK5之前,創(chuàng)建線程有2種方式,一種是繼承Thread類,另外一種是實現(xiàn)Runnable接口。這2種方式在執(zhí)行完任務之后都無法獲取執(zhí)行結(jié)果,如果需要獲取執(zhí)行結(jié)果,就必須通過共享變量或者使用線程通信的方式來達到效果,這樣使用起來就比較麻煩。自Java 5起,就提供了Callable和Future,通過它們可以在任務執(zhí)行完畢之后得到任務執(zhí)行結(jié)果。
方式①舉例:繼承Thread類,實現(xiàn)run()方法,調(diào)用start()方法啟動線程
public class ThreadSample extends Thread {
@Override
public void run() {
System.out.println(this.getName() + " do some work...");
}
public static void main(String[] args) {
ThreadSample threadSample = new ThreadSample();
threadSample.setName("thread-a");
threadSample.start();
}
}
start()方法調(diào)用后并不是立即執(zhí)行多線程代碼,而是使得該線程變?yōu)镽eady狀態(tài),等待CPU分配執(zhí)行時間。
方式②舉例:實現(xiàn)Runnable接口,實現(xiàn)run()方法,將實例對象傳入Thread構(gòu)造方法
public class ThreadSample implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " do some work...");
}
public static void main(String[] args) {
Thread threadSample = new Thread(new ThreadSample(), "thread-b");
threadSample.start();
}
}
方式③舉例:實現(xiàn)Callable接口和FutureTask對象組合
public class ThreadSample implements Callable<Integer> {
@Override
public Integer call() {
int result = 0;
for (int i = 0; i <= 10; i++) {
result++;
}
return result;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
//1、實例化Callable對象
ThreadSample callableSample = new ThreadSample();
//2、創(chuàng)建裝載線程的FutureTask對象
FutureTask<Integer> ft = new FutureTask<Integer>(callableSample);
//3、啟動線程
Thread thread = new Thread(ft, "thread-callable");
thread.start();
//4、獲取返回結(jié)果
Integer result = ft.get();
System.out.println("result = " + result);
}
}
與使用Runnable相比,Callable功能更強大
- 可以有返回值,支持泛型的返回值,借助FutureTask類獲取返回值;
- 可以捕獲程序執(zhí)行過程中的異常。
方式④舉例:線程池實現(xiàn)
public class ThreadSample implements Callable<Integer> {
@Override
public Integer call() {
int result = 0;
for (int i = 0; i <= 10; i++) {
result++;
}
return result;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService services = Executors.newSingleThreadExecutor();
Future<Integer> future = services.submit(new ThreadSample());
System.out.println("result = " + future.get());
services.shutdown();
}
}
以上繼承Thread類、實現(xiàn)Runnable接口、實現(xiàn)Callable接口三種方式中,理論上優(yōu)先選用Runnable接口和Callable接口,如果有需要返回值則選用Callable接口實現(xiàn)方式。此外,無論何時,當看到這種形式的代碼:
new Thread(runnable).start()
并且最終希望有一個更加靈活的執(zhí)行策略時,都可以認真考慮使用Executor代替Thread。
使用線程池的好處
在線程池中執(zhí)行任務線程,比起每個任務創(chuàng)建一個線程,有很多優(yōu)勢。
- 減少系統(tǒng)開銷。重用存在的線程,而不是創(chuàng)建新的線程,這可以在處理多請求時抵消線程創(chuàng)建、銷毀產(chǎn)生的開銷。
- 提升請求響應性。在請求到達時,工作者線程已經(jīng)存在,可以立即執(zhí)行,因此提高了響應性。
- 增強線程的可管理性。通過調(diào)整線程池的大小,可以充分利用CPU資源,同時可以防止過多的線程相互競爭資源,導致應用程序耗盡內(nèi)存或者失敗。線程池可以對線程資源進行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。
線程池的工作流程
Java線程池的核心實現(xiàn)類是ThreadPoolExecutor類,任務提交到線程池時,具體處理由ThreadPoolExecutor類的execute()方法執(zhí)行。當一個新任務提交到線程池時,線程池的處理流程如下:
①判斷核心線程池里的線程是否都在執(zhí)行任務,如果不是,創(chuàng)建一個新的工作線程來執(zhí)行任務。如果是,則進行下一步流程。
②判斷阻塞隊列是否已滿,如果沒滿,則將新提交的任務存儲在阻塞隊列中。如果滿,則進行下一步流程。
③判斷線程池中的線程是否都處于工作狀態(tài),如果沒有,則創(chuàng)建一個新的工作線程來執(zhí)行任務。如果線程池中所有的線程都處于工作狀態(tài),則交給飽和策略來處理這個任務。
ThreadPoolExecutor通用的構(gòu)造函數(shù)為:
參數(shù)說明如下
corePoolSize
- 線程池基本大小。提交一個任務到線程池時,線程池會創(chuàng)建一個新的線程來執(zhí)行任務。新任務提交時,當目前線程數(shù)小于corePoolSize,即使有空閑的線程可以執(zhí)行該任務,也會創(chuàng)建新的線程。
- 如果線程池中的線程數(shù)已經(jīng)大于或等于corePoolSize,則不會創(chuàng)建新的線程。
- 當一個ThreadPoolExecutor被初始創(chuàng)建后,所有核心線程并非立即開始,而是等到有任務提交的時刻。但如果調(diào)用了prestartAllCoreThreads()方法,所有核心線程會立即啟動。
maximuPoolSize
- 線程池允許創(chuàng)建的最大線程數(shù)。當阻塞隊列滿,且線程數(shù)小于maximumPoolSize時,便可以創(chuàng)建新的線程執(zhí)行任務。
- 如果使用無界阻塞隊列,該參數(shù)無效。
keepAliveTime
- 線程池的工作線程空閑后,保持存活的時間。如果任務多且任務執(zhí)行時間較短,可以調(diào)大該值,提高線程利用率。
- 如果一個線程已經(jīng)閑置的時間超過了存活時間,它將成為一個被回收的候選者,如果當前的池的大小超過了的corePoolSize,線程池會終止它。
unit
- 與keepAliveTime相關(guān)聯(lián)的時間單位。可選值有DAYS、HOURS、MINUTES、毫秒、微妙、納秒。
workQueue
- 用于保存等待執(zhí)行的任務的阻塞隊列。ThreadPoolExecutor允許提供一個BlockingQueue來持有等待執(zhí)行的任務。任務排隊有3種基本方法:無限隊列、有限隊列、同步移交。隊列的選擇和很多其他的配置參數(shù)都有關(guān)系,比如池的大小等。ThreadPoolExecutor推薦以下幾種阻塞隊列。
- LinkedBlockingQueue:線程安全的阻塞隊列,先進先出(FIFO)。可以指定容量(有限隊列),也可以不指定(無限隊列),不指定的話默認最大是Integer.MAX_VALUE。如果所有的工作者線程都處于忙碌狀態(tài),新提交的任務將會在隊列中等候。如果新的任務持續(xù)快速到達,超過了它們被執(zhí)行的速度,隊列會無限制地增加。線程池中能創(chuàng)建的最大線程數(shù)為corePoolSize指定的值。
- ArrayBlockingQueue:數(shù)組實現(xiàn)的有界阻塞隊列,先進先出(FIFO)。線程池中能創(chuàng)建的最大線程數(shù)為maximumPoolSize指定的值。有界隊列有助于避免資源耗盡,當隊列滿時,如果還有新的任務到達,將根據(jù)飽和策略(也稱拒絕策略)進行處理。對于一個有界隊列,隊列的長度與池的長度必須一起調(diào)節(jié)。一個大隊列和一個小池,可以控制對內(nèi)存和CPU的使用,也可以減少上下文切換,但相應的吞吐量也會減小。
- SynchronousQueue:SynchronousQueue并不是一個真正的隊列,而是一種管理直接在線程間移交信息的機制。當新任務到達時,如果所有工作線程都處于忙碌狀態(tài),且線程池數(shù)量小于maximumPoolSize,就會創(chuàng)建一個新的線程。否則根據(jù)飽和策略處理。只有池是無限的,或者可以接受任務被拒絕,SynchronousQueue才是一個有實際價值的選擇。
- PriorityBlokingQueue: 一個支持優(yōu)先級的無界阻塞隊列 。使用該隊列,線程池中能創(chuàng)建的最大線程數(shù)為corePoolSize。
threadFactory
- 線程池創(chuàng)建線程時使用的線程工廠,可以不指定該參數(shù),使用默認的線程工廠Executors.defaultThreadFactory()。
handler
飽和策略,也稱拒絕策略。當有限隊列滿且線程池滿的情況下,新的任務到達后,飽和策略將進行處理。有以下幾種:
- ThreadPoolExecutor.AbortPolicy()
拋出RejectedExecutionException異常。默認策略。調(diào)用者可以捕獲拋出的異常,進行相應的處理。
- ThreadPoolExecutor.CallerRunsPolicy()
不會丟棄任務,也不會拋出異常,由向線程池提交任務的線程來執(zhí)行該任務。
- ThreadPoolExecutor.DiscardPolicy()
丟棄當前的任務
- ThreadPoolExecutor.DiscardOldestPolicy()
丟棄最舊的任務(最先提交而沒有得到執(zhí)行的任務),并執(zhí)行當前任務。
ThreadPoolExecutor執(zhí)行流程如下
①當新任務提交時,如果當前線程池中的線程數(shù)小于corePoolSize,則創(chuàng)建新的線程處理。
②如果線程池中的線程大于或等于corePoolSize,且BlockingQueue未滿,則將新任務加入BlockingQueue。
③如果BlockingQueue已滿,且線程池中的線程數(shù)小于maximumPoolSize,則創(chuàng)建新的工作線程來執(zhí)行任務。
④如果當前運行的線程大于或等于maximumPoolSize,將執(zhí)行飽和策略。即調(diào)用
RejectedExecutionHandler.rejectExecution()方法。
幾種常見的線程池
Executors提供了一些靜態(tài)工廠方法創(chuàng)建的常見線程池。
- newFixedThreadPool
創(chuàng)建一個固定長度的線程池,每當提交一個任務就創(chuàng)建一個線程,直到達到線程池最大長度,這時線程池長度不再變化。如果一個線程異常退出,線程池會補充一個新的線程。
- newCachedThreadPool
創(chuàng)建一個可緩存的線程池,不會對池的長度做限制。如果線程池長度超過需求,它可以靈活地回收空閑的線程;當需求增加時,它可以靈活地添加新的線程。
- newSingleThreadExecutor
創(chuàng)建一個單線程的executor,只創(chuàng)建唯一的工作者線程來執(zhí)行任務,如果這個線程異常結(jié)束,會有一個新的取代它。
- newScheduledThreadPool
創(chuàng)建一個定長的線程池,支持定時執(zhí)行任務。
這幾種線程池中,newFixedThreadPool和newSingleThreadExecutor默認使用無線隊列LinkedBlockingQueue。newCachedThreadPool使用了同步移交隊列SynchronousQueue。newScheduledThreadPool使用了DelayedWorkQueue阻塞隊列。
newCachedThreadPool的corePoolSize為0,maximumPoolSize為Integer.MAX_VALUE,其他幾種線程池corePoolSize與maximumPoolSize一樣大。
線程池的狀態(tài)與生命周期
線程池有5種狀態(tài),在ThreadPoolExecutor 源碼中有定義。
- RUNNING : 線程池最初創(chuàng)建后的初始狀態(tài),該狀態(tài)的線程池既能接受新提交的任務 ,又能處理阻塞隊列中任務。
- SHUTDOWN: 調(diào)用shutdown()方法后進入該狀態(tài)。該狀態(tài)的線程池不能接收新提交的任務 ,但是能處理阻塞隊列中的任務。
- STOP: 調(diào)用shutdownNow()方法后進入該狀態(tài)。該狀態(tài)的線程池不接受新提交的任務 ,也不處理在阻塞隊列中的任務 ,還會中斷正在執(zhí)行的任務。
- TIDYING: 當所有的任務都已終止,工作線程數(shù)為0的狀態(tài)。線程池進入該狀態(tài)后會調(diào)用 terminated() 鉤子方法進入TERMINATED 狀態(tài)。
- TERMINATED: 在terminated()鉤子方法執(zhí)行完后進入該狀態(tài)。
調(diào)用線程池的shutdown()或者shutdownNow()方法可以關(guān)閉線程池,遍歷線程池中工作線程,逐個調(diào)用interrupt方法來中斷線程。
Shutdown()方法與shutdownNow()的特點:
Shutdown()方法將線程池的狀態(tài)設置為SHUTDOWN狀態(tài),只會中斷空閑的工作線程。
shutdownNow()方法將線程池的狀態(tài)設置為STOP狀態(tài),會中斷所有工作線程,不管工作線程是否空閑。
調(diào)用兩者中任何一種方法,都會使isShutdown()方法的返回值為true;線程池中所有的任務都關(guān)閉后,isTerminated()方法的返回值為true。
通常使用shutdown()方法關(guān)閉線程池,如果不要求任務一定要執(zhí)行完,則可以調(diào)用shutdownNow()方法。
確定線程池的大小
線程池合理的長度取決于所要執(zhí)行的任務特征以及程序所部署的系統(tǒng)環(huán)境,一般根據(jù)這二者因素使用配置文件提供或者通過CPU核數(shù)N:
N = Runtime.getRuntime().availableProcessors();
動態(tài)計算而不是硬編碼在代碼中。主要是避免線程池過大或過小這兩種極端情況。如果線程池過大,會導致CPU和內(nèi)存資源競爭,頻繁的上下文切換,任務延遲,甚至資源耗盡。如果線程池過小,會造成CPU和內(nèi)存資源未充分利用,任務處理的吞吐量減小。
對于任務特征來說,需要分清楚是計算密集型任務,還是IO密集型任務或是混合型任務。
計算密集型也稱為CPU密集型,意思就是該任務需要大量運算,而沒有阻塞,CPU一直全速運行。CPU密集型任務只有在多核CPU上才可能得到加速,即scale up,通過多線程程序享受到增加CPU核數(shù)帶來的好處。
IO密集型,即該任務需要大量的IO操作,例如網(wǎng)絡連接、數(shù)據(jù)庫連接等等,執(zhí)行任務過程中會有大量的阻塞。在單線程上運行IO密集型任務會導致浪費大量的CPU運算能力浪費在等待。
對于計算密集型任務,原則是配置盡可能少的線程數(shù),通常建議以下計算方式設置線程池大小來獲得最優(yōu)利用率:
N(線程數(shù)) = N(CPU核數(shù))+ 1
對于IO密集型任務,考慮的因素會多一些,原則是因為較多的時間處于IO阻塞,不能處理新的任務,所有線程數(shù)盡可能大一些,通常建議是:
N(線程數(shù)) = 2 x N(CPU核數(shù)) + 1
或者更精確的:
N(線程數(shù)) = N(CPU核數(shù)) x U x (1 + W/C)
其中U表示CPU使用率,W/C表示IO等待時間與計算時間的比率,這個不需要太精確,只需要一個估算值。例如,4核CPU,CPU使用率80%,IO等待時間1秒,計算時間0.1秒,那么線程數(shù)為:4.8 x 11≈53。一些文章中還提到這種計算方式:
N(線程數(shù)) = N(CPU核數(shù)) x U / (1 - f)
其中U表示CPU使用率,f表示阻塞系數(shù),即IO等待時間與任務執(zhí)行總時間的比率:W/(W + C)。根據(jù)上面的例子計算出線程數(shù)為:4.8/0.09≈53。兩種計算方式的結(jié)果是很相近的。
以上的計算方式和建議盡可以作為理論參考值,實際業(yè)務中可能并不完全按照這個計算值來設置。可以根據(jù)對線程池各項參數(shù)的監(jiān)控,來確定一個合理的值。ThreadPoolExecutor提供的一些可用于獲取監(jiān)控的參數(shù)方法如下:
- getTaskCount():線程池需要執(zhí)行的任務數(shù)量,包括已經(jīng)執(zhí)行完的、未執(zhí)行的和正在執(zhí)行的。
- getCompletedTaskCount():線程池在運行過程中已完成的任務數(shù)量 ,completedTaskCount <= taskCount。
- getLargestPoolSize():線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量 ,通過這個數(shù)據(jù)可以知道線程池是否滿過。如等于線程池的最大大小 ,則表示線程池曾經(jīng)滿了。
- getPoolSize(): 線程池的線程數(shù)量。如果線程池不銷毀的話,池里的線程不會自動銷毀,所以線程池的線程數(shù)量只增不減 。
- getActiveCount():獲取活動的線程數(shù)。
此外,可以通過繼承ThreadPoolExecutor并重寫它的 beforeExecute(),afterExecute() 和 terminated()方法,我們可以在任務執(zhí)行前,執(zhí)行后和線程池關(guān)閉前做一些統(tǒng)計、日志輸出等等操作,以幫助我們更好地監(jiān)控到線程池的運行狀態(tài)。






