FileDocCategorySizeDatePackage
CyclicBarrier.javaAPI DocAndroid 1.5 API15672Wed May 06 22:41:02 BST 2009java.util.concurrent

CyclicBarrier

public class CyclicBarrier extends Object
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

A CyclicBarrier supports an optional {@link Runnable} command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue.

Sample usage: Here is an example of using a barrier in a parallel decomposition design:

class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;

class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);

try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}

public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
barrier = new CyclicBarrier(N,
new Runnable() {
public void run() {
mergeRows(...);
}
});
for (int i = 0; i < N; ++i)
new Thread(new Worker(i)).start();

waitUntilDone();
}
}
Here, each worker thread processes a row of the matrix then waits at the barrier until all rows have been processed. When all rows are processed the supplied {@link Runnable} barrier action is executed and merges the rows. If the merger determines that a solution has been found then done() will return true and each worker will terminate.

If the barrier action does not rely on the parties being suspended when it is executed, then any of the threads in the party could execute that action when it is released. To facilitate this, each invocation of {@link #await} returns the arrival index of that thread at the barrier. You can then choose which thread should execute the barrier action, for example:

 if (barrier.await() == 0) {
// log the completion of this iteration
}

The CyclicBarrier uses a fast-fail all-or-none breakage model for failed synchronization attempts: If a thread leaves a barrier point prematurely because of interruption, failure, or timeout, all other threads, even those that have not yet resumed from a previous {@link #await}. will also leave abnormally via {@link BrokenBarrierException} (or InterruptedException if they too were interrupted at about the same time).

since
1.5
see
CountDownLatch
author
Doug Lea

Fields Summary
private final ReentrantLock
lock
The lock for guarding barrier entry
private final Condition
trip
Condition to wait on until tripped
private final int
parties
The number of parties
private final Runnable
barrierCommand
private long
generation
The generation number. Incremented upon barrier trip. Retracted upon reset.
private boolean
broken
Breakage indicator.
private int
count
Number of parties still waiting. Counts down from parties to 0 on each cycle.
Constructors Summary
public CyclicBarrier(int parties, Runnable barrierAction)
Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it, and which will execute the given barrier action when the barrier is tripped, performed by the last thread entering the barrier.

param
parties the number of threads that must invoke {@link #await} before the barrier is tripped.
param
barrierAction the command to execute when the barrier is tripped, or null if there is no action.
throws
IllegalArgumentException if parties is less than 1.

        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties; 
        this.count = parties;
        this.barrierCommand = barrierAction;
    
public CyclicBarrier(int parties)
Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it, and does not perform a predefined action upon each barrier.

param
parties the number of threads that must invoke {@link #await} before the barrier is tripped.
throws
IllegalArgumentException if parties is less than 1.

        this(parties, null);
    
Methods Summary
public intawait()
Waits until all {@link #getParties parties} have invoked await on this barrier.

If the current thread is not the last to arrive then it is disabled for thread scheduling purposes and lies dormant until one of following things happens:

  • The last thread arrives; or
  • Some other thread {@link Thread#interrupt interrupts} the current thread; or
  • Some other thread {@link Thread#interrupt interrupts} one of the other waiting threads; or
  • Some other thread times out while waiting for barrier; or
  • Some other thread invokes {@link #reset} on this barrier.

If the current thread:

  • has its interrupted status set on entry to this method; or
  • is {@link Thread#interrupt interrupted} while waiting
then {@link InterruptedException} is thrown and the current thread's interrupted status is cleared.

If the barrier is {@link #reset} while any thread is waiting, or if the barrier {@link #isBroken is broken} when await is invoked, or while any thread is waiting, then {@link BrokenBarrierException} is thrown.

If any thread is {@link Thread#interrupt interrupted} while waiting, then all other waiting threads will throw {@link BrokenBarrierException} and the barrier is placed in the broken state.

If the current thread is the last thread to arrive, and a non-null barrier action was supplied in the constructor, then the current thread runs the action before allowing the other threads to continue. If an exception occurs during the barrier action then that exception will be propagated in the current thread and the barrier is placed in the broken state.

return
the arrival index of the current thread, where index {@link #getParties()} - 1 indicates the first to arrive and zero indicates the last to arrive.
throws
InterruptedException if the current thread was interrupted while waiting
throws
BrokenBarrierException if another thread was interrupted while the current thread was waiting, or the barrier was reset, or the barrier was broken when await was called, or the barrier action (if present) failed due an exception.

        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    
public intawait(long timeout, java.util.concurrent.TimeUnit unit)
Waits until all {@link #getParties parties} have invoked await on this barrier.

If the current thread is not the last to arrive then it is disabled for thread scheduling purposes and lies dormant until one of the following things happens:

  • The last thread arrives; or
  • The specified timeout elapses; or
  • Some other thread {@link Thread#interrupt interrupts} the current thread; or
  • Some other thread {@link Thread#interrupt interrupts} one of the other waiting threads; or
  • Some other thread times out while waiting for barrier; or
  • Some other thread invokes {@link #reset} on this barrier.

If the current thread:

  • has its interrupted status set on entry to this method; or
  • is {@link Thread#interrupt interrupted} while waiting
then {@link InterruptedException} is thrown and the current thread's interrupted status is cleared.

If the barrier is {@link #reset} while any thread is waiting, or if the barrier {@link #isBroken is broken} when await is invoked, or while any thread is waiting, then {@link BrokenBarrierException} is thrown.

If any thread is {@link Thread#interrupt interrupted} while waiting, then all other waiting threads will throw {@link BrokenBarrierException} and the barrier is placed in the broken state.

If the current thread is the last thread to arrive, and a non-null barrier action was supplied in the constructor, then the current thread runs the action before allowing the other threads to continue. If an exception occurs during the barrier action then that exception will be propagated in the current thread and the barrier is placed in the broken state.

param
timeout the time to wait for the barrier
param
unit the time unit of the timeout parameter
return
the arrival index of the current thread, where index {@link #getParties()} - 1 indicates the first to arrive and zero indicates the last to arrive.
throws
InterruptedException if the current thread was interrupted while waiting
throws
TimeoutException if the specified timeout elapses.
throws
BrokenBarrierException if another thread was interrupted while the current thread was waiting, or the barrier was reset, or the barrier was broken when await was called, or the barrier action (if present) failed due an exception.

        return dowait(true, unit.toNanos(timeout));
    
private voidbreakBarrier()
Sets barrier as broken and wake up everyone

        broken = true;
        trip.signalAll();
    
private intdowait(boolean timed, long nanos)
Main barrier code, covering the various policies.

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int index = --count;
            long g = generation;

            if (broken) 
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            if (index == 0) {  // tripped
                nextGeneration();
                boolean ranAction = false;
                try {
                    Runnable command = barrierCommand;
                    if (command != null) 
                        command.run();
                    ranAction = true;
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            for (;;) {
                try {
                    if (!timed) 
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    breakBarrier();
                    throw ie;
                }
                
                if (broken || 
                    g > generation) // true if a reset occurred while waiting
                    throw new BrokenBarrierException();

                if (g < generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }

        } finally {
            lock.unlock();
        }
    
public intgetNumberWaiting()
Returns the number of parties currently waiting at the barrier. This method is primarily useful for debugging and assertions.

return
the number of parties currently blocked in {@link #await}

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    
public intgetParties()
Returns the number of parties required to trip this barrier.

return
the number of parties required to trip this barrier.

        return parties;
    
public booleanisBroken()
Queries if this barrier is in a broken state.

return
true if one or more parties broke out of this barrier due to interruption or timeout since construction or the last reset, or a barrier action failed due to an exception; and false otherwise.

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return broken;
        } finally {
            lock.unlock();
        }
    
private voidnextGeneration()
Updates state on barrier trip and wake up everyone.

 

                    
       
        count = parties;
        ++generation;
        trip.signalAll();
    
public voidreset()
Resets the barrier to its initial state. If any parties are currently waiting at the barrier, they will return with a {@link BrokenBarrierException}. Note that resets after a breakage has occurred for other reasons can be complicated to carry out; threads need to re-synchronize in some other way, and choose one to perform the reset. It may be preferable to instead create a new barrier for subsequent use.

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            /*
             * Retract generation number enough to cover threads
             * currently waiting on current and still resuming from
             * previous generation, plus similarly accommodating spans
             * after the reset.
             */
            generation -= 4;
            broken = false;
            trip.signalAll();
        } finally {
            lock.unlock();
        }