CyclicBarrierpublic class CyclicBarrier extends Object A synchronization aid that allows a set of threads to all wait for
each other to reach a common barrier point. CyclicBarriers are
useful in programs involving a fixed sized party of threads that
must occasionally wait for each other. The barrier is called
cyclic because it can be re-used after the waiting threads
are released.
A CyclicBarrier supports an optional {@link Runnable} command
that is run once per barrier point, after the last thread in the party
arrives, but before any threads are released.
This barrier action is useful
for updating shared-state before any of the parties continue.
Sample usage: Here is an example of
using a barrier in a parallel decomposition design:
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
barrier = new CyclicBarrier(N,
new Runnable() {
public void run() {
mergeRows(...);
}
});
for (int i = 0; i < N; ++i)
new Thread(new Worker(i)).start();
waitUntilDone();
}
}
Here, each worker thread processes a row of the matrix then waits at the
barrier until all rows have been processed. When all rows are processed
the supplied {@link Runnable} barrier action is executed and merges the
rows. If the merger
determines that a solution has been found then done() will return
true and each worker will terminate.
If the barrier action does not rely on the parties being suspended when
it is executed, then any of the threads in the party could execute that
action when it is released. To facilitate this, each invocation of
{@link #await} returns the arrival index of that thread at the barrier.
You can then choose which thread should execute the barrier action, for
example:
if (barrier.await() == 0) {
// log the completion of this iteration
}
The CyclicBarrier uses an all-or-none breakage model
for failed synchronization attempts: If a thread leaves a barrier
point prematurely because of interruption, failure, or timeout, all
other threads waiting at that barrier point will also leave
abnormally via {@link BrokenBarrierException} (or
{@link InterruptedException} if they too were interrupted at about
the same time).
Memory consistency effects: Actions in a thread prior to calling
{@code await()}
happen-before
actions that are part of the barrier action, which in turn
happen-before actions following a successful return from the
corresponding {@code await()} in other threads. |
Fields Summary |
---|
private final ReentrantLock | lockThe lock for guarding barrier entry | private final Condition | tripCondition to wait on until tripped | private final int | partiesThe number of parties | private final Runnable | barrierCommand | private Generation | generationThe current generation | private int | countNumber of parties still waiting. Counts down from parties to 0
on each generation. It is reset to parties on each new
generation or when broken. |
Constructors Summary |
---|
public CyclicBarrier(int parties, Runnable barrierAction)Creates a new CyclicBarrier that will trip when the
given number of parties (threads) are waiting upon it, and which
will execute the given barrier action when the barrier is tripped,
performed by the last thread entering the barrier.
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
| public CyclicBarrier(int parties)Creates a new CyclicBarrier that will trip when the
given number of parties (threads) are waiting upon it, and
does not perform a predefined action when the barrier is tripped.
this(parties, null);
|
Methods Summary |
---|
public int | await()Waits until all {@linkplain #getParties parties} have invoked
await on this barrier.
If the current thread is not the last to arrive then it is
disabled for thread scheduling purposes and lies dormant until
one of the following things happens:
- The last thread arrives; or
- Some other thread {@linkplain Thread#interrupt interrupts}
the current thread; or
- Some other thread {@linkplain Thread#interrupt interrupts}
one of the other waiting threads; or
- Some other thread times out while waiting for barrier; or
- Some other thread invokes {@link #reset} on this barrier.
If the current thread:
- has its interrupted status set on entry to this method; or
- is {@linkplain Thread#interrupt interrupted} while waiting
then {@link InterruptedException} is thrown and the current thread's
interrupted status is cleared.
If the barrier is {@link #reset} while any thread is waiting,
or if the barrier {@linkplain #isBroken is broken} when
await is invoked, or while any thread is waiting, then
{@link BrokenBarrierException} is thrown.
If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
then all other waiting threads will throw
{@link BrokenBarrierException} and the barrier is placed in the broken
state.
If the current thread is the last thread to arrive, and a
non-null barrier action was supplied in the constructor, then the
current thread runs the action before allowing the other threads to
continue.
If an exception occurs during the barrier action then that exception
will be propagated in the current thread and the barrier is placed in
the broken state.
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
| public int | await(long timeout, java.util.concurrent.TimeUnit unit)Waits until all {@linkplain #getParties parties} have invoked
await on this barrier, or the specified waiting time elapses.
If the current thread is not the last to arrive then it is
disabled for thread scheduling purposes and lies dormant until
one of the following things happens:
- The last thread arrives; or
- The specified timeout elapses; or
- Some other thread {@linkplain Thread#interrupt interrupts}
the current thread; or
- Some other thread {@linkplain Thread#interrupt interrupts}
one of the other waiting threads; or
- Some other thread times out while waiting for barrier; or
- Some other thread invokes {@link #reset} on this barrier.
If the current thread:
- has its interrupted status set on entry to this method; or
- is {@linkplain Thread#interrupt interrupted} while waiting
then {@link InterruptedException} is thrown and the current thread's
interrupted status is cleared.
If the specified waiting time elapses then {@link TimeoutException}
is thrown. If the time is less than or equal to zero, the
method will not wait at all.
If the barrier is {@link #reset} while any thread is waiting,
or if the barrier {@linkplain #isBroken is broken} when
await is invoked, or while any thread is waiting, then
{@link BrokenBarrierException} is thrown.
If any thread is {@linkplain Thread#interrupt interrupted} while
waiting, then all other waiting threads will throw {@link
BrokenBarrierException} and the barrier is placed in the broken
state.
If the current thread is the last thread to arrive, and a
non-null barrier action was supplied in the constructor, then the
current thread runs the action before allowing the other threads to
continue.
If an exception occurs during the barrier action then that exception
will be propagated in the current thread and the barrier is placed in
the broken state.
return dowait(true, unit.toNanos(timeout));
| private void | breakBarrier()Sets current barrier generation as broken and wakes up everyone.
Called only while holding lock.
generation.broken = true;
count = parties;
trip.signalAll();
| private int | dowait(boolean timed, long nanos)Main barrier code, covering the various policies.
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
| public int | getNumberWaiting()Returns the number of parties currently waiting at the barrier.
This method is primarily useful for debugging and assertions.
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
| public int | getParties()Returns the number of parties required to trip this barrier.
return parties;
| public boolean | isBroken()Queries if this barrier is in a broken state.
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
| private void | nextGeneration()Updates state on barrier trip and wakes up everyone.
Called only while holding lock.
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
| public void | reset()Resets the barrier to its initial state. If any parties are
currently waiting at the barrier, they will return with a
{@link BrokenBarrierException}. Note that resets after
a breakage has occurred for other reasons can be complicated to
carry out; threads need to re-synchronize in some other way,
and choose one to perform the reset. It may be preferable to
instead create a new barrier for subsequent use.
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
|
|