1 import java.util.*; 2 import java.io.IOException; 3 import java.util.concurrent.*; 4 public class BlockingQueueDemo1 { 5 6 public static void main(String[] args) { 7 BlockingQueue queue = new LinkedBlockingQueue(); 8 Consumer consumer = new Consumer(queue); 9 Producer producer = new Producer(queue); 10 System.out.println("Press to stop"); 11 Thread consumerThread = new Thread(consumer); 12 consumerThread.start(); 13 Thread producerThread = new Thread(producer); 14 producerThread.start(); 15 try { 16 System.in.read(); 17 } catch (IOException e) { 18 e.printStackTrace(); 19 } 20 producer.stop(); 21 consumer.stop(); 22 try { 23 producerThread.join(); 24 } catch(InterruptedException e) { 25 System.out.println("Interrupted"); 26 e.printStackTrace(); 27 } 28 try { 29 consumerThread.join(); 30 } catch(InterruptedException e) { 31 System.out.println("Interrupted"); 32 e.printStackTrace(); 33 } 34 System.out.println("Exiting..."); 35 } 36 } 37 class Consumer implements Runnable { 38 private final BlockingQueue queue; 39 private boolean isAlive; 40 Consumer(BlockingQueue queue) { 41 this.queue = queue; 42 } 43 public void run() { 44 isAlive = true; 45 while(isAlive) { 46 try { 47 QueueItem item = queue.take(); 48 item.doWork(); 49 } catch (InterruptedException e) { 50 System.out.println("Consumer interrupted"); 51 break; 52 } 53 } 54 System.out.println("Consumer exits"); 55 } 56 synchronized void stop() { 57 isAlive = false; 58 notify(); 59 } 60 } 61 class Producer implements Runnable { 62 private final BlockingQueue queue; 63 private boolean isAlive; 64 Producer(BlockingQueue queue) { 65 this.queue = queue; 66 } 67 public void run() { 68 isAlive = true; 69 Random r = new Random(); 70 while(isAlive) { 71 QueueItem item = new QueueItem(); 72 queue.add(new QueueItem()); 73 System.out.println("Queued work item " + item); 74 try { 75 synchronized(this) { 76 long sleepTime = (r.nextInt(5) + 5) * 1000; 77 System.out.println("Producer sleeping for " + 78 (int)(sleepTime/1000) + " s"); 79 wait((r.nextInt(5) + 5) * 1000); 80 } 81 } catch (InterruptedException e) { 82 System.out.println("Producer interrupted"); 83 break; 84 } 85 } 86 System.out.println("Producer exits"); 87 } 88 synchronized void stop() { 89 isAlive = false; 90 notify(); 91 } 92 } 93 class QueueItem { 94 private static int idCount; 95 private int id; 96 QueueItem() { 97 synchronized(QueueItem.class) { 98 this.id = idCount++; 99 } 100 } 101 public void doWork() throws InterruptedException { 102 System.out.println("Processing job id=" + id); 103 Random r = new Random(); 104 synchronized(this) { 105 wait((r.nextInt(3) + 1) * 1000); 106 } 107 System.out.println("Done processing job id=" + id); 108 } 109 public String toString() { 110 return "QueueItem id=" + id; 111 } 112 }