Saturday, 27 February 2010

Java Concurrency - CyclicBarrier

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

Suppose you have matix of N rows, and you want to process summation of each row in different thread and the thread should wait till the processing of summations is complete for all the rows. That’s the ‘Barrier’.

Once processing for all the rows is done, barrier will be opened and program shows the GrandTotal.
There is a optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released.


import java.util.ArrayList;

import java.util.List;

import java.util.Random;

import java.util.concurrent.BrokenBarrierException;

import java.util.concurrent.Callable;

import java.util.concurrent.CyclicBarrier;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;



public class TestCyclicBarrier {



public static void main(String[] args) {

TestCyclicBarrier cb = new TestCyclicBarrier();

cb.demo();

}



private void demo() {



int a1[] = { 1, 2, 3 };

int a2[] = { 4, 5, 6 };

int a3[] = { 7, 8, 9 };

int a4[] = { 10, 11, 12 };

int a5[] = { 13, 14, 15 };



int aa[][] = { a1, a2, a3, a4, a5 };

new Solver(aa);

}

}



class Solver {



int data[][];

int N;

CyclicBarrier barrier = null;



public Solver(int[][] data) {

this.data = data;

N = data.length;

barrier = new CyclicBarrier(N, new WhenLastOneInBarrier());

ExecutorService ex = Executors.newFixedThreadPool(N);

Callable<Integer> task = null;

List<Future<Integer>> resultList = new ArrayList<Future<Integer>>();



for (int i = 0; i < N; i++) {

task = new Worker(data, i, barrier);

Future<Integer> result = ex.submit(task);

resultList.add(result);

}



int grandTotal = 0;

for (Future<Integer> f : resultList) {

try {

grandTotal += f.get();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

}

}

System.out.println("Is Barrier broken : " + barrier.isBroken());

System.out.println("Grand total is : " + grandTotal);

ex.shutdown();

}

}



class Worker implements Callable<Integer> {



CyclicBarrier barrier;

int data[][];

int rowNum;



public Worker(int[][] data, int rowNum, CyclicBarrier barrier) {

this.data = data;

this.barrier = barrier;

this.rowNum = rowNum;

}



@Override

public Integer call() throws Exception {

Random random = new Random();

try {

Thread.sleep((random.nextInt(10) * 1000));

} catch (InterruptedException e1) {

e1.printStackTrace();

}

int d[] = data[rowNum];

int total = 0;

for (int i : d) {

total += i;

}

System.out.println("processed..total for the row number " + rowNum

+ " is : " + total);



try {

System.out.println("Here is barrier, Waiting here for other threads to complete.");

barrier.await();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (BrokenBarrierException e) {

e.printStackTrace();

}

System.out.println("Barrier opened");

return total;

}



}



class WhenLastOneInBarrier implements Runnable {



@Override

public void run() {

System.out.println("Last worker in the barrier...");

}



}


Java Concurrency - Use of CountDownLatch

CountDownLatch allows one or more threads to wait until a set of operations being performed in other threads completes. A CountDownLatch is initialized with a given count.
E.g. suppose you have Birthday party, and then the following activities must happen in a given order.
1. Get the Cake.
2. Put the (N) candles and light them.
3. Blow and Sing a song.

So in such situation, we can utilize CountDownLatch to perform above tasks in order.

To achive this, I have created three Runables

1. GetCake – Initialized with count 1. It acts as ON/OFF switch
2. PutCandles - Initialized with count 10, as we are putting 10 candles on Cake
3. BlowAndSing – Does not require as it is the last task in the sequence.

Once GetCake task is done, it notifies PutCandles . Once PutCandles is done(for 10 candles), it notifies BlowAndSing .
Following is the code sample :



import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class TestCountDownLatch1 {

public static void main(String[] args) {
TestCountDownLatch1 cdl1 = new TestCountDownLatch1();
cdl1.test();
}

private void test() {
CountDownLatch gotTheCake = new CountDownLatch(1);
CountDownLatch candlesReady = new CountDownLatch(10);

ExecutorService es = Executors.newFixedThreadPool(15);
Runnable task = null;

task = new GetCake(gotTheCake);
es.execute(task);

for(int i=1; i<= 10; i++) {
task = new PutCandles(i, gotTheCake, candlesReady, i);
es.execute(task);
}

task = new BlowAndSing(candlesReady);
es.execute(task);

es.shutdown();
}

}

class PutCandles implements Runnable {

CountDownLatch gotTheCake;
CountDownLatch candlesReady;
private int candleNo;
int sleepTime;

public PutCandles(int candleNo, CountDownLatch gotTheCake, CountDownLatch candlesReady, int sleepTime) {
this.candleNo = candleNo;
this.gotTheCake = gotTheCake;
this.candlesReady = candlesReady;
this.sleepTime = sleepTime;
}
@Override
public void run() {
try {
gotTheCake.await();
Thread.sleep(sleepTime * 1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

System.out.println("Putting candle no : "+ candleNo+ " and lighting it.");
candlesReady.countDown();
}
}

class GetCake implements Runnable {

CountDownLatch gotTheCake;

public GetCake(CountDownLatch gotTheCake) {
this.gotTheCake = gotTheCake;
}
@Override
public void run() {
System.out.println("Got the Cake : ");
gotTheCake.countDown();
}
}

class BlowAndSing implements Runnable {

CountDownLatch candlesReady;

public BlowAndSing(CountDownLatch candlesReady) {
this.candlesReady = candlesReady;
}
@Override
public void run() {
try {
candlesReady.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Fuuuuuuuuuuu..Happy B'day Soham...:) ");
}
}