0
Sponsored Links


Ad by Google
In our previous tutorial, we have learned What is SynchronousQueue and How to use Synchronous Queue here. In this tutorial, I will try to explain you What is DelayQueue? and How to use DelayQueue with solving producer and consumer problem using java.util.concurrent.DelayQueue.

What is DelayQueue?

The java.util.concurrent.DelayQueue is an implementation class of BlockingQueue, so before reading the DelayQueue, I strongly suggest you to read about BlockingQueue from any source, we have also posted tutorial on BlockingQueue, so you can follow that tutorial too BlockingQueue Tutorial.
The DelayQueue is an un-bounded BlockingQueue for delayed elements unlike ArrayBlockingQueue. You can put element into the queue, but element can be taken from the queue only if the delay has expired for the element. It works on First-In-First-Out data structure. It also does not allow null element.
If no delay expired for the elements and trying to accessing through poll() method, it will return null.

Expiration of elements can be calculated based on the return value of getDelay(TimeUnit unit) method, If getDelay() method return zero or any negative value than that element is taken from the queue because it's delay has expired.
To add element into the DelayQueue, you must have to implement Delayed interface and provide the implementation of public long getDelay(TimeUnit unit) and to maintain the order you also have to provide the implementation of public int compareTo(Delayed o)  method.

Let's see an example, DelayBasket object we are going to add into the DelayQueue, so we are implementing the Delayed interface in  DelayBasket and providing the implementation of getDelay(TimeUnit unit) and compareTo(Delayed o) methods.
DelayBasket.java

package com.javamakeuse.poc;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @author subodh.ray
 *
 */
public class DelayBasket implements Delayed {

 private String basket;
 private Long expiryTime;

 public DelayBasket(String basket, long delay) {
  this.basket = basket;
  expiryTime = System.currentTimeMillis()+delay;
 }
 public String getBasket() {
  return basket;
 }
 public void setBasket(String basket) {
  this.basket = basket;
 }
 public Long getExpiryTime() {
  return expiryTime;
 }
 public void setExpiryTime(Long expiryTime) {
  this.expiryTime = expiryTime;
 }
 @Override
 public int compareTo(Delayed o) {
  if (null == o) {
   return -1;
  } else {
   return this.expiryTime.compareTo(((DelayBasket) o).getExpiryTime());
  }
 }
 @Override
 public long getDelay(TimeUnit unit) {
  long diffInMs = expiryTime-System.currentTimeMillis();
  return unit.convert(diffInMs, TimeUnit.SECONDS);
 }
 @Override
 public String toString() {
  return "DelayBasket [basket = " + basket + "]";
 }

}
The compareTo method will helps, if multiple element has been expired into the DelayQueue, than the oldest expired element will be taken first from the queue.

Producer class Chef.java
package com.javamakeuse.poc;

import java.util.concurrent.BlockingQueue;

/**
 * Chef class to produce pizza
 * 
 * @author subodh.ray
 *
 */
public class Chef implements Runnable {

 private final BlockingQueue<DelayBasket> bQueue;
 private final DelayBasket delayBasket;

 public Chef(BlockingQueue<DelayBasket> bQueue, DelayBasket delayBasket) {
  this.bQueue = bQueue;
  this.delayBasket = delayBasket;
 }
 @Override
 public void run() {
  bQueue.add(delayBasket);
 }

}
Now Consumer class Consumer.java
package com.javamakeuse.poc;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable{
  private BlockingQueue<DelayBasket> drop;

     public Consumer(BlockingQueue<DelayBasket> drop) {
         this.drop = drop;
     }
     public void run() {
      try {
    System.out.println(drop.take());
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
     }

}

ProducerConsumer Test class - Restaurant.java
package com.javamakeuse.poc;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;

/**
 * Restaurant class where pizza produce and consumed
 * 
 * @author subodh.ray
 *
 */
public class Restaurant {

 public static void main(String[] args){

  BlockingQueue<DelayBasket> bQueue = new DelayQueue<DelayBasket>();

  // producing in queue using thread-1
  (new Thread(new Chef(bQueue, new DelayBasket("basket1",10)))).start();
  
  // Consuming from the queue
  (new Thread(new Consumer(bQueue))).start();
  
  // producing in queue using thread-2
  (new Thread(new Chef(bQueue, new DelayBasket("basket2",5)))).start();
  (new Thread(new Consumer(bQueue))).start();
 }
 
 
}
We are providing delayed of 10 and 5 milliseconds between each thread while adding element into the queue. That means consumer thread will not get the element via take() method unless the element delayed has expired. So consumer will wait for 10 and 5 milliseconds because elements will get expired after the specified time period.

OUT PUT
DelayBasket [basket = basket1]
DelayBasket [basket = basket2]

That's it.

Sponsored Links

0 comments:

Post a Comment