Tuesday 2 March 2010

Java Concurrency - Task Cancellation.

In multi-threaded application, it is often to cancel the task on user request or on some event occurance. In the following example, there are two points in each loop iteration where interruption may be detected: in the blocking put call, and by explicitly polling the interrupted status in the loop header. The explicit test is not strictly necessary here because of the blocking put call, but it makes PrimeProducer more responsive to interruption because it checks for interruption before starting the lengthy task of searching for a prime, rather than after. When calls to interruptible blocking methods are not frequent enough to deliver the desired responsiveness, explicitly testing the interrupted status can help.




import java.math.BigInteger;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestTaskCancellation {

/**
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
TestTaskCancellation tc = new TestTaskCancellation();
tc.demo();

}

private void demo() throws InterruptedException {

BlockingQueue queue = new ArrayBlockingQueue(
100);
PrimeProducer pp = new PrimeProducer(queue);

ExecutorService es = Executors.newFixedThreadPool(1);
es.execute(pp);
Thread.sleep(1000);
es.shutdownNow();
// OR
/*
* If you do not want to user ExecutorService then uncomment the
* following code and comment the above 4 lines
*/
// Thread t1 = new Thread(pp);
// t1.start();
// Thread.sleep(1000);
// t1.interrupt();

}

}

class PrimeProducer extends Thread {
private final BlockingQueue queue;

PrimeProducer(BlockingQueue queue) {
this.queue = queue;
}

@Override
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted()) {
p = p.nextProbablePrime();
System.out.println("Next Prime Number is : "+p.intValue() + "-> Is interrupted "
+ Thread.currentThread().isInterrupted());
System.out.println(queue.remainingCapacity());
if (0 == queue.remainingCapacity()) {
System.out.println("Now Blocked.");
}
queue.put(p);

}
System.out.println("Now Unblocked");
} catch (InterruptedException consumed) {
System.out.println("Interrupted......");
/* Allow thread to exit */
}
}

}