April 17, 2017

#Collections: Part 2-All about BlockingQueue!


java.util.concurrent.BlockingQueue interface is added in Java 1.5 along with various other concurrent Utility classes like ConcurrentHashMap, Counting Semaphore, CopyOnWriteArrrayList.

  • A BlockingQueue in Java is a type of queue that supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element. 
  • The insertion of elements takes place when the queue is not completely filled otherwise the thread performing the operation will get blocked. It will wait for till the space is available in the queue to accommodate the new element. And when an element has to be dequeued from the queue, the operation gets blocked if the queue is empty. It will wait till an element is inserted into the queue.
  • In simple words, the take() method of BlockingQueue will block if Queue is empty and put() method of BlockingQueue will block if Queue is full. This will enhance flow control by activating blocking, in case a thread is trying to dequeue an empty queue or enqueue a full queue. In either case, this interface comes in handy. 
  • Because of this property BlockingQueue is an ideal choice for implementing Producer-Consumer design pattern, where one thread insert element into BlockingQueue and other thread consumes it.
  • BlockingQueue in Java doesn't allow null elements, implementation of BlockingQueue like ArrayBlockingQueue, LinkedBlockingQueue throws NullPointerException when you try to add null on queue.
Types Of Blocking Queue

Unbounded Queue: An unbounded Queue is one which is initialized without capacity, actually by default it initialized with Integer.MAX_VALUE. In the case of an unbounded blocking queue, the queue will never block because it could grow to a very large size. when you add elements its size grows.

Syntax:
BlockingQueue blockingQueue = new LinkedBlockingDeque();

Bounded Queue: A bounded queue is created by passing the capacity of the queue in constructor and call to put() will be blocked if BlockingQueue is full.

Syntax:
BlockingQueue blockingQueue = new LinkedBlockingDeque(5);

e.g:

/*Bounded BlockingQueue */
BlockingQueue < String > namebQueue = new ArrayBlockingQueue < String > (3);
namebQueue.put("Catherine Bell");
System.out.println("Catherine Bell is added in the list");
namebQueue.put("James Denton");
System.out.println("James Denton is added in the list");
namebQueue.put("Bailee Madison");
System.out.println("Bailee Madison is added in the list");
namebQueue.put("Rhys Matthew Bond"); //list is full, so this will not be added
System.out.println("Rhys Matthew Bond is added in the list");

Output will be:
Catherine Bell is added in the list
James Denton is added in the list
Bailee Madison is added in the list

Hierarchy of BlockingQueue


Basic Operations Of BlockingQueue

BlockingQueue implementations like ArrayBlockingQueue, LinkedBlockingQueue and PriorityBlockingQueue are thread-safe. All queuing methods use concurrency control and internal locks to perform operation atomically.

The blocking queue interface has a number of methods, some of which are described here:
  • BlockingQueue interface extends Collection, Queue and Iterable interface which provides all Collection and Queue related methods like poll(), and peak().
  • take() method will help in retrieving and removing the head of the queue by waiting for an element to become available, in case the queue is empty.
  • Unlike take(), the peek() method returns head of the queue without removing it, 'E poll(long timeout, TimeUnit unit)' also retrieves and removes elements from head but can wait till specified time if Queue is empty.
  • Since BlockingQueue also extends Collection, bulk Collection operations like addAll(), containsAll() are not performed atomically until any BlockingQueue implementation specifically supports it. So the call to addAll() may fail after inserting a couple of elements.
  • contains(value) method is used to check for the specific elements present in the array blocking queue, if the element is present then it returns true else it returns false.
  • remainingCapacity() method of BlockingQueue, returns the remaining space in BlockingQueue, which can be filled without blocking.
  • offer(E e) method of BlockingQueue insert object into queue if possible and return true if success and false if fail, unlike add(E e) method which throws IllegalStateException if it fails to insert object into BlockingQueue. Use offer() over add() wherever possible.
  • offer(E e, long timeout, TimeUnit unit) will insert the specified element into the queue. In case the queue is full, it will wait up to the specified wait time for the space to become available.
  • int drainTo(Collection < ? super E > c) method will remove all available elements from the queue and add them to the specified collection. i.e it move the contents of the first blocking queue to other blocking queue, making the first array blocking queue empty.
  • Where as int drainTo(Collection < ? super E > c, int maxElements) method will remove at the given number of available elements from the queue and add them into the specified collection.
  • remove() method deletes the elements from the queue from the front i.e. the element which is first inserted into the queue is removed first. It is of void type.
  • boolean remove(Object o) method will remove a single instance of the specified element from the queue only if it is present.
How can we implement a Blocking Queue in Java?

To implement a blocking queue we need to make sure:
  • It is always thread-safe.
  • It can hold arbitrary data
  • Producer has to wait if the queue is already full
  • Consumer has to be wait if no item is present in the queue.
Why and when do we need to use BlockingQueue?

BlockingQueue in Java is considered as the thread-safe collection because it can be helpful in multi-threading operations.

Let's assume, one thread is inserting elements to the queue and another thread is removing elements. If the first thread is slow, then the blocking queue can make the second thread wait until the first thread completes its operation. 

Because of it's behavior we can use BlockingQueue to solve producer consumer problem.

How can we Implement producer consumer using BlockingQueue?

The Producer-Consumer problem is one of the classic problems in the multithreading world. Because of inter-thread communication, it's a bit tricky to implement it. There are many ways to solve this either by using the wait() and notify() method, or by using Semaphore. The simplest way to solve this problem is by using BlockingQueue.

For this we need to create two threads that will simulate producer and consumer. And we will use a shared BlockingQueue, instead of the shared object.

If we use BlockingQueue we don't need to put any thread synchronization code, because put() will block if the queue has reached its capacity and take() method will block if the queue is empty. 

GIT URL: Producer-Consumer using BQueue
public class ProducerConsumerBQ {
public static void main(String[] args){
BlockingQueue<Item> queue=
new ArrayBlockingQueue<>(10);
//producer
final Runnable producer=()->{
while(true)
try {
//thread will be blocked when
//queue is full
queue.put(createItem());
} catch (InterruptedException e) {
e.printStackTrace();
}
};
//creating two producers
new Thread(producer).start();
new Thread(producer).start();
//consumer
final Runnable consumer=()->{
while(true)
try {
//thread will be blocked when
//queue is empty
queue.take();
//do something
} catch (InterruptedException e) {
e.printStackTrace();
}
};
// creating two consumers
new Thread(consumer).start();
new Thread(consumer).start();
}
public static Item createItem() {
return new Item();
}
}
class Item{}


Enqueue methods of BlockingQueue, which can used to add elements
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* enqueue methods of BlockingQueue, which can used to add elements
*/
public class EnqueueMethodsBQ {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
try {
/**
* Put method inserts the specified element into the queue if space is
* available, or blocks if the queue is currently full. It waits indefinitely
* until space becomes available if the queue is full.
*/
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
/**
* add will throw IllegalStateException if no space is available in
* BlockingQueue.
*/
queue.add(2);
} catch (IllegalStateException e) {
e.printStackTrace();
}
/**
* offer inserts the specified element into the queue if space is available,
* returning true if successful. If the queue is full, it returns false
* immediately without blocking.
*/
boolean wasEnqueued = queue.offer(3);
/**
* Inserts the specified element into the queue if space is available within the
* specified waiting time. Returns true if successful, or false if the specified
* waiting time elapses before space is available.
*/
try {
// Waits up to 10 seconds to add element
boolean added = queue.offer(1, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

The dequeue methods of Blocking Queue which are used to retrieve and remove elements.
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* The dequeue methods of Blocking Queue which are used to retrieve and remove
* elements.
*/
public class DequeueMethodsBQ {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
/**
* Take method retrieves and removes the head of the queue, waiting if necessary
* until an element becomes available. This method blocks the thread if the
* queue is empty, waiting indefinitely until an element is available to be
* retrieved.
*/
try {
Integer element = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
/**
* poll method retrieves and removes the head of the queue, returning null if
* the queue is empty. Unlike take(), poll() does not block the thread
* indefinitely if the queue is empty. It returns null immediately if no element
* is available.
*/
Integer element = queue.poll();
/**
* This version of poll method retrieves and removes the head of the queue,
* waiting up to the specified wait time if necessary for an element to become
* available. Returns null if the specified waiting time elapses before an
* element is available.
*/
try {
Integer element1 = queue.poll(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
// removes if the object is present.
boolean wasRemoved = queue.remove(1);
}
}
Keep learning and growing!

-K Himaanshu Shuklaa..

No comments:

Post a Comment