1 import java.util.*; 2 import java.io.IOException; 3 import java.util.concurrent.*; 4 public class BlockingQueueDemo2 { 5 6 public static void main(String[] args) { 7 BlockingQueue queue = new LinkedBlockingQueue(); 8 Consumer consumer = new Consumer(queue); 9 Producer producer = new Producer(queue); 10 System.out.println("Press to stop"); 11 Thread consumerThread = new Thread(consumer); 12 consumerThread.start(); 13 Thread producerThread = new Thread(producer); 14 producerThread.start(); 15 try { 16 System.in.read(); 17 } catch (IOException e) { 18 e.printStackTrace(); 19 } 20 producer.stop(); 21 consumer.stop(); 22 queue.add(new PoisonQueueItem()); 23 try { 24 producerThread.join(); 25 } catch(InterruptedException e) { 26 System.out.println("Interrupted"); 27 e.printStackTrace(); 28 } 29 try { 30 consumerThread.join(); 31 } catch(InterruptedException e) { 32 System.out.println("Interrupted"); 33 e.printStackTrace(); 34 } 35 System.out.println("Exiting..."); 36 } 37 } 38 class Consumer implements Runnable { 39 private final BlockingQueue queue; 40 private boolean isAlive; 41 Consumer(BlockingQueue queue) { 42 this.queue = queue; 43 } 44 public void run() { 45 isAlive = true; 46 while(isAlive) { 47 try { 48 QueueItem item = queue.take(); 49 if (item instanceof PoisonQueueItem) { 50 System.out.println("Got poison item"); 51 break; 52 } 53 item.doWork(); 54 } catch (InterruptedException e) { 55 System.out.println("Consumer interrupted"); 56 break; 57 } 58 } 59 System.out.println("Consumer exits"); 60 } 61 synchronized void stop() { 62 isAlive = false; 63 notify(); 64 } 65 } 66 class Producer implements Runnable { 67 private final BlockingQueue queue; 68 private boolean isAlive; 69 Producer(BlockingQueue queue) { 70 this.queue = queue; 71 } 72 public void run() { 73 isAlive = true; 74 Random r = new Random(); 75 while(isAlive) { 76 QueueItem item = new QueueItem(); 77 queue.add(new QueueItem()); 78 System.out.println("Queued work item " + item); 79 try { 80 synchronized(this) { 81 long sleepTime = (r.nextInt(5) + 5) * 1000; 82 System.out.println("Producer sleeping for " + 83 (int)(sleepTime/1000) + " s"); 84 wait((r.nextInt(5) + 5) * 1000); 85 } 86 } catch (InterruptedException e) { 87 System.out.println("Producer interrupted"); 88 break; 89 } 90 } 91 System.out.println("Producer exits"); 92 } 93 synchronized void stop() { 94 isAlive = false; 95 notify(); 96 } 97 } 98 class QueueItem { 99 private static int idCount; 100 private int id; 101 QueueItem() { 102 synchronized(QueueItem.class) { 103 this.id = idCount++; 104 } 105 } 106 public void doWork() throws InterruptedException { 107 System.out.println("Processing job id=" + id); 108 Random r = new Random(); 109 synchronized(this) { 110 wait((r.nextInt(3) + 1) * 1000); 111 } 112 System.out.println("Done processing job id=" + id); 113 } 114 public String toString() { 115 return "QueueItem id=" + id; 116 } 117 } 118 class PoisonQueueItem extends QueueItem { 119 }