Saturday, 27 February 2010

Java Concurrency - CyclicBarrier

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

Suppose you have matix of N rows, and you want to process summation of each row in different thread and the thread should wait till the processing of summations is complete for all the rows. That’s the ‘Barrier’.

Once processing for all the rows is done, barrier will be opened and program shows the GrandTotal.
There is a optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released.


import java.util.ArrayList;

import java.util.List;

import java.util.Random;

import java.util.concurrent.BrokenBarrierException;

import java.util.concurrent.Callable;

import java.util.concurrent.CyclicBarrier;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;



public class TestCyclicBarrier {



public static void main(String[] args) {

TestCyclicBarrier cb = new TestCyclicBarrier();

cb.demo();

}



private void demo() {



int a1[] = { 1, 2, 3 };

int a2[] = { 4, 5, 6 };

int a3[] = { 7, 8, 9 };

int a4[] = { 10, 11, 12 };

int a5[] = { 13, 14, 15 };



int aa[][] = { a1, a2, a3, a4, a5 };

new Solver(aa);

}

}



class Solver {



int data[][];

int N;

CyclicBarrier barrier = null;



public Solver(int[][] data) {

this.data = data;

N = data.length;

barrier = new CyclicBarrier(N, new WhenLastOneInBarrier());

ExecutorService ex = Executors.newFixedThreadPool(N);

Callable<Integer> task = null;

List<Future<Integer>> resultList = new ArrayList<Future<Integer>>();



for (int i = 0; i < N; i++) {

task = new Worker(data, i, barrier);

Future<Integer> result = ex.submit(task);

resultList.add(result);

}



int grandTotal = 0;

for (Future<Integer> f : resultList) {

try {

grandTotal += f.get();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

}

}

System.out.println("Is Barrier broken : " + barrier.isBroken());

System.out.println("Grand total is : " + grandTotal);

ex.shutdown();

}

}



class Worker implements Callable<Integer> {



CyclicBarrier barrier;

int data[][];

int rowNum;



public Worker(int[][] data, int rowNum, CyclicBarrier barrier) {

this.data = data;

this.barrier = barrier;

this.rowNum = rowNum;

}



@Override

public Integer call() throws Exception {

Random random = new Random();

try {

Thread.sleep((random.nextInt(10) * 1000));

} catch (InterruptedException e1) {

e1.printStackTrace();

}

int d[] = data[rowNum];

int total = 0;

for (int i : d) {

total += i;

}

System.out.println("processed..total for the row number " + rowNum

+ " is : " + total);



try {

System.out.println("Here is barrier, Waiting here for other threads to complete.");

barrier.await();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (BrokenBarrierException e) {

e.printStackTrace();

}

System.out.println("Barrier opened");

return total;

}



}



class WhenLastOneInBarrier implements Runnable {



@Override

public void run() {

System.out.println("Last worker in the barrier...");

}



}


No comments: