java线程安全的队列
㈠ 璇剧▼璁捐¢樼洰锛屽氱嚎绋嬬紪绋嬶细鍖婚櫌闂ㄨ瘖妯℃嫙锛屾兂鐢╦ava瀹炵幇锛屾眰澶х炴寚镣
鍏稿瀷镄勭敓浜ц呮秷璐硅呮ā鍨嬨
浜呜Вj5镄勫苟鍙戝簱锛岄偅涓骞跺彂搴扑腑链夐傚悎缁勪欢瀹炵幇銆
濡傛灉涓崭简瑙o纴杩欎箞𨱒ワ细
鍒涘缓涓涓阒熷垪锛屾ら槦鍒楄佹眰绾跨▼瀹夊叏锛屽傛灉阒熷垪涓虹┖鍒欐秷璐硅呴樆濉炪傚傛灉阒熷垪杈惧埌镆愪釜链澶у硷纴鍒欓樆濉炵敓浜ц呫
阒熷垪鐢锛屾櫘阃氱殑list鎴栧疄鐜板ソ镄勯槦鍒楀寘瑁呮垚绾跨▼瀹夊叏镄勚
鐢╯ynchronized钖屾ュ师鏂规硶鎴栦唬镰佸潡銆
鍐欎竴涓鎴杗涓绾跨▼锛屾ā𨰾熺梾浜猴纴鎺挜槦锷炵悊涓氩姟锛屽线涓婇溃镄勯槦鍒椾腑娣诲姞鏁版嵁銆
褰撹揪鍒伴槦鍒楃殑链澶у圭Н锛岄樆濉烇纴绛夊緟鐢熶骇钥呯嚎绋嫔彇鏁版嵁銆
阒诲烇细makerLock.wait();//铏氭嫙链轰细鍑鸿╃嚎绋嬫寕璧凤纴鍏跺疄灏辨槸镎崭綔绯荤粺锛屼缭瀛桦綋鍓岖嚎绋嫔湪cpu涓婄殑杩愯岀姸镐併傚啀鍑鸿╃嚎绋嬫e湪浣跨敤镄刢pu璧勬簮锛屽崰鐢ㄧ殑鍐呭瓨涓崭细閲婃斁銆
寰阒熷垪鎻掑叆鏁版嵁镄勬椂鍊欙纴锲犱负涓岖煡阆撴槸钖︽湁娑堣垂钥呭勪簬绛夊緟鐘舵侊纴阃氱煡娑堣垂钥咃细
customerLock.notifyAll();//铏氭嫙链鸿皟搴︽秷璐硅呯嚎绋嬭繍琛岋纴瀹为檯涓婃槸镎崭綔绯荤粺锛屾妸淇濆瓨镄勬秷璐硅呯嚎绋嬬姸镐侊纴浠庢柊锷犺浇鍒瘫pu涓鎺ョ潃杩愯屻傛帴镌杩愯岀嚎绋嬫槸浠绘剰镄勶纴鍙栧喅浜庝笉钖屾搷浣灭郴缁熺殑绾跨▼璋冨害绠楁硶銆
娑堣垂钥呯嚎绋嬭诲彇涓涓鏁版嵁钖庯纴瑕侀氱煡鐢熶骇钥咃纴鍙浠ョ户缁锛岄亾鐞嗗悓涓婏细
makerLock.notifyAll();
阒熷垪涓锛屾棤鏁版嵁鍙璇荤殑镞跺欙细
customerLock.wait();//铡熺悊钖屼笂锛
链钖庢敞镒忥纴鐢熶骇钥呰窡娑堣垂钥呬娇鐢ㄤ简涓や釜涓嶅悓镄勫硅薄阌併俵ock.wait()镄勪娇鐢ㄦ柟娉曟槸杩欐牱镄勶细
synchronized(lock){
......
while(condition==true){
lock.wait();
}
......
Objecto=queen.pop();
lock.notifyAll();
}
链钖庡惎锷╪涓绾跨▼璇婚槦鍒楋纴妯℃嫙锷炵悊涓氩姟镄勭獥鍙o绂n涓绾跨▼鍐欓槦鍒楋纴妯℃嫙䦅呬汉鎺挜槦銆
鏂扮嚎绋嫔簱涔熸湁璺熻佺嚎绋嫔簱瀵瑰簲镄勬柟娉曪纴鏂扮嚎绋嫔簱链夌嚎绋嫔畨鍏ㄧ殑楂樻晥阒熷垪銆傛病链変笂闱㈤夯鐑︼纴浣嗕笂闱㈠啓镄勬槸鐞呜В鏂扮嚎绋嬫暟鎹缁撴瀯涓庡疄鐜扮殑锘虹銆
java">packagecom.;
importjava.util.LinkedList;
importjava.util.List;
importjava.util.Random;
publicclassTestThread2{
//缂揿啿涓婇檺
privatelongbufsize;
//缂揿啿
privateList<String>buf;
publicTestThread2(){
bufsize=5;
buf=newLinkedList<String>();
}
//鐢熶骇钥呰皟鐢
publicvoidput(Strings){
//妯℃嫙鐢熶骇钥呰窡涓崭笂娑堣垂钥
/*
try{
Thread.sleep(100);
}catch(InterruptedExceptione){
}
*/
synchronized(this){
//瓒呰繃阒熷垪闄愬埗灏辩瓑寰
while(buf.size()==bufsize){
System.out.println("阒熷垪宸叉弧锛岀敓浜ц:"+Thread.currentThread().getId()+"寮濮嬬瓑寰呫");
try{
this.wait();
}catch(InterruptedExceptione){
}
}
buf.add(s);
//阃氱煡娑堣垂钥
this.notifyAll();
}
}
//娑堣垂钥呰皟鐢
synchronizedpublicStringtake(){
//妯℃嫙娑堣垂钥呰窡涓崭笂鐢熶骇钥
try{
Thread.sleep(100);
}catch(InterruptedExceptione){
}
Strings=null;
synchronized(this){
while(buf.size()==0){
System.out.println("阒熷垪涓虹┖锛屾秷璐硅:"+Thread.currentThread().getId()+"寮濮嬬瓑寰呫");
try{
this.wait();
}catch(InterruptedExceptione){
}
}
//鍙栧厛鏀惧叆镄勫厓绱狅纴骞剁Щ闄
s=buf.get(0);
buf.remove(0);
//阃氱煡鐢熶骇钥
this.notifyAll();
}
returns;
}
publicstaticvoidmain(String[]args){
//镊宸卞疄鐜扮殑锛屽畨鍏ㄩ槦鍒
finalTestThread2tt=newTestThread2();
//鐢熶骇钥
Threadp=newThread(newRunnable(){
@Override
publicvoidrun(){
while(!Thread.currentThread().isInterrupted()){
Randomr=newRandom();
tt.put(String.valueOf(r.nextInt(10)));
}
}
});
//娑堣垂钥
Threadc1=newThread(newRunnable(){
@Override
publicvoidrun(){
while(!Thread.currentThread().isInterrupted()){
System.out.println("绾跨▼锛"+Thread.currentThread().getId()+"銮峰彇鍒版暟鎹"+tt.take());
}
}
});
Threadc2=newThread(newRunnable(){
@Override
publicvoidrun(){
while(!Thread.currentThread().isInterrupted()){
System.out.println("绾跨▼锛"+Thread.currentThread().getId()+"銮峰彇鍒版暟鎹"+tt.take());
}
}
});
p.start();
c1.start();
c2.start();
try{
p.join();
c1.join();
c2.join();
}catch(InterruptedExceptione){
}
}
}
㈡ java 线程池 工作队列是如何工作的
使用线程池的好处
1、降低资源消耗
可以重复利用已创建的线程降低线程创建和销毁造成的消耗。
2、提高响应速度
当任务到达时,任务可以不需要等到线程创建就能立即执行。
3、提高线程的可管理性
线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控
线程池的工作原理
首先我们看下当一个新的任务提交到线程池之后,线程池是如何处理的
1、线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。
2、线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步
3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务
线程池饱和策略
这里提到了线程池的饱和策略,那我们就简单介绍下有哪些饱和策略:
AbortPolicy
为Java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。
DiscardPolicy
直接抛弃,任务不执行,空方法
DiscardOldestPolicy
从队列里面抛弃head的一个任务,并再次execute 此task。
CallerRunsPolicy
在调用execute的线程里面执行此command,会阻塞入口
用户自定义拒绝策略(最常用)
实现RejectedExecutionHandler,并自己定义策略模式
下我们以ThreadPoolExecutor为例展示下线程池的工作流程图
3.jpg
关键方法源码分析
我们看看核心方法添加到线程池方法execute的源码如下:
// //Executes the given task sometime in the future. The task //may execute in a new thread or in an existing pooled thread. // // If the task cannot be submitted for execution, either because this // executor has been shutdown or because its capacity has been reached, // the task is handled by the current {@code RejectedExecutionHandler}. // // @param command the task to execute // @throws RejectedExecutionException at discretion of // {@code RejectedExecutionHandler}, if the task // cannot be accepted for execution // @throws NullPointerException if {@code command} is null // public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // // Proceed in 3 steps: // // 1. If fewer than corePoolSize threads are running, try to // start a new thread with the given command as its first // task. The call to addWorker atomically checks runState and // workerCount, and so prevents false alarms that would add // threads when it shouldn't, by returning false. // 翻译如下: // 判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程, // 如果能完成新线程创建exexute方法结束,成功提交任务 // 2. If a task can be successfully queued, then we still need // to double-check whether we should have added a thread // (because existing ones died since last checking) or that // the pool shut down since entry into this method. So we // recheck state and if necessary roll back the enqueuing if // stopped, or start a new thread if there are none. // 翻译如下: // 在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check,如果状态 // 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要 // reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程; // 3. If we cannot queue task, then we try to add a new // thread. If it fails, we know we are shut down or saturated // and so reject the task. // 翻译如下: // 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池 // 已经达到饱和状态,所以reject这个他任务 // int c = ctl.get(); // 工作线程数小于核心线程数 if (workerCountOf(c) < corePoolSize) { // 直接启动新线程,true表示会再次检查workerCount是否小于corePoolSize if (addWorker(command, true)) return; c = ctl.get(); } // 如果工作线程数大于等于核心线程数 // 线程的的状态未RUNNING并且队列notfull if (isRunning(c) && workQueue.offer(command)) { // 再次检查线程的运行状态,如果不是RUNNING直接从队列中移除 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 移除成功,拒绝该非运行的任务 reject(command); else if (workerCountOf(recheck) == 0) // 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。 // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务 addWorker(null, false); } // 如果队列满了或者是非运行的任务都拒绝执行 else if (!addWorker(command, false)) reject(command); }
㈢ java 什么情况下使用 并发队列
并发队列是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现,该算法在Michael
& Scott算法上进行了一些修改。
入队列
入队列就是将入队节点添加到队列的尾部。为了方便理解入队时队列的变化,以及head节点和tair节点的变化,每添加一个节点我就做了一个队列的快照图。
publicEpoll(){
Node</e><e>h=head;
//p表示头节点,需要出队的节点
Node</e><e>p=h;
for(inthops=0;;hops++){
//获取p节点的元素
Eitem=p.getItem();
//如果p节点的元素不为空,使用CAS设置p节点引用的元素为null,如果成功则返回p节点的元素。
if(item!=null&&p.casItem(item,null)){
if(hops>=HOPS){
//将p节点下一个节点设置成head节点
Node</e><e>q=p.getNext();
updateHead(h,(q!=null)?q:p);
}
returnitem;
}
//如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了。那么获取p节点的下一个节点
Node</e><e>next=succ(p);
//如果p的下一个节点也为空,说明这个队列已经空了
if(next==null){
//更新头节点。
updateHead(h,p);
break;
}
//如果下一个元素不为空,则将头节点的下一个节点设置成头节点
p=next;
}
returnnull;
}
首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。
㈣ JAVA中哪个能同时满足 先进先出(增减操作非常频繁)和同步安全的容器集合并在性能上不至于太差
public class ConcurrentLinkedQueue<E>
extends AbstractQueue<E>
implements Queue<E>, Serializable
一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素。
此实现采用了有效的“无等待 (wait-free)”算法,该算法基于 Maged M. Michael 和 Michael L. Scott 合着的 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms 中描述的算法。
需要小心的是,与大多数 collection 不同,size 方法不是 一个固定时间操作。由于这些队列的异步特性,确定当前元素的数量需要遍历这些元素。
此类及其迭代器实现了 Collection 和 Iterator 接口的所有可选 方法。
内存一致性效果:当存在其他并发 collection 时,将对象放入 ConcurrentLinkedQueue 之前的线程中的操作 happen-before 随后通过另一线程从 ConcurrentLinkedQueue 访问或移除该元素的操作。
注意:ConcurrentLinkedQueue的size()是要遍历一遍集合的!因此,若不能满足你,可以基于 LinkedList(先进先出),自己加上同步,要性能控制住,需要尽可能小力度加同步 。