當前位置:首頁 » 存儲配置 » Java線程池多線程存儲數據

Java線程池多線程存儲數據

發布時間: 2023-04-23 23:05:56

1. java多線程有幾種實現方法線程之間如何同步

一、為什麼要線程同步

因為當我們有多個線程要同時訪問一個變數或對象時,如果這些線程中既有讀又有寫操作時,就會導致變數值或對象的狀態出現混亂,從而導致程序異常。舉個例子,如果一個銀行賬戶同時被兩個線程操作,一個取100塊,一個存錢100塊。假設賬戶原本有0塊,如果取錢線程和存錢線程同時發生,會出現什麼結果呢?取錢不成功,賬戶余額是100.取錢成功了,賬戶余額是0.那到底是哪個呢?很難說清楚。因此多線程同步就是要解決這個問題。

二、不同步時的代碼

Bank.Java

packagethreadTest;

/**
*@authorww
*
*/
publicclassBank{

privateintcount=0;//賬戶余額

//存錢
publicvoidaddMoney(intmoney){
count+=money;
System.out.println(System.currentTimeMillis()+"存進:"+money);
}

//取錢
publicvoidsubMoney(intmoney){
if(count-money<0){
System.out.println("余額不足");
return;
}
count-=money;
System.out.println(+System.currentTimeMillis()+"取出:"+money);
}

//查詢
publicvoidlookMoney(){
System.out.println("賬戶余額:"+count);
}
}

SyncThreadTest.java


packagethreadTest;


publicclassSyncThreadTest{

publicstaticvoidmain(Stringargs[]){
finalBankbank=newBank();

Threadtadd=newThread(newRunnable(){

@Override
publicvoidrun(){
//TODOAuto-generatedmethodstub
while(true){
try{
Thread.sleep(1000);
}catch(InterruptedExceptione){
//TODOAuto-generatedcatchblock
e.printStackTrace();
}
bank.addMoney(100);
bank.lookMoney();
System.out.println(" ");

}
}
});

Threadtsub=newThread(newRunnable(){

@Override
publicvoidrun(){
//TODOAuto-generatedmethodstub
while(true){
bank.subMoney(100);
bank.lookMoney();
System.out.println(" ");
try{
Thread.sleep(1000);
}catch(InterruptedExceptione){
//TODOAuto-generatedcatchblock
e.printStackTrace();
}
}
}
});
tsub.start();

tadd.start();
}



}

余額不足
賬戶余額:0


余額不足
賬戶余額:100


1441790503354存進:100
賬戶余額:100


1441790504354存進:100
賬戶余額:100


1441790504354取出:100
賬戶余額:100


1441790505355存進:100
賬戶余額:100


1441790505355取出:100
賬戶余額:100

三、使用同步時的代碼

(1)同步方法:


即有synchronized關鍵字修飾的方法。由於java的每個對象都有一個內置鎖,當用此關鍵字修飾方法時,內置鎖會保護整個方法。在調用該方法前,需要獲得內置鎖,否則就處於阻塞狀態。

修改後的Bank.java


packagethreadTest;

/**
*@authorww
*
*/
publicclassBank{

privateintcount=0;//賬戶余額

//存錢
(intmoney){
count+=money;
System.out.println(System.currentTimeMillis()+"存進:"+money);
}

//取錢
(intmoney){
if(count-money<0){
System.out.println("余額不足");
return;
}
count-=money;
System.out.println(+System.currentTimeMillis()+"取出:"+money);
}

//查詢
publicvoidlookMoney(){
System.out.println("賬戶余額:"+count);
}
}

再看看運行結果:


余額不足
賬戶余額:0


余額不足
賬戶余額:0


1441790837380存進:100
賬戶余額:100


1441790838380取出:100
賬戶余額:0
1441790838380存進:100
賬戶余額:100
1441790839381取出:100
賬戶余額:0

瞬間感覺可以理解了吧。


註: synchronized關鍵字也可以修飾靜態方法,此時如果調用該靜態方法,將會鎖住整個類

(2)同步代碼塊

即有synchronized關鍵字修飾的語句塊。被該關鍵字修飾的語句塊會自動被加上內置鎖,從而實現同步

Bank.java代碼如下:


packagethreadTest;

/**
*@authorww
*
*/
publicclassBank{

privateintcount=0;//賬戶余額

//存錢
publicvoidaddMoney(intmoney){

synchronized(this){
count+=money;
}
System.out.println(System.currentTimeMillis()+"存進:"+money);
}

//取錢
publicvoidsubMoney(intmoney){

synchronized(this){
if(count-money<0){
System.out.println("余額不足");
return;
}
count-=money;
}
System.out.println(+System.currentTimeMillis()+"取出:"+money);
}

//查詢
publicvoidlookMoney(){
System.out.println("賬戶余額:"+count);
}
}

運行結果如下:

余額不足
賬戶余額:0


1441791806699存進:100
賬戶余額:100


1441791806700取出:100
賬戶余額:0


1441791807699存進:100
賬戶余額:100

效果和方法一差不多。

註:同步是一種高開銷的操作,因此應該盡量減少同步的內容。通常沒有必要同步整個方法,使用synchronized代碼塊同步關鍵代碼即可。

(3)使用特殊域變數(volatile)實現線程同步


a.volatile關鍵字為域變數的訪問提供了一種免鎖機制
b.使用volatile修飾域相當於告訴虛擬機該域可能會被其他線程更新
c.因此每次使用該域就要重新計算,而不是使用寄存器中的值
d.volatile不會提供任何原子操作,它也不能用來修飾final類型的變數

Bank.java代碼如下:


packagethreadTest;

/**
*@authorww
*
*/
publicclassBank{

privatevolatileintcount=0;//賬戶余額

//存錢
publicvoidaddMoney(intmoney){

count+=money;
System.out.println(System.currentTimeMillis()+"存進:"+money);
}

//取錢
publicvoidsubMoney(intmoney){

if(count-money<0){
System.out.println("余額不足");
return;
}
count-=money;
System.out.println(+System.currentTimeMillis()+"取出:"+money);
}

//查詢
publicvoidlookMoney(){
System.out.println("賬戶余額:"+count);
}
}

運行效果怎樣呢?


余額不足
賬戶余額:0


余額不足
賬戶余額:100


1441792010959存進:100
賬戶余額:100


1441792011960取出:100
賬戶余額:0


1441792011961存進:100
賬戶余額:100

是不是又看不懂了,又亂了。這是為什麼呢?就是因為volatile不能保證原子操作導致的,因此volatile不能代替synchronized。此外volatile會組織編譯器對代碼優化,因此能不使用它就不適用它吧。它的原理是每次要線程要訪問volatile修飾的變數時都是從內存中讀取,而不是存緩存當中讀取,因此每個線程訪問到的變數值都是一樣的。這樣就保證了同步。


(4)使用重入鎖實現線程同步

在JavaSE5.0中新增了一個java.util.concurrent包來支持同步。ReentrantLock類是可重入、互斥、實現了Lock介面的鎖,它與使用synchronized方法和快具有相同的基本行為和語義,並且擴展了其能力。
ReenreantLock類的常用方法有:
ReentrantLock() : 創建一個ReentrantLock實例
lock() : 獲得鎖
unlock() : 釋放鎖
註:ReentrantLock()還有一個可以創建公平鎖的構造方法,但由於能大幅度降低程序運行效率,不推薦使用
Bank.java代碼修改如下:


packagethreadTest;

importjava.util.concurrent.locks.Lock;
importjava.util.concurrent.locks.ReentrantLock;

/**
*@authorww
*
*/
publicclassBank{

privateintcount=0;//賬戶余額

//需要聲明這個鎖
privateLocklock=newReentrantLock();

//存錢
publicvoidaddMoney(intmoney){
lock.lock();//上鎖
try{
count+=money;
System.out.println(System.currentTimeMillis()+"存進:"+money);

}finally{
lock.unlock();//解鎖
}
}

//取錢
publicvoidsubMoney(intmoney){
lock.lock();
try{

if(count-money<0){
System.out.println("余額不足");
return;
}
count-=money;
System.out.println(+System.currentTimeMillis()+"取出:"+money);
}finally{
lock.unlock();
}
}

//查詢
publicvoidlookMoney(){
System.out.println("賬戶余額:"+count);
}
}

運行效果怎麼樣呢?


余額不足
賬戶余額:0


余額不足
賬戶余額:0


1441792891934存進:100
賬戶余額:100


1441792892935存進:100
賬戶余額:200


1441792892954取出:100
賬戶余額:100

效果和前兩種方法差不多。


如果synchronized關鍵字能滿足用戶的需求,就用synchronized,因為它能簡化代碼 。如果需要更高級的功能,就用ReentrantLock類,此時要注意及時釋放鎖,否則會出現死鎖,通常在finally代碼釋放鎖

(5)使用局部變數實現線程同步


Bank.java代碼如下:


packagethreadTest;


/**
*@authorww
*
*/
publicclassBank{

privatestaticThreadLocal<Integer>count=newThreadLocal<Integer>(){

@Override
protectedIntegerinitialValue(){
//TODOAuto-generatedmethodstub
return0;
}

};


//存錢
publicvoidaddMoney(intmoney){
count.set(count.get()+money);
System.out.println(System.currentTimeMillis()+"存進:"+money);

}

//取錢
publicvoidsubMoney(intmoney){
if(count.get()-money<0){
System.out.println("余額不足");
return;
}
count.set(count.get()-money);
System.out.println(+System.currentTimeMillis()+"取出:"+money);
}

//查詢
publicvoidlookMoney(){
System.out.println("賬戶余額:"+count.get());
}
}

運行效果:


余額不足
賬戶余額:0


余額不足
賬戶余額:0


1441794247939存進:100
賬戶余額:100


余額不足
1441794248940存進:100
賬戶余額:0


賬戶余額:200


余額不足
賬戶余額:0


1441794249941存進:100
賬戶余額:300

看了運行效果,一開始一頭霧水,怎麼只讓存,不讓取啊?看看ThreadLocal的原理:


如果使用ThreadLocal管理變數,則每一個使用該變數的線程都獲得該變數的副本,副本之間相互獨立,這樣每一個線程都可以隨意修改自己的變數副本,而不會對其他線程產生影響。現在明白了吧,原來每個線程運行的都是一個副本,也就是說存錢和取錢是兩個賬戶,知識名字相同而已。所以就會發生上面的效果。


ThreadLocal與同步機制
a.ThreadLocal與同步機制都是為了解決多線程中相同變數的訪問沖突問題

b.前者採用以"空間換時間"的方法,後者採用以"時間換空間"的方式

2. java多線程訪問資料庫怎麼優化啊,並發很大

個人覺得高寫入並發的話先用緩存緩沖一下,可以合並的寫入合並成批量寫入可以管一些用但終歸寫入量很大的話還是要在資料庫端優化了,把並發寫均衡到多台伺服器上,應該沒有別的辦法了。如果瓶頸不再資料庫那就是應用伺服器處理能力不足,升級應用伺服器。

3. Java實現通用線程池

線程池通俗的描述就是預先創建若干空閑線程 等到需要用多線程去處理事務的時候去喚醒某些空閑線程執行處理任務 這樣就省去了頻繁創建線程的時間 因為頻 繁創建線程是要耗費大量的CPU資源的 如果一個應用程序需要頻繁地處理大量並發事務 不斷的創建銷毀線程往往會大大地降低系統的效率 這時候線程池就派 上用場了

本文旨在使用Java語言編寫一個通用的線程池 當需要使用線程池處理事務時 只需按照指定規范封裝好事務處理對象 然後用已有的線程池對象去自動選擇空 閑線程自動調用事務處理對象即可 並實現線程池的動態修改(修改當前線程數 最大線程數等) 下面是實現代碼

//ThreadTask java

package polarman threadpool;

/** *//**

*線程任務

* @author ryang

*

*/

public interface ThreadTask {

public void run();

}

//PooledThread java

package polarman threadpool;

import java util Collection; import java util Vector;

/** *//**

*接受線程池管理的線程

* @author ryang

*

*/

public class PooledThread extends Thread {

protected Vector tasks = new Vector();

protected boolean running = false;

protected boolean stopped = false;

protected boolean paused = false;

protected boolean killed = false;

private ThreadPool pool;

public PooledThread(ThreadPool pool) { this pool = pool;

}

public void putTask(ThreadTask task) { tasks add(task);

}

public void putTasks(ThreadTask[] tasks) { for(int i= ; i<tasks length; i++) this tasks add(tasks[i]);

}

public void putTasks(Collection tasks) { this tasks addAll(tasks);

}

protected ThreadTask popTask() { if(tasks size() > ) return (ThreadTask)tasks remove( );

else

return null;

}

public boolean isRunning() {

return running;

}

public void stopTasks() {

stopped = true;

}

public void stopTasksSync() {

stopTasks();

while(isRunning()) { try {

sleep( );

} catch (InterruptedException e) {

}

}

}

public void pauseTasks() {

paused = true;

}

public void pauseTasksSync() {

pauseTasks();

while(isRunning()) { try {

sleep( );

} catch (InterruptedException e) {

}

}

}

public void kill() { if(!running)

interrupt();

else

killed = true;

}

public void killSync() {

kill();

while(isAlive()) { try {

sleep( );

} catch (InterruptedException e) {

}

}

}

public synchronized void startTasks() {

running = true;

this notify();

}

public synchronized void run() { try { while(true) { if(!running || tasks size() == ) { pool notifyForIdleThread(); //System out println(Thread currentThread() getId() + : 空閑 ); this wait(); }else {

ThreadTask task;

while((task = popTask()) != null) { task run(); if(stopped) {

stopped = false;

if(tasks size() > ) { tasks clear(); System out println(Thread currentThread() getId() + : Tasks are stopped );

break;

}

}

if(paused) {

paused = false;

if(tasks size() > ) { System out println(Thread currentThread() getId() + : Tasks are paused );

break;

}

}

}

running = false;

}

if(killed) {

killed = false;

break;

}

}

}catch(InterruptedException e) {

return;

}

//System out println(Thread currentThread() getId() + : Killed );

}

}

//ThreadPool java

package polarman threadpool;

import java util Collection; import java util Iterator; import java util Vector;

/** *//**

*線程池

* @author ryang

*

*/

public class ThreadPool {

protected int maxPoolSize;

protected int initPoolSize;

protected Vector threads = new Vector();

protected boolean initialized = false;

protected boolean hasIdleThread = false;

public ThreadPool(int maxPoolSize int initPoolSize) { this maxPoolSize = maxPoolSize; this initPoolSize = initPoolSize;

}

public void init() {

initialized = true;

for(int i= ; i<initPoolSize; i++) {

PooledThread thread = new PooledThread(this);

thread start(); threads add(thread);

}

//System out println( 線程池初始化結束 線程數= + threads size() + 最大線程數= + maxPoolSize);

}

public void setMaxPoolSize(int maxPoolSize) { //System out println( 重設最大線程數 最大線程數= + maxPoolSize); this maxPoolSize = maxPoolSize;

if(maxPoolSize < getPoolSize())

setPoolSize(maxPoolSize);

}

/** *//**

*重設當前線程數

* 若需殺掉某線程 線程不會立刻殺掉 而會等到線程中的事務處理完成* 但此方法會立刻從線程池中移除該線程 不會等待事務處理結束

* @param size

*/

public void setPoolSize(int size) { if(!initialized) {

initPoolSize = size;

return;

}else if(size > getPoolSize()) { for(int i=getPoolSize(); i<size && i<maxPoolSize; i++) {

PooledThread thread = new PooledThread(this);

thread start(); threads add(thread);

}

}else if(size < getPoolSize()) { while(getPoolSize() > size) { PooledThread th = (PooledThread)threads remove( ); th kill();

}

}

//System out println( 重設線程數 線程數= + threads size());

}

public int getPoolSize() { return threads size();

}

protected void notifyForIdleThread() {

hasIdleThread = true;

}

protected boolean waitForIdleThread() {

hasIdleThread = false;

while(!hasIdleThread && getPoolSize() >= maxPoolSize) { try { Thread sleep( ); } catch (InterruptedException e) {

return false;

}

}

return true;

}

public synchronized PooledThread getIdleThread() { while(true) { for(Iterator itr=erator(); itr hasNext();) { PooledThread th = (PooledThread)itr next(); if(!th isRunning())

return th;

}

if(getPoolSize() < maxPoolSize) {

PooledThread thread = new PooledThread(this);

thread start(); threads add(thread);

return thread;

}

//System out println( 線程池已滿 等待 );

if(waitForIdleThread() == false)

return null;

}

}

public void processTask(ThreadTask task) {

PooledThread th = getIdleThread();

if(th != null) { th putTask(task); th startTasks();

}

}

public void processTasksInSingleThread(ThreadTask[] tasks) {

PooledThread th = getIdleThread();

if(th != null) { th putTasks(tasks); th startTasks();

}

}

public void processTasksInSingleThread(Collection tasks) {

PooledThread th = getIdleThread();

if(th != null) { th putTasks(tasks); th startTasks();

}

}

}

下面是線程池的測試程序

//ThreadPoolTest java

import java io BufferedReader; import java io IOException; import java io InputStreamReader;

import polarman threadpool ThreadPool; import polarman threadpool ThreadTask;

public class ThreadPoolTest {

public static void main(String[] args) { System out println( quit 退出 ); System out println( task A 啟動任務A 時長為 秒 ); System out println( size 設置當前線程池大小為 ); System out println( max 設置線程池最大線程數為 ); System out println();

final ThreadPool pool = new ThreadPool( ); pool init();

Thread cmdThread = new Thread() { public void run() {

BufferedReader reader = new BufferedReader(new InputStreamReader(System in));

while(true) { try { String line = reader readLine(); String words[] = line split( ); if(words[ ] equalsIgnoreCase( quit )) { System exit( ); }else if(words[ ] equalsIgnoreCase( size ) && words length >= ) { try { int size = Integer parseInt(words[ ]); pool setPoolSize(size); }catch(Exception e) {

}

}else if(words[ ] equalsIgnoreCase( max ) && words length >= ) { try { int max = Integer parseInt(words[ ]); pool setMaxPoolSize(max); }catch(Exception e) {

}

}else if(words[ ] equalsIgnoreCase( task ) && words length >= ) { try { int timelen = Integer parseInt(words[ ]); SimpleTask task = new SimpleTask(words[ ] timelen * ); pool processTask(task); }catch(Exception e) {

}

}

} catch (IOException e) { e printStackTrace();

}

}

}

};

cmdThread start();

/**//*

for(int i= ; i< ; i++){

SimpleTask task = new SimpleTask( Task + i (i+ )* ); pool processTask(task);

}*/

}

}

class SimpleTask implements ThreadTask {

private String taskName;

private int timeLen;

public SimpleTask(String taskName int timeLen) { this taskName = taskName; this timeLen = timeLen;

}

public void run() { System out println(Thread currentThread() getId() +

: START TASK + taskName + );

try { Thread sleep(timeLen); } catch (InterruptedException e) {

}

System out println(Thread currentThread() getId() +

: END TASK + taskName + );

}

}

使用此線程池相當簡單 下面兩行代碼初始化線程池

ThreadPool pool = new ThreadPool( ); pool init();

要處理的任務實現ThreadTask 介面即可(如測試代碼里的SimpleTask) 這個介面只有一個方法run()

兩行代碼即可調用

lishixin/Article/program/Java/hx/201311/27203

4. javase線程怎麼存儲到容器

Java 並發重要知識點
java 線程池
ThreadPoolExecutor 類分析
ThreadPoolExecutor 類中提供的四個構造方法。我們來看最長的那個,其餘三個都是在這個構造方法的基礎上產生(其他幾個構造方法說白點都是給定某些默認參數的構造方法比如默認制定拒絕策略是什麼)。

/**
* 用給定的初始參數創建一個新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//線程池的核心線程數量
int maximumPoolSize,//線程池的最大線程數
long keepAliveTime,//當線程數大於核心線程數時,多餘的空閑線程存活的最長時間
TimeUnit unit,//時間單位
BlockingQueue<Runnable> workQueue,//任務隊列,用來儲存等待執行任務的隊列
ThreadFactory threadFactory,//線程工廠,用來創建線程,一般默認即可
RejectedExecutionHandler handler//拒絕策略,當提交的任務過多而不能及時處理時,我們可以定製策略來處理任務
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
下面這些對創建非常重要,在後面使用線程池的過程中你一定會用到!所以,務必拿著小本本記清楚。

ThreadPoolExecutor 3 個最重要的參數:

corePoolSize : 核心線程數線程數定義了最小可以同時運行的線程數量。
maximumPoolSize : 當隊列中存放的任務達到隊列容量的時候,當前可以同時運行的線程數量變為最大線程數。
workQueue: 當新任務來的時候會先判斷當前運行的線程數量是否達到核心線程數,如果達到的話,新任務就會被存放在隊列中。
ThreadPoolExecutor其他常見參數 :

keepAliveTime:當線程池中的線程數量大於 corePoolSize 的時候,如果這時沒有新的任務提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了 keepAliveTime才會被回收銷毀;
unit : keepAliveTime 參數的時間單位。
threadFactory :executor 創建新線程的時候會用到。
handler :飽和策略。關於飽和策略下面單獨介紹一下。
下面這張圖可以加深你對線程池中各個參數的相互關系的理解(圖片來源:《Java 性能調優實戰》):

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-nzbqGRz9-1654600571133)(https://javaguide.cn/assets/%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%90%84%E4%B8%AA%E5%8F%82%E6%95%B0%E4%B9%8B%E9%97%B4%E7%9A%84%E5%85%B3%E7%B3%BB.d65f3309.png)]

ThreadPoolExecutor 飽和策略定義:

如果當前同時運行的線程數量達到最大線程數量並且隊列也已經被放滿了任務時,ThreadPoolTaskExecutor 定義一些策略:

ThreadPoolExecutor.AbortPolicy :拋出 RejectedExecutionException來拒絕新任務的處理。
ThreadPoolExecutor.CallerRunsPolicy :調用執行自己的線程運行任務,也就是直接在調用execute方法的線程中運行(run)被拒絕的任務,如果執行程序已關閉,則會丟棄該任務。因此這種策略會降低對於新任務提交速度,影響程序的整體性能。如果您的應用程序可以承受此延遲並且你要求任何一個任務請求都要被執行的話,你可以選擇這個策略。
ThreadPoolExecutor.DiscardPolicy :不處理新任務,直接丟棄掉。
ThreadPoolExecutor.DiscardOldestPolicy : 此策略將丟棄最早的未處理的任務請求。
舉個例子:

Spring 通過 ThreadPoolTaskExecutor 或者我們直接通過 ThreadPoolExecutor 的構造函數創建線程池的時候,當我們不指定 RejectedExecutionHandler 飽和策略的話來配置線程池的時候默認使用的是 ThreadPoolExecutor.AbortPolicy。在默認情況下,ThreadPoolExecutor 將拋出 RejectedExecutionException 來拒絕新來的任務 ,這代表你將丟失對這個任務的處理。 對於可伸縮的應用程序,建議使用 ThreadPoolExecutor.CallerRunsPolicy。當最大池被填滿時,此策略為我們提供可伸縮隊列。(這個直接查看 ThreadPoolExecutor 的構造函數源碼就可以看出,比較簡單的原因,這里就不貼代碼了。)

推薦使用 ThreadPoolExecutor 構造函數創建線程池
在《阿里巴巴 Java 開發手冊》「並發處理」這一章節,明確指出線程資源必須通過線程池提供,不允許在應用中自行顯式創建線程。

為什麼呢?

使用線程池的好處是減少在創建和銷毀線程上所消耗的時間以及系統資源開銷,解決資源不足的問題。如果不使用線程池,有可能會造成系統創建大量同類線程而導致消耗完內存或者「過度切換」的問題。

另外,《阿里巴巴 Java 開發手冊》中強制線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 構造函數的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險

Executors 返回線程池對象的弊端如下(後文會詳細介紹到):

FixedThreadPool 和 SingleThreadExecutor : 允許請求的隊列長度為 Integer.MAX_VALUE,可能堆積大量的請求,從而導致 OOM。
CachedThreadPool 和 ScheledThreadPool : 允許創建的線程數量為 Integer.MAX_VALUE ,可能會創建大量線程,從而導致 OOM。
方式一:通過ThreadPoolExecutor構造函數實現(推薦)通過構造方法實現

方式二:通過 Executor 框架的工具類 Executors 來實現 我們可以創建三種類型的 ThreadPoolExecutor:

FixedThreadPool
SingleThreadExecutor
CachedThreadPool
對應 Executors 工具類中的方法如圖所示:

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-YGd4ygZu-1654600571136)(https://javaguide.cn/assets/Executors%E5%B7%A5%E5%85%B7%E7%B1%BB.4b0cbd16.png)]

正確配置線程池參數
說到如何給線程池配置參數,美團的騷操作至今讓我難忘(後面會提到)!

我們先來看一下各種書籍和博客上一般推薦的配置線程池參數的方式,可以作為參考!

常規操作
很多人甚至可能都會覺得把線程池配置過大一點比較好!我覺得這明顯是有問題的。就拿我們生活中非常常見的一例子來說:並不是人多就能把事情做好,增加了溝通交流成本。你本來一件事情只需要 3 個人做,你硬是拉來了 6 個人,會提升做事效率嘛?我想並不會。 線程數量過多的影響也是和我們分配多少人做事情一樣,對於多線程這個場景來說主要是增加了上下文切換成本。不清楚什麼是上下文切換的話,可以看我下面的介紹。

上下文切換:

多線程編程中一般線程的個數都大於 CPU 核心的個數,而一個 CPU 核心在任意時刻只能被一個線程使用,為了讓這些線程都能得到有效執行,CPU 採取的策略是為每個線程分配時間片並輪轉的形式。當一個線程的時間片用完的時候就會重新處於就緒狀態讓給其他線程使用,這個過程就屬於一次上下文切換。概括來說就是:當前任務在執行完 CPU 時間片切換到另一個任務之前會先保存自己的狀態,以便下次再切換回這個任務時,可以再載入這個任務的狀態。任務從保存到再載入的過程就是一次上下文切換。

上下文切換通常是計算密集型的。也就是說,它需要相當可觀的處理器時間,在每秒幾十上百次的切換中,每次切換都需要納秒量級的時間。所以,上下文切換對系統來說意味著消耗大量的 CPU 時間,事實上,可能是操作系統中時間消耗最大的操作。

Linux 相比與其他操作系統(包括其他類 Unix 系統)有很多的優點,其中有一項就是,其上下文切換和模式切換的時間消耗非常少。

類比於實現世界中的人類通過合作做某件事情,我們可以肯定的一點是線程池大小設置過大或者過小都會有問題,合適的才是最好。

如果我們設置的線程池數量太小的話,如果同一時間有大量任務/請求需要處理,可能會導致大量的請求/任務在任務隊列中排隊等待執行,甚至會出現任務隊列滿了之後任務/請求無法處理的情況,或者大量任務堆積在任務隊列導致 OOM。這樣很明顯是有問題的! CPU 根本沒有得到充分利用。

但是,如果我們設置線程數量太大,大量線程可能會同時在爭取 CPU 資源,這樣會導致大量的上下文切換,從而增加線程的執行時間,影響了整體執行效率。

有一個簡單並且適用面比較廣的公式:

CPU 密集型任務(N+1): 這種任務消耗的主要是 CPU 資源,可以將線程數設置為 N(CPU 核心數)+1,比 CPU 核心數多出來的一個線程是為了防止線程偶發的缺頁中斷,或者其它原因導致的任務暫停而帶來的影響。一旦任務暫停,CPU 就會處於空閑狀態,而在這種情況下多出來的一個線程就可以充分利用 CPU 的空閑時間。
I/O 密集型任務(2N): 這種任務應用起來,系統會用大部分的時間來處理 I/O 交互,而線程在處理 I/O 的時間段內不會佔用 CPU 來處理,這時就可以將 CPU 交出給其它線程使用。因此在 I/O 密集型任務的應用中,我們可以多配置一些線程,具體的計算方法是 2N。
如何判斷是 CPU 密集任務還是 IO 密集任務?

CPU 密集型簡單理解就是利用 CPU 計算能力的任務比如你在內存中對大量數據進行排序。但凡涉及到網路讀取,文件讀取這類都是 IO 密集型,這類任務的特點是 CPU 計算耗費時間相比於等待 IO 操作完成的時間來說很少,大部分時間都花在了等待 IO 操作完成上。

美團的騷操作
美團技術團隊在《Java線程池實現原理及其在美團業務中的實踐》open in new window這篇文章中介紹到對線程池參數實現可自定義配置的思路和方法。

美團技術團隊的思路是主要對線程池的核心參數實現自定義可配置。這三個核心參數是:

corePoolSize : 核心線程數線程數定義了最小可以同時運行的線程數量。
maximumPoolSize : 當隊列中存放的任務達到隊列容量的時候,當前可以同時運行的線程數量變為最大線程數。
workQueue: 當新任務來的時候會先判斷當前運行的線程數量是否達到核心線程數,如果達到的話,新任務就會被存放在隊列中。
為什麼是這三個參數?

我在這篇《新手也能看懂的線程池學習總結》open in new window 中就說過這三個參數是 ThreadPoolExecutor 最重要的參數,它們基本決定了線程池對於任務的處理策略。

如何支持參數動態配置? 且看 ThreadPoolExecutor 提供的下面這些方法。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-Sm89qdJZ-1654600571137)(https://javaguide.cn/assets/b6fd95a7-4c9d-4fc6-ad26-890adb3f6c4c.5ff332dc.png)]

格外需要注意的是corePoolSize, 程序運行期間的時候,我們調用 setCorePoolSize() 這個方法的話,線程池會首先判斷當前工作線程數是否大於corePoolSize,如果大於的話就會回收工作線程。

另外,你也看到了上面並沒有動態指定隊列長度的方法,美團的方式是自定義了一個叫做 的隊列(主要就是把LinkedBlockingQueue的capacity 欄位的final關鍵字修飾給去掉了,讓它變為可變的)。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-cmNN5yAL-1654600571138)(https://javaguide.cn/assets/19a0255a-6ef3-4835-98d1-a839d1983332.b334d1e9.png)]

還沒看夠?推薦 why神的[《如何設置線程池參數?美團給出了一個讓面試官虎軀一震的回答。》open in new window](如何設置線程池參數?美團給出了一個讓面試官虎軀一震的回答。 (qq.com))這篇文章,深度剖析,很不錯哦!

Java 常見並發容器
JDK 提供的這些容器大部分在 java.util.concurrent 包中。

ConcurrentHashMap : 線程安全的 HashMap
CopyOnWriteArrayList : 線程安全的 List,在讀多寫少的場合性能非常好,遠遠好於 Vector。
ConcurrentLinkedQueue : 高效的並發隊列,使用鏈表實現。可以看做一個線程安全的 LinkedList,這是一個非阻塞隊列。
BlockingQueue : 這是一個介面,JDK 內部通過鏈表、數組等方式實現了這個介面。表示阻塞隊列,非常適合用於作為數據共享的通道。
ConcurrentSkipListMap : 跳錶的實現。這是一個 Map,使用跳錶的數據結構進行快速查找。
ConcurrentHashMap
我們知道 HashMap 不是線程安全的,在並發場景下如果要保證一種可行的方式是使用 Collections.synchronizedMap() 方法來包裝我們的 HashMap。但這是通過使用一個全局的鎖來同步不同線程間的並發訪問,因此會帶來不可忽視的性能問題。

所以就有了 HashMap 的線程安全版本—— ConcurrentHashMap 的誕生。

在 ConcurrentHashMap 中,無論是讀操作還是寫操作都能保證很高的性能:在進行讀操作時(幾乎)不需要加鎖,而在寫操作時通過鎖分段技術只對所操作的段加鎖而不影響客戶端對其它段的訪問。

CopyOnWriteArrayList
CopyOnWriteArrayList 簡介
public class CopyOnWriteArrayList<E>
extends Object
implements List<E>, RandomAccess, Cloneable, Serializable
在很多應用場景中,讀操作可能會遠遠大於寫操作。由於讀操作根本不會修改原有的數據,因此對於每次讀取都進行加鎖其實是一種資源浪費。我們應該允許多個線程同時訪問 List 的內部數據,畢竟讀取操作是安全的。

這和我們之前在多線程章節講過 ReentrantReadWriteLock 讀寫鎖的思想非常類似,也就是讀讀共享、寫寫互斥、讀寫互斥、寫讀互斥。JDK 中提供了 CopyOnWriteArrayList 類比相比於在讀寫鎖的思想又更進一步。為了將讀取的性能發揮到極致,CopyOnWriteArrayList 讀取是完全不用加鎖的,並且更厲害的是:寫入也不會阻塞讀取操作。只有寫入和寫入之間需要進行同步等待。這樣一來,讀操作的性能就會大幅度提升。那它是怎麼做的呢?

CopyOnWriteArrayList 是如何做到的?
CopyOnWriteArrayList 類的所有可變操作(add,set 等等)都是通過創建底層數組的新副本來實現的。當 List 需要被修改的時候,我並不修改原有內容,而是對原有數據進行一次復制,將修改的內容寫入副本。寫完之後,再將修改完的副本替換原來的數據,這樣就可以保證寫操作不會影響讀操作了。

從 CopyOnWriteArrayList 的名字就能看出 CopyOnWriteArrayList 是滿足 CopyOnWrite 的。所謂 CopyOnWrite 也就是說:在計算機,如果你想要對一塊內存進行修改時,我們不在原有內存塊中進行寫操作,而是將內存拷貝一份,在新的內存中進行寫操作,寫完之後呢,就將指向原來內存指針指向新的內存,原來的內存就可以被回收掉了。

CopyOnWriteArrayList 讀取和寫入源碼簡單分析
CopyOnWriteArrayList 讀取操作的實現
讀取操作沒有任何同步控制和鎖操作,理由就是內部數組 array 不會發生修改,只會被另外一個 array 替換,因此可以保證數據安全。

/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
public E get(int index) {
return get(getArray(), index);
}
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
return (E) a[index];
}
final Object[] getArray() {
return array;
}

CopyOnWriteArrayList 寫入操作的實現
CopyOnWriteArrayList 寫入操作 add()方法在添加集合的時候加了鎖,保證了同步,避免了多線程寫的時候會 出多個副本出來。

/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();//加鎖
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.Of(elements, len + 1);//拷貝新數組
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();//釋放鎖
}
}

ConcurrentLinkedQueue
Java 提供的線程安全的 Queue 可以分為阻塞隊列和非阻塞隊列,其中阻塞隊列的典型例子是 BlockingQueue,非阻塞隊列的典型例子是 ConcurrentLinkedQueue,在實際應用中要根據實際需要選用阻塞隊列或者非阻塞隊列。 阻塞隊列可以通過加鎖來實現,非阻塞隊列可以通過 CAS 操作實現。

從名字可以看出,ConcurrentLinkedQueue這個隊列使用鏈表作為其數據結構.ConcurrentLinkedQueue 應該算是在高並發環境中性能最好的隊列了。它之所有能有很好的性能,是因為其內部復雜的實現。

ConcurrentLinkedQueue 內部代碼我們就不分析了,大家知道 ConcurrentLinkedQueue 主要使用 CAS 非阻塞演算法來實現線程安全就好了。

ConcurrentLinkedQueue 適合在對性能要求相對較高,同時對隊列的讀寫存在多個線程同時進行的場景,即如果對隊列加鎖的成本較高則適合使用無鎖的 ConcurrentLinkedQueue 來替代。

BlockingQueue
BlockingQueue 簡介
上面我們己經提到了 ConcurrentLinkedQueue 作為高性能的非阻塞隊列。下面我們要講到的是阻塞隊列——BlockingQueue。阻塞隊列(BlockingQueue)被廣泛使用在「生產者-消費者」問題中,其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。當隊列容器已滿,生產者線程會被阻塞,直到隊列未滿;當隊列容器為空時,消費者線程會被阻塞,直至隊列非空時為止。

BlockingQueue 是一個介面,繼承自 Queue,所以其實現類也可以作為 Queue 的實現來使用,而 Queue 又繼承自 Collection 介面。下面是 BlockingQueue 的相關實現類:

BlockingQueue 的實現類

下面主要介紹一下 3 個常見的 BlockingQueue 的實現類:ArrayBlockingQueue、LinkedBlockingQueue 、PriorityBlockingQueue 。

ArrayBlockingQueue
ArrayBlockingQueue 是 BlockingQueue 介面的有界隊列實現類,底層採用數組來實現。

public class ArrayBlockingQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, Serializable{}
ArrayBlockingQueue 一旦創建,容量不能改變。其並發控制採用可重入鎖 ReentrantLock ,不管是插入操作還是讀取操作,都需要獲取到鎖才能進行操作。當隊列容量滿時,嘗試將元素放入隊列將導致操作阻塞;嘗試從一個空隊列中取一個元素也會同樣阻塞。

ArrayBlockingQueue 默認情況下不能保證線程訪問隊列的公平性,所謂公平性是指嚴格按照線程等待的絕對時間順序,即最先等待的線程能夠最先訪問到 ArrayBlockingQueue。而非公平性則是指訪問 ArrayBlockingQueue 的順序不是遵守嚴格的時間順序,有可能存在,當 ArrayBlockingQueue 可以被訪問時,長時間阻塞的線程依然無法訪問到 ArrayBlockingQueue。如果保證公平性,通常會降低吞吐量。如果需要獲得公平性的 ArrayBlockingQueue,可採用如下代碼:

private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);
1
1
LinkedBlockingQueue
LinkedBlockingQueue 底層基於單向鏈表實現的阻塞隊列,可以當做無界隊列也可以當做有界隊列來使用,同樣滿足 FIFO 的特性,與 ArrayBlockingQueue 相比起來具有更高的吞吐量,為了防止 LinkedBlockingQueue 容量迅速增,損耗大量內存。通常在創建 LinkedBlockingQueue 對象時,會指定其大小,如果未指定,容量等於 Integer.MAX_VALUE 。

相關構造方法:

/**
*某種意義上的無界隊列
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

/**
*有界隊列
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

PriorityBlockingQueue
PriorityBlockingQueue 是一個支持優先順序的無界阻塞隊列。默認情況下元素採用自然順序進行排序,也可以通過自定義類實現 compareTo() 方法來指定元素排序規則,或者初始化時通過構造器參數 Comparator 來指定排序規則。

PriorityBlockingQueue 並發控制採用的是可重入鎖 ReentrantLock,隊列為無界隊列(ArrayBlockingQueue 是有界隊列,LinkedBlockingQueue 也可以通過在構造函數中傳入 capacity 指定隊列最大的容量,但是 PriorityBlockingQueue 只能指定初始的隊列大小,後面插入元素的時候,如果空間不夠的話會自動擴容)。

簡單地說,它就是 PriorityQueue 的線程安全版本。不可以插入 null 值,同時,插入隊列的對象必須是可比較大小的(comparable),否則報 ClassCastException 異常。它的插入操作 put 方法不會 block,因為它是無界隊列(take 方法在隊列為空的時候會阻塞)。

推薦文章: 《解讀 Java 並發隊列 BlockingQueue》open in new window

ConcurrentSkipListMap
下面這部分內容參考了極客時間專欄《數據結構與演算法之美》open in new window以及《實戰 Java 高並發程序設計》。

為了引出 ConcurrentSkipListMap,先帶著大家簡單理解一下跳錶。

對於一個單鏈表,即使鏈表是有序的,如果我們想要在其中查找某個數據,也只能從頭到尾遍歷鏈表,這樣效率自然就會很低,跳錶就不一樣了。跳錶是一種可以用來快速查找的數據結構,有點類似於平衡樹。它們都可以對元素進行快速的查找。但一個重要的區別是:對平衡樹的插入和刪除往往很可能導致平衡樹進行一次全局的調整。而對跳錶的插入和刪除只需要對整個數據結構的局部進行操作即可。這樣帶來的好處是:在高並發的情況下,你會需要一個全局鎖來保證整個平衡樹的線程安全。而對於跳錶,你只需要部分鎖即可。這樣,在高並發環境下,你就可以擁有更好的性能。而就查詢的性能而言,跳錶的時間復雜度也是 O(logn) 所以在並發數據結構中,JDK 使用跳錶來實現一個 Map。

跳錶的本質是同時維護了多個鏈表,並且鏈表是分層的,

2級索引跳錶

最低層的鏈表維護了跳錶內所有的元素,每上面一層鏈表都是下面一層的子集。

跳錶內的所有鏈表的元素都是排序的。查找時,可以從頂級鏈表開始找。一旦發現被查找的元素大於當前鏈表中的取值,就會轉入下一層鏈表繼續找。這也就是說在查找過程中,搜索是跳躍式的。如上圖所示,在跳錶中查找元素 18。

在跳錶中查找元素18

查找 18 的時候原來需要遍歷 18 次,現在只需要 7 次即可。針對鏈表長度比較大的時候,構建索引查找效率的提升就會非常明顯。

從上面很容易看出,跳錶是一種利用空間換時間的演算法。

使用跳錶實現 Map 和使用哈希演算法實現 Map 的另外一個不同之處是:哈希並不會保存元素的順序,而跳錶內所有的元素都是排序的。因此在對跳錶進行遍歷時,你會得到一個有序的結果。所以,如果你的應用需要有序性,那麼跳錶就是你不二的選擇。JDK 中實現這一數據結構的類是 ConcurrentSkipListMap。

5. Java多線程為什麼使用線程池

1:提高效率 創建好一定數量的線程放在池中,等需要使用的時候就從池中拿一個,這要比需要的時候創建一個線程對象要快的多。

2:方便管理 可以編寫線程池管理代碼對池中的線程統一進行管理,比如說系統啟動時由該程序創建100個線程,每當有請求的時候,就分配一個線程去工作, 如果剛好並發有101個請求,那多出的這一個請求可以排隊等候,避免因無休止的創建線程導致系統崩潰

6. java中線程池如何管理多個線程

ExecutorService threadPoll = Executors.newCachedThreadPool(); /陵沖/創建線程池
threadPoll.execute(線程1);//執行線程一

線程池根據程序需求創建新線程亂簡的,需求多時,創建的就多,需求少時,JVM自己會慢慢的釋放掉多餘的線嘩汪褲程

不需求程序員去做什麼,JVM自己會處理,程序員調用就行了..

7. Java 中幾種常用的線程池

一:newCachedThreadPool
(1)緩存型池子,先查看池中有沒有以前建立的線程,如果有,就reuse,如果沒有,就建立一個新的線程加入池中;
(2)緩存型池子,通常用於執行一些生存周期很短的非同步型任務;因此一些面向連接的daemon型server中用得不多;
(3)能reuse的線程,必須是timeout IDLE內的池中線程,預設timeout是60s,超過這個IDLE時長,線程實例將被終止及移出池。
(4)注意,放入CachedThreadPool的線程不必擔心其結束,超過TIMEOUT不活動,其會自動被終止

二:newFixedThreadPool
(1)newFixedThreadPool與cacheThreadPool差不多,也是能reuse就用,但不能隨時建新的線程
(2)其獨特之處胡清賀:任意時間點,最多隻能有固定數目的活動線程存在,此時如果有新的線程要建立,只能放在另外的隊列中等待,直到當前的線程中某個線程終止直接被移出池子
(3)和正殲cacheThreadPool不同,FixedThreadPool沒有IDLE機制(可能也有,但既然文檔沒提,肯定非常長,類似依賴上層的TCP或UDP IDLE機制之類的),所以FixedThreadPool多數針對一些很褲派穩定很固定的正規並發線程,多用於伺服器
(4)從方法的源代碼看,cache池和fixed 池調用的是同一個底層池,只不過參數不同:
fixed池線程數固定,並且是0秒IDLE(無IDLE)
cache池線程數支持0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60秒IDLE

三:ScheledThreadPool
(1)調度型線程池
(2)這個池子里的線程可以按schele依次delay執行,或周期執行
四:SingleThreadExecutor
(1)單例線程,任意時間池中只能有一個線程
(2)用的是和cache池和fixed池相同的底層池,但線程數目是1-1,0秒IDLE(無IDLE)

8. java線程池newCachedThreadPool怎麼使用

你設定了10個線程,就可以跑10個線程,多於10個的時候會啟用默認策略

9. 【Java基礎】線程池的原理是什麼

什麼是線程池?

總歸為:池化技術 ---》資料庫連接池 緩存架構 緩存池 線程池 內存池,連接池,這種思想演變成緩存架構技術---> JDK設計思想有千絲萬縷的聯系

首先我們從最核心的ThreadPoolExecutor類中的方法講起,然後再講述它的實現原理,接著給出了它的使用示例,最後討論了一下如何合理配置線程池的大小。

Java 中的 ThreadPoolExecutor 類

java.uitl.concurrent.ThreadPoolExecutor 類是線程池中最核心的一個類,因此如果要透徹地了解Java 中的線程池,必須先了解這個類。下面我們來看一下 ThreadPoolExecutor 類的具體實現源碼。

在 ThreadPoolExecutor 類中提供了四個構造方法:

10. java 多線程並發請求數據,只要有一條線程獲得數據,則其他線程終止運行,並列印出獲得的數據

這邊我寫了一個例子,兩個線程同時獲取隨機數,當獲取的值為68的時候則停止所有進程。

熱點內容
整個伺服器搭建教程 發布:2025-02-12 11:48:16 瀏覽:579
我的世界伺服器人多的 發布:2025-02-12 11:48:12 瀏覽:347
為實現分頁存儲管理需要哪些硬體支持 發布:2025-02-12 11:46:34 瀏覽:539
編程下載線 發布:2025-02-12 11:41:48 瀏覽:210
json存儲數據 發布:2025-02-12 11:41:39 瀏覽:219
天龍八部腳本免費 發布:2025-02-12 11:30:12 瀏覽:501
卡羅拉的配置一般買哪個好一點 發布:2025-02-12 11:20:03 瀏覽:743
沒有伺服器的IP怎麼連上 發布:2025-02-12 11:19:55 瀏覽:80
編程sqs 發布:2025-02-12 11:09:55 瀏覽:239
electron脫離編譯環境 發布:2025-02-12 11:08:21 瀏覽:69