當前位置:首頁 » 編程語言 » java線程生產者消費者

java線程生產者消費者

發布時間: 2023-02-03 20:58:55

java 生產者/消費者問題

/**
* 公共資源類
*/
public class PublicResource {
private int number = 0;
/**
* 增加公共資源
*/
public synchronized void increace() {
while (number != 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number++;
System.out.println(number);
notify();
}
/**
* 減少公共資源
*/
public synchronized void decreace() {
while (number == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number--;
System.out.println(number);
notify();
}
}

/**

* 生產者線程,負責生產公共資源
*/
public class ProcerThread implements Runnable {
private PublicResource resource;
public ProcerThread(PublicResource resource) {
this.resource = resource;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.increace();
}
}
}
/**
* 消費者線程,負責消費公共資源
*/
public class ConsumerThread implements Runnable {
private PublicResource resource;
public ConsumerThread(PublicResource resource) {
this.resource = resource;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.decreace();
}
}
}

public class ProcerConsumerTest {
public static void main(String[] args) {
PublicResource resource = new PublicResource();
new Thread(new ProcerThread(resource)).start();
new Thread(new ConsumerThread(resource)).start();
new Thread(new ProcerThread(resource)).start();
new Thread(new ConsumerThread(resource)).start();
new Thread(new ProcerThread(resource)).start();
new Thread(new ConsumerThread(resource)).start();
}
}

⑵ 由生產者/消費者問題看JAVA多線程

生產者消費者問題是研究多線程程序時繞不開的問題 它的描述是有一塊生產者和消費者共享的有界緩沖區 生產者往緩沖區放入產品 消費者從緩沖區取走產品 這個過程可以無休止的執行 不能因緩沖區滿生產者放不進產品而終止 也不能因緩沖區空消費者無產品可取而終止

解決生產者消費者問題的方法有兩種 一種是採用某種機制保持生產者和消費者之間的同步 一種是在生產者和消費者之間建立一個管道 前一種有較高的效率並且可控制性較好 比較常用 後一種由於管道緩沖區不易控制及被傳輸數據對象不易封裝等原因 比較少用

同步問題的核心在於 CPU是按時間片輪詢的方式執行程序 我們無法知道某一個線程是否被執行 是否被搶占 是否結束等 因此生產者完全可能當緩沖區已滿的時候還在放入產品 消費者也完全可能當緩沖區為空時還在取出產品

現在同步問題的解決方法一般是採用信號或者加鎖機制 即生產者線程當緩沖區已滿時放棄自己的執行權 進入等待狀態 並通知消費者線程執行 消費者線程當緩沖區已空時放棄自己的執行權 進入等待狀態 並通知生產者線程執行 這樣一來就保持了線程的同步 並避免了線程間互相等待而進入死鎖狀態

JAVA語言提供了獨立於平台的線程機制 保持了 write once run anywhere 的特色 同時也提供了對同步機制的良好支持

在JAVA中 一共有四種方法支持同步 其中三個是同步方法 一個是管道方法

方法wait()/notify()

方法await()/signal()

阻塞隊列方法BlockingQueue

管道方法PipedInputStream/PipedOutputStream

下面我們看各個方法的實現

方法wait()/notify()

wait()和notify()是根類Object的兩個方法 也就意味著所有的JAVA類都會具有這個兩個方法 為什麼會被這樣設計呢?我們可以認為所有的對象默認都具有一個鎖 雖然我們看不到 也沒有辦法直接操作 但它是存在的

wait()方法表示 當緩沖區已滿或空時 生產者或消費者線程停止自己的執行 放棄鎖 使自己處於等待狀態 讓另一個線程開始執行

notify()方法表示 當生產者或消費者對緩沖區放入或取出一個產品時 向另一個線程發出可執行通知 同時放棄鎖 使自己處於等待狀態

下面是一個例子代碼

import java util LinkedList;

public class Sycn {

private LinkedList<Object> myList =new LinkedList<Object>();

private int MAX = ;

public Sycn (){

}

public void start(){

new Procer() start();

new Consumer() start();

}

public static void main(String[] args) throws Exception{

Sycn s = new Sycn ();

s start();

}

class Procer extends Thread{

public void run(){

while(true){

synchronized(myList){

try{

while(myList size() == MAX){

System out println( warning: it s full! );

myList wait();

}

Object o = new Object();

if(myList add(o)){

System out println( Procer: + o);

myList notify();

}

}catch(InterruptedException ie){

System out println( procer is interrupted! );

}

}

}

}

}

class Consumer extends Thread{

public void run(){

while(true){

synchronized(myList){

try{

while(myList size() == ){

System out println( warning: it s empty! );

myList wait();

}

Object o = myList removeLast();

System out println( Consumer: + o);

myList notify();

}catch(InterruptedException ie){

System out println( consumer is interrupted! );

}

}

}

}

}

}

方法await()/signal()

在JDK 以後 JAVA提供了新的更加健壯的線程處理機制 包括了同步 鎖定 線程池等等 它們可以實現更小粒度上的控制 await()和signal()就是其中用來做同步的兩種方法 它們的功能基本上和wait()/notify()相同 完全可以取代它們 但是它們和新引入的鎖定機制Lock直接掛鉤 具有更大的靈活性

下面是一個例子代碼

import java util LinkedList;

import ncurrent locks *;

public class Sycn {

private LinkedList<Object> myList = new LinkedList<Object>();

private int MAX = ;

private final Lock lock = new ReentrantLock();

private final Condition full = lock newCondition();

private final Condition empty = lock newCondition();

public Sycn (){

}

public void start(){

new Procer() start();

new Consumer() start();

}

public static void main(String[] args) throws Exception{

Sycn s = new Sycn ();

s start();

}

class Procer extends Thread{

public void run(){

while(true){

lock lock();

try{

while(myList size() == MAX){

System out println( warning: it s full! );

full await();

}

Object o = new Object();

if(myList add(o)){

System out println( Procer: + o);

empty signal();

}

}catch(InterruptedException ie){

System out println( procer is interrupted! );

}finally{

lock unlock();

}

}

}

}

class Consumer extends Thread{

public void run(){

while(true){

lock lock();

try{

while(myList size() == ){

System out println( warning: it s empty! );

empty await();

}

Object o = myList removeLast();

System out println( Consumer: + o);

full signal();

}catch(InterruptedException ie){

System out println( consumer is interrupted! );

}finally{

lock unlock();

}

}

}

}

}

阻塞隊列方法BlockingQueue

BlockingQueue也是JDK 的一部分 它是一個已經在內部實現了同步的隊列 實現方式採用的是我們的第 種await()/signal()方法 它可以在生成對象時指定容量大小

它用於阻塞操作的是put()和take()方法

put()方法類似於我們上面的生產者線程 容量最大時 自動阻塞

take()方法類似於我們上面的消費者線程 容量為 時 自動阻塞

下面是一個例子代碼

import ncurrent *;

public class Sycn {

private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>( );

private int MAX = ;

public Sycn (){

}

public void start(){

new Procer() start();

new Consumer() start();

}

public static void main(String[] args) throws Exception{

Sycn s = new Sycn ();

s start();

}

class Procer extends Thread{

public void run(){

while(true){

//synchronized(this){

try{

if(queue size() == MAX)

System out println( warning: it s full! );

Object o = new Object();

queue put(o);

System out println( Procer: + o);

}catch(InterruptedException e){

System out println( procer is interrupted! );

}

//}

}

}

}

class Consumer extends Thread{

public void run(){

while(true){

//synchronized(this){

try{

if(queue size() == )

System out println( warning: it s empty! );

Object o = queue take();

System out println( Consumer: + o);

}catch(InterruptedException e){

System out println( procer is interrupted! );

}

//}

}

}

}

}

你發現這個例子中的問題了嗎?

如果沒有 我建議你運行一下這段代碼 仔細觀察它的輸出 是不是有下面這個樣子的?為什麼會這樣呢?

warning: it s full!

Procer: java lang object@ e a

你可能會說這是因為put()和System out println()之間沒有同步造成的 我也這樣認為 我也這樣認為 但是你把run()中的synchronized前面的注釋去掉 重新編譯運行 有改觀嗎?沒有 為什麼?

這是因為 當緩沖區已滿 生產者在put()操作時 put()內部調用了await()方法 放棄了線程的執行 然後消費者線程執行 調用take()方法 take()內部調用了signal()方法 通知生產者線程可以執行 致使在消費者的println()還沒運行的情況下生產者的println()先被執行 所以有了上面的輸出 run()中的synchronized其實並沒有起什麼作用

對於BlockingQueue大家可以放心使用 這可不是它的問題 只是在它和別的對象之間的同步有問題

對於這種多重嵌套同步的問題 以後再談吧 歡迎大家討論啊!

管道方法PipedInputStream/PipedOutputStream

這個類位於java io包中 是解決同步問題的最簡單的辦法 一個線程將數據寫入管道 另一個線程從管道讀取數據 這樣便構成了一種生產者/消費者的緩沖區編程模式

下面是一個例子代碼 在這個代碼我沒有使用Object對象 而是簡單的讀寫位元組值 這是因為PipedInputStream/PipedOutputStream不允許傳輸對象 這是JAVA本身的一個bug 具體的大家可以看sun的解釋 _bug do?bug_id=

import java io *;

public class Sycn {

private PipedOutputStream pos;

private PipedInputStream pis;

//private ObjectOutputStream oos;

//private ObjectInputStream ois;

public Sycn (){

try{

pos = new PipedOutputStream();

pis = new PipedInputStream(pos);

//oos = new ObjectOutputStream(pos);

//ois = new ObjectInputStream(pis);

}catch(IOException e){

System out println(e);

}

}

public void start(){

new Procer() start();

new Consumer() start();

}

public static void main(String[] args) throws Exception{

Sycn s = new Sycn ();

s start();

}

class Procer extends Thread{

public void run() {

try{

while(true){

int b = (int) (Math random() * );

System out println( Procer: a byte the value is + b);

pos write(b);

pos flush();

//Object o = new MyObject();

//oos writeObject(o);

//oos flush();

//System out println( Procer: + o);

}

}catch(Exception e){

//System out println(e);

e printStackTrace();

}finally{

try{

pos close();

pis close();

//oos close();

//ois close();

}catch(IOException e){

System out println(e);

}

}

}

}

class Consumer extends Thread{

public void run(){

try{

while(true){

int b = pis read();

System out println( Consumer: a byte the value is + String valueOf(b));

//Object o = ois readObject();

//if(o != null)

//System out println( Consumer: + o);

}

}catch(Exception e){

//System out println(e);

e printStackTrace();

}finally{

try{

pos close();

pis close();

//oos close();

//ois close();

}catch(IOException e){

System out println(e);

}

}

}

}

//class MyObject implements Serializable {

//}

lishixin/Article/program/Java/gj/201311/27617

⑶ JAVA模擬生產者與消費者實例

使用的生產者和消費者模型具有如下特點:
(1)本實驗的多個緩沖區不是環形循環的,也不要求按順序訪問。生產者可以把產品放到目前某一個空緩沖區中。
(2)消費者只消費指定生產者的產品。
(3)在測試用例文件中指定了所有的生產和消費的需求,只有當共享緩沖區的數據滿足了所有關於它的消費需求後,此共享緩沖區才可以作為空閑空間允許新的生產者使用。
(4)本實驗在為生產者分配緩沖區時各生產者間必須互斥,此後各個生產者的具體生產活動可以並發。而消費者之間只有在對同一產品進行消費時才需要互斥,同時它們在消費過程結束時需要判斷該消費對象是否已經消費完畢並清除該產品。
Windows
用來實現同步和互斥的實體。在Windows
中,常見的同步對象有:信號量(Semaphore)、
互斥量(Mutex)、臨界段(CriticalSection)和事件(Event)等。本程序中用到了前三個。使用這些對象都分
為三個步驟,一是創建或者初始化:接著請求該同步對象,隨即進入臨界區,這一步對應於互斥量的
上鎖;最後釋放該同步對象,這對應於互斥量的解鎖。這些同步對象在一個線程中創建,在其他線程
中都可以使用,從而實現同步互斥。當然,在進程間使用這些同步對象實現同步的方法是類似的。
1.用鎖操作原語實現互斥
為解決進程互斥進人臨界區的問題,可為每類臨界區設置一把鎖,該鎖有打開和關閉兩種狀態,進程執行臨界區程序的操作按下列步驟進行:
①關鎖。先檢查鎖的狀態,如為關閉狀態,則等待其打開;如已打開了,則將其關閉,繼續執行步驟②的操作。
②執行臨界區程序。
③開鎖。將鎖打開,退出臨界區。
2.信號量及WAIT,SIGNAL操作原語
信號量的初值可以由系統根據資源情況和使用需要來確定。在初始條件下信號量的指針項可以置為0,表示隊列為空。信號量在使用過程中它的值是可變的,但只能由WAIT,SIGNAL操作來改變。設信號量為S,對S的WAIT操作記為WAIT(S),對它的SIGNAL操作記為SIGNAL(S)。
WAIT(S):順序執行以下兩個動作:
①信號量的值減1,即S=S-1;
②如果S≥0,則該進程繼續執行;
如果
S(0,則把該進程的狀態置為阻塞態,把相應的WAITCB連人該信號量隊列的末尾,並放棄處理機,進行等待(直至其它進程在S上執行SIGNAL操作,把它釋放出來為止)。
SIGNAL(S):順序執行以下兩個動作
①S值加
1,即
S=S+1;
②如果S)0,則該進程繼續運行;
如果S(0則釋放信號量隊列上的第一個PCB(既信號量指針項所指向的PCB)所對應的進程(把阻塞態改為就緒態),執行SIGNAL操作的進程繼續運行。
在具體實現時注意,WAIT,SIGNAL操作都應作為一個整體實施,不允許分割或相互穿插執行。也就是說,WAIT,SIGNAL操作各自都好像對應一條指令,需要不間斷地做下去,否則會造成混亂。
從物理概念上講,信號量S)時,S值表示可用資源的數量。執行一次WAIT操作意味著請求分配一個單位資源,因此S值減1;當S<0時,表示已無可用資源,請求者必須等待別的進程釋放了該類資源,它才能運行下去。所以它要排隊。而執行一次SIGNAL操作意味著釋放一個單位資源,因此S值加1;若S(0時,表示有某些進程正在等待該資源,因而要把隊列頭上的進程喚醒,釋放資源的進程總是可以運行下去的。
---------------
/**
*
生產者
*
*/
public
class
Procer
implements
Runnable{
private
Semaphore
mutex,full,empty;
private
Buffer
buf;
String
name;
public
Procer(String
name,Semaphore
mutex,Semaphore
full,Semaphore
empty,Buffer
buf){
this.mutex
=
mutex;
this.full
=
full;
this.empty
=
empty;
this.buf
=
buf;
this.name
=
name;
}
public
void
run(){
while(true){
empty.p();
mutex.p();
System.out.println(name+"
inserts
a
new
proct
into
"+buf.nextEmptyIndex);
buf.nextEmptyIndex
=
(buf.nextEmptyIndex+1)%buf.size;
mutex.v();
full.v();
try
{
Thread.sleep(1000);
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
}
---------------
/**
*
消費者
*
*/
public
class
Customer
implements
Runnable{
private
Semaphore
mutex,full,empty;
private
Buffer
buf;
String
name;
public
Customer(String
name,Semaphore
mutex,Semaphore
full,Semaphore
empty,Buffer
buf){
this.mutex
=
mutex;
this.full
=
full;
this.empty
=
empty;
this.buf
=
buf;
this.name
=
name;
}
public
void
run(){
while(true){
full.p();
mutex.p();
System.out.println(name+"
gets
a
proct
from
"+buf.nextFullIndex);
buf.nextFullIndex
=
(buf.nextFullIndex+1)%buf.size;
mutex.v();
empty.v();
try
{
Thread.sleep(1000);
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
}
-------------------------
/**
*
緩沖區
*
*/
public
class
Buffer{
public
Buffer(int
size,int
nextEmpty,int
nextFull){
this.nextEmptyIndex
=
nextEmpty;
this.nextFullIndex
=
nextFull;
this.size
=
size;
}
public
int
size;
public
int
nextEmptyIndex;
public
int
nextFullIndex;
}
-----------------
/**
*
此類用來模擬信號量
*
*/
public
class
Semaphore{
private
int
semValue;
public
Semaphore(int
semValue){
this.semValue
=
semValue;
}
public
synchronized
void
p(){
semValue--;
if(semValue<0){
try
{
this.wait();
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
public
synchronized
void
v(){
semValue++;
if(semValue<=0){
this.notify();
}
}
}
------------------------
public
class
Test
extends
Thread
{
public
static
void
main(String[]
args)
{
Buffer
bf=new
Buffer(10,0,0);
Semaphore
mutex=new
Semaphore(1);
Semaphore
full=new
Semaphore(0);
Semaphore
empty=new
Semaphore(10);
//new
Thread(new
Procer("p001",mutex,full,empty,bf)).start();
Procer
p=new
Procer("p001",mutex,full,empty,bf);
new
Thread(new
Procer("p002",mutex,full,empty,bf)).start();
new
Thread(new
Procer("p003",mutex,full,empty,bf)).start();
new
Thread(new
Procer("p004",mutex,full,empty,bf)).start();
new
Thread(new
Procer("p005",mutex,full,empty,bf)).start();
try{
sleep(3000);
}
catch(Exception
ex)
{
ex.printStackTrace();
}
new
Thread(new
Customer("c001",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c002",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c003",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c004",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c005",mutex,full,empty,bf)).start();
}
}
--------------------------------------------

熱點內容
sqlserver默認實例 發布:2024-11-01 22:23:42 瀏覽:959
sort排序java 發布:2024-11-01 22:23:26 瀏覽:47
解壓後的apk無法安裝 發布:2024-11-01 22:22:10 瀏覽:665
公司的pop伺服器地址 發布:2024-11-01 22:22:07 瀏覽:118
朵唯m30手機配置是真的嗎如何 發布:2024-11-01 22:16:56 瀏覽:680
夢幻西遊怎麼清理緩存 發布:2024-11-01 22:15:52 瀏覽:344
如何配置fcm 發布:2024-11-01 22:08:15 瀏覽:853
原裝電腦配置哪個好 發布:2024-11-01 22:05:49 瀏覽:728
r910伺服器能上什麼cpu 發布:2024-11-01 22:04:54 瀏覽:531
postgetphp 發布:2024-11-01 22:03:40 瀏覽:787