FileDocCategorySizeDatePackage
SynchronousQueue.javaAPI DocAndroid 1.5 API21002Wed May 06 22:41:02 BST 2009java.util.concurrent

SynchronousQueue

public class SynchronousQueue extends AbstractQueue implements BlockingQueue, Serializable
A {@linkplain BlockingQueue blocking queue} in which each put must wait for a take, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to take it; you cannot add an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. The head of the queue is the element that the first queued thread is trying to add to the queue; if there are no queued threads then no element is being added and the head is null. For purposes of other Collection methods (for example contains), a SynchronousQueue acts as an empty collection. This queue does not permit null elements.

Synchronous queues are similar to rendezvous channels used in CSP and Ada. They are well suited for handoff designs, in which an object running in one thread must sync up with an object running in another thread in order to hand it some information, event, or task.

This class supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness set to true grants threads access in FIFO order. Fairness generally decreases throughput but reduces variability and avoids starvation.

This class implements all of the optional methods of the {@link Collection} and {@link Iterator} interfaces.

since
1.5
author
Doug Lea
param
the type of elements held in this collection

Fields Summary
private static final long
serialVersionUID
private final ReentrantLock
qlock
Lock protecting both wait queues
private final WaitQueue
waitingProducers
Queue holding waiting puts
private final WaitQueue
waitingConsumers
Queue holding waiting takes
Constructors Summary
public SynchronousQueue()
Creates a SynchronousQueue with nonfair access policy.


                
      
        this(false);
    
public SynchronousQueue(boolean fair)
Creates a SynchronousQueue with specified fairness policy.

param
fair if true, threads contend in FIFO order for access; otherwise the order is unspecified.

        if (fair) {
            qlock = new ReentrantLock(true);
            waitingProducers = new FifoWaitQueue();
            waitingConsumers = new FifoWaitQueue();
        }
        else {
            qlock = new ReentrantLock();
            waitingProducers = new LifoWaitQueue();
            waitingConsumers = new LifoWaitQueue();
        }
    
Methods Summary
public voidclear()
Does nothing. A SynchronousQueue has no internal capacity.

public booleancontains(java.lang.Object o)
Always returns false. A SynchronousQueue has no internal capacity.

param
o the element
return
false

        return false;
    
public booleancontainsAll(java.util.Collection c)
Returns false unless given collection is empty. A SynchronousQueue has no internal capacity.

param
c the collection
return
false unless given collection is empty

        return c.isEmpty();
    
public intdrainTo(java.util.Collection c)

        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        int n = 0;
        E e;
        while ( (e = poll()) != null) {
            c.add(e);
            ++n;
        }
        return n;
    
public intdrainTo(java.util.Collection c, int maxElements)

        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        int n = 0;
        E e;
        while (n < maxElements && (e = poll()) != null) {
            c.add(e);
            ++n;
        }
        return n;
    
public booleanisEmpty()
Always returns true. A SynchronousQueue has no internal capacity.

return
true

        return true;
    
public java.util.Iteratoriterator()
Returns an empty iterator in which hasNext always returns false.

return
an empty iterator

        return new EmptyIterator<E>();
    
public booleanoffer(E o, long timeout, java.util.concurrent.TimeUnit unit)
Inserts the specified element into this queue, waiting if necessary up to the specified wait time for another thread to receive it.

param
o the element to add
param
timeout how long to wait before giving up, in units of unit
param
unit a TimeUnit determining how to interpret the timeout parameter
return
true if successful, or false if the specified waiting time elapses before a consumer appears.
throws
InterruptedException if interrupted while waiting.
throws
NullPointerException if the specified element is null.

        if (o == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        final ReentrantLock qlock = this.qlock;
        for (;;) {
            Node node;
            boolean mustWait;
            if (Thread.interrupted()) throw new InterruptedException();
            qlock.lock();
            try {
                node = waitingConsumers.deq();
                if ( (mustWait = (node == null)) )
                    node = waitingProducers.enq(o);
            } finally {
                qlock.unlock();
            }

            if (mustWait) 
                return node.waitForTake(nanos);

            else if (node.setItem(o))
                return true;

            // else consumer cancelled, so retry
        }
    
public booleanoffer(E o)
Inserts the specified element into this queue, if another thread is waiting to receive it.

param
o the element to add.
return
true if it was possible to add the element to this queue, else false
throws
NullPointerException if the specified element is null

        if (o == null) throw new NullPointerException();
        final ReentrantLock qlock = this.qlock;

        for (;;) {
            Node node;
            qlock.lock();
            try {
                node = waitingConsumers.deq();
            } finally {
                qlock.unlock();
            }
            if (node == null)
                return false;

            else if (node.setItem(o))
                return true;
            // else retry
        }
    
public Epeek()
Always returns null. A SynchronousQueue does not return elements unless actively waited on.

return
null

        return null;
    
public Epoll(long timeout, java.util.concurrent.TimeUnit unit)
Retrieves and removes the head of this queue, waiting if necessary up to the specified wait time, for another thread to insert it.

param
timeout how long to wait before giving up, in units of unit
param
unit a TimeUnit determining how to interpret the timeout parameter
return
the head of this queue, or null if the specified waiting time elapses before an element is present.
throws
InterruptedException if interrupted while waiting.

        long nanos = unit.toNanos(timeout);
        final ReentrantLock qlock = this.qlock;

        for (;;) {
            Node node;
            boolean mustWait;

            if (Thread.interrupted()) throw new InterruptedException();
            qlock.lock();
            try {
                node = waitingProducers.deq();
                if ( (mustWait = (node == null)) )
                    node = waitingConsumers.enq(null);
            } finally {
                qlock.unlock();
            }

            if (mustWait) {
                Object x = node.waitForPut(nanos);
                return (E)x;
            }
            else {
                Object x = node.getItem();
                if (x != null)
                    return (E)x;
                // else cancelled, so retry
            }
        }
    
public Epoll()
Retrieves and removes the head of this queue, if another thread is currently making an element available.

return
the head of this queue, or null if no element is available.

        final ReentrantLock qlock = this.qlock;
        for (;;) {
            Node node;
            qlock.lock();
            try {
                node = waitingProducers.deq();
            } finally {
                qlock.unlock();
            }
            if (node == null)
                return null;

            else {
                Object x = node.getItem();
                if (x != null)
                    return (E)x;
                // else retry
            }
        }
    
public voidput(E o)
Adds the specified element to this queue, waiting if necessary for another thread to receive it.

param
o the element to add
throws
InterruptedException if interrupted while waiting.
throws
NullPointerException if the specified element is null.

        if (o == null) throw new NullPointerException();
        final ReentrantLock qlock = this.qlock;

        for (;;) {
            Node node;
            boolean mustWait;
            if (Thread.interrupted()) throw new InterruptedException();
            qlock.lock();
            try {
                node = waitingConsumers.deq();
                if ( (mustWait = (node == null)) )
                    node = waitingProducers.enq(o);
            } finally {
                qlock.unlock();
            }

            if (mustWait) {
                node.waitForTake();
                return;
            }

            else if (node.setItem(o))
                return;

            // else consumer cancelled, so retry
        }
    
public intremainingCapacity()
Always returns zero. A SynchronousQueue has no internal capacity.

return
zero.

        return 0;
    
public booleanremove(java.lang.Object o)
Always returns false. A SynchronousQueue has no internal capacity.

param
o the element to remove
return
false

        return false;
    
public booleanremoveAll(java.util.Collection c)
Always returns false. A SynchronousQueue has no internal capacity.

param
c the collection
return
false

        return false;
    
public booleanretainAll(java.util.Collection c)
Always returns false. A SynchronousQueue has no internal capacity.

param
c the collection
return
false

        return false;
    
public intsize()
Always returns zero. A SynchronousQueue has no internal capacity.

return
zero.

        return 0;
    
public Etake()
Retrieves and removes the head of this queue, waiting if necessary for another thread to insert it.

throws
InterruptedException if interrupted while waiting.
return
the head of this queue

        final ReentrantLock qlock = this.qlock;
        for (;;) {
            Node node;
            boolean mustWait;

            if (Thread.interrupted()) throw new InterruptedException();
            qlock.lock();
            try {
                node = waitingProducers.deq();
                if ( (mustWait = (node == null)) )
                    node = waitingConsumers.enq(null);
            } finally {
                qlock.unlock();
            }

            if (mustWait) {
                Object x = node.waitForPut();
                return (E)x;
            }
            else {
                Object x = node.getItem();
                if (x != null)
                    return (E)x;
                // else cancelled, so retry
            }
        }
    
public java.lang.Object[]toArray()
Returns a zero-length array.

return
a zero-length array

        return new Object[0];
    
public T[]toArray(T[] a)
Sets the zeroeth element of the specified array to null (if the array has non-zero length) and returns it.

param
a the array
return
the specified array

        if (a.length > 0)
            a[0] = null;
        return a;