生产者与消费者java
⑴ java实现生产者和消费者问题的几种方式
生产者消费者问题是多线程的一个经典问题,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。
解决生产者/消费者问题的方法可分为两类:
采用某种机制保护生产者和消费者之间的同步;
在生产者和消费者之间建立一个管道。
第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。
在Java中有四种方法支持同步,其中前三个是同步方法,一个是管道方法。
wait()
/
notify()方法
await()
/
signal()方法
BlockingQueue阻塞队列方法
PipedInputStream
/
PipedOutputStream
通过
wait()
/
notify()方法实现:
wait()
/
nofity()方法是基类Object的两个方法:
wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等等状态,让其他线程执行。
notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
通过await()
/
signal()方法实现:
await()和signal()的功能基本上和wait()
/
nofity()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。
通过BlockingQueue方法实现:
它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await()
/
signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法:
put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。
⑵ JAVA模拟生产者与消费者实例
使用的生产者和消费者模型具有如下特点:
(1)本实验的多个缓冲区不是环形循环的,也不要求按顺序访问。生产者可以把产品放到目前某一个空缓冲区中。
(2)消费者只消费指定生产者的产品。
(3)在测试用例文件中指定了所有的生产和消费的需求,只有当共享缓冲区的数据满足了所有关于它的消费需求后,此共享缓冲区才可以作为空闲空间允许新的生产者使用。
(4)本实验在为生产者分配缓冲区时各生产者间必须互斥,此后各个生产者的具体生产活动可以并发。而消费者之间只有在对同一产品进行消费时才需要互斥,同时它们在消费过程结束时需要判断该消费对象是否已经消费完毕并清除该产品。
Windows
用来实现同步和互斥的实体。在Windows
中,常见的同步对象有:信号量(Semaphore)、
互斥量(Mutex)、临界段(CriticalSection)和事件(Event)等。本程序中用到了前三个。使用这些对象都分
为三个步骤,一是创建或者初始化:接着请求该同步对象,随即进入临界区,这一步对应于互斥量的
上锁;最后释放该同步对象,这对应于互斥量的解锁。这些同步对象在一个线程中创建,在其他线程
中都可以使用,从而实现同步互斥。当然,在进程间使用这些同步对象实现同步的方法是类似的。
1.用锁操作原语实现互斥
为解决进程互斥进人临界区的问题,可为每类临界区设置一把锁,该锁有打开和关闭两种状态,进程执行临界区程序的操作按下列步骤进行:
①关锁。先检查锁的状态,如为关闭状态,则等待其打开;如已打开了,则将其关闭,继续执行步骤②的操作。
②执行临界区程序。
③开锁。将锁打开,退出临界区。
2.信号量及WAIT,SIGNAL操作原语
信号量的初值可以由系统根据资源情况和使用需要来确定。在初始条件下信号量的指针项可以置为0,表示队列为空。信号量在使用过程中它的值是可变的,但只能由WAIT,SIGNAL操作来改变。设信号量为S,对S的WAIT操作记为WAIT(S),对它的SIGNAL操作记为SIGNAL(S)。
WAIT(S):顺序执行以下两个动作:
①信号量的值减1,即S=S-1;
②如果S≥0,则该进程继续执行;
如果
S(0,则把该进程的状态置为阻塞态,把相应的WAITCB连人该信号量队列的末尾,并放弃处理机,进行等待(直至其它进程在S上执行SIGNAL操作,把它释放出来为止)。
SIGNAL(S):顺序执行以下两个动作
①S值加
1,即
S=S+1;
②如果S)0,则该进程继续运行;
如果S(0则释放信号量队列上的第一个PCB(既信号量指针项所指向的PCB)所对应的进程(把阻塞态改为就绪态),执行SIGNAL操作的进程继续运行。
在具体实现时注意,WAIT,SIGNAL操作都应作为一个整体实施,不允许分割或相互穿插执行。也就是说,WAIT,SIGNAL操作各自都好像对应一条指令,需要不间断地做下去,否则会造成混乱。
从物理概念上讲,信号量S)时,S值表示可用资源的数量。执行一次WAIT操作意味着请求分配一个单位资源,因此S值减1;当S<0时,表示已无可用资源,请求者必须等待别的进程释放了该类资源,它才能运行下去。所以它要排队。而执行一次SIGNAL操作意味着释放一个单位资源,因此S值加1;若S(0时,表示有某些进程正在等待该资源,因而要把队列头上的进程唤醒,释放资源的进程总是可以运行下去的。
---------------
/**
*
生产者
*
*/
public
class
Procer
implements
Runnable{
private
Semaphore
mutex,full,empty;
private
Buffer
buf;
String
name;
public
Procer(String
name,Semaphore
mutex,Semaphore
full,Semaphore
empty,Buffer
buf){
this.mutex
=
mutex;
this.full
=
full;
this.empty
=
empty;
this.buf
=
buf;
this.name
=
name;
}
public
void
run(){
while(true){
empty.p();
mutex.p();
System.out.println(name+"
inserts
a
new
proct
into
"+buf.nextEmptyIndex);
buf.nextEmptyIndex
=
(buf.nextEmptyIndex+1)%buf.size;
mutex.v();
full.v();
try
{
Thread.sleep(1000);
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
}
---------------
/**
*
消费者
*
*/
public
class
Customer
implements
Runnable{
private
Semaphore
mutex,full,empty;
private
Buffer
buf;
String
name;
public
Customer(String
name,Semaphore
mutex,Semaphore
full,Semaphore
empty,Buffer
buf){
this.mutex
=
mutex;
this.full
=
full;
this.empty
=
empty;
this.buf
=
buf;
this.name
=
name;
}
public
void
run(){
while(true){
full.p();
mutex.p();
System.out.println(name+"
gets
a
proct
from
"+buf.nextFullIndex);
buf.nextFullIndex
=
(buf.nextFullIndex+1)%buf.size;
mutex.v();
empty.v();
try
{
Thread.sleep(1000);
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
}
-------------------------
/**
*
缓冲区
*
*/
public
class
Buffer{
public
Buffer(int
size,int
nextEmpty,int
nextFull){
this.nextEmptyIndex
=
nextEmpty;
this.nextFullIndex
=
nextFull;
this.size
=
size;
}
public
int
size;
public
int
nextEmptyIndex;
public
int
nextFullIndex;
}
-----------------
/**
*
此类用来模拟信号量
*
*/
public
class
Semaphore{
private
int
semValue;
public
Semaphore(int
semValue){
this.semValue
=
semValue;
}
public
synchronized
void
p(){
semValue--;
if(semValue<0){
try
{
this.wait();
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
public
synchronized
void
v(){
semValue++;
if(semValue<=0){
this.notify();
}
}
}
------------------------
public
class
Test
extends
Thread
{
public
static
void
main(String[]
args)
{
Buffer
bf=new
Buffer(10,0,0);
Semaphore
mutex=new
Semaphore(1);
Semaphore
full=new
Semaphore(0);
Semaphore
empty=new
Semaphore(10);
//new
Thread(new
Procer("p001",mutex,full,empty,bf)).start();
Procer
p=new
Procer("p001",mutex,full,empty,bf);
new
Thread(new
Procer("p002",mutex,full,empty,bf)).start();
new
Thread(new
Procer("p003",mutex,full,empty,bf)).start();
new
Thread(new
Procer("p004",mutex,full,empty,bf)).start();
new
Thread(new
Procer("p005",mutex,full,empty,bf)).start();
try{
sleep(3000);
}
catch(Exception
ex)
{
ex.printStackTrace();
}
new
Thread(new
Customer("c001",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c002",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c003",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c004",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c005",mutex,full,empty,bf)).start();
}
}
--------------------------------------------
⑶ java中生产者和消费者问题
好不好,要看情况,单这样,提不提都问题不大。
类的架构,由设计者决定。
this 是指当前类的当前实例
⑷ JAVA多生产者多消费者问题。希望用wait()和notify().谢谢!
publicclassProceConsumerDemo{
publicstaticvoidmain(String[]args){
//1.创建资源
Resourceresource=newResource();
//2.创建两个任务
Procerprocer=newProcer(resource);
Consumerconsumer=newConsumer(resource);
//3.创建线程
/*
*多生产多消费产生的问题:重复生产、重复消费
*/
Threadthread0=newThread(procer);
Threadthread1=newThread(procer);
thread0.setName("生产者(NO0)");
thread1.setName("生产者(NO1)");
Threadthread2=newThread(consumer);
Threadthread3=newThread(consumer);
thread2.setName("消费者(NO2)");
thread3.setName("消费者(NO3)");
thread0.start();
thread1.start();
thread2.start();
thread3.start();
}
}
classResource{
privateStringname;
privateintcount=1;
//定义标记
privatebooleanflag;
//提供给商品赋值的方法
publicsynchronizedvoidsetName(Stringname){//thread0,thread1在这里运行
while(flag)//判断标记为true,执行wait等待,为false则生产
/*
*这里使用while,而不使用if的理由如下:
*
*thread0有可能第二次也抢到锁的执行权,判断为真,则有面包不生产,所以接下来执行等待,此时thread0在线程池中。
*接下来活的线程有3个(除了thread0),这三个线程都有可能获取到执行权.
*假设thread1获得了执行权,判断为真,则有面包不生产,执行等待。此时thread1又进入到了线程池中。
*接下来有两个活的线程thread2和thread3。假设thread2又抢到了执行权,所以程序转到了消费get处……
*/
try{
this.wait();//这里wait语句必须包含在try/catch块中,抛出异常。
}catch(InterruptedExceptione){
e.printStackTrace();
}
this.name=name+count;//第一个面包
count++;//2
System.out.println(Thread.currentThread().getName()+this.name);//thread0线程生产了面包1
//生产完毕,将标记改成true.
flag=true;//thread0第一次生产完面包以后,将标记改为真,表示有面包了
//唤醒消费者(这里使用notifyAll而不使用notify的原因在下面)
this.notifyAll();//第一次在这里是空唤醒,没有意义
}
/*
*通过同步,解决了没生产就消费的问题
*生产完以后,生产者释放了this锁,此时,生产者和消费者同时去抢锁,又是生产者抢到了锁,所以就出现了一直生产的情况。
*与“生产一个就消费一个的需求不符合”等待唤醒机制wait();该方法可以使线程处于冻结状态,并将线程临时存储到线程池
*notify();唤醒指定线程池中的任意一个线程。notifyAll();唤醒指定线程池中的所有线程
*这些方法必须使用在同步函数中,因为他们用来操作同步锁上的线程上的状态的。
*在使用这些方法时候,必须标识他们所属于的锁,标识方式就是锁对象.wait();锁对象.notify();锁对象.notifyAll();
*相同锁的notify()可以获取相同锁的wait();
*/
publicsynchronizedvoidgetName(){//thread2,thread3在这里运行
while(!flag)
/*
*……接着上面的程序执行分析thread2拿到锁获取执行权之后,判断!flag为假,则不等待,直接消费面包1,输出一次.
*消费完成之后将flag改为假接下来又唤醒了thread0或者thread1生产者中的一个
*假设又唤醒了thread0线程,现在活的线程有thread0,thread2,thread3三个线程
*假设接下来thread2又抢到了执行权,判断!flag为真,没面包了,停止消费,所以thread2执行等待.
*此时活着的线程有thread0和thread3。
*假设thread3得到了执行权,拿到锁之后进来执行等待,此时活着的线程只有thread0.
*所以thread0只能抢到执行权之后,生产面包2,将标记改为true告诉消费者有面包可以消费了。
*接下来执行notify唤醒,此时唤醒休眠中的3个线程中的任何一个都有可能。
*如果唤醒了消费者thread2或者thread3中的任何一个,程序都是正常。如果此时唤醒thread1则不正常。
*如果唤醒了thread1,此时活着的线程有thread0和thread1两个线程。
*假设thread0又获得了执行权,判读为真有面包,则又一次执行等待。
*接下来只有thread1线程有执行权(此时没有判断标记直接生产了,出错了),所以又生产了面包3。在这个过程中,面包2没有被消费。
*这就是连续生产和消费容易出现的问题。
*
*原因:被唤醒的线程没有判断标记就开始执行了,导致了重复的生产和消费发生。
*
*解决:被唤醒的线程必须判断标记,使用while循环标记,而不使用if判断的理由。
*
*但是接下来会出现死锁,原因在于:
*上面的程序中thread0在执行notify的时候唤醒了thread1,而此时thread2和thread3两个消费者线程都处于等待状态
*thread1在执行while判断语句之后判断为真,则执行等待,此时所有的线程都处于冻结等待状态了。
*
*原因:本方线程在执行唤醒的时候又一次唤醒了本方线程,而本方线程循环判断标记又继续等待,而导致所有的线程都等待。
*
*解决:本方线程唤醒对方线程,可以使用notifyAll()方法
* 唤醒之后,既有本方,又有对方,但是本方线程判断标记之后,会继续等待,这样就有对方线程在执行。
*/
try{
this.wait();
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+this.name);
//将标记改为false
flag=false;
//唤醒生产者
this.notify();
}
}
//生产者
classProcerimplementsRunnable{
privateResourceresource;
publicProcer(Resourceresource){
this.resource=resource;
}
publicvoidrun(){
while(true){
resource.setName("面包");
}
}
}
//消费者
{
privateResourceresource;
publicConsumer(Resourceresource){
this.resource=resource;
}
@Override
publicvoidrun(){
while(true){
resource.getName();
}
}
}
⑸ java生产者消费者问题
这是因为你的日志并不同步所导致的。生产确实在消费之前,但是生产日志却在消费之后皮派纯。你生产的push方法和消费的pop方法是加了synchronized同步的,但是打印却不在同步的块中。所以很有可能是在生产线程执行完push之后cpu就被消费者抢占了,燃咐直羡州到打印出了消费信息之后才还给生产者打印生产信息。
⑹ 由生产者/消费者问题看JAVA多线程
生产者消费者问题是研究多线程程序时绕不开的问题 它的描述是有一块生产者和消费者共享的有界缓冲区 生产者往缓冲区放入产品 消费者从缓冲区取走产品 这个过程可以无休止的执行 不能因缓冲区满生产者放不进产品而终止 也不能因缓冲区空消费者无产品可取而终止
解决生产者消费者问题的方法有两种 一种是采用某种机制保持生产者和消费者之间的同步 一种是在生产者和消费者之间建立一个管道 前一种有较高的效率并且可控制性较好 比较常用 后一种由于管道缓冲区不易控制及被传输数据对象不易封装等原因 比较少用
同步问题的核心在于 CPU是按时间片轮询的方式执行程序 我们无法知道某一个线程是否被执行 是否被抢占 是否结束等 因此生产者完全可能当缓冲区已满的时候还在放入产品 消费者也完全可能当缓冲区为空时还在取出产品
现在同步问题的解决方法一般是采用信号或者加锁机制 即生产者线程当缓冲区已满时放弃自己的执行权 进入等待状态 并通知消费者线程执行 消费者线程当缓冲区已空时放弃自己的执行权 进入等待状态 并通知生产者线程执行 这样一来就保持了线程的同步 并避免了线程间互相等待而进入死锁状态
JAVA语言提供了独立于平台的线程机制 保持了 write once run anywhere 的特色 同时也提供了对同步机制的良好支持
在JAVA中 一共有四种方法支持同步 其中三个是同步方法 一个是管道方法
方法wait()/notify()
方法await()/signal()
阻塞队列方法BlockingQueue
管道方法PipedInputStream/PipedOutputStream
下面我们看各个方法的实现
方法wait()/notify()
wait()和notify()是根类Object的两个方法 也就意味着所有的JAVA类都会具有这个两个方法 为什么会被这样设计呢?我们可以认为所有的对象默认都具有一个锁 虽然我们看不到 也没有办法直接操作 但它是存在的
wait()方法表示 当缓冲区已满或空时 生产者或消费者线程停止自己的执行 放弃锁 使自己处于等待状态 让另一个线程开始执行
notify()方法表示 当生产者或消费者对缓冲区放入或取出一个产品时 向另一个线程发出可执行通知 同时放弃锁 使自己处于等待状态
下面是一个例子代码
import java util LinkedList;
public class Sycn {
private LinkedList<Object> myList =new LinkedList<Object>();
private int MAX = ;
public Sycn (){
}
public void start(){
new Procer() start();
new Consumer() start();
}
public static void main(String[] args) throws Exception{
Sycn s = new Sycn ();
s start();
}
class Procer extends Thread{
public void run(){
while(true){
synchronized(myList){
try{
while(myList size() == MAX){
System out println( warning: it s full! );
myList wait();
}
Object o = new Object();
if(myList add(o)){
System out println( Procer: + o);
myList notify();
}
}catch(InterruptedException ie){
System out println( procer is interrupted! );
}
}
}
}
}
class Consumer extends Thread{
public void run(){
while(true){
synchronized(myList){
try{
while(myList size() == ){
System out println( warning: it s empty! );
myList wait();
}
Object o = myList removeLast();
System out println( Consumer: + o);
myList notify();
}catch(InterruptedException ie){
System out println( consumer is interrupted! );
}
}
}
}
}
}
方法await()/signal()
在JDK 以后 JAVA提供了新的更加健壮的线程处理机制 包括了同步 锁定 线程池等等 它们可以实现更小粒度上的控制 await()和signal()就是其中用来做同步的两种方法 它们的功能基本上和wait()/notify()相同 完全可以取代它们 但是它们和新引入的锁定机制Lock直接挂钩 具有更大的灵活性
下面是一个例子代码
import java util LinkedList;
import ncurrent locks *;
public class Sycn {
private LinkedList<Object> myList = new LinkedList<Object>();
private int MAX = ;
private final Lock lock = new ReentrantLock();
private final Condition full = lock newCondition();
private final Condition empty = lock newCondition();
public Sycn (){
}
public void start(){
new Procer() start();
new Consumer() start();
}
public static void main(String[] args) throws Exception{
Sycn s = new Sycn ();
s start();
}
class Procer extends Thread{
public void run(){
while(true){
lock lock();
try{
while(myList size() == MAX){
System out println( warning: it s full! );
full await();
}
Object o = new Object();
if(myList add(o)){
System out println( Procer: + o);
empty signal();
}
}catch(InterruptedException ie){
System out println( procer is interrupted! );
}finally{
lock unlock();
}
}
}
}
class Consumer extends Thread{
public void run(){
while(true){
lock lock();
try{
while(myList size() == ){
System out println( warning: it s empty! );
empty await();
}
Object o = myList removeLast();
System out println( Consumer: + o);
full signal();
}catch(InterruptedException ie){
System out println( consumer is interrupted! );
}finally{
lock unlock();
}
}
}
}
}
阻塞队列方法BlockingQueue
BlockingQueue也是JDK 的一部分 它是一个已经在内部实现了同步的队列 实现方式采用的是我们的第 种await()/signal()方法 它可以在生成对象时指定容量大小
它用于阻塞操作的是put()和take()方法
put()方法类似于我们上面的生产者线程 容量最大时 自动阻塞
take()方法类似于我们上面的消费者线程 容量为 时 自动阻塞
下面是一个例子代码
import ncurrent *;
public class Sycn {
private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>( );
private int MAX = ;
public Sycn (){
}
public void start(){
new Procer() start();
new Consumer() start();
}
public static void main(String[] args) throws Exception{
Sycn s = new Sycn ();
s start();
}
class Procer extends Thread{
public void run(){
while(true){
//synchronized(this){
try{
if(queue size() == MAX)
System out println( warning: it s full! );
Object o = new Object();
queue put(o);
System out println( Procer: + o);
}catch(InterruptedException e){
System out println( procer is interrupted! );
}
//}
}
}
}
class Consumer extends Thread{
public void run(){
while(true){
//synchronized(this){
try{
if(queue size() == )
System out println( warning: it s empty! );
Object o = queue take();
System out println( Consumer: + o);
}catch(InterruptedException e){
System out println( procer is interrupted! );
}
//}
}
}
}
}
你发现这个例子中的问题了吗?
如果没有 我建议你运行一下这段代码 仔细观察它的输出 是不是有下面这个样子的?为什么会这样呢?
…
warning: it s full!
Procer: java lang object@ e a
…
你可能会说这是因为put()和System out println()之间没有同步造成的 我也这样认为 我也这样认为 但是你把run()中的synchronized前面的注释去掉 重新编译运行 有改观吗?没有 为什么?
这是因为 当缓冲区已满 生产者在put()操作时 put()内部调用了await()方法 放弃了线程的执行 然后消费者线程执行 调用take()方法 take()内部调用了signal()方法 通知生产者线程可以执行 致使在消费者的println()还没运行的情况下生产者的println()先被执行 所以有了上面的输出 run()中的synchronized其实并没有起什么作用
对于BlockingQueue大家可以放心使用 这可不是它的问题 只是在它和别的对象之间的同步有问题
对于这种多重嵌套同步的问题 以后再谈吧 欢迎大家讨论啊!
管道方法PipedInputStream/PipedOutputStream
这个类位于java io包中 是解决同步问题的最简单的办法 一个线程将数据写入管道 另一个线程从管道读取数据 这样便构成了一种生产者/消费者的缓冲区编程模式
下面是一个例子代码 在这个代码我没有使用Object对象 而是简单的读写字节值 这是因为PipedInputStream/PipedOutputStream不允许传输对象 这是JAVA本身的一个bug 具体的大家可以看sun的解释 _bug do?bug_id=
import java io *;
public class Sycn {
private PipedOutputStream pos;
private PipedInputStream pis;
//private ObjectOutputStream oos;
//private ObjectInputStream ois;
public Sycn (){
try{
pos = new PipedOutputStream();
pis = new PipedInputStream(pos);
//oos = new ObjectOutputStream(pos);
//ois = new ObjectInputStream(pis);
}catch(IOException e){
System out println(e);
}
}
public void start(){
new Procer() start();
new Consumer() start();
}
public static void main(String[] args) throws Exception{
Sycn s = new Sycn ();
s start();
}
class Procer extends Thread{
public void run() {
try{
while(true){
int b = (int) (Math random() * );
System out println( Procer: a byte the value is + b);
pos write(b);
pos flush();
//Object o = new MyObject();
//oos writeObject(o);
//oos flush();
//System out println( Procer: + o);
}
}catch(Exception e){
//System out println(e);
e printStackTrace();
}finally{
try{
pos close();
pis close();
//oos close();
//ois close();
}catch(IOException e){
System out println(e);
}
}
}
}
class Consumer extends Thread{
public void run(){
try{
while(true){
int b = pis read();
System out println( Consumer: a byte the value is + String valueOf(b));
//Object o = ois readObject();
//if(o != null)
//System out println( Consumer: + o);
}
}catch(Exception e){
//System out println(e);
e printStackTrace();
}finally{
try{
pos close();
pis close();
//oos close();
//ois close();
}catch(IOException e){
System out println(e);
}
}
}
}
//class MyObject implements Serializable {
//}
lishixin/Article/program/Java/gj/201311/27617