java線程安全的隊列
㈠ 璇劇▼璁捐¢樼洰錛屽氱嚎紼嬬紪紼嬶細鍖婚櫌闂ㄨ瘖妯℃嫙錛屾兂鐢╦ava瀹炵幇錛屾眰澶х炴寚鐐
鍏稿瀷鐨勭敓浜ц呮秷璐硅呮ā鍨嬨
浜嗚Вj5鐨勫苟鍙戝簱錛岄偅涓騫跺彂搴撲腑鏈夐傚悎緇勪歡瀹炵幇銆
濡傛灉涓嶄簡瑙o紝榪欎箞鏉ワ細
鍒涘緩涓涓闃熷垪錛屾ら槦鍒楄佹眰綰跨▼瀹夊叏錛屽傛灉闃熷垪涓虹┖鍒欐秷璐硅呴樆濉炪傚傛灉闃熷垪杈懼埌鏌愪釜鏈澶у礆紝鍒欓樆濉炵敓浜ц呫
闃熷垪鐢錛屾櫘閫氱殑list鎴栧疄鐜板ソ鐨勯槦鍒楀寘瑁呮垚綰跨▼瀹夊叏鐨勩
鐢╯ynchronized鍚屾ュ師鏂規硶鎴栦唬鐮佸潡銆
鍐欎竴涓鎴杗涓綰跨▼錛屾ā鎷熺棶浜猴紝鎺掗槦鍔炵悊涓氬姟錛屽線涓婇潰鐨勯槦鍒椾腑娣誨姞鏁版嵁銆
褰撹揪鍒伴槦鍒楃殑鏈澶у圭Н錛岄樆濉烇紝絳夊緟鐢熶駭鑰呯嚎紼嬪彇鏁版嵁銆
闃誨烇細makerLock.wait();//鉶氭嫙鏈轟細鍑鴻╃嚎紼嬫寕璧鳳紝鍏跺疄灝辨槸鎿嶄綔緋葷粺錛屼繚瀛樺綋鍓嶇嚎紼嬪湪cpu涓婄殑榪愯岀姸鎬併傚啀鍑鴻╃嚎紼嬫e湪浣跨敤鐨刢pu璧勬簮錛屽崰鐢ㄧ殑鍐呭瓨涓嶄細閲婃斁銆
寰闃熷垪鎻掑叆鏁版嵁鐨勬椂鍊欙紝鍥犱負涓嶇煡閬撴槸鍚︽湁娑堣垂鑰呭勪簬絳夊緟鐘舵侊紝閫氱煡娑堣垂鑰咃細
customerLock.notifyAll();//鉶氭嫙鏈鴻皟搴︽秷璐硅呯嚎紼嬭繍琛岋紝瀹為檯涓婃槸鎿嶄綔緋葷粺錛屾妸淇濆瓨鐨勬秷璐硅呯嚎紼嬬姸鎬侊紝浠庢柊鍔犺澆鍒癱pu涓鎺ョ潃榪愯屻傛帴鐫榪愯岀嚎紼嬫槸浠繪剰鐨勶紝鍙栧喅浜庝笉鍚屾搷浣滅郴緇熺殑綰跨▼璋冨害綆楁硶銆
娑堣垂鑰呯嚎紼嬭誨彇涓涓鏁版嵁鍚庯紝瑕侀氱煡鐢熶駭鑰咃紝鍙浠ョ戶緇錛岄亾鐞嗗悓涓婏細
makerLock.notifyAll();
闃熷垪涓錛屾棤鏁版嵁鍙璇葷殑鏃跺欙細
customerLock.wait();//鍘熺悊鍚屼笂錛
鏈鍚庢敞鎰忥紝鐢熶駭鑰呰窡娑堣垂鑰呬嬌鐢ㄤ簡涓や釜涓嶅悓鐨勫硅薄閿併俵ock.wait()鐨勪嬌鐢ㄦ柟娉曟槸榪欐牱鐨勶細
synchronized(lock){
......
while(condition==true){
lock.wait();
}
......
Objecto=queen.pop();
lock.notifyAll();
}
鏈鍚庡惎鍔╪涓綰跨▼璇婚槦鍒楋紝妯℃嫙鍔炵悊涓氬姟鐨勭獥鍙o紱n涓綰跨▼鍐欓槦鍒楋紝妯℃嫙鐥呬漢鎺掗槦銆
鏂扮嚎紼嬪簱涔熸湁璺熻佺嚎紼嬪簱瀵瑰簲鐨勬柟娉曪紝鏂扮嚎紼嬪簱鏈夌嚎紼嬪畨鍏ㄧ殑楂樻晥闃熷垪銆傛病鏈変笂闈㈤夯鐑︼紝浣嗕笂闈㈠啓鐨勬槸鐞嗚В鏂扮嚎紼嬫暟鎹緇撴瀯涓庡疄鐜扮殑鍩虹銆
java">packagecom.;
importjava.util.LinkedList;
importjava.util.List;
importjava.util.Random;
publicclassTestThread2{
//緙撳啿涓婇檺
privatelongbufsize;
//緙撳啿
privateList<String>buf;
publicTestThread2(){
bufsize=5;
buf=newLinkedList<String>();
}
//鐢熶駭鑰呰皟鐢
publicvoidput(Strings){
//妯℃嫙鐢熶駭鑰呰窡涓嶄笂娑堣垂鑰
/*
try{
Thread.sleep(100);
}catch(InterruptedExceptione){
}
*/
synchronized(this){
//瓚呰繃闃熷垪闄愬埗灝辯瓑寰
while(buf.size()==bufsize){
System.out.println("闃熷垪宸叉弧錛岀敓浜ц:"+Thread.currentThread().getId()+"寮濮嬬瓑寰呫");
try{
this.wait();
}catch(InterruptedExceptione){
}
}
buf.add(s);
//閫氱煡娑堣垂鑰
this.notifyAll();
}
}
//娑堣垂鑰呰皟鐢
synchronizedpublicStringtake(){
//妯℃嫙娑堣垂鑰呰窡涓嶄笂鐢熶駭鑰
try{
Thread.sleep(100);
}catch(InterruptedExceptione){
}
Strings=null;
synchronized(this){
while(buf.size()==0){
System.out.println("闃熷垪涓虹┖錛屾秷璐硅:"+Thread.currentThread().getId()+"寮濮嬬瓑寰呫");
try{
this.wait();
}catch(InterruptedExceptione){
}
}
//鍙栧厛鏀懼叆鐨勫厓緔狅紝騫剁Щ闄
s=buf.get(0);
buf.remove(0);
//閫氱煡鐢熶駭鑰
this.notifyAll();
}
returns;
}
publicstaticvoidmain(String[]args){
//鑷宸卞疄鐜扮殑錛屽畨鍏ㄩ槦鍒
finalTestThread2tt=newTestThread2();
//鐢熶駭鑰
Threadp=newThread(newRunnable(){
@Override
publicvoidrun(){
while(!Thread.currentThread().isInterrupted()){
Randomr=newRandom();
tt.put(String.valueOf(r.nextInt(10)));
}
}
});
//娑堣垂鑰
Threadc1=newThread(newRunnable(){
@Override
publicvoidrun(){
while(!Thread.currentThread().isInterrupted()){
System.out.println("綰跨▼錛"+Thread.currentThread().getId()+"鑾峰彇鍒版暟鎹"+tt.take());
}
}
});
Threadc2=newThread(newRunnable(){
@Override
publicvoidrun(){
while(!Thread.currentThread().isInterrupted()){
System.out.println("綰跨▼錛"+Thread.currentThread().getId()+"鑾峰彇鍒版暟鎹"+tt.take());
}
}
});
p.start();
c1.start();
c2.start();
try{
p.join();
c1.join();
c2.join();
}catch(InterruptedExceptione){
}
}
}
㈡ java 線程池 工作隊列是如何工作的
使用線程池的好處
1、降低資源消耗
可以重復利用已創建的線程降低線程創建和銷毀造成的消耗。
2、提高響應速度
當任務到達時,任務可以不需要等到線程創建就能立即執行。
3、提高線程的可管理性
線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一分配、調優和監控
線程池的工作原理
首先我們看下當一個新的任務提交到線程池之後,線程池是如何處理的
1、線程池判斷核心線程池裡的線程是否都在執行任務。如果不是,則創建一個新的工作線程來執行任務。如果核心線程池裡的線程都在執行任務,則執行第二步。
2、線程池判斷工作隊列是否已經滿。如果工作隊列沒有滿,則將新提交的任務存儲在這個工作隊列里進行等待。如果工作隊列滿了,則執行第三步
3、線程池判斷線程池的線程是否都處於工作狀態。如果沒有,則創建一個新的工作線程來執行任務。如果已經滿了,則交給飽和策略來處理這個任務
線程池飽和策略
這里提到了線程池的飽和策略,那我們就簡單介紹下有哪些飽和策略:
AbortPolicy
為Java線程池默認的阻塞策略,不執行此任務,而且直接拋出一個運行時異常,切記ThreadPoolExecutor.execute需要try catch,否則程序會直接退出。
DiscardPolicy
直接拋棄,任務不執行,空方法
DiscardOldestPolicy
從隊列裡面拋棄head的一個任務,並再次execute 此task。
CallerRunsPolicy
在調用execute的線程裡面執行此command,會阻塞入口
用戶自定義拒絕策略(最常用)
實現RejectedExecutionHandler,並自己定義策略模式
下我們以ThreadPoolExecutor為例展示下線程池的工作流程圖
3.jpg
關鍵方法源碼分析
我們看看核心方法添加到線程池方法execute的源碼如下:
// //Executes the given task sometime in the future. The task //may execute in a new thread or in an existing pooled thread. // // If the task cannot be submitted for execution, either because this // executor has been shutdown or because its capacity has been reached, // the task is handled by the current {@code RejectedExecutionHandler}. // // @param command the task to execute // @throws RejectedExecutionException at discretion of // {@code RejectedExecutionHandler}, if the task // cannot be accepted for execution // @throws NullPointerException if {@code command} is null // public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // // Proceed in 3 steps: // // 1. If fewer than corePoolSize threads are running, try to // start a new thread with the given command as its first // task. The call to addWorker atomically checks runState and // workerCount, and so prevents false alarms that would add // threads when it shouldn't, by returning false. // 翻譯如下: // 判斷當前的線程數是否小於corePoolSize如果是,使用入參任務通過addWord方法創建一個新的線程, // 如果能完成新線程創建exexute方法結束,成功提交任務 // 2. If a task can be successfully queued, then we still need // to double-check whether we should have added a thread // (because existing ones died since last checking) or that // the pool shut down since entry into this method. So we // recheck state and if necessary roll back the enqueuing if // stopped, or start a new thread if there are none. // 翻譯如下: // 在第一步沒有完成任務提交;狀態為運行並且能否成功加入任務到工作隊列後,再進行一次check,如果狀態 // 在任務加入隊列後變為了非運行(有可能是在執行到這里線程池shutdown了),非運行狀態下當然是需要 // reject;然後再判斷當前線程數是否為0(有可能這個時候線程數變為了0),如是,新增一個線程; // 3. If we cannot queue task, then we try to add a new // thread. If it fails, we know we are shut down or saturated // and so reject the task. // 翻譯如下: // 如果不能加入任務到工作隊列,將嘗試使用任務新增一個線程,如果失敗,則是線程池已經shutdown或者線程池 // 已經達到飽和狀態,所以reject這個他任務 // int c = ctl.get(); // 工作線程數小於核心線程數 if (workerCountOf(c) < corePoolSize) { // 直接啟動新線程,true表示會再次檢查workerCount是否小於corePoolSize if (addWorker(command, true)) return; c = ctl.get(); } // 如果工作線程數大於等於核心線程數 // 線程的的狀態未RUNNING並且隊列notfull if (isRunning(c) && workQueue.offer(command)) { // 再次檢查線程的運行狀態,如果不是RUNNING直接從隊列中移除 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 移除成功,拒絕該非運行的任務 reject(command); else if (workerCountOf(recheck) == 0) // 防止了SHUTDOWN狀態下沒有活動線程了,但是隊列里還有任務沒執行這種特殊情況。 // 添加一個null任務是因為SHUTDOWN狀態下,線程池不再接受新任務 addWorker(null, false); } // 如果隊列滿了或者是非運行的任務都拒絕執行 else if (!addWorker(command, false)) reject(command); }
㈢ java 什麼情況下使用 並發隊列
並發隊列是一個基於鏈接節點的無界線程安全隊列,它採用先進先出的規則對節點進行排序,當我們添加一個元素的時候,它會添加到隊列的尾部,當我們獲取一個元素時,它會返回隊列頭部的元素。它採用了「wait-free」演算法來實現,該演算法在Michael
& Scott演算法上進行了一些修改。
入隊列
入隊列就是將入隊節點添加到隊列的尾部。為了方便理解入隊時隊列的變化,以及head節點和tair節點的變化,每添加一個節點我就做了一個隊列的快照圖。
publicEpoll(){
Node</e><e>h=head;
//p表示頭節點,需要出隊的節點
Node</e><e>p=h;
for(inthops=0;;hops++){
//獲取p節點的元素
Eitem=p.getItem();
//如果p節點的元素不為空,使用CAS設置p節點引用的元素為null,如果成功則返回p節點的元素。
if(item!=null&&p.casItem(item,null)){
if(hops>=HOPS){
//將p節點下一個節點設置成head節點
Node</e><e>q=p.getNext();
updateHead(h,(q!=null)?q:p);
}
returnitem;
}
//如果頭節點的元素為空或頭節點發生了變化,這說明頭節點已經被另外一個線程修改了。那麼獲取p節點的下一個節點
Node</e><e>next=succ(p);
//如果p的下一個節點也為空,說明這個隊列已經空了
if(next==null){
//更新頭節點。
updateHead(h,p);
break;
}
//如果下一個元素不為空,則將頭節點的下一個節點設置成頭節點
p=next;
}
returnnull;
}
首先獲取頭節點的元素,然後判斷頭節點元素是否為空,如果為空,表示另外一個線程已經進行了一次出隊操作將該節點的元素取走,如果不為空,則使用CAS的方式將頭節點的引用設置成null,如果CAS成功,則直接返回頭節點的元素,如果不成功,表示另外一個線程已經進行了一次出隊操作更新了head節點,導致元素發生了變化,需要重新獲取頭節點。
㈣ JAVA中哪個能同時滿足 先進先出(增減操作非常頻繁)和同步安全的容器集合並在性能上不至於太差
public class ConcurrentLinkedQueue<E>
extends AbstractQueue<E>
implements Queue<E>, Serializable
一個基於鏈接節點的無界線程安全隊列。此隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列獲取操作從隊列頭部獲得元素。當多個線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當的選擇。此隊列不允許使用 null 元素。
此實現採用了有效的「無等待 (wait-free)」演算法,該演算法基於 Maged M. Michael 和 Michael L. Scott 合著的 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms 中描述的演算法。
需要小心的是,與大多數 collection 不同,size 方法不是 一個固定時間操作。由於這些隊列的非同步特性,確定當前元素的數量需要遍歷這些元素。
此類及其迭代器實現了 Collection 和 Iterator 介面的所有可選 方法。
內存一致性效果:當存在其他並發 collection 時,將對象放入 ConcurrentLinkedQueue 之前的線程中的操作 happen-before 隨後通過另一線程從 ConcurrentLinkedQueue 訪問或移除該元素的操作。
注意:ConcurrentLinkedQueue的size()是要遍歷一遍集合的!因此,若不能滿足你,可以基於 LinkedList(先進先出),自己加上同步,要性能控制住,需要盡可能小力度加同步 。