Friday, September 9, 2016

JAVA MULTITHREADING: Producer-Consumer Pattern

So, imagine that we have a buffer.

The buffer can hold 10 items at once & is thread-safe in nature.

Now, a producer() method constantly populates it, another method called consumer() pulls one value out of it, every 1 second in average.

The code below is pretty interesting as this will cause both the threads to run and halt, synchronously, depending upon the state of 'queue' object.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class App{
 //This is like a buffer which can hold 10 objects; here 10 integers
 //This is a thread-safe data-structure
 private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
 
 //Main method where 2 threads are run
 //1st method runs producer() method
 //2nd method runs consumer() method
 public static void main(String[] args) throws InterruptedException {
  
  Thread t1 = new Thread(new Runnable(){//<--To run producer() method
   @Override
   public void run() {
    try {
     producer();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   } 
  });
  
  Thread t2 = new Thread(new Runnable(){//<--To run consumer() method

   @Override
   public void run() {
    try {
     consumer();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  });
  
  t1.start();
  t2.start();
  
  t1.join();
  t2.join();//<--Actually, only one join() is enough here.
  
  System.out.println("Program ended!");
  
  
 }
 
 private static void producer() throws InterruptedException{
  Random random = new Random();
  while(true){//<--This run infinitely, to keep the queue full at all times
   queue.put(random.nextInt(100));//<--Generates integers within 100
  }
 }
 
 private static void consumer() throws InterruptedException{
  Random random = new Random();
  
  while(true){
   Thread.sleep(100);//<--Guarantees thread sleeping for 0.1s
   
   if(random.nextInt(10) == 0){//<--Generates a zero in 1 second statistically
    Integer value = queue.take();
    
    System.out.println("Taken value: " + value + "; Queue size: " + queue.size());
   }
   
  }
 }
}

No comments:

Post a Comment