線程池java源碼
Ⅰ java 線程池 工作隊列是如何工作的
使用線程池的好處
1、降低資源消耗
可以重復利用已創建的線程降低線程創建和銷毀造成的消耗。
2、提高響應速度
當任務到達時,任務可以不需要等到線程創建就能立即執行。
3、提高線程的可管理性
線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一分配、調優和監控
線程池的工作原理
首先我們看下當一個新的任務提交到線程池之後,線程池是如何處理的
1、線程池判斷核心線程池裡的線程是否都在執行任務。如果不是,則創建一個新的工作線程來執行任務。如果核心線程池裡的線程都在執行任務,則執行第二步。
2、線程池判斷工作隊列是否已經滿。如果工作隊列沒有滿,則將新提交的任務存儲在這個工作隊列里進行等待。如果工作隊列滿了,則執行第三步
3、線程池判斷線程池的線程是否都處於工作狀態。如果沒有,則創建一個新的工作線程來執行任務。如果已經滿了,則交給飽和策略來處理這個任務
線程池飽和策略
這里提到了線程池的飽和策略,那我們就簡單介紹下有哪些飽和策略:
AbortPolicy
為Java線程池默認的阻塞策略,不執行此任務,而且直接拋出一個運行時異常,切記ThreadPoolExecutor.execute需要try catch,否則程序會直接退出。
DiscardPolicy
直接拋棄,任務不執行,空方法
DiscardOldestPolicy
從隊列裡面拋棄head的一個任務,並再次execute 此task。
CallerRunsPolicy
在調用execute的線程裡面執行此command,會阻塞入口
用戶自定義拒絕策略(最常用)
實現RejectedExecutionHandler,並自己定義策略模式
下我們以ThreadPoolExecutor為例展示下線程池的工作流程圖
3.jpg
關鍵方法源碼分析
我們看看核心方法添加到線程池方法execute的源碼如下:
// //Executes the given task sometime in the future. The task //may execute in a new thread or in an existing pooled thread. // // If the task cannot be submitted for execution, either because this // executor has been shutdown or because its capacity has been reached, // the task is handled by the current {@code RejectedExecutionHandler}. // // @param command the task to execute // @throws RejectedExecutionException at discretion of // {@code RejectedExecutionHandler}, if the task // cannot be accepted for execution // @throws NullPointerException if {@code command} is null // public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // // Proceed in 3 steps: // // 1. If fewer than corePoolSize threads are running, try to // start a new thread with the given command as its first // task. The call to addWorker atomically checks runState and // workerCount, and so prevents false alarms that would add // threads when it shouldn't, by returning false. // 翻譯如下: // 判斷當前的線程數是否小於corePoolSize如果是,使用入參任務通過addWord方法創建一個新的線程, // 如果能完成新線程創建exexute方法結束,成功提交任務 // 2. If a task can be successfully queued, then we still need // to double-check whether we should have added a thread // (because existing ones died since last checking) or that // the pool shut down since entry into this method. So we // recheck state and if necessary roll back the enqueuing if // stopped, or start a new thread if there are none. // 翻譯如下: // 在第一步沒有完成任務提交;狀態為運行並且能否成功加入任務到工作隊列後,再進行一次check,如果狀態 // 在任務加入隊列後變為了非運行(有可能是在執行到這里線程池shutdown了),非運行狀態下當然是需要 // reject;然後再判斷當前線程數是否為0(有可能這個時候線程數變為了0),如是,新增一個線程; // 3. If we cannot queue task, then we try to add a new // thread. If it fails, we know we are shut down or saturated // and so reject the task. // 翻譯如下: // 如果不能加入任務到工作隊列,將嘗試使用任務新增一個線程,如果失敗,則是線程池已經shutdown或者線程池 // 已經達到飽和狀態,所以reject這個他任務 // int c = ctl.get(); // 工作線程數小於核心線程數 if (workerCountOf(c) < corePoolSize) { // 直接啟動新線程,true表示會再次檢查workerCount是否小於corePoolSize if (addWorker(command, true)) return; c = ctl.get(); } // 如果工作線程數大於等於核心線程數 // 線程的的狀態未RUNNING並且隊列notfull if (isRunning(c) && workQueue.offer(command)) { // 再次檢查線程的運行狀態,如果不是RUNNING直接從隊列中移除 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 移除成功,拒絕該非運行的任務 reject(command); else if (workerCountOf(recheck) == 0) // 防止了SHUTDOWN狀態下沒有活動線程了,但是隊列里還有任務沒執行這種特殊情況。 // 添加一個null任務是因為SHUTDOWN狀態下,線程池不再接受新任務 addWorker(null, false); } // 如果隊列滿了或者是非運行的任務都拒絕執行 else if (!addWorker(command, false)) reject(command); }
Ⅱ java線程池怎麼實現
要想理解清楚java線程池實現原理,明白下面幾個問題就可以了:
(1):線程池存在哪些狀態,這些狀態之間是如何進行切換的呢?
(2):線程池的種類有哪些?
(3):創建線程池需要哪些參數,這些參數的具體含義是什麼?
(4):將任務添加到線程池之後運行流程?
(5):線程池是怎麼做到重用線程的呢?
(6):線程池的關閉
首先回答第一個問題:線程池存在哪些狀態;
查看ThreadPoolExecutor源碼便知曉:
[java]view plain
//runStateisstoredinthehigh-orderbits
privatestaticfinalintRUNNING=-1<<COUNT_BITS;
privatestaticfinalintSHUTDOWN=0<<COUNT_BITS;
privatestaticfinalintSTOP=1<<COUNT_BITS;
privatestaticfinalintTIDYING=2<<COUNT_BITS;
=3<<COUNT_BITS;
*RUNNING->SHUTDOWN
*Oninvocationofshutdown(),perhapsimplicitlyinfinalize()
*(RUNNINGorSHUTDOWN)->STOP
*OninvocationofshutdownNow()
*SHUTDOWN->TIDYING
*Whenbothqueueandpoolareempty
*STOP->TIDYING
*Whenpoolisempty
*TIDYING->TERMINATED
*Whentheterminated()hookmethodhascompleted
publicThreadPoolExecutor(intcorePoolSize,
intmaximumPoolSize,
longkeepAliveTime,
TimeUnitunit,
BlockingQueue<Runnable>workQueue,
ThreadFactorythreadFactory,
)
if(wc>=CAPACITY||
wc>=(core?corePoolSize:maximumPoolSize))
returnfalse;
privatefinalclassWorker
implementsRunnable
Worker(RunnablefirstTask){
setState(-1);//
this.firstTask=firstTask;
this.thread=getThreadFactory().newThread(this);
}
public<T>Future<T>submit(Callable<T>task){
if(task==null)thrownewNullPointerException();
RunnableFuture<T>ftask=newTaskFor(task);
execute(ftask);
returnftask;
}
存在5種狀態:
<1>Running:可以接受新任務,同時也可以處理阻塞隊列裡面的任務;
<2>Shutdown:不可以接受新任務,但是可以處理阻塞隊列裡面的任務;
<3>Stop:不可以接受新任務,也不處理阻塞隊列裡面的任務,同時還中斷正在處理的任務;
<4>Tidying:屬於過渡階段,在這個階段表示所有的任務已經執行結束了,當前線程池中是不存在有效的線程的,並且將要調用terminated方法;
<5>Terminated:終止狀態,這個狀態是在調用完terminated方法之後所處的狀態;
那麼這5種狀態之間是如何進行轉換的呢?查看ThreadPoolExecutor源碼裡面的注釋便可以知道啦:
[java]view plain
從上面可以看到,在調用shutdown方法的時候,線程池狀態會從Running轉換成Shutdown;在調用shutdownNow方法的時候,線程池狀態會從Running/Shutdown轉換成Stop;在阻塞隊列為空同時線程池為空的情況下,線程池狀態會從Shutdown轉換成Tidying;在線程池為空的情況下,線程池狀態會從Stop轉換成Tidying;當調用terminated方法之後,線程池狀態會從Tidying轉換成Terminate;
在明白了線程池的各個狀態以及狀態之間是怎麼進行切換之後,我們來看看第二個問題,線程池的種類:
(1):CachedThreadPool:緩存線程池,該類線程池中線程的數量是不確定的,理論上可以達到Integer.MAX_VALUE個,這種線程池中的線程都是非核心線程,既然是非核心線程,那麼就存在超時淘汰機制了,當裡面的某個線程空閑時間超過了設定的超時時間的話,就會回收掉該線程;
(2):FixedThreadPool:固定線程池,這類線程池中是只存在核心線程的,對於核心線程來說,如果我們不設置allowCoreThreadTimeOut屬性的話是不存在超時淘汰機制的,這類線程池中的corePoolSize的大小是等於maximumPoolSize大小的,也就是說,如果線程池中的線程都處於活動狀態的話,如果有新任務到來,他是不會開辟新的工作線程來處理這些任務的,只能將這些任務放到阻塞隊列裡面進行等到,直到有核心線程空閑為止;
(3):ScheledThreadPool:任務線程池,這種線程池中核心線程的數量是固定的,而對於非核心線程的數量是不限制的,同時對於非核心線程是存在超時淘汰機制的,主要適用於執行定時任務或者周期性任務的場景;
(4):SingleThreadPool:單一線程池,線程池裡面只有一個線程,同時也不存在非核心線程,感覺像是FixedThreadPool的特殊版本,他主要用於確保任務在同一線程中的順序執行,有點類似於進行同步吧;
接下來我們來看第三個問題,創建線程池需要哪些參數:
同樣查看ThreadPoolExecutor源碼,查看創建線程池的構造函數:
[java]view plain
不管你調用的是ThreadPoolExecutor的哪個構造函數,最終都會執行到這個構造函數的,這個構造函數有7個參數,正是由於對這7個參數值的賦值不同,造成生成不同類型的線程池,比如我們常見的CachedThreadPoolExecutor、FixedThreadPoolExecutor
SingleThreadPoolExecutor、ScheledThreadPoolExecutor,我們老看看這幾個參數的具體含義:
<1>corePoolSize:線程池中核心線程的數量;當提交一個任務到線程池的時候,線程池會創建一個線程來執行執行任務,即使有其他空閑的線程存在,直到線程數達到corePoolSize時不再創建,這時候會把提交的新任務放入到阻塞隊列中,如果調用了線程池的preStartAllCoreThreads方法,則會在創建線程池的時候初始化出來核心線程;
<2>maximumPoolSize:線程池允許創建的最大線程數;如果阻塞隊列已經滿了,同時已經創建的線程數小於最大線程數的話,那麼會創建新的線程來處理阻塞隊列中的任務;
<3>keepAliveTime:線程活動保持時間,指的是工作線程空閑之後繼續存活的時間,默認情況下,這個參數只有線程數大於corePoolSize的時候才會起作用,即當線程池中的線程數目大於corePoolSize的時候,如果某一個線程的空閑時間達到keepAliveTime,那麼這個線程是會被終止的,直到線程池中的線程數目不大於corePoolSize;如果調用allowCoreThreadTimeOut的話,在線程池中線程數量不大於corePoolSize的時候,keepAliveTime參數也可以起作用的,知道線程數目為0為止;
<4>unit:參數keepAliveTime的時間單位;
<5>workQueue:阻塞隊列;用於存儲等待執行的任務,有四種阻塞隊列類型,ArrayBlockingQueue(基於數組的有界阻塞隊列)、LinkedBlockingQueue(基於鏈表結構的阻塞隊列)、SynchronousQueue(不存儲元素的阻塞隊列)、PriorityBlockingQueue(具有優先順序的阻塞隊列);
<6>threadFactory:用於創建線程的線程工廠;
<7>handler:當阻塞隊列滿了,且沒有空閑線程的情況下,也就是說這個時候,線程池中的線程數目已經達到了最大線程數量,處於飽和狀態,那麼必須採取一種策略來處理新提交的任務,我們可以自己定義處理策略,也可以使用系統已經提供給我們的策略,先來看看系統為我們提供的4種策略,AbortPolicy(直接拋出異常)、CallerRunsPolicy(只有調用者所在的線程來運行任務)、DiscardOldestPolicy(丟棄阻塞隊列中最近的一個任務,並執行當前任務)、Discard(直接丟棄);
接下來就是將任務添加到線程池之後的運行流程了;
我們可以調用submit或者execute方法,兩者最大的區別在於,調用submit方法的話,我們可以傳入一個實現Callable介面的對象,進而能在當前任務執行結束之後通過Future對象獲得任務的返回值,submit內部實際上還是執行的execute方法;而調用execute方法的話,是不能獲得任務執行結束之後的返回值的;此外,調用submit方法的話是可以拋出異常的,但是調用execute方法的話,異常在其內部得到了消化,也就是說異常在其內部得到了處理,不會向外傳遞的;
因為submit方法最終也是會執行execute方法的,因此我們只需要了解execute方法就可以了:
在execute方法內部會分三種情況來進行處理:
<1>:首先判斷當前線程池中的線程數量是否小於corePoolSize,如果小於的話,則直接通過addWorker方法創建一個新的Worker對象來執行我們當前的任務;
<2>:如果說當前線程池中的線程數量大於corePoolSize的話,那麼會嘗試將當前任務添加到阻塞隊列中,然後第二次檢查線程池的狀態,如果線程池不在Running狀態的話,會將剛剛添加到阻塞隊列中的任務移出,同時拒絕當前任務請求;如果第二次檢查發現當前線程池處於Running狀態的話,那麼會查看當前線程池中的工作線程數量是否為0,如果為0的話,就會通過addWorker方法創建一個Worker對象出來處理阻塞隊列中的任務;
<3>:如果原先線程池就不處於Running狀態或者我們剛剛將當前任務添加到阻塞隊列的時候出現錯誤的話,那麼會去嘗試通過addWorker創建新的Worker來處理當前任務,如果添加失敗的話,則拒絕當前任務請求;
可以看到在上面的execute方法中,我們僅僅只是檢查了當前線程池中的線程數量有沒有超過corePoolSize的情況,那麼當前線程池中的線程數量有沒有超過maximumPoolSize是在哪裡檢測的呢?實際上是在addWorker方法裡面了,我們可以看下addWorker裡面的一段代碼:
[java]view plain
如果當前線程數量超過maximumPoolSize的話,直接就會調用return方法,返回false;
其實到這里我們很明顯可以知道,一個線程池中線程的數量實際上就是這個線程池中Worker的數量,如果Worker的大小超過了corePoolSize,那麼任務都在阻塞隊列裡面了,Worker是Java對我們任務的一個封裝類,他的聲明是醬紫的:
[java]view plain
可以看到他實現了Runnable介面,他是在addWorker方法裡面通過new Worker(firstTask)創建的,我們來看看他的構造函數就知道了:
[java]view plain
而這里的firstTask其實就是我們調用execute或者submit的時候傳入的那個參數罷了,一般來說這些參數是實現Callable或者Runnable介面的;
在通過addWorker方法創建出來Worker對象之後,這個方法的最後會執行Worker內部thread屬性的start方法,而這個thread屬性實際上就是封裝了Worker的Thread,執行他的start方法實際上執行的是Worker的run方法,因為Worker是實現了Runnable介面的,在run方法裡面就會執行runWorker方法,而runWorker方法裡面首先會判斷當前我們傳入的任務是否為空,不為空的話直接就會執行我們通過execute或者submit方法提交的任務啦,注意一點就是我們雖然會通過submit方法提交實現了Callable介面的對象,但是在調用submit方法的時候,其實是會將Callable對象封裝成實現了Runnable介面對象的,不信我們看看submit方法源碼是怎麼實現的:
[java]view plain
看到沒有呢,實際上在你傳入實現了Callable介面對象的時候,在submit方法裡面是會將其封裝成RunnableFuture對象的,而RunnableFuture介面是繼承了Runnable介面的;那麼說白了其實就是直接執行我們提交任務的run方法了;如果為空的話,則會通過getTask方法從阻塞隊列裡面拿出一個任務去執行;在任務執行結束之後繼續從阻塞隊列裡面拿任務,直到getTask的返回值為空則退出runWorker內部循環,那麼什麼情況下getTask返回為空呢?查看getTask方法的源碼注釋可以知道:在Worker必須需要退出的情況下getTask會返回空,具體什麼情況下Worker會退出呢?(1):當Worker的數量超過maximumPoolSize的時候;(2):當線程池狀態為Stop的時候;(3):當線程池狀態為Shutdown並且阻塞隊列為空的時候;(4):使用等待超時時間從阻塞隊列中拿數據,但是超時之後仍然沒有拿到數據;
如果runWorker方法退出了它裡面的循環,那麼就說明當前阻塞隊列裡面是沒有任務可以執行的了,你可以看到在runWorker方法內部的finally語句塊中執行了processWorkerExit方法,用來對Worker對象進行回收操作,這個方法會傳入一個參數表示需要刪除的Worker對象;在進行Worker回收的時候會調用tryTerminate方法來嘗試關閉線程池,在tryTerminate方法裡面會檢查是否有Worker在工作,檢查線程池的狀態,沒問題的話就會將當前線程池的狀態過渡到Tidying,之後調用terminated方法,將線程池狀態更新到Terminated;
從上面的分析中,我們可以看出線程池運行的4個階段:
(1):poolSize < corePoolSize,則直接創建新的線程(核心線程)來執行當前提交的任務;
(2):poolSize = corePoolSize,並且此時阻塞隊列沒有滿,那麼會將當前任務添加到阻塞隊列中,如果此時存在工作線程(非核心線程)的話,那麼會由工作線程來處理該阻塞隊列中的任務,如果此時工作線程數量為0的話,那麼會創建一個工作線程(非核心線程)出來;
(3):poolSize = corePoolSize,並且此時阻塞隊列已經滿了,那麼會直接創建新的工作線程(非核心線程)來處理阻塞隊列中的任務;
(4):poolSize = maximumPoolSize,並且此時阻塞隊列也滿了的話,那麼會觸發拒絕機制,具體決絕策略採用的是什麼就要看我們創建ThreadPoolExecutor的時候傳入的RejectExecutionHandler參數了;
接下來就是線程池是怎麼做到重用線程的呢?
個人認為線程池裡面重用線程的工作是在getTask裡面實現的,在getTask裡面是存在兩個for死循環嵌套的,他會不斷的從阻塞對列裡面取出需要執行的任務,返回給我們的runWorker方法裡面,而在runWorker方法裡面只要getTask返回的任務不是空就會執行該任務的run方法來處理它,這樣一直執行下去,直到getTask返回空為止,此時的情況就是阻塞隊列裡面沒有任務了,這樣一個線程處理完一個任務之後接著再處理阻塞隊列中的另一個任務,當然在線程池中的不同線程是可以並發處理阻塞隊列中的任務的,最後在阻塞隊列內部不存在任務的時候會去判斷是否需要回收Worker對象,其實Worker對象的個數就是線程池中線程的個數,至於什麼情況才需要回收,上面已經說了,就是四種情況了;
最後就是線程池是怎樣被關閉的呢?
涉及到線程池的關閉,需要用到兩個方法,shutdown和shutdownNow,他們都是位於ThreadPoolExecutor裡面的,對於shutdown的話,他會將線程池狀態切換成Shutdown,此時是不會影響對阻塞隊列中任務執行的,但是會拒絕執行新加進來的任務,同時會回收閑置的Worker;而shutdownNow方法會將線程池狀態切換成Stop,此時既不會再去處理阻塞隊列裡面的任務,也不會去處理新加進來的任務,同時會回收所有Worker;
Ⅲ Java 線程池
你說的這些本身就是線程池的作用,你可以去看一下ThreadPoolExecutor這個類,你說的它度能實現
Ⅳ java如何創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。求代碼
packagetest;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
{
publicstaticvoidmain(String[]args){
=Executors.newFixedThreadPool(3);
for(inti=0;i<10;i++){
finalintindex=i;
fixedThreadPool.execute(newRunnable(){
publicvoidrun(){
try{
System.out.println(index);
Thread.sleep(2000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
});
}
}
}
因為線程池大小為3,每個任務輸出index後sleep 2秒,所以每兩秒列印3個數字。
定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()
Ⅳ 【Java基礎】線程池的原理是什麼
什麼是線程池?
總歸為:池化技術 ---》資料庫連接池 緩存架構 緩存池 線程池 內存池,連接池,這種思想演變成緩存架構技術---> JDK設計思想有千絲萬縷的聯系
首先我們從最核心的ThreadPoolExecutor類中的方法講起,然後再講述它的實現原理,接著給出了它的使用示例,最後討論了一下如何合理配置線程池的大小。
Java 中的 ThreadPoolExecutor 類
java.uitl.concurrent.ThreadPoolExecutor 類是線程池中最核心的一個類,因此如果要透徹地了解Java 中的線程池,必須先了解這個類。下面我們來看一下 ThreadPoolExecutor 類的具體實現源碼。
在 ThreadPoolExecutor 類中提供了四個構造方法:
Ⅵ Java線程池中的核心線程是如何被重復利用的
Java線程池中的核心線程是如何被重復利用的?
引言
在Java開發中,經常需要創建線程去執行一些任務,實現起來也非常方便,但如果並發的線程數量很多,並且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁創建線程就會大大降低系統的效率,因為頻繁創建線程和銷毀線程需要時間。此時,我們很自然會想到使用線程池來解決這個問題。
使用線程池的好處:
降低資源消耗。java中所有的池化技術都有一個好處,就是通過復用池中的對象,降低系統資源消耗。設想一下如果我們有n多個子任務需要執行,如果我們為每個子任務都創建一個執行線程,而創建線程的過程是需要一定的系統消耗的,最後肯定會拖慢整個系統的處理速度。而通過線程池我們可以做到復用線程,任務有多個,但執行任務的線程可以通過線程池來復用,這樣減少了創建線程的開銷,系統資源利用率得到了提升。
降低管理線程的難度。多線程環境下對線程的管理是最容易出現問題的,而線程池通過框架為我們降低了管理線程的難度。我們不用再去擔心何時該銷毀線程,如何最大限度的避免多線程的資源競爭。這些事情線程池都幫我們代勞了。
提升任務處理速度。線程池中長期駐留了一定數量的活線程,當任務需要執行時,我們不必先去創建線程,線程池會自己選擇利用現有的活線程來處理任務。
- // 獲取運行狀態
- private static int runStateOf(int c) { return c & ~CAPACITY; }
- // 獲取活動線程數
- private static int workerCountOf(int c) { return c & CAPACITY; }123456
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- int c = ctl.get();
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- else if (!addWorker(command, false))
- reject(command);
- }
- // 為分析而簡化後的代碼
- public void execute(Runnable command) {
- int c = ctl.get();
- if (workerCountOf(c) < corePoolSize) {
- // 如果當前活動線程數小於corePoolSize,則新建一個線程放入線程池中,並把任務添加到該線程中
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- // 如果當前活動線程數大於等於corePoolSize,則嘗試將任務放入緩存隊列
- if (workQueue.offer(command)) {
- int recheck = ctl.get();
- if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }else {
- // 緩存已滿,新建一個線程放入線程池,並把任務添加到該線程中(此時新建的線程相當於非核心線程)
- addWorker(command, false)
- }
- }22
如果 當前活動線程數 < 指定的核心線程數,則創建並啟動一個線程來執行新提交的任務(此時新建的線程相當於核心線程);
如果 當前活動線程數 >= 指定的核心線程數,且緩存隊列未滿,則將任務添加到緩存隊列中;
如果 當前活動線程數 >= 指定的核心線程數,且緩存隊列已滿,則創建並啟動一個線程來執行新提交的任務(此時新建的線程相當於非核心線程);
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
- for (;;) {
- int wc = workerCountOf(c);
- if (wc >= CAPACITY ||
- wc >= (core ? corePoolSize : maximumPoolSize))
- return false;
- if ((c))
- break retry;
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed e to workerCount change; retry inner loop
- }
- }
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int rs = runStateOf(ctl.get());
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- workers.add(w);
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- if (workerAdded) {
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
- // 為分析而簡化後的代碼
- private boolean addWorker(Runnable firstTask, boolean core) {
- int wc = workerCountOf(c);
- if (wc >= (core ? corePoolSize : maximumPoolSize))
- // 如果當前活動線程數 >= 指定的核心線程數,不創建核心線程
- // 如果當前活動線程數 >= 指定的最大線程數,不創建非核心線程
- return false;
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- // 新建一個Worker,將要執行的任務作為參數傳進去
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- workers.add(w);
- workerAdded = true;
- if (workerAdded) {
- // 啟動剛剛新建的那個worker持有的線程,等下要看看這個線程做了啥
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }2223242526272829303132
- private final class Worker
- extends AbstractQueuedSynchronizer
- implements Runnable{
- // ....
- }
- Worker(Runnable firstTask) {
- setState(-1); // inhibit interrupts until runWorker
- this.firstTask = firstTask;
- this.thread = getThreadFactory().newThread(this);
- }123456789101112
- public void run() {
- runWorker(this);
- }
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // allow interrupts
- boolean completedAbruptly = true;
- try {
- while (task != null || (task = getTask()) != null) {
- w.lock();
- // If pool is stopping, ensure thread is interrupted;
- // if not, ensure thread is not interrupted. This
- // requires a recheck in second case to deal with
- // shutdownNow race while clearing interrupt
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- w.completedTasks++;
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- processWorkerExit(w, completedAbruptly);
- }
- }04142434445464748
- // 為分析而簡化後的代碼
- final void runWorker(Worker w) {
- Runnable task = w.firstTask;
- w.firstTask = null;
- while (task != null || (task = getTask()) != null) {
- try {
- task.run();
- } finally {
- task = null;
- }
- }
- }12345678910111213
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- int wc = workerCountOf(c);
- // Are workers subject to culling?
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if ((c))
- return null;
- continue;
- }
- try {
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
- // 為分析而簡化後的代碼
- private Runnable getTask() {
- boolean timedOut = false;
- for (;;) {
- int c = ctl.get();
- int wc = workerCountOf(c);
- // timed變數用於判斷是否需要進行超時控制。
- // allowCoreThreadTimeOut默認是false,也就是核心線程不允許進行超時;
- // wc > corePoolSize,表示當前線程池中的線程數量大於核心線程數量;
- // 對於超過核心線程數量的這些線程,需要進行超時控制
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- if (timed && timedOut) {
- // 如果需要進行超時控制,且上次從緩存隊列中獲取任務時發生了超時,那麼嘗試將workerCount減1,即當前活動線程數減1,
- // 如果減1成功,則返回null,這就意味著runWorker()方法中的while循環會被退出,其對應的線程就要銷毀了,也就是線程池中少了一個線程了
- if ((c))
- return null;
- continue;
- }
- try {
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- // 注意workQueue中的poll()方法與take()方法的區別
- //poll方式取任務的特點是從緩存隊列中取任務,最長等待keepAliveTime的時長,取不到返回null
- //take方式取任務的特點是從緩存隊列中取任務,若隊列為空,則進入阻塞狀態,直到能取出對象為止
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }39
如果當前活動線程數大於核心線程數,當去緩存隊列中取任務的時候,如果緩存隊列中沒任務了,則等待keepAliveTime的時長,此時還沒任務就返回null,這就意味著runWorker()方法中的while循環會被退出,其對應的線程就要銷毀了,也就是線程池中少了一個線程了。因此只要線程池中的線程數大於核心線程數就會這樣一個一個地銷毀這些多餘的線程。
如果當前活動線程數小於等於核心線程數,同樣也是去緩存隊列中取任務,但當緩存隊列中沒任務了,就會進入阻塞狀態,直到能取出任務為止,因此這個線程是處於阻塞狀態的,並不會因為緩存隊列中沒有任務了而被銷毀。這樣就保證了線程池有N個線程是活的,可以隨時處理任務,從而達到重復利用的目的。
當有新任務來的時候,先看看當前的線程數有沒有超過核心線程數,如果沒超過就直接新建一個線程來執行新的任務,如果超過了就看看緩存隊列有沒有滿,沒滿就將新任務放進緩存隊列中,滿了就新建一個線程來執行新的任務,如果線程池中的線程數已經達到了指定的最大線程數了,那就根據相應的策略拒絕任務。
當緩存隊列中的任務都執行完了的時候,線程池中的線程數如果大於核心線程數,就銷毀多出來的線程,直到線程池中的線程數等於核心線程數。此時這些線程就不會被銷毀了,它們一直處於阻塞狀態,等待新的任務到來。
很顯然,線程池一個很顯著的特徵就是「長期駐留了一定數量的活線程」,避免了頻繁創建線程和銷毀線程的開銷,那麼它是如何做到的呢?我們知道一個線程只要執行完了run()方法內的代碼,這個線程的使命就完成了,等待它的就是銷毀。既然這是個「活線程」,自然是不能很快就銷毀的。為了搞清楚這個「活線程」是如何工作的,下面通過追蹤源碼來看看能不能解開這個疑問。
分析方法
在分析源碼之前先來思考一下要怎麼去分析,源碼往往是比較復雜的,如果知識儲備不夠豐厚,很有可能會讀不下去,或者讀岔了。一般來講要時刻緊跟著自己的目標來看代碼,跟目標關系不大的代碼可以不理會它,一些異常的處理也可以暫不理會,先看正常的流程。就我們現在要分析的源碼而言,目標就是看看線程是如何被復用的。那麼對於線程池的狀態的管理以及非正常狀態下的處理代碼就可以不理會,具體來講,在ThreadPollExcutor類中,有一個欄位private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));是對線程池的運行狀態和線程池中有效線程的數量進行控制的, 它包含兩部分信息: 線程池的運行狀態 (runState) 和線程池內有效線程的數量 (workerCount),還有幾個對ctl進行計算的方法:
以上兩個方法在源碼中經常用到,結合我們的目標,對運行狀態的一些判斷及處理可以不用去管,而對當前活動線程數要加以關注等等。
下面將遵循這些原則來分析源碼。
解惑
當我們要向線程池添加一個任務時是調用ThreadPollExcutor對象的execute(Runnable command)方法來完成的,所以先來看看ThreadPollExcutor類中的execute(Runnable command)方法的源碼:
按照我們在分析方法中提到的一些原則,去掉一些相關性不強的代碼,看看核心代碼是怎樣的。
這樣一看,邏輯應該清晰很多了。
接下來看addWorker(Runnable firstTask, boolean core)方法
同樣,我們也來簡化一下:
看到這里,我們大概能猜測到,addWorker方法的功能就是新建一個線程並啟動這個線程,要執行的任務應該就是在這個線程中執行。為了證實我們的這種猜測需要再來看看Worker這個類。
從上面的Worker類的聲明可以看到,它實現了Runnable介面,以及從它的構造方法中可以知道待執行的任務賦值給了它的變數firstTask,並以它自己為參數新建了一個線程賦值給它的變數thread,那麼運行這個線程的時候其實就是執行Worker的run()方法,來看一下這個方法:
在run()方法中只調了一下 runWorker(this) 方法,再來簡化一下這個 runWorker() 方法
很明顯,runWorker()方法裡面執行了我們新建Worker對象時傳進去的待執行的任務,到這里為止貌似這個worker的run()方法就執行完了,既然執行完了那麼這個線程也就沒用了,只有等待虛擬機銷毀了。那麼回顧一下我們的目標:Java線程池中的核心線程是如何被重復利用的?好像並沒有重復利用啊,新建一個線程,執行一個任務,然後就結束了,銷毀了。沒什麼特別的啊,難道有什麼地方漏掉了,被忽略了?再仔細看一下runWorker()方法的代碼,有一個while循環,當執行完firstTask後task==null了,那麼就會執行判斷條件(task = getTask()) != null,我們假設這個條件成立的話,那麼這個線程就不止只執行一個任務了,可以執行多個任務了,也就實現了重復利用了。答案呼之欲出了,接著看getTask()方法
老規矩,簡化一下代碼來看:
從以上代碼可以看出,getTask()的作用是
小結
通過以上的分析,應該算是比較清楚地解答了「線程池中的核心線程是如何被重復利用的」這個問題,同時也對線程池的實現機制有了更進一步的理解:
注意:
本文所說的「核心線程」、「非核心線程」是一個虛擬的概念,是為了方便描述而虛擬出來的概念,在代碼中並沒有哪個線程被標記為「核心線程」或「非核心線程」,所有線程都是一樣的,只是當線程池中的線程多於指定的核心線程數量時,會將多出來的線程銷毀掉,池中只保留指定個數的線程。那些被銷毀的線程是隨機的,可能是第一個創建的線程,也可能是最後一個創建的線程,或其它時候創建的線程。一開始我以為會有一些線程被標記為「核心線程」,而其它的則是「非核心線程」,在銷毀多餘線程的時候只銷毀那些「非核心線程」,而「核心線程」不被銷毀。這種理解是錯誤的。
另外還有一個重要的介面 BlockingQueue 值得去了解,它定義了一些入隊出隊同步操作的方法,還可以阻塞,作用很大。
Ⅶ java 中創建線程池的問題。
看下源碼你就明白了
/**
*,but
*
*available.
*-livedasynchronoustasks.
*Callsto<tt>execute</tt>
*threadsifavailable.Ifnoexistingthreadisavailable,anew
*.Threadsthathave
*
*thecache.Thus,
*notconsumeanyresources.Notethatpoolswithsimilar
*propertiesbutdifferentdetails(forexample,timeoutparameters)
*maybecreatesing{@linkThreadPoolExecutor}constructors.
*
*@
*/
(){
returnnewThreadPoolExecutor(0,Integer.MAX_VALUE,
60L,TimeUnit.SECONDS,
newSynchronousQueue<Runnable>());
}
這個只是初始化的數據,具體線程池了有多少線程是active的,要看你怎麼用,像你上面跑的代碼,線程池裡就只有一個線程在跑。
Ⅷ Java線程池
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
Ⅸ java 怎麼實現線程池
最簡單的可以利用java.util.concurrent.Executors
調用Executors.newCachedThreadPool()獲取緩沖式線程池
Executors.newFixedThreadPool(int nThreads)獲取固定大小的線程池
Ⅹ 什麼是java線程池
多線程是為了能夠讓計算機資源合理的分配,對於處理不同的任務創建不同的線程進行處理,但是計算機創建一個線程或者銷毀一個線程所花費的也是比較昂貴的,有時候需要同時處理的事情比較多,就需要我們頻繁的進行線程的創建和銷毀,這樣花費的時間也是比較多的。為了解決這一問題,我們就可以引用線程池的概念。
所謂線程池就是將線程集中管理起來,當需要線程的時候,可以從線程池中獲取空閑的線程,這樣可以減少線程的頻繁創建與銷毀,節省很大的時間和減少很多不必要的操作。
在java中提供了ThreadPoolExecutor類來進行線程的管理,這個類繼承於AbstractExecutorService,而AbstractExecutorService實現了ExecutorService介面,我們可以使用ThreadPoolExecutor來進行線程池的創建。
在ThreadPoolExecutor的構造方法中,有多個參數,可以配置不同的參數來進行優化。這個類的源碼構造方法為:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)其中每個參數代表的意義分別為
corePoolSize : 線程池中的核心線程數量,當線程池中當前的線程數小於這個配置的時候,如果有一個新的任務到來,即使線程池中還存在空閑狀態的線程,程序也會繼續創建一個新的線程放進線程池當中
maximumPoolSize: 線程池中的線程最大數量
keepAliveTime:當線程池中的線程數量大於配置的核心線程數量(corePoolSize)的時候,如果當前有空閑的線程,則當這個空閑線程可以存在的時間,如果在keepAliveTime這個時間點內沒有新的任務使用這個線程,那麼這個線程將會結束,核心線程不會結束,但是如果配置了allowCoreThreadTimeOut = true,則當空閑時間超過keepAliveTime之後,線程也會被結束調,默認allowCoreThreadTimeOut = false,即表示默認情況下,核心線程會一直存在於線程池當中。
unit : 空閑線程保持連接時間(keepAliveTime)的時間單位
workQueue:阻塞的任務隊列,用來保存等待需要執行的任務。
threadFactory :線程工廠,可以根據自己的需求去創建線程的對象,設置線程的名稱,優先順序等屬性信息。
handler:當線程池中存在的線程數超過設置的最大值之後,新的任務就會被拒絕,可以自己定義一個拒絕的策略,當新任務被拒絕之後,就會使用hander方法進行處理。
在java中也提供了Executors工具類,在這個工具類中提供了多個創建線程池的靜態方法,其中包含newCachedThreadPool、newFixedThreadPool、newScheledThreadPool、newSingleThreadExecutor等。但是他們每個方法都是創建了ThreadPoolExecutor對象,不同的是,每個對象的初始 參數值不一樣;