Tuesday, 2 March 2010

Java Concurrency - Task Cancellation.

In multi-threaded application, it is often to cancel the task on user request or on some event occurance. In the following example, there are two points in each loop iteration where interruption may be detected: in the blocking put call, and by explicitly polling the interrupted status in the loop header. The explicit test is not strictly necessary here because of the blocking put call, but it makes PrimeProducer more responsive to interruption because it checks for interruption before starting the lengthy task of searching for a prime, rather than after. When calls to interruptible blocking methods are not frequent enough to deliver the desired responsiveness, explicitly testing the interrupted status can help.




import java.math.BigInteger;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestTaskCancellation {

/**
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
TestTaskCancellation tc = new TestTaskCancellation();
tc.demo();

}

private void demo() throws InterruptedException {

BlockingQueue queue = new ArrayBlockingQueue(
100);
PrimeProducer pp = new PrimeProducer(queue);

ExecutorService es = Executors.newFixedThreadPool(1);
es.execute(pp);
Thread.sleep(1000);
es.shutdownNow();
// OR
/*
* If you do not want to user ExecutorService then uncomment the
* following code and comment the above 4 lines
*/
// Thread t1 = new Thread(pp);
// t1.start();
// Thread.sleep(1000);
// t1.interrupt();

}

}

class PrimeProducer extends Thread {
private final BlockingQueue queue;

PrimeProducer(BlockingQueue queue) {
this.queue = queue;
}

@Override
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted()) {
p = p.nextProbablePrime();
System.out.println("Next Prime Number is : "+p.intValue() + "-> Is interrupted "
+ Thread.currentThread().isInterrupted());
System.out.println(queue.remainingCapacity());
if (0 == queue.remainingCapacity()) {
System.out.println("Now Blocked.");
}
queue.put(p);

}
System.out.println("Now Unblocked");
} catch (InterruptedException consumed) {
System.out.println("Interrupted......");
/* Allow thread to exit */
}
}

}




Saturday, 27 February 2010

Java Concurrency - CyclicBarrier

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

Suppose you have matix of N rows, and you want to process summation of each row in different thread and the thread should wait till the processing of summations is complete for all the rows. That’s the ‘Barrier’.

Once processing for all the rows is done, barrier will be opened and program shows the GrandTotal.
There is a optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released.


import java.util.ArrayList;

import java.util.List;

import java.util.Random;

import java.util.concurrent.BrokenBarrierException;

import java.util.concurrent.Callable;

import java.util.concurrent.CyclicBarrier;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;



public class TestCyclicBarrier {



public static void main(String[] args) {

TestCyclicBarrier cb = new TestCyclicBarrier();

cb.demo();

}



private void demo() {



int a1[] = { 1, 2, 3 };

int a2[] = { 4, 5, 6 };

int a3[] = { 7, 8, 9 };

int a4[] = { 10, 11, 12 };

int a5[] = { 13, 14, 15 };



int aa[][] = { a1, a2, a3, a4, a5 };

new Solver(aa);

}

}



class Solver {



int data[][];

int N;

CyclicBarrier barrier = null;



public Solver(int[][] data) {

this.data = data;

N = data.length;

barrier = new CyclicBarrier(N, new WhenLastOneInBarrier());

ExecutorService ex = Executors.newFixedThreadPool(N);

Callable<Integer> task = null;

List<Future<Integer>> resultList = new ArrayList<Future<Integer>>();



for (int i = 0; i < N; i++) {

task = new Worker(data, i, barrier);

Future<Integer> result = ex.submit(task);

resultList.add(result);

}



int grandTotal = 0;

for (Future<Integer> f : resultList) {

try {

grandTotal += f.get();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

}

}

System.out.println("Is Barrier broken : " + barrier.isBroken());

System.out.println("Grand total is : " + grandTotal);

ex.shutdown();

}

}



class Worker implements Callable<Integer> {



CyclicBarrier barrier;

int data[][];

int rowNum;



public Worker(int[][] data, int rowNum, CyclicBarrier barrier) {

this.data = data;

this.barrier = barrier;

this.rowNum = rowNum;

}



@Override

public Integer call() throws Exception {

Random random = new Random();

try {

Thread.sleep((random.nextInt(10) * 1000));

} catch (InterruptedException e1) {

e1.printStackTrace();

}

int d[] = data[rowNum];

int total = 0;

for (int i : d) {

total += i;

}

System.out.println("processed..total for the row number " + rowNum

+ " is : " + total);



try {

System.out.println("Here is barrier, Waiting here for other threads to complete.");

barrier.await();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (BrokenBarrierException e) {

e.printStackTrace();

}

System.out.println("Barrier opened");

return total;

}



}



class WhenLastOneInBarrier implements Runnable {



@Override

public void run() {

System.out.println("Last worker in the barrier...");

}



}


Java Concurrency - Use of CountDownLatch

CountDownLatch allows one or more threads to wait until a set of operations being performed in other threads completes. A CountDownLatch is initialized with a given count.
E.g. suppose you have Birthday party, and then the following activities must happen in a given order.
1. Get the Cake.
2. Put the (N) candles and light them.
3. Blow and Sing a song.

So in such situation, we can utilize CountDownLatch to perform above tasks in order.

To achive this, I have created three Runables

1. GetCake – Initialized with count 1. It acts as ON/OFF switch
2. PutCandles - Initialized with count 10, as we are putting 10 candles on Cake
3. BlowAndSing – Does not require as it is the last task in the sequence.

Once GetCake task is done, it notifies PutCandles . Once PutCandles is done(for 10 candles), it notifies BlowAndSing .
Following is the code sample :



import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class TestCountDownLatch1 {

public static void main(String[] args) {
TestCountDownLatch1 cdl1 = new TestCountDownLatch1();
cdl1.test();
}

private void test() {
CountDownLatch gotTheCake = new CountDownLatch(1);
CountDownLatch candlesReady = new CountDownLatch(10);

ExecutorService es = Executors.newFixedThreadPool(15);
Runnable task = null;

task = new GetCake(gotTheCake);
es.execute(task);

for(int i=1; i<= 10; i++) {
task = new PutCandles(i, gotTheCake, candlesReady, i);
es.execute(task);
}

task = new BlowAndSing(candlesReady);
es.execute(task);

es.shutdown();
}

}

class PutCandles implements Runnable {

CountDownLatch gotTheCake;
CountDownLatch candlesReady;
private int candleNo;
int sleepTime;

public PutCandles(int candleNo, CountDownLatch gotTheCake, CountDownLatch candlesReady, int sleepTime) {
this.candleNo = candleNo;
this.gotTheCake = gotTheCake;
this.candlesReady = candlesReady;
this.sleepTime = sleepTime;
}
@Override
public void run() {
try {
gotTheCake.await();
Thread.sleep(sleepTime * 1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

System.out.println("Putting candle no : "+ candleNo+ " and lighting it.");
candlesReady.countDown();
}
}

class GetCake implements Runnable {

CountDownLatch gotTheCake;

public GetCake(CountDownLatch gotTheCake) {
this.gotTheCake = gotTheCake;
}
@Override
public void run() {
System.out.println("Got the Cake : ");
gotTheCake.countDown();
}
}

class BlowAndSing implements Runnable {

CountDownLatch candlesReady;

public BlowAndSing(CountDownLatch candlesReady) {
this.candlesReady = candlesReady;
}
@Override
public void run() {
try {
candlesReady.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Fuuuuuuuuuuu..Happy B'day Soham...:) ");
}
}



Tuesday, 17 November 2009

@Transactional annotation on interface or concrete class?

The Spring team's recommendation is that you only annotate concrete classes with the @Transactional annotation, as opposed to annotating interfaces. You certainly can place the @Transactional annotation on an interface (or an interface method), but this will only work as you would expect it to if you are using interface-based proxies. The fact that annotations are not inherited means that if you are using class-based proxies (proxy-target-class="true") or the weaving-based aspect (mode="aspectj") then the transaction settings will not be recognised by the proxying/weaving infrastructure and the object will not be wrapped in a transactional proxy (which would be decidedly bad). So please do take the Spring team's advice and only annotate concrete classes (and the methods of concrete classes) with the @Transactional annotation.

Note: In proxy mode (which is the default), only 'external' method calls coming in through the proxy will be intercepted. This means that 'self-invocation', i.e. a method within the target object calling some other method of the target object, won't lead to an actual transaction at runtime even if the invoked method is marked with @Transactional!

Consider the use of AspectJ mode if you expect self-invocations to be wrapped with transactions as well. In this case, there won't be a proxy in the first place; instead, the target class will be 'weaved' (i.e. its byte code will be modified) in order to turn @Transactional into runtime behavior on any kind of method.

Wednesday, 8 July 2009

Writing a JVM shutdown hook

Following is the class which exits on some condition. So before exit, if we need to carry out some task, register a shutdown hook(TestShutdownHook.java) with JVM


public class Test {

public static void main(String args[]){

Test test = new Test();
TestShutdownHook testshutdownHook = new TestShutdownHook(test);
Runtime.getRuntime().addShutdownHook(testshutdownHook);
test.startTest();

}

private void startTest() {

int counter = 0;
while(true){
if(counter == 100){
System.exit(0);
}
System.out.println("Running..");
counter++;
}
}

public void beforeShutdownDothis() {
System.out.println("Doing somethig before shutdown");
}
}

package com.my;

public class TestShutdownHook extends Thread {

private Test test;

public TestShutdownHook(Test test){
this.test = test;
}

public void run(){
test.beforeShutdownDothis();
}
}

Useful tips for Spring beans initialization

Referring to Constants in the bean initilization







Using the static factory for creating NON singleton beans. Even though, parameters are in 'constructor-arg' element, those are treated as normal parameters to method call.



0


test


0



Write the following code in the class which implements BeanFactoryAware , InitializingBean interfaces.

String testFileSystemNames[] = {"/","/usr","/exports"};
Object args[] = new Object[3];
List fileSystemObjects = new ArrayList();
for(int i=0; i< testFileSystemNames.length; i++ ){
args[0]=i+1;
args[1]=testFileSystemNames[i];
args[2]=i;
FileSystem fileSystem = (FileSystem)beanFactory.getBean("fileSystem", args);
fileSystemObjects.add(fileSystem);
}

Monday, 1 June 2009

Quick Tip for using SimpleJdbcTemplate

When using SimpleJdbcTemplte.queryForMap method, remember that the results returned as Map has keys in all capital letters. e.g

Map results = template.queryForMap("select id, balance from Account where id=?", account.getId());

In above case, to retrieve the value of id and balance use results.get("ID") and results.get("BALANCE") instead of results.get("id") and results.get("balance");

:)