FileDocCategorySizeDatePackage
Exchanger.javaAPI DocJava SE 5 API8665Fri Aug 26 14:57:26 BST 2005java.util.concurrent

Exchanger

public class Exchanger extends Object
A synchronization point at which two threads can exchange objects. Each thread presents some object on entry to the {@link #exchange exchange} method, and receives the object presented by the other thread on return.

Sample Usage: Here are the highlights of a class that uses an Exchanger to swap buffers between threads so that the thread filling the buffer gets a freshly emptied one when it needs it, handing off the filled one to the thread emptying the buffer.

class FillAndEmpty {
Exchanger<DataBuffer> exchanger = new Exchanger();
DataBuffer initialEmptyBuffer = ... a made-up type
DataBuffer initialFullBuffer = ...

class FillingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer);
if (currentBuffer.full())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ... }
}
}

class EmptyingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialFullBuffer;
try {
while (currentBuffer != null) {
takeFromBuffer(currentBuffer);
if (currentBuffer.empty())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ...}
}
}

void start() {
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
}
}
since
1.5
author
Doug Lea
param
The type of objects that may be exchanged

Fields Summary
private final ReentrantLock
lock
private final Condition
taken
private V
item
Holder for the item being exchanged
private int
arrivalCount
Arrival count transitions from 0 to 1 to 2 then back to 0 during an exchange.
Constructors Summary
public Exchanger()
Create a new Exchanger.

    
Methods Summary
private VdoExchange(V x, boolean timed, long nanos)
Main exchange function, handling the different policy variants.


                 
               
        lock.lock();
        try {
            V other;

            // If arrival count already at two, we must wait for
            // a previous pair to finish and reset the count;
            while (arrivalCount == 2) {
                if (!timed)
                    taken.await();
                else if (nanos > 0) 
                    nanos = taken.awaitNanos(nanos);
                else 
                    throw new TimeoutException();
            }

            int count = ++arrivalCount;

            // If item is already waiting, replace it and signal other thread
            if (count == 2) { 
                other = item;
                item = x;
                taken.signal();
                return other;
            }

            // Otherwise, set item and wait for another thread to
            // replace it and signal us.

            item = x;
            InterruptedException interrupted = null;
            try { 
                while (arrivalCount != 2) {
                    if (!timed)
                        taken.await();
                    else if (nanos > 0) 
                        nanos = taken.awaitNanos(nanos);
                    else 
                        break; // timed out
                }
            } catch (InterruptedException ie) {
                interrupted = ie;
            }

            // Get and reset item and count after the wait.
            // (We need to do this even if wait was aborted.)
            other = item;
            item = null;
            count = arrivalCount;
            arrivalCount = 0; 
            taken.signal();
            
            // If the other thread replaced item, then we must
            // continue even if cancelled.
            if (count == 2) {
                if (interrupted != null)
                    Thread.currentThread().interrupt();
                return other;
            }

            // If no one is waiting for us, we can back out
            if (interrupted != null) 
                throw interrupted;
            else  // must be timeout
                throw new TimeoutException();
        } finally {
            lock.unlock();
        }
    
public Vexchange(V x)
Waits for another thread to arrive at this exchange point (unless it is {@link Thread#interrupt interrupted}), and then transfers the given object to it, receiving its object in return.

If another thread is already waiting at the exchange point then it is resumed for thread scheduling purposes and receives the object passed in by the current thread. The current thread returns immediately, receiving the object passed to the exchange by that other thread.

If no other thread is already waiting at the exchange then the current thread is disabled for thread scheduling purposes and lies dormant until one of two things happens:

  • Some other thread enters the exchange; or
  • Some other thread {@link Thread#interrupt interrupts} the current thread.

If the current thread:

  • has its interrupted status set on entry to this method; or
  • is {@link Thread#interrupt interrupted} while waiting for the exchange,
then {@link InterruptedException} is thrown and the current thread's interrupted status is cleared.

param
x the object to exchange
return
the object provided by the other thread.
throws
InterruptedException if current thread was interrupted while waiting

        try {
            return doExchange(x, false, 0);
        } catch (TimeoutException cannotHappen) { 
            throw new Error(cannotHappen);
        }
    
public Vexchange(V x, long timeout, java.util.concurrent.TimeUnit unit)
Waits for another thread to arrive at this exchange point (unless it is {@link Thread#interrupt interrupted}, or the specified waiting time elapses), and then transfers the given object to it, receiving its object in return.

If another thread is already waiting at the exchange point then it is resumed for thread scheduling purposes and receives the object passed in by the current thread. The current thread returns immediately, receiving the object passed to the exchange by that other thread.

If no other thread is already waiting at the exchange then the current thread is disabled for thread scheduling purposes and lies dormant until one of three things happens:

  • Some other thread enters the exchange; or
  • Some other thread {@link Thread#interrupt interrupts} the current thread; or
  • The specified waiting time elapses.

If the current thread:

  • has its interrupted status set on entry to this method; or
  • is {@link Thread#interrupt interrupted} while waiting for the exchange,
then {@link InterruptedException} is thrown and the current thread's interrupted status is cleared.

If the specified waiting time elapses then {@link TimeoutException} is thrown. If the time is less than or equal to zero, the method will not wait at all.

param
x the object to exchange
param
timeout the maximum time to wait
param
unit the time unit of the timeout argument.
return
the object provided by the other thread.
throws
InterruptedException if current thread was interrupted while waiting
throws
TimeoutException if the specified waiting time elapses before another thread enters the exchange.

        return doExchange(x, true, unit.toNanos(timeout));