java线程生产者消费者
⑴ java 生产者/消费者问题
/**
* 公共资源类
*/
public class PublicResource {
private int number = 0;
/**
* 增加公共资源
*/
public synchronized void increace() {
while (number != 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number++;
System.out.println(number);
notify();
}
/**
* 减少公共资源
*/
public synchronized void decreace() {
while (number == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number--;
System.out.println(number);
notify();
}
}
/**
* 生产者线程,负责生产公共资源
*/
public class ProcerThread implements Runnable {
private PublicResource resource;
public ProcerThread(PublicResource resource) {
this.resource = resource;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.increace();
}
}
}
/**
* 消费者线程,负责消费公共资源
*/
public class ConsumerThread implements Runnable {
private PublicResource resource;
public ConsumerThread(PublicResource resource) {
this.resource = resource;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.decreace();
}
}
}
public class ProcerConsumerTest {
public static void main(String[] args) {
PublicResource resource = new PublicResource();
new Thread(new ProcerThread(resource)).start();
new Thread(new ConsumerThread(resource)).start();
new Thread(new ProcerThread(resource)).start();
new Thread(new ConsumerThread(resource)).start();
new Thread(new ProcerThread(resource)).start();
new Thread(new ConsumerThread(resource)).start();
}
}
⑵ 由生产者/消费者问题看JAVA多线程
生产者消费者问题是研究多线程程序时绕不开的问题 它的描述是有一块生产者和消费者共享的有界缓冲区 生产者往缓冲区放入产品 消费者从缓冲区取走产品 这个过程可以无休止的执行 不能因缓冲区满生产者放不进产品而终止 也不能因缓冲区空消费者无产品可取而终止
解决生产者消费者问题的方法有两种 一种是采用某种机制保持生产者和消费者之间的同步 一种是在生产者和消费者之间建立一个管道 前一种有较高的效率并且可控制性较好 比较常用 后一种由于管道缓冲区不易控制及被传输数据对象不易封装等原因 比较少用
同步问题的核心在于 CPU是按时间片轮询的方式执行程序 我们无法知道某一个线程是否被执行 是否被抢占 是否结束等 因此生产者完全可能当缓冲区已满的时候还在放入产品 消费者也完全可能当缓冲区为空时还在取出产品
现在同步问题的解决方法一般是采用信号或者加锁机制 即生产者线程当缓冲区已满时放弃自己的执行权 进入等待状态 并通知消费者线程执行 消费者线程当缓冲区已空时放弃自己的执行权 进入等待状态 并通知生产者线程执行 这样一来就保持了线程的同步 并避免了线程间互相等待而进入死锁状态
JAVA语言提供了独立于平台的线程机制 保持了 write once run anywhere 的特色 同时也提供了对同步机制的良好支持
在JAVA中 一共有四种方法支持同步 其中三个是同步方法 一个是管道方法
方法wait()/notify()
方法await()/signal()
阻塞队列方法BlockingQueue
管道方法PipedInputStream/PipedOutputStream
下面我们看各个方法的实现
方法wait()/notify()
wait()和notify()是根类Object的两个方法 也就意味着所有的JAVA类都会具有这个两个方法 为什么会被这样设计呢?我们可以认为所有的对象默认都具有一个锁 虽然我们看不到 也没有办法直接操作 但它是存在的
wait()方法表示 当缓冲区已满或空时 生产者或消费者线程停止自己的执行 放弃锁 使自己处于等待状态 让另一个线程开始执行
notify()方法表示 当生产者或消费者对缓冲区放入或取出一个产品时 向另一个线程发出可执行通知 同时放弃锁 使自己处于等待状态
下面是一个例子代码
import java util LinkedList;
public class Sycn {
private LinkedList<Object> myList =new LinkedList<Object>();
private int MAX = ;
public Sycn (){
}
public void start(){
new Procer() start();
new Consumer() start();
}
public static void main(String[] args) throws Exception{
Sycn s = new Sycn ();
s start();
}
class Procer extends Thread{
public void run(){
while(true){
synchronized(myList){
try{
while(myList size() == MAX){
System out println( warning: it s full! );
myList wait();
}
Object o = new Object();
if(myList add(o)){
System out println( Procer: + o);
myList notify();
}
}catch(InterruptedException ie){
System out println( procer is interrupted! );
}
}
}
}
}
class Consumer extends Thread{
public void run(){
while(true){
synchronized(myList){
try{
while(myList size() == ){
System out println( warning: it s empty! );
myList wait();
}
Object o = myList removeLast();
System out println( Consumer: + o);
myList notify();
}catch(InterruptedException ie){
System out println( consumer is interrupted! );
}
}
}
}
}
}
方法await()/signal()
在JDK 以后 JAVA提供了新的更加健壮的线程处理机制 包括了同步 锁定 线程池等等 它们可以实现更小粒度上的控制 await()和signal()就是其中用来做同步的两种方法 它们的功能基本上和wait()/notify()相同 完全可以取代它们 但是它们和新引入的锁定机制Lock直接挂钩 具有更大的灵活性
下面是一个例子代码
import java util LinkedList;
import ncurrent locks *;
public class Sycn {
private LinkedList<Object> myList = new LinkedList<Object>();
private int MAX = ;
private final Lock lock = new ReentrantLock();
private final Condition full = lock newCondition();
private final Condition empty = lock newCondition();
public Sycn (){
}
public void start(){
new Procer() start();
new Consumer() start();
}
public static void main(String[] args) throws Exception{
Sycn s = new Sycn ();
s start();
}
class Procer extends Thread{
public void run(){
while(true){
lock lock();
try{
while(myList size() == MAX){
System out println( warning: it s full! );
full await();
}
Object o = new Object();
if(myList add(o)){
System out println( Procer: + o);
empty signal();
}
}catch(InterruptedException ie){
System out println( procer is interrupted! );
}finally{
lock unlock();
}
}
}
}
class Consumer extends Thread{
public void run(){
while(true){
lock lock();
try{
while(myList size() == ){
System out println( warning: it s empty! );
empty await();
}
Object o = myList removeLast();
System out println( Consumer: + o);
full signal();
}catch(InterruptedException ie){
System out println( consumer is interrupted! );
}finally{
lock unlock();
}
}
}
}
}
阻塞队列方法BlockingQueue
BlockingQueue也是JDK 的一部分 它是一个已经在内部实现了同步的队列 实现方式采用的是我们的第 种await()/signal()方法 它可以在生成对象时指定容量大小
它用于阻塞操作的是put()和take()方法
put()方法类似于我们上面的生产者线程 容量最大时 自动阻塞
take()方法类似于我们上面的消费者线程 容量为 时 自动阻塞
下面是一个例子代码
import ncurrent *;
public class Sycn {
private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>( );
private int MAX = ;
public Sycn (){
}
public void start(){
new Procer() start();
new Consumer() start();
}
public static void main(String[] args) throws Exception{
Sycn s = new Sycn ();
s start();
}
class Procer extends Thread{
public void run(){
while(true){
//synchronized(this){
try{
if(queue size() == MAX)
System out println( warning: it s full! );
Object o = new Object();
queue put(o);
System out println( Procer: + o);
}catch(InterruptedException e){
System out println( procer is interrupted! );
}
//}
}
}
}
class Consumer extends Thread{
public void run(){
while(true){
//synchronized(this){
try{
if(queue size() == )
System out println( warning: it s empty! );
Object o = queue take();
System out println( Consumer: + o);
}catch(InterruptedException e){
System out println( procer is interrupted! );
}
//}
}
}
}
}
你发现这个例子中的问题了吗?
如果没有 我建议你运行一下这段代码 仔细观察它的输出 是不是有下面这个样子的?为什么会这样呢?
…
warning: it s full!
Procer: java lang object@ e a
…
你可能会说这是因为put()和System out println()之间没有同步造成的 我也这样认为 我也这样认为 但是你把run()中的synchronized前面的注释去掉 重新编译运行 有改观吗?没有 为什么?
这是因为 当缓冲区已满 生产者在put()操作时 put()内部调用了await()方法 放弃了线程的执行 然后消费者线程执行 调用take()方法 take()内部调用了signal()方法 通知生产者线程可以执行 致使在消费者的println()还没运行的情况下生产者的println()先被执行 所以有了上面的输出 run()中的synchronized其实并没有起什么作用
对于BlockingQueue大家可以放心使用 这可不是它的问题 只是在它和别的对象之间的同步有问题
对于这种多重嵌套同步的问题 以后再谈吧 欢迎大家讨论啊!
管道方法PipedInputStream/PipedOutputStream
这个类位于java io包中 是解决同步问题的最简单的办法 一个线程将数据写入管道 另一个线程从管道读取数据 这样便构成了一种生产者/消费者的缓冲区编程模式
下面是一个例子代码 在这个代码我没有使用Object对象 而是简单的读写字节值 这是因为PipedInputStream/PipedOutputStream不允许传输对象 这是JAVA本身的一个bug 具体的大家可以看sun的解释 _bug do?bug_id=
import java io *;
public class Sycn {
private PipedOutputStream pos;
private PipedInputStream pis;
//private ObjectOutputStream oos;
//private ObjectInputStream ois;
public Sycn (){
try{
pos = new PipedOutputStream();
pis = new PipedInputStream(pos);
//oos = new ObjectOutputStream(pos);
//ois = new ObjectInputStream(pis);
}catch(IOException e){
System out println(e);
}
}
public void start(){
new Procer() start();
new Consumer() start();
}
public static void main(String[] args) throws Exception{
Sycn s = new Sycn ();
s start();
}
class Procer extends Thread{
public void run() {
try{
while(true){
int b = (int) (Math random() * );
System out println( Procer: a byte the value is + b);
pos write(b);
pos flush();
//Object o = new MyObject();
//oos writeObject(o);
//oos flush();
//System out println( Procer: + o);
}
}catch(Exception e){
//System out println(e);
e printStackTrace();
}finally{
try{
pos close();
pis close();
//oos close();
//ois close();
}catch(IOException e){
System out println(e);
}
}
}
}
class Consumer extends Thread{
public void run(){
try{
while(true){
int b = pis read();
System out println( Consumer: a byte the value is + String valueOf(b));
//Object o = ois readObject();
//if(o != null)
//System out println( Consumer: + o);
}
}catch(Exception e){
//System out println(e);
e printStackTrace();
}finally{
try{
pos close();
pis close();
//oos close();
//ois close();
}catch(IOException e){
System out println(e);
}
}
}
}
//class MyObject implements Serializable {
//}
lishixin/Article/program/Java/gj/201311/27617
⑶ JAVA模拟生产者与消费者实例
使用的生产者和消费者模型具有如下特点:
(1)本实验的多个缓冲区不是环形循环的,也不要求按顺序访问。生产者可以把产品放到目前某一个空缓冲区中。
(2)消费者只消费指定生产者的产品。
(3)在测试用例文件中指定了所有的生产和消费的需求,只有当共享缓冲区的数据满足了所有关于它的消费需求后,此共享缓冲区才可以作为空闲空间允许新的生产者使用。
(4)本实验在为生产者分配缓冲区时各生产者间必须互斥,此后各个生产者的具体生产活动可以并发。而消费者之间只有在对同一产品进行消费时才需要互斥,同时它们在消费过程结束时需要判断该消费对象是否已经消费完毕并清除该产品。
Windows
用来实现同步和互斥的实体。在Windows
中,常见的同步对象有:信号量(Semaphore)、
互斥量(Mutex)、临界段(CriticalSection)和事件(Event)等。本程序中用到了前三个。使用这些对象都分
为三个步骤,一是创建或者初始化:接着请求该同步对象,随即进入临界区,这一步对应于互斥量的
上锁;最后释放该同步对象,这对应于互斥量的解锁。这些同步对象在一个线程中创建,在其他线程
中都可以使用,从而实现同步互斥。当然,在进程间使用这些同步对象实现同步的方法是类似的。
1.用锁操作原语实现互斥
为解决进程互斥进人临界区的问题,可为每类临界区设置一把锁,该锁有打开和关闭两种状态,进程执行临界区程序的操作按下列步骤进行:
①关锁。先检查锁的状态,如为关闭状态,则等待其打开;如已打开了,则将其关闭,继续执行步骤②的操作。
②执行临界区程序。
③开锁。将锁打开,退出临界区。
2.信号量及WAIT,SIGNAL操作原语
信号量的初值可以由系统根据资源情况和使用需要来确定。在初始条件下信号量的指针项可以置为0,表示队列为空。信号量在使用过程中它的值是可变的,但只能由WAIT,SIGNAL操作来改变。设信号量为S,对S的WAIT操作记为WAIT(S),对它的SIGNAL操作记为SIGNAL(S)。
WAIT(S):顺序执行以下两个动作:
①信号量的值减1,即S=S-1;
②如果S≥0,则该进程继续执行;
如果
S(0,则把该进程的状态置为阻塞态,把相应的WAITCB连人该信号量队列的末尾,并放弃处理机,进行等待(直至其它进程在S上执行SIGNAL操作,把它释放出来为止)。
SIGNAL(S):顺序执行以下两个动作
①S值加
1,即
S=S+1;
②如果S)0,则该进程继续运行;
如果S(0则释放信号量队列上的第一个PCB(既信号量指针项所指向的PCB)所对应的进程(把阻塞态改为就绪态),执行SIGNAL操作的进程继续运行。
在具体实现时注意,WAIT,SIGNAL操作都应作为一个整体实施,不允许分割或相互穿插执行。也就是说,WAIT,SIGNAL操作各自都好像对应一条指令,需要不间断地做下去,否则会造成混乱。
从物理概念上讲,信号量S)时,S值表示可用资源的数量。执行一次WAIT操作意味着请求分配一个单位资源,因此S值减1;当S<0时,表示已无可用资源,请求者必须等待别的进程释放了该类资源,它才能运行下去。所以它要排队。而执行一次SIGNAL操作意味着释放一个单位资源,因此S值加1;若S(0时,表示有某些进程正在等待该资源,因而要把队列头上的进程唤醒,释放资源的进程总是可以运行下去的。
---------------
/**
*
生产者
*
*/
public
class
Procer
implements
Runnable{
private
Semaphore
mutex,full,empty;
private
Buffer
buf;
String
name;
public
Procer(String
name,Semaphore
mutex,Semaphore
full,Semaphore
empty,Buffer
buf){
this.mutex
=
mutex;
this.full
=
full;
this.empty
=
empty;
this.buf
=
buf;
this.name
=
name;
}
public
void
run(){
while(true){
empty.p();
mutex.p();
System.out.println(name+"
inserts
a
new
proct
into
"+buf.nextEmptyIndex);
buf.nextEmptyIndex
=
(buf.nextEmptyIndex+1)%buf.size;
mutex.v();
full.v();
try
{
Thread.sleep(1000);
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
}
---------------
/**
*
消费者
*
*/
public
class
Customer
implements
Runnable{
private
Semaphore
mutex,full,empty;
private
Buffer
buf;
String
name;
public
Customer(String
name,Semaphore
mutex,Semaphore
full,Semaphore
empty,Buffer
buf){
this.mutex
=
mutex;
this.full
=
full;
this.empty
=
empty;
this.buf
=
buf;
this.name
=
name;
}
public
void
run(){
while(true){
full.p();
mutex.p();
System.out.println(name+"
gets
a
proct
from
"+buf.nextFullIndex);
buf.nextFullIndex
=
(buf.nextFullIndex+1)%buf.size;
mutex.v();
empty.v();
try
{
Thread.sleep(1000);
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
}
-------------------------
/**
*
缓冲区
*
*/
public
class
Buffer{
public
Buffer(int
size,int
nextEmpty,int
nextFull){
this.nextEmptyIndex
=
nextEmpty;
this.nextFullIndex
=
nextFull;
this.size
=
size;
}
public
int
size;
public
int
nextEmptyIndex;
public
int
nextFullIndex;
}
-----------------
/**
*
此类用来模拟信号量
*
*/
public
class
Semaphore{
private
int
semValue;
public
Semaphore(int
semValue){
this.semValue
=
semValue;
}
public
synchronized
void
p(){
semValue--;
if(semValue<0){
try
{
this.wait();
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
public
synchronized
void
v(){
semValue++;
if(semValue<=0){
this.notify();
}
}
}
------------------------
public
class
Test
extends
Thread
{
public
static
void
main(String[]
args)
{
Buffer
bf=new
Buffer(10,0,0);
Semaphore
mutex=new
Semaphore(1);
Semaphore
full=new
Semaphore(0);
Semaphore
empty=new
Semaphore(10);
//new
Thread(new
Procer("p001",mutex,full,empty,bf)).start();
Procer
p=new
Procer("p001",mutex,full,empty,bf);
new
Thread(new
Procer("p002",mutex,full,empty,bf)).start();
new
Thread(new
Procer("p003",mutex,full,empty,bf)).start();
new
Thread(new
Procer("p004",mutex,full,empty,bf)).start();
new
Thread(new
Procer("p005",mutex,full,empty,bf)).start();
try{
sleep(3000);
}
catch(Exception
ex)
{
ex.printStackTrace();
}
new
Thread(new
Customer("c001",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c002",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c003",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c004",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c005",mutex,full,empty,bf)).start();
}
}
--------------------------------------------