當前位置:首頁 » 編程語言 » javapool

javapool

發布時間: 2023-09-04 03:46:07

1. java實現通用線程池

線程池通俗的描述就是預先創建若干空閑線程 等到需要用多線程去處理事務的時候去喚醒某些空閑線程執行處理任務 這樣就省去了頻繁創建線程的時間 因為頻 繁創建線程是要耗費大量的CPU資源的 如果一個應用程序需要頻繁地處理大量並發事務 不斷的創建銷毀線程往往會大大地降低系統的效率 這時候線程池就派 上用場了

本文旨在使用Java語言編寫一個通用的線程池 當需要使用線程池處理事務時 只需按照指定規范封裝好事務處理對象 然後用已有的線程池對象去自動選擇空 閑線程自動調用事務處理對象即可 並實現線程池的動態修改(修改當前線程數 最大線程數等) 下面是實現代碼

//ThreadTask java

package polarman threadpool;

/** *//**

*線程任務

* @author ryang

*

*/

public interface ThreadTask {

public void run();

}

//PooledThread java

package polarman threadpool;

import java util Collection; import java util Vector;

/** *//**

*接受線程池管理的線程

* @author ryang

*

*/

public class PooledThread extends Thread {

protected Vector tasks = new Vector();

protected boolean running = false;

protected boolean stopped = false;

protected boolean paused = false;

protected boolean killed = false;

private ThreadPool pool;

public PooledThread(ThreadPool pool) { this pool = pool;

}

public void putTask(ThreadTask task) { tasks add(task);

}

public void putTasks(ThreadTask[] tasks) { for(int i= ; i<tasks length; i++) this tasks add(tasks[i]);

}

public void putTasks(Collection tasks) { this tasks addAll(tasks);

}

protected ThreadTask popTask() { if(tasks size() > ) return (ThreadTask)tasks remove( );

else

return null;

}

public boolean isRunning() {

return running;

}

public void stopTasks() {

stopped = true;

}

public void stopTasksSync() {

stopTasks();

while(isRunning()) { try {

sleep( );

} catch (InterruptedException e) {

}

}

}

public void pauseTasks() {

paused = true;

}

public void pauseTasksSync() {

pauseTasks();

while(isRunning()) { try {

sleep( );

} catch (InterruptedException e) {

}

}

}

public void kill() { if(!running)

interrupt();

else

killed = true;

}

public void killSync() {

kill();

while(isAlive()) { try {

sleep( );

} catch (InterruptedException e) {

}

}

}

public synchronized void startTasks() {

running = true;

this notify();

}

public synchronized void run() { try { while(true) { if(!running || tasks size() == ) { pool notifyForIdleThread(); //System out println(Thread currentThread() getId() + : 空閑 ); this wait(); }else {

ThreadTask task;

while((task = popTask()) != null) { task run(); if(stopped) {

stopped = false;

if(tasks size() > ) { tasks clear(); System out println(Thread currentThread() getId() + : Tasks are stopped );

break;

}

}

if(paused) {

paused = false;

if(tasks size() > ) { System out println(Thread currentThread() getId() + : Tasks are paused );

break;

}

}

}

running = false;

}

if(killed) {

killed = false;

break;

}

}

}catch(InterruptedException e) {

return;

}

//System out println(Thread currentThread() getId() + : Killed );

}

}

//ThreadPool java

package polarman threadpool;

import java util Collection; import java util Iterator; import java util Vector;

/** *//**

*線程池

* @author ryang

*

*/

public class ThreadPool {

protected int maxPoolSize;

protected int initPoolSize;

protected Vector threads = new Vector();

protected boolean initialized = false;

protected boolean hasIdleThread = false;

public ThreadPool(int maxPoolSize int initPoolSize) { this maxPoolSize = maxPoolSize; this initPoolSize = initPoolSize;

}

public void init() {

initialized = true;

for(int i= ; i<initPoolSize; i++) {

PooledThread thread = new PooledThread(this);

thread start(); threads add(thread);

}

//System out println( 線程池初始化結束 線程數= + threads size() + 最大線程數= + maxPoolSize);

}

public void setMaxPoolSize(int maxPoolSize) { //System out println( 重設最大線程數 最大線程數= + maxPoolSize); this maxPoolSize = maxPoolSize;

if(maxPoolSize < getPoolSize())

setPoolSize(maxPoolSize);

}

/** *//**

*重設當前線程數

* 若需殺掉某線程 線程不會立刻殺掉 而會等到線程中的事務處理完成* 但此方法會立刻從線程池中移除該線程 不會等待事務處理結束

* @param size

*/

public void setPoolSize(int size) { if(!initialized) {

initPoolSize = size;

return;

}else if(size > getPoolSize()) { for(int i=getPoolSize(); i<size && i<maxPoolSize; i++) {

PooledThread thread = new PooledThread(this);

thread start(); threads add(thread);

}

}else if(size < getPoolSize()) { while(getPoolSize() > size) { PooledThread th = (PooledThread)threads remove( ); th kill();

}

}

//System out println( 重設線程數 線程數= + threads size());

}

public int getPoolSize() { return threads size();

}

protected void notifyForIdleThread() {

hasIdleThread = true;

}

protected boolean waitForIdleThread() {

hasIdleThread = false;

while(!hasIdleThread && getPoolSize() >= maxPoolSize) { try { Thread sleep( ); } catch (InterruptedException e) {

return false;

}

}

return true;

}

public synchronized PooledThread getIdleThread() { while(true) { for(Iterator itr=erator(); itr hasNext();) { PooledThread th = (PooledThread)itr next(); if(!th isRunning())

return th;

}

if(getPoolSize() < maxPoolSize) {

PooledThread thread = new PooledThread(this);

thread start(); threads add(thread);

return thread;

}

//System out println( 線程池已滿 等待 );

if(waitForIdleThread() == false)

return null;

}

}

public void processTask(ThreadTask task) {

PooledThread th = getIdleThread();

if(th != null) { th putTask(task); th startTasks();

}

}

public void processTasksInSingleThread(ThreadTask[] tasks) {

PooledThread th = getIdleThread();

if(th != null) { th putTasks(tasks); th startTasks();

}

}

public void processTasksInSingleThread(Collection tasks) {

PooledThread th = getIdleThread();

if(th != null) { th putTasks(tasks); th startTasks();

}

}

}

下面是線程池的測試程序

//ThreadPoolTest java

import java io BufferedReader; import java io IOException; import java io InputStreamReader;

import polarman threadpool ThreadPool; import polarman threadpool ThreadTask;

public class ThreadPoolTest {

public static void main(String[] args) { System out println( quit 退出 ); System out println( task A 啟動任務A 時長為 秒 ); System out println( size 設置當前線程池大小為 ); System out println( max 設置線程池最大線程數為 ); System out println();

final ThreadPool pool = new ThreadPool( ); pool init();

Thread cmdThread = new Thread() { public void run() {

BufferedReader reader = new BufferedReader(new InputStreamReader(System in));

while(true) { try { String line = reader readLine(); String words[] = line split( ); if(words[ ] equalsIgnoreCase( quit )) { System exit( ); }else if(words[ ] equalsIgnoreCase( size ) && words length >= ) { try { int size = Integer parseInt(words[ ]); pool setPoolSize(size); }catch(Exception e) {

}

}else if(words[ ] equalsIgnoreCase( max ) && words length >= ) { try { int max = Integer parseInt(words[ ]); pool setMaxPoolSize(max); }catch(Exception e) {

}

}else if(words[ ] equalsIgnoreCase( task ) && words length >= ) { try { int timelen = Integer parseInt(words[ ]); SimpleTask task = new SimpleTask(words[ ] timelen * ); pool processTask(task); }catch(Exception e) {

}

}

} catch (IOException e) { e printStackTrace();

}

}

}

};

cmdThread start();

/**//*

for(int i= ; i< ; i++){

SimpleTask task = new SimpleTask( Task + i (i+ )* ); pool processTask(task);

}*/

}

}

class SimpleTask implements ThreadTask {

private String taskName;

private int timeLen;

public SimpleTask(String taskName int timeLen) { this taskName = taskName; this timeLen = timeLen;

}

public void run() { System out println(Thread currentThread() getId() +

: START TASK + taskName + );

try { Thread sleep(timeLen); } catch (InterruptedException e) {

}

System out println(Thread currentThread() getId() +

: END TASK + taskName + );

}

}

使用此線程池相當簡單 下面兩行代碼初始化線程池

ThreadPool pool = new ThreadPool( ); pool init();

要處理的任務實現ThreadTask 介面即可(如測試代碼里的SimpleTask) 這個介面只有一個方法run()

兩行代碼即可調用

lishixin/Article/program/Java/hx/201311/27203

2. 什麼是java線程池

多線程是為了能夠讓計算機資源合理的分配,對於處理不同的任務創建不同的線程進行處理,但是計算機創建一個線程或者銷毀一個線程所花費的也是比較昂貴的,有時候需要同時處理的事情比較多,就需要我們頻繁的進行線程的創建和銷毀,這樣花費的時間也是比較多的。為了解決這一問題,我們就可以引用線程池的概念。
所謂線程池就是將線程集中管理起來,當需要線程的時候,可以從線程池中獲取空閑的線程,這樣可以減少線程的頻繁創建與銷毀,節省很大的時間和減少很多不必要的操作。
在java中提供了ThreadPoolExecutor類來進行線程的管理,這個類繼承於AbstractExecutorService,而AbstractExecutorService實現了ExecutorService介面,我們可以使用ThreadPoolExecutor來進行線程池的創建。
在ThreadPoolExecutor的構造方法中,有多個參數,可以配置不同的參數來進行優化。這個類的源碼構造方法為:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)其中每個參數代表的意義分別為
corePoolSize : 線程池中的核心線程數量,當線程池中當前的線程數小於這個配置的時候,如果有一個新的任務到來,即使線程池中還存在空閑狀態的線程,程序也會繼續創建一個新的線程放進線程池當中
maximumPoolSize: 線程池中的線程最大數量
keepAliveTime:當線程池中的線程數量大於配置的核心線程數量(corePoolSize)的時候,如果當前有空閑的線程,則當這個空閑線程可以存在的時間,如果在keepAliveTime這個時間點內沒有新的任務使用這個線程,那麼這個線程將會結束,核心線程不會結束,但是如果配置了allowCoreThreadTimeOut = true,則當空閑時間超過keepAliveTime之後,線程也會被結束調,默認allowCoreThreadTimeOut = false,即表示默認情況下,核心線程會一直存在於線程池當中。
unit : 空閑線程保持連接時間(keepAliveTime)的時間單位
workQueue:阻塞的任務隊列,用來保存等待需要執行的任務。
threadFactory :線程工廠,可以根據自己的需求去創建線程的對象,設置線程的名稱,優先順序等屬性信息。
handler:當線程池中存在的線程數超過設置的最大值之後,新的任務就會被拒絕,可以自己定義一個拒絕的策略,當新任務被拒絕之後,就會使用hander方法進行處理。
在java中也提供了Executors工具類,在這個工具類中提供了多個創建線程池的靜態方法,其中包含newCachedThreadPool、newFixedThreadPool、newScheledThreadPool、newSingleThreadExecutor等。但是他們每個方法都是創建了ThreadPoolExecutor對象,不同的是,每個對象的初始 參數值不一樣;

3. java 線程池ThreadPoolExecutor 共同完成一個任務

線程池可以解決兩個不同問題:由於減少了每個任務調用的開銷,它們通常可以在執行大量非同步任務時提供增強的性能,並且還可以提供綁定和管理資源(包括執行集合任務時使用的線程)的方法。每個ThreadPoolExecutor 還維護著一些基本的統計數據,如完成的任務數。

為了便於跨大量上下文使用,此類提供了很多可調整的參數和擴展掛鉤。但是,強烈建議程序員使用較為方便的 Executors 工廠方法 Executors.newCachedThreadPool()(無界線程池,可以進行自動線程回收)、Executors.newFixedThreadPool(int)(固定大小線程池)和 Executors.newSingleThreadExecutor()(單個後台線程),它們均為大多數使用場景預定義了設置。否則,在手動配置和調整此類時,使用以下指導:

核心和最大池大小
ThreadPoolExecutor 將根據 corePoolSize(參見 getCorePoolSize())和 maximumPoolSize(參見getMaximumPoolSize())設置的邊界自動調整池大小。當新任務在方法 execute(java.lang.Runnable) 中提交時,如果運行的線程少於 corePoolSize,則創建新線程來處理請求,即使其他輔助線程是空閑的。如果運行的線程多於corePoolSize 而少於 maximumPoolSize,則僅當隊列滿時才創建新線程。如果設置的 corePoolSize 和 maximumPoolSize相同,則創建了固定大小的線程池。如果將 maximumPoolSize 設置為基本的無界值(如 Integer.MAX_VALUE),則允許池適應任意數量的並發任務。在大多數情況下,核心和最大池大小僅基於構造來設置,不過也可以使用setCorePoolSize(int) 和 setMaximumPoolSize(int) 進行動態更改。

按需構造
默認情況下,即使核心線程最初只是在新任務需要時才創建和啟動的,也可以使用方法 prestartCoreThread()或 prestartAllCoreThreads() 對其進行動態重寫。

創建新線程
使用 ThreadFactory 創建新線程。如果沒有另外說明,則在同一個 ThreadGroup 中一律使用Executors.defaultThreadFactory() 創建線程,並且這些線程具有相同的 NORM_PRIORITY 優先順序和非守護進程狀態。通過提供不同的 ThreadFactory,可以改變線程的名稱、線程組、優先順序、守護進程狀態,等等。如果從 newThread返回 null 時 ThreadFactory 未能創建線程,則執行程序將繼續運行,但不能執行任何任務。
保持活動時間
如果池中當前有多於 corePoolSize 的線程,則這些多出的線程在空閑時間超過 keepAliveTime 時將會終止(參見getKeepAliveTime(java.util.concurrent.TimeUnit))。這提供了當池處於非活動狀態時減少資源消耗的方法。如果池後來變得更為活動,則可以創建新的線程。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 動態地更改此參數。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的值在關閉前有效地從以前的終止狀態禁用空閑線程。

排隊
所有 BlockingQueue 都可用於傳輸和保持提交的任務。可以使用此隊列與池大小進行交互:
A. 如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。
B. 如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。
C. 如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。

排隊有三種通用策略:
直接提交。工作隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用於立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集合時出現鎖定。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。
無界隊列。使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 線程都忙的情況下將新任務加入隊列。這樣,創建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web 頁伺服器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。
有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。
被拒絕的任務

當 Executor 已經關閉,並且 Executor 將有限邊界用於最大線程和工作隊列容量,且已經飽和時,在方法execute(java.lang.Runnable) 中提交的新任務將被拒絕。在以上兩種情況下,execute 方法都將調用其RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四種預定義的處理程序策略:
A. 在默認的 ThreadPoolExecutor.AbortPolicy 中,處理程序遭到拒絕將拋出運行時 RejectedExecutionException。
B. 在 ThreadPoolExecutor.CallerRunsPolicy 中,線程調用運行該任務的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。
C. 在 ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務將被刪除。
D. 在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然後重試執行程序(如果再次失敗,則重復此過程)。
定義和使用其他種類的 RejectedExecutionHandler 類也是可能的,但這樣做需要非常小心,尤其是當策略僅用於特定容量或排隊策略時。

掛鉤方法
此類提供 protected 可重寫的 beforeExecute(java.lang.Thread, java.lang.Runnable) 和 afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,這兩種方法分別在執行每個任務之前和之後調用。它們可用於操縱執行環境;例如,重新初始化ThreadLocal、搜集統計信息或添加日誌條目。此外,還可以重寫方法 terminated() 來執行 Executor 完全終止後需要完成的所有特殊處理。

如果掛鉤或回調方法拋出異常,則內部輔助線程將依次失敗並突然終止。

隊列維護
方法 getQueue() 允許出於監控和調試目的而訪問工作隊列。強烈反對出於其他任何目的而使用此方法。remove(java.lang.Runnable) 和 purge() 這兩種方法可用於在取消大量已排隊任務時幫助進行存儲回收。

一、例子

創建 TestThreadPool 類:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestThreadPool {

private static int proceTaskSleepTime = 2;

private static int proceTaskMaxNumber = 10;

public static void main(String[] args) {

// 構造一個線程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
new ThreadPoolExecutor.DiscardOldestPolicy());

for (int i = 1; i <= proceTaskMaxNumber; i++) {
try {
String task = "task@ " + i;
System.out.println("創建任務並提交到線程池中:" + task);
threadPool.execute(new ThreadPoolTask(task));

Thread.sleep(proceTaskSleepTime);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
view plain
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestThreadPool {

private static int proceTaskSleepTime = 2;

private static int proceTaskMaxNumber = 10;

public static void main(String[] args) {

// 構造一個線程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
new ThreadPoolExecutor.DiscardOldestPolicy());

for (int i = 1; i <= proceTaskMaxNumber; i++) {
try {
String task = "task@ " + i;
System.out.println("創建任務並提交到線程池中:" + task);
threadPool.execute(new ThreadPoolTask(task));

Thread.sleep(proceTaskSleepTime);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

創建 ThreadPoolTask類:
view plain to clipboardprint?
import java.io.Serializable;

public class ThreadPoolTask implements Runnable, Serializable {

private Object attachData;

ThreadPoolTask(Object tasks) {
this.attachData = tasks;
}

public void run() {

System.out.println("開始執行任務:" + attachData);

attachData = null;
}

public Object getTask() {
return this.attachData;
}
}
view plain
import java.io.Serializable;

public class ThreadPoolTask implements Runnable, Serializable {

private Object attachData;

ThreadPoolTask(Object tasks) {
this.attachData = tasks;
}

public void run() {

System.out.println("開始執行任務:" + attachData);

attachData = null;
}

public Object getTask() {
return this.attachData;
}
}

執行結果:
創建任務並提交到線程池中:task@ 1
開始執行任務:task@ 1
創建任務並提交到線程池中:task@ 2
開始執行任務:task@ 2
創建任務並提交到線程池中:task@ 3
創建任務並提交到線程池中:task@ 4
開始執行任務:task@ 3
創建任務並提交到線程池中:task@ 5
開始執行任務:task@ 4
創建任務並提交到線程池中:task@ 6
創建任務並提交到線程池中:task@ 7
創建任務並提交到線程池中:task@ 8
開始執行任務:task@ 5
開始執行任務:task@ 6
創建任務並提交到線程池中:task@ 9
開始執行任務:task@ 7
創建任務並提交到線程池中:task@ 10
開始執行任務:task@ 8
開始執行任務:task@ 9
開始執行任務:task@ 10

ThreadPoolExecutor配置
一、ThreadPoolExcutor為一些Executor提供了基本的實現,這些Executor是由Executors中的工廠 newCahceThreadPool、newFixedThreadPool和newScheledThreadExecutor返回的。 ThreadPoolExecutor是一個靈活的健壯的池實現,允許各種各樣的用戶定製。
二、線程的創建與銷毀
1、核心池大小、最大池大小和存活時間共同管理著線程的創建與銷毀。
2、核心池的大小是目標的大小;線程池的實現試圖維護池的大小;即使沒有任務執行,池的大小也等於核心池的大小,並直到工作隊列充滿前,池都不會創建更多的線程。如果當前池的大小超過了核心池的大小,線程池就會終止它。
3、最大池的大小是可同時活動的線程數的上限。
4、如果一個線程已經閑置的時間超過了存活時間,它將成為一個被回收的候選者。
5、newFixedThreadPool工廠為請求的池設置了核心池的大小和最大池的大小,而且池永遠不會超時
6、newCacheThreadPool工廠將最大池的大小設置為Integer.MAX_VALUE,核心池的大小設置為0,超時設置為一分鍾。這樣創建了無限擴大的線程池,會在需求量減少的情況下減少線程數量。
三、管理
1、 ThreadPoolExecutor允許你提供一個BlockingQueue來持有等待執行的任務。任務排隊有3種基本方法:無限隊列、有限隊列和同步移交。
2、 newFixedThreadPool和newSingleThreadExectuor默認使用的是一個無限的 LinkedBlockingQueue。如果所有的工作者線程都處於忙碌狀態,任務會在隊列中等候。如果任務持續快速到達,超過了它們被執行的速度,隊列也會無限制地增加。穩妥的策略是使用有限隊列,比如ArrayBlockingQueue或有限的LinkedBlockingQueue以及 PriorityBlockingQueue。
3、對於龐大或無限的池,可以使用SynchronousQueue,完全繞開隊列,直接將任務由生產者交給工作者線程
4、可以使用PriorityBlockingQueue通過優先順序安排任務

4. Java 中幾種常用的線程池

一:newCachedThreadPool
(1)緩存型池子,先查看池中有沒有以前建立的線程,如果有,就reuse,如果沒有,就建立一個新的線程加入池中;
(2)緩存型池子,通常用於執行一些生存周期很短的非同步型任務;因此一些面向連接的daemon型server中用得不多;
(3)能reuse的線程,必須是timeout IDLE內的池中線程,預設timeout是60s,超過這個IDLE時長,線程實例將被終止及移出池。
(4)注意,放入CachedThreadPool的線程不必擔心其結束,超過TIMEOUT不活動,其會自動被終止

二:newFixedThreadPool
(1)newFixedThreadPool與cacheThreadPool差不多,也是能reuse就用,但不能隨時建新的線程
(2)其獨特之處胡清賀:任意時間點,最多隻能有固定數目的活動線程存在,此時如果有新的線程要建立,只能放在另外的隊列中等待,直到當前的線程中某個線程終止直接被移出池子
(3)和正殲cacheThreadPool不同,FixedThreadPool沒有IDLE機制(可能也有,但既然文檔沒提,肯定非常長,類似依賴上層的TCP或UDP IDLE機制之類的),所以FixedThreadPool多數針對一些很褲派穩定很固定的正規並發線程,多用於伺服器
(4)從方法的源代碼看,cache池和fixed 池調用的是同一個底層池,只不過參數不同:
fixed池線程數固定,並且是0秒IDLE(無IDLE)
cache池線程數支持0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60秒IDLE

三:ScheledThreadPool
(1)調度型線程池
(2)這個池子里的線程可以按schele依次delay執行,或周期執行
四:SingleThreadExecutor
(1)單例線程,任意時間池中只能有一個線程
(2)用的是和cache池和fixed池相同的底層池,但線程數目是1-1,0秒IDLE(無IDLE)

5. java線程池怎麼實現

要想理解清楚java線程池實現原理,明白下面幾個問題就可以了:

(1):線程池存在哪些狀態,這些狀態之間是如何進行切換的呢?

(2):線程池的種類有哪些?

(3):創建線程池需要哪些參數,這些參數的具體含義是什麼?

(4):將任務添加到線程池之後運行流程?

(5):線程池是怎麼做到重用線程的呢?

(6):線程池的關閉

首先回答第一個問題:線程池存在哪些狀態;

查看ThreadPoolExecutor源碼便知曉:

[java]view plain

  • //runStateisstoredinthehigh-orderbits

  • privatestaticfinalintRUNNING=-1<<COUNT_BITS;

  • privatestaticfinalintSHUTDOWN=0<<COUNT_BITS;

  • privatestaticfinalintSTOP=1<<COUNT_BITS;

  • privatestaticfinalintTIDYING=2<<COUNT_BITS;

  • =3<<COUNT_BITS;

  • 存在5種狀態:

    <1>Running:可以接受新任務,同時也可以處理阻塞隊列裡面的任務;

    <2>Shutdown:不可以接受新任務,但是可以處理阻塞隊列裡面的任務;

    <3>Stop:不可以接受新任務,也不處理阻塞隊列裡面的任務,同時還中斷正在處理的任務;

    <4>Tidying:屬於過渡階段,在這個階段表示所有的任務已經執行結束了,當前線程池中是不存在有效的線程的,並且將要調用terminated方法;

    <5>Terminated:終止狀態,這個狀態是在調用完terminated方法之後所處的狀態;

    那麼這5種狀態之間是如何進行轉換的呢?查看ThreadPoolExecutor源碼裡面的注釋便可以知道啦:

    [java]view plain

  • *RUNNING->SHUTDOWN

  • *Oninvocationofshutdown(),perhapsimplicitlyinfinalize()

  • *(RUNNINGorSHUTDOWN)->STOP

  • *OninvocationofshutdownNow()

  • *SHUTDOWN->TIDYING

  • *Whenbothqueueandpoolareempty

  • *STOP->TIDYING

  • *Whenpoolisempty

  • *TIDYING->TERMINATED

  • *Whentheterminated()hookmethodhascompleted

  • 從上面可以看到,在調用shutdown方法的時候,線程池狀態會從Running轉換成Shutdown;在調用shutdownNow方法的時候,線程池狀態會從Running/Shutdown轉換成Stop;在阻塞隊列為空同時線程池為空的情況下,線程池狀態會從Shutdown轉換成Tidying;在線程池為空的情況下,線程池狀態會從Stop轉換成Tidying;當調用terminated方法之後,線程池狀態會從Tidying轉換成Terminate;

    在明白了線程池的各個狀態以及狀態之間是怎麼進行切換之後,我們來看看第二個問題,線程池的種類:

    (1):CachedThreadPool:緩存線程池,該類線程池中線程的數量是不確定的,理論上可以達到Integer.MAX_VALUE個,這種線程池中的線程都是非核心線程,既然是非核心線程,那麼就存在超時淘汰機制了,當裡面的某個線程空閑時間超過了設定的超時時間的話,就會回收掉該線程;

    (2):FixedThreadPool:固定線程池,這類線程池中是只存在核心線程的,對於核心線程來說,如果我們不設置allowCoreThreadTimeOut屬性的話是不存在超時淘汰機制的,這類線程池中的corePoolSize的大小是等於maximumPoolSize大小的,也就是說,如果線程池中的線程都處於活動狀態的話,如果有新任務到來,他是不會開辟新的工作線程來處理這些任務的,只能將這些任務放到阻塞隊列裡面進行等到,直到有核心線程空閑為止;

    (3):ScheledThreadPool:任務線程池,這種線程池中核心線程的數量是固定的,而對於非核心線程的數量是不限制的,同時對於非核心線程是存在超時淘汰機制的,主要適用於執行定時任務或者周期性任務的場景;

    (4):SingleThreadPool:單一線程池,線程池裡面只有一個線程,同時也不存在非核心線程,感覺像是FixedThreadPool的特殊版本,他主要用於確保任務在同一線程中的順序執行,有點類似於進行同步吧;

    接下來我們來看第三個問題,創建線程池需要哪些參數:

    同樣查看ThreadPoolExecutor源碼,查看創建線程池的構造函數:

    [java]view plain

  • publicThreadPoolExecutor(intcorePoolSize,

  • intmaximumPoolSize,

  • longkeepAliveTime,

  • TimeUnitunit,

  • BlockingQueue<Runnable>workQueue,

  • ThreadFactorythreadFactory,

  • )

  • 不管你調用的是ThreadPoolExecutor的哪個構造函數,最終都會執行到這個構造函數的,這個構造函數有7個參數,正是由於對這7個參數值的賦值不同,造成生成不同類型的線程池,比如我們常見的CachedThreadPoolExecutor、FixedThreadPoolExecutor

    SingleThreadPoolExecutor、ScheledThreadPoolExecutor,我們老看看這幾個參數的具體含義:

    <1>corePoolSize:線程池中核心線程的數量;當提交一個任務到線程池的時候,線程池會創建一個線程來執行執行任務,即使有其他空閑的線程存在,直到線程數達到corePoolSize時不再創建,這時候會把提交的新任務放入到阻塞隊列中,如果調用了線程池的preStartAllCoreThreads方法,則會在創建線程池的時候初始化出來核心線程;

    <2>maximumPoolSize:線程池允許創建的最大線程數;如果阻塞隊列已經滿了,同時已經創建的線程數小於最大線程數的話,那麼會創建新的線程來處理阻塞隊列中的任務;

    <3>keepAliveTime:線程活動保持時間,指的是工作線程空閑之後繼續存活的時間,默認情況下,這個參數只有線程數大於corePoolSize的時候才會起作用,即當線程池中的線程數目大於corePoolSize的時候,如果某一個線程的空閑時間達到keepAliveTime,那麼這個線程是會被終止的,直到線程池中的線程數目不大於corePoolSize;如果調用allowCoreThreadTimeOut的話,在線程池中線程數量不大於corePoolSize的時候,keepAliveTime參數也可以起作用的,知道線程數目為0為止;

    <4>unit:參數keepAliveTime的時間單位;

    <5>workQueue:阻塞隊列;用於存儲等待執行的任務,有四種阻塞隊列類型,ArrayBlockingQueue(基於數組的有界阻塞隊列)、LinkedBlockingQueue(基於鏈表結構的阻塞隊列)、SynchronousQueue(不存儲元素的阻塞隊列)、PriorityBlockingQueue(具有優先順序的阻塞隊列);

    <6>threadFactory:用於創建線程的線程工廠;

    <7>handler:當阻塞隊列滿了,且沒有空閑線程的情況下,也就是說這個時候,線程池中的線程數目已經達到了最大線程數量,處於飽和狀態,那麼必須採取一種策略來處理新提交的任務,我們可以自己定義處理策略,也可以使用系統已經提供給我們的策略,先來看看系統為我們提供的4種策略,AbortPolicy(直接拋出異常)、CallerRunsPolicy(只有調用者所在的線程來運行任務)、DiscardOldestPolicy(丟棄阻塞隊列中最近的一個任務,並執行當前任務)、Discard(直接丟棄);

    接下來就是將任務添加到線程池之後的運行流程了;

    我們可以調用submit或者execute方法,兩者最大的區別在於,調用submit方法的話,我們可以傳入一個實現Callable介面的對象,進而能在當前任務執行結束之後通過Future對象獲得任務的返回值,submit內部實際上還是執行的execute方法;而調用execute方法的話,是不能獲得任務執行結束之後的返回值的;此外,調用submit方法的話是可以拋出異常的,但是調用execute方法的話,異常在其內部得到了消化,也就是說異常在其內部得到了處理,不會向外傳遞的;

    因為submit方法最終也是會執行execute方法的,因此我們只需要了解execute方法就可以了:

    在execute方法內部會分三種情況來進行處理:

    <1>:首先判斷當前線程池中的線程數量是否小於corePoolSize,如果小於的話,則直接通過addWorker方法創建一個新的Worker對象來執行我們當前的任務;

    <2>:如果說當前線程池中的線程數量大於corePoolSize的話,那麼會嘗試將當前任務添加到阻塞隊列中,然後第二次檢查線程池的狀態,如果線程池不在Running狀態的話,會將剛剛添加到阻塞隊列中的任務移出,同時拒絕當前任務請求;如果第二次檢查發現當前線程池處於Running狀態的話,那麼會查看當前線程池中的工作線程數量是否為0,如果為0的話,就會通過addWorker方法創建一個Worker對象出來處理阻塞隊列中的任務;

    <3>:如果原先線程池就不處於Running狀態或者我們剛剛將當前任務添加到阻塞隊列的時候出現錯誤的話,那麼會去嘗試通過addWorker創建新的Worker來處理當前任務,如果添加失敗的話,則拒絕當前任務請求;

    可以看到在上面的execute方法中,我們僅僅只是檢查了當前線程池中的線程數量有沒有超過corePoolSize的情況,那麼當前線程池中的線程數量有沒有超過maximumPoolSize是在哪裡檢測的呢?實際上是在addWorker方法裡面了,我們可以看下addWorker裡面的一段代碼:

    [java]view plain

  • if(wc>=CAPACITY||

  • wc>=(core?corePoolSize:maximumPoolSize))

  • returnfalse;

  • 如果當前線程數量超過maximumPoolSize的話,直接就會調用return方法,返回false;

    其實到這里我們很明顯可以知道,一個線程池中線程的數量實際上就是這個線程池中Worker的數量,如果Worker的大小超過了corePoolSize,那麼任務都在阻塞隊列裡面了,Worker是Java對我們任務的一個封裝類,他的聲明是醬紫的:

    [java]view plain

  • privatefinalclassWorker

  • implementsRunnable

  • 可以看到他實現了Runnable介面,他是在addWorker方法裡面通過new Worker(firstTask)創建的,我們來看看他的構造函數就知道了:

    [java]view plain

  • Worker(RunnablefirstTask){

  • setState(-1);//

  • this.firstTask=firstTask;

  • this.thread=getThreadFactory().newThread(this);

  • }

  • 而這里的firstTask其實就是我們調用execute或者submit的時候傳入的那個參數罷了,一般來說這些參數是實現Callable或者Runnable介面的;

    在通過addWorker方法創建出來Worker對象之後,這個方法的最後會執行Worker內部thread屬性的start方法,而這個thread屬性實際上就是封裝了Worker的Thread,執行他的start方法實際上執行的是Worker的run方法,因為Worker是實現了Runnable介面的,在run方法裡面就會執行runWorker方法,而runWorker方法裡面首先會判斷當前我們傳入的任務是否為空,不為空的話直接就會執行我們通過execute或者submit方法提交的任務啦,注意一點就是我們雖然會通過submit方法提交實現了Callable介面的對象,但是在調用submit方法的時候,其實是會將Callable對象封裝成實現了Runnable介面對象的,不信我們看看submit方法源碼是怎麼實現的:

    [java]view plain

  • public<T>Future<T>submit(Callable<T>task){

  • if(task==null)thrownewNullPointerException();

  • RunnableFuture<T>ftask=newTaskFor(task);

  • execute(ftask);

  • returnftask;

  • }

  • 看到沒有呢,實際上在你傳入實現了Callable介面對象的時候,在submit方法裡面是會將其封裝成RunnableFuture對象的,而RunnableFuture介面是繼承了Runnable介面的;那麼說白了其實就是直接執行我們提交任務的run方法了;如果為空的話,則會通過getTask方法從阻塞隊列裡面拿出一個任務去執行;在任務執行結束之後繼續從阻塞隊列裡面拿任務,直到getTask的返回值為空則退出runWorker內部循環,那麼什麼情況下getTask返回為空呢?查看getTask方法的源碼注釋可以知道:在Worker必須需要退出的情況下getTask會返回空,具體什麼情況下Worker會退出呢?(1):當Worker的數量超過maximumPoolSize的時候;(2):當線程池狀態為Stop的時候;(3):當線程池狀態為Shutdown並且阻塞隊列為空的時候;(4):使用等待超時時間從阻塞隊列中拿數據,但是超時之後仍然沒有拿到數據;

    如果runWorker方法退出了它裡面的循環,那麼就說明當前阻塞隊列裡面是沒有任務可以執行的了,你可以看到在runWorker方法內部的finally語句塊中執行了processWorkerExit方法,用來對Worker對象進行回收操作,這個方法會傳入一個參數表示需要刪除的Worker對象;在進行Worker回收的時候會調用tryTerminate方法來嘗試關閉線程池,在tryTerminate方法裡面會檢查是否有Worker在工作,檢查線程池的狀態,沒問題的話就會將當前線程池的狀態過渡到Tidying,之後調用terminated方法,將線程池狀態更新到Terminated;

    從上面的分析中,我們可以看出線程池運行的4個階段:

    (1):poolSize < corePoolSize,則直接創建新的線程(核心線程)來執行當前提交的任務;

    (2):poolSize = corePoolSize,並且此時阻塞隊列沒有滿,那麼會將當前任務添加到阻塞隊列中,如果此時存在工作線程(非核心線程)的話,那麼會由工作線程來處理該阻塞隊列中的任務,如果此時工作線程數量為0的話,那麼會創建一個工作線程(非核心線程)出來;

    (3):poolSize = corePoolSize,並且此時阻塞隊列已經滿了,那麼會直接創建新的工作線程(非核心線程)來處理阻塞隊列中的任務;

    (4):poolSize = maximumPoolSize,並且此時阻塞隊列也滿了的話,那麼會觸發拒絕機制,具體決絕策略採用的是什麼就要看我們創建ThreadPoolExecutor的時候傳入的RejectExecutionHandler參數了;

    接下來就是線程池是怎麼做到重用線程的呢?

    個人認為線程池裡面重用線程的工作是在getTask裡面實現的,在getTask裡面是存在兩個for死循環嵌套的,他會不斷的從阻塞對列裡面取出需要執行的任務,返回給我們的runWorker方法裡面,而在runWorker方法裡面只要getTask返回的任務不是空就會執行該任務的run方法來處理它,這樣一直執行下去,直到getTask返回空為止,此時的情況就是阻塞隊列裡面沒有任務了,這樣一個線程處理完一個任務之後接著再處理阻塞隊列中的另一個任務,當然在線程池中的不同線程是可以並發處理阻塞隊列中的任務的,最後在阻塞隊列內部不存在任務的時候會去判斷是否需要回收Worker對象,其實Worker對象的個數就是線程池中線程的個數,至於什麼情況才需要回收,上面已經說了,就是四種情況了;

    最後就是線程池是怎樣被關閉的呢?

    涉及到線程池的關閉,需要用到兩個方法,shutdown和shutdownNow,他們都是位於ThreadPoolExecutor裡面的,對於shutdown的話,他會將線程池狀態切換成Shutdown,此時是不會影響對阻塞隊列中任務執行的,但是會拒絕執行新加進來的任務,同時會回收閑置的Worker;而shutdownNow方法會將線程池狀態切換成Stop,此時既不會再去處理阻塞隊列裡面的任務,也不會去處理新加進來的任務,同時會回收所有Worker;

熱點內容
滑板鞋腳本視頻 發布:2025-02-02 09:48:54 瀏覽:433
群暉怎麼玩安卓模擬器 發布:2025-02-02 09:45:23 瀏覽:558
三星安卓12彩蛋怎麼玩 發布:2025-02-02 09:44:39 瀏覽:744
電腦顯示連接伺服器錯誤 發布:2025-02-02 09:24:10 瀏覽:537
瑞芯微開發板編譯 發布:2025-02-02 09:22:54 瀏覽:147
linux虛擬機用gcc編譯時顯示錯誤 發布:2025-02-02 09:14:01 瀏覽:240
java駝峰 發布:2025-02-02 09:13:26 瀏覽:652
魔獸腳本怎麼用 發布:2025-02-02 09:10:28 瀏覽:538
linuxadobe 發布:2025-02-02 09:09:43 瀏覽:212
sql2000資料庫連接 發布:2025-02-02 09:09:43 瀏覽:726