首页 / 操作系统 / Linux / Java生产消费模式实例代码[已测试]
/////////////////////////////////////////////////////////// Test.java 测试代码 public class Test1 { /**
* @param args
*/
public static void main(String[] args) { //如有多出生产此处q必须为单例模式,如最后那个源文件!
QueueThread q = new QueueThread();
// 启动10条消费线程
for (int i = 0; i < 5; i++) {
Consumer c = new Consumer(q);
c.start();
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// for (int i = 0; i < 50; i++) { Producer p = new Producer(q);
p.setStr("生产线程产生ggggggggg ");
p.start();
// }
// 非生产线程产生数据
for (int i = 0; i < 50; i++) { q.put(" 非生产线程产生数据 " + i);
} }}/////////////////////////////////////////// QueueThread.java 线程池import java.util.LinkedList;public class QueueThread { final static int MaxLength = 10;// 等待队列大小 private static LinkedList thread_pool = null; public QueueThread() {
synchronized (this) {
if (thread_pool == null) {
System.out.println("初始化");
thread_pool = new LinkedList();
}
}
} /**
* 判断是否已满
*
* @return
*/
public boolean isFull() { return thread_pool.size() >= MaxLength ? true : false; } /**
* 判断是否为空
*
* @return
*/
public boolean isEmpty() { return thread_pool.isEmpty(); } /**
* 生产模式:插入数据到队列
*
* @param obj
*/
public synchronized void put(Object obj) {
while (isFull()) {
try {
System.out.println("满了等待。。。。。。");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 如果没满,插入
// 插入到队尾
System.out.println("没满。。。。。");
thread_pool.addLast(obj);
notify();// notifyAll();?
} /**
* 消费模式:从队列里面取出一个对象
*
* @return
*/
public synchronized Object get() {
// 如果是空的则等待
while (isEmpty()) {
System.out.println("没数据等待。。。。。");
try {
wait();
} catch (InterruptedException e) { e.printStackTrace();
}
}
System.out.println("有数据,干活。。。。。");
notify();
return thread_pool.removeFirst(); }}///////////////////////////////////////// Consumer.java/**
* 消费线程
*
* @author Mygia.mai
*
*/
public class Consumer extends Thread {
private QueueThread q;
public Consumer(QueueThread q){
this.q=q;
} public void run() {
while (true) {
System.out.println(getName() + " 开始执行消费!");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(getName() + q.get());
System.out.println(getName() + " 消费结束!");
}
}
}////////////////////////////////// Producer.java 生产者线程 public class Producer extends Thread { QueueThread q; private String str; Producer(QueueThread q) {
this.q = q;
} public String getStr() {
return str;
} public void setStr(String str) {
this.str = str;
} public void run() {
for (int i = 0; i < 50; i++)
q.put(str + i);
}}////////////////////////////////////////// InitQueue.java 初始化消费线程池public class InitQueue { private static QueueThread queue = null; public InitQueue() {
synchronized (this) {
if (queue == null) {
System.out.println("初始化消费线程队列!启动5条");
queue = new QueueThread();
// 启动5条消费线程
for (int i = 0; i < 5; i++) {
Consumer c = new Consumer(queue);
c.start();
}
}
}
} /**
* 获取
* @return
*/
public QueueThread getQueue() {
return queue;
}}