Exchangerpublic class Exchanger extends Object A synchronization point at which threads can pair and swap elements
within pairs. Each thread presents some object on entry to the
{@link #exchange exchange} method, matches with a partner thread,
and receives its partner's object on return. An Exchanger may be
viewed as a bidirectional form of a {@link SynchronousQueue}.
Exchangers may be useful in applications such as genetic algorithms
and pipeline designs.
Sample Usage:
Here are the highlights of a class that uses an {@code Exchanger}
to swap buffers between threads so that the thread filling the
buffer gets a freshly emptied one when it needs it, handing off the
filled one to the thread emptying the buffer.
{@code
class FillAndEmpty {
Exchanger exchanger = new Exchanger();
DataBuffer initialEmptyBuffer = ... a made-up type
DataBuffer initialFullBuffer = ...
class FillingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer);
if (currentBuffer.isFull())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ... }
}
}
class EmptyingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialFullBuffer;
try {
while (currentBuffer != null) {
takeFromBuffer(currentBuffer);
if (currentBuffer.isEmpty())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ...}
}
}
void start() {
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
}
}
}
Memory consistency effects: For each pair of threads that
successfully exchange objects via an {@code Exchanger}, actions
prior to the {@code exchange()} in each thread
happen-before
those subsequent to a return from the corresponding {@code exchange()}
in the other thread. |
Fields Summary |
---|
private static final int | NCPUThe number of CPUs, for sizing and spin control | private static final int | CAPACITYThe capacity of the arena. Set to a value that provides more
than enough space to handle contention. On small machines
most slots won't be used, but it is still not wasted because
the extra space provides some machine-level address padding
to minimize interference with heavily CAS'ed Slot locations.
And on very large machines, performance eventually becomes
bounded by memory bandwidth, not numbers of threads/CPUs.
This constant cannot be changed without also modifying
indexing and hashing algorithms. | private static final int | FULLThe value of "max" that will hold all threads without
contention. When this value is less than CAPACITY, some
otherwise wasted expansion can be avoided. | private static final int | SPINSThe number of times to spin (doing nothing except polling a
memory location) before blocking or giving up while waiting to
be fulfilled. Should be zero on uniprocessors. On
multiprocessors, this value should be large enough so that two
threads exchanging items as fast as possible block only when
one of them is stalled (due to GC or preemption), but not much
longer, to avoid wasting CPU resources. Seen differently, this
value is a little over half the number of cycles of an average
context switch time on most systems. The value here is
approximately the average of those across a range of tested
systems. | private static final int | TIMED_SPINSThe number of times to spin before blocking in timed waits.
Timed waits spin more slowly because checking the time takes
time. The best value relies mainly on the relative rate of
System.nanoTime vs memory accesses. The value is empirically
derived to work well across a variety of systems. | private static final Object | CANCELSentinel item representing cancellation of a wait due to
interruption, timeout, or elapsed spin-waits. This value is
placed in holes on cancellation, and used as a return value
from waiting methods to indicate failure to set or get hole. | private static final Object | NULL_ITEMValue representing null arguments/returns from public
methods. This disambiguates from internal requirement that
holes start out as null to mean they are not yet set. | private volatile Slot[] | arenaSlot array. Elements are lazily initialized when needed.
Declared volatile to enable double-checked lazy construction. | private final AtomicInteger | maxThe maximum slot index being used. The value sometimes
increases when a thread experiences too many CAS contentions,
and sometimes decreases when a spin-wait elapses. Changes
are performed only via compareAndSet, to avoid stale values
when a thread happens to stall right before setting. |
Constructors Summary |
---|
public Exchanger()Creates a new Exchanger.
|
Methods Summary |
---|
private static java.lang.Object | await(java.util.concurrent.Exchanger$Node node, java.util.concurrent.Exchanger$Slot slot)Waits for (by spinning and/or blocking) and gets the hole
filled in by another thread. Fails if interrupted before
hole filled.
When a node/thread is about to block, it sets its waiter field
and then rechecks state at least one more time before actually
parking, thus covering race vs fulfiller noticing that waiter
is non-null so should be woken.
Thread interruption status is checked only surrounding calls to
park. The caller is assumed to have checked interrupt status
on entry.
Thread w = Thread.currentThread();
int spins = SPINS;
for (;;) {
Object v = node.get();
if (v != null)
return v;
else if (spins > 0) // Spin-wait phase
--spins;
else if (node.waiter == null) // Set up to block next
node.waiter = w;
else if (w.isInterrupted()) // Abort on interrupt
tryCancel(node, slot);
else // Block
LockSupport.park(node);
}
| private java.lang.Object | awaitNanos(java.util.concurrent.Exchanger$Node node, java.util.concurrent.Exchanger$Slot slot, long nanos)Waits for (at index 0) and gets the hole filled in by another
thread. Fails if timed out or interrupted before hole filled.
Same basic logic as untimed version, but a bit messier.
int spins = TIMED_SPINS;
long lastTime = 0;
Thread w = null;
for (;;) {
Object v = node.get();
if (v != null)
return v;
long now = System.nanoTime();
if (w == null)
w = Thread.currentThread();
else
nanos -= now - lastTime;
lastTime = now;
if (nanos > 0) {
if (spins > 0)
--spins;
else if (node.waiter == null)
node.waiter = w;
else if (w.isInterrupted())
tryCancel(node, slot);
else
LockSupport.parkNanos(node, nanos);
}
else if (tryCancel(node, slot) && !w.isInterrupted())
return scanOnTimeout(node);
}
| private void | createSlot(int index)Creates a new slot at given index. Called only when the slot
appears to be null. Relies on double-check using builtin
locks, since they rarely contend. This in turn relies on the
arena array being declared volatile.
// Create slot outside of lock to narrow sync region
Slot newSlot = new Slot();
Slot[] a = arena;
synchronized (a) {
if (a[index] == null)
a[index] = newSlot;
}
| private java.lang.Object | doExchange(java.lang.Object item, boolean timed, long nanos)Main exchange function, handling the different policy variants.
Uses Object, not "V" as argument and return value to simplify
handling of sentinel values. Callers from public methods decode
and cast accordingly.
Node me = new Node(item); // Create in case occupying
int index = hashIndex(); // Index of current slot
int fails = 0; // Number of CAS failures
for (;;) {
Object y; // Contents of current slot
Slot slot = arena[index];
if (slot == null) // Lazily initialize slots
createSlot(index); // Continue loop to reread
else if ((y = slot.get()) != null && // Try to fulfill
slot.compareAndSet(y, null)) {
Node you = (Node)y; // Transfer item
if (you.compareAndSet(null, item)) {
LockSupport.unpark(you.waiter);
return you.item;
} // Else cancelled; continue
}
else if (y == null && // Try to occupy
slot.compareAndSet(null, me)) {
if (index == 0) // Blocking wait for slot 0
return timed? awaitNanos(me, slot, nanos): await(me, slot);
Object v = spinWait(me, slot); // Spin wait for non-0
if (v != CANCEL)
return v;
me = new Node(item); // Throw away cancelled node
int m = max.get();
if (m > (index >>>= 1)) // Decrease index
max.compareAndSet(m, m - 1); // Maybe shrink table
}
else if (++fails > 1) { // Allow 2 fails on 1st slot
int m = max.get();
if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
index = m + 1; // Grow on 3rd failed slot
else if (--index < 0)
index = m; // Circularly traverse
}
}
| public V | exchange(V x)Waits for another thread to arrive at this exchange point (unless
the current thread is {@linkplain Thread#interrupt interrupted}),
and then transfers the given object to it, receiving its object
in return.
If another thread is already waiting at the exchange point then
it is resumed for thread scheduling purposes and receives the object
passed in by the current thread. The current thread returns immediately,
receiving the object passed to the exchange by that other thread.
If no other thread is already waiting at the exchange then the
current thread is disabled for thread scheduling purposes and lies
dormant until one of two things happens:
- Some other thread enters the exchange; or
- Some other thread {@linkplain Thread#interrupt interrupts} the current
thread.
If the current thread:
- has its interrupted status set on entry to this method; or
- is {@linkplain Thread#interrupt interrupted} while waiting
for the exchange,
then {@link InterruptedException} is thrown and the current thread's
interrupted status is cleared.
if (!Thread.interrupted()) {
Object v = doExchange(x == null? NULL_ITEM : x, false, 0);
if (v == NULL_ITEM)
return null;
if (v != CANCEL)
return (V)v;
Thread.interrupted(); // Clear interrupt status on IE throw
}
throw new InterruptedException();
| public V | exchange(V x, long timeout, java.util.concurrent.TimeUnit unit)Waits for another thread to arrive at this exchange point (unless
the current thread is {@linkplain Thread#interrupt interrupted} or
the specified waiting time elapses), and then transfers the given
object to it, receiving its object in return.
If another thread is already waiting at the exchange point then
it is resumed for thread scheduling purposes and receives the object
passed in by the current thread. The current thread returns immediately,
receiving the object passed to the exchange by that other thread.
If no other thread is already waiting at the exchange then the
current thread is disabled for thread scheduling purposes and lies
dormant until one of three things happens:
- Some other thread enters the exchange; or
- Some other thread {@linkplain Thread#interrupt interrupts}
the current thread; or
- The specified waiting time elapses.
If the current thread:
- has its interrupted status set on entry to this method; or
- is {@linkplain Thread#interrupt interrupted} while waiting
for the exchange,
then {@link InterruptedException} is thrown and the current thread's
interrupted status is cleared.
If the specified waiting time elapses then {@link
TimeoutException} is thrown. If the time is less than or equal
to zero, the method will not wait at all.
if (!Thread.interrupted()) {
Object v = doExchange(x == null? NULL_ITEM : x,
true, unit.toNanos(timeout));
if (v == NULL_ITEM)
return null;
if (v != CANCEL)
return (V)v;
if (!Thread.interrupted())
throw new TimeoutException();
}
throw new InterruptedException();
| private final int | hashIndex()Returns a hash index for the current thread. Uses a one-step
FNV-1a hash code (http://www.isthe.com/chongo/tech/comp/fnv/)
based on the current thread's Thread.getId(). These hash codes
have more uniform distribution properties with respect to small
moduli (here 1-31) than do other simple hashing functions.
To return an index between 0 and max, we use a cheap
approximation to a mod operation, that also corrects for bias
due to non-power-of-2 remaindering (see {@link
java.util.Random#nextInt}). Bits of the hashcode are masked
with "nbits", the ceiling power of two of table size (looked up
in a table packed into three ints). If too large, this is
retried after rotating the hash by nbits bits, while forcing new
top bit to 0, which guarantees eventual termination (although
with a non-random-bias). This requires an average of less than
2 tries for all table sizes, and has a maximum 2% difference
from perfectly uniform slot probabilities when applied to all
possible hash codes for sizes less than 32.
long id = Thread.currentThread().getId();
int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193;
int m = max.get();
int nbits = (((0xfffffc00 >> m) & 4) | // Compute ceil(log2(m+1))
((0x000001f8 >>> m) & 2) | // The constants hold
((0xffff00f2 >>> m) & 1)); // a lookup table
int index;
while ((index = hash & ((1 << nbits) - 1)) > m) // May retry on
hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m
return index;
| private java.lang.Object | scanOnTimeout(java.util.concurrent.Exchanger$Node node)Sweeps through arena checking for any waiting threads. Called
only upon return from timeout while waiting in slot 0. When a
thread gives up on a timed wait, it is possible that a
previously-entered thread is still waiting in some other
slot. So we scan to check for any. This is almost always
overkill, but decreases the likelihood of timeouts when there
are other threads present to far less than that in lock-based
exchangers in which earlier-arriving threads may still be
waiting on entry locks.
Object y;
for (int j = arena.length - 1; j >= 0; --j) {
Slot slot = arena[j];
if (slot != null) {
while ((y = slot.get()) != null) {
if (slot.compareAndSet(y, null)) {
Node you = (Node)y;
if (you.compareAndSet(null, node.item)) {
LockSupport.unpark(you.waiter);
return you.item;
}
}
}
}
}
return CANCEL;
| private static java.lang.Object | spinWait(java.util.concurrent.Exchanger$Node node, java.util.concurrent.Exchanger$Slot slot)Spin-waits for hole for a non-0 slot. Fails if spin elapses
before hole filled. Does not check interrupt, relying on check
in public exchange method to abort if interrupted on entry.
int spins = SPINS;
for (;;) {
Object v = node.get();
if (v != null)
return v;
else if (spins > 0)
--spins;
else
tryCancel(node, slot);
}
| private static boolean | tryCancel(java.util.concurrent.Exchanger$Node node, java.util.concurrent.Exchanger$Slot slot)Tries to cancel a wait for the given node waiting in the given
slot, if so, helping clear the node from its slot to avoid
garbage retention.
if (!node.compareAndSet(null, CANCEL))
return false;
if (slot.get() == node) // pre-check to minimize contention
slot.compareAndSet(node, null);
return true;
|
|