FileDocCategorySizeDatePackage
LinkedBlockingQueue.javaAPI DocJava SE 6 API26316Tue Jun 10 00:25:56 BST 2008java.util.concurrent

LinkedBlockingQueue

public class LinkedBlockingQueue extends AbstractQueue implements BlockingQueue, Serializable
An optionally-bounded {@linkplain BlockingQueue blocking queue} based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.

The optional capacity bound constructor argument serves as a way to prevent excessive queue expansion. The capacity, if unspecified, is equal to {@link Integer#MAX_VALUE}. Linked nodes are dynamically created upon each insertion unless this would bring the queue above capacity.

This class and its iterator implement all of the optional methods of the {@link Collection} and {@link Iterator} interfaces.

This class is a member of the Java Collections Framework.

since
1.5
author
Doug Lea
param
the type of elements held in this collection

Fields Summary
private static final long
serialVersionUID
private final int
capacity
The capacity bound, or Integer.MAX_VALUE if none
private final AtomicInteger
count
Current number of elements
private transient Node
head
Head of linked list
private transient Node
last
Tail of linked list
private final ReentrantLock
takeLock
Lock held by take, poll, etc
private final Condition
notEmpty
Wait queue for waiting takes
private final ReentrantLock
putLock
Lock held by put, offer, etc
private final Condition
notFull
Wait queue for waiting puts
Constructors Summary
public LinkedBlockingQueue()
Creates a LinkedBlockingQueue with a capacity of {@link Integer#MAX_VALUE}.

        this(Integer.MAX_VALUE);
    
public LinkedBlockingQueue(int capacity)
Creates a LinkedBlockingQueue with the given (fixed) capacity.

param
capacity the capacity of this queue
throws
IllegalArgumentException if capacity is not greater than zero

        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    
public LinkedBlockingQueue(Collection c)
Creates a LinkedBlockingQueue with a capacity of {@link Integer#MAX_VALUE}, initially containing the elements of the given collection, added in traversal order of the collection's iterator.

param
c the collection of elements to initially contain
throws
NullPointerException if the specified collection or any of its elements are null

        this(Integer.MAX_VALUE);
        for (E e : c)
            add(e);
    
Methods Summary
public voidclear()
Atomically removes all of the elements from this queue. The queue will be empty after this call returns.

        fullyLock();
        try {
            head.next = null;
	    assert head.item == null;
	    last = head;
            if (count.getAndSet(0) == capacity)
                notFull.signalAll();
        } finally {
            fullyUnlock();
        }
    
public intdrainTo(java.util.Collection c)

throws
UnsupportedOperationException {@inheritDoc}
throws
ClassCastException {@inheritDoc}
throws
NullPointerException {@inheritDoc}
throws
IllegalArgumentException {@inheritDoc}

        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        Node<E> first;
        fullyLock();
        try {
            first = head.next;
            head.next = null;
	    assert head.item == null;
	    last = head;
            if (count.getAndSet(0) == capacity)
                notFull.signalAll();
        } finally {
            fullyUnlock();
        }
        // Transfer the elements outside of locks
        int n = 0;
        for (Node<E> p = first; p != null; p = p.next) {
            c.add(p.item);
            p.item = null;
            ++n;
        }
        return n;
    
public intdrainTo(java.util.Collection c, int maxElements)

throws
UnsupportedOperationException {@inheritDoc}
throws
ClassCastException {@inheritDoc}
throws
NullPointerException {@inheritDoc}
throws
IllegalArgumentException {@inheritDoc}

        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        fullyLock();
        try {
            int n = 0;
            Node<E> p = head.next;
            while (p != null && n < maxElements) {
                c.add(p.item);
                p.item = null;
                p = p.next;
                ++n;
            }
            if (n != 0) {
                head.next = p;
		assert head.item == null;
		if (p == null)
		    last = head;
                if (count.getAndAdd(-n) == capacity)
                    notFull.signalAll();
            }
            return n;
        } finally {
            fullyUnlock();
        }
    
private Eextract()
Removes a node from head of queue,

return
the node

        Node<E> first = head.next;
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    
private voidfullyLock()
Lock to prevent both puts and takes.

        putLock.lock();
        takeLock.lock();
    
private voidfullyUnlock()
Unlock to allow both puts and takes.

        takeLock.unlock();
        putLock.unlock();
    
private voidinsert(E x)
Creates a node and links it at end of queue.

param
x the item

        last = last.next = new Node<E>(x);
    
public java.util.Iteratoriterator()
Returns an iterator over the elements in this queue in proper sequence. The returned Iterator is a "weakly consistent" iterator that will never throw {@link ConcurrentModificationException}, and guarantees to traverse elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any modifications subsequent to construction.

return
an iterator over the elements in this queue in proper sequence

      return new Itr();
    
public booleanoffer(E e, long timeout, java.util.concurrent.TimeUnit unit)
Inserts the specified element at the tail of this queue, waiting if necessary up to the specified wait time for space to become available.

return
true if successful, or false if the specified waiting time elapses before space is available.
throws
InterruptedException {@inheritDoc}
throws
NullPointerException {@inheritDoc}


        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            for (;;) {
                if (count.get() < capacity) {
                    insert(e);
                    c = count.getAndIncrement();
                    if (c + 1 < capacity)
                        notFull.signal();
                    break;
                }
                if (nanos <= 0)
                    return false;
                try {
                    nanos = notFull.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notFull.signal(); // propagate to a non-interrupted thread
                    throw ie;
                }
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    
public booleanoffer(E e)
Inserts the specified element at the tail of this queue if it is possible to do so immediately without exceeding the queue's capacity, returning true upon success and false if this queue is full. When using a capacity-restricted queue, this method is generally preferable to method {@link BlockingQueue#add add}, which can fail to insert an element only by throwing an exception.

throws
NullPointerException if the specified element is null

        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                insert(e);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    
public Epeek()

        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    
public Epoll(long timeout, java.util.concurrent.TimeUnit unit)

        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            for (;;) {
                if (count.get() > 0) {
                    x = extract();
                    c = count.getAndDecrement();
                    if (c > 1)
                        notEmpty.signal();
                    break;
                }
                if (nanos <= 0)
                    return null;
                try {
                    nanos = notEmpty.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // propagate to a non-interrupted thread
                    throw ie;
                }
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    
public Epoll()

        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = extract();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    
public voidput(E e)
Inserts the specified element at the tail of this queue, waiting if necessary for space to become available.

throws
InterruptedException {@inheritDoc}
throws
NullPointerException {@inheritDoc}

        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset
        // local var holding count  negative to indicate failure unless set.
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from
             * capacity. Similarly for all other uses of count in
             * other wait guards.
             */
            try {
                while (count.get() == capacity)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to a non-interrupted thread
                throw ie;
            }
            insert(e);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    
private voidreadObject(java.io.ObjectInputStream s)
Reconstitute this queue instance from a stream (that is, deserialize it).

param
s the stream

        // Read in capacity, and any hidden stuff
        s.defaultReadObject();

        count.set(0);
        last = head = new Node<E>(null);

        // Read in all elements and place in queue
        for (;;) {
            E item = (E)s.readObject();
            if (item == null)
                break;
            add(item);
        }
    
public intremainingCapacity()
Returns the number of additional elements that this queue can ideally (in the absence of memory or resource constraints) accept without blocking. This is always equal to the initial capacity of this queue less the current size of this queue.

Note that you cannot always tell if an attempt to insert an element will succeed by inspecting remainingCapacity because it may be the case that another thread is about to insert or remove an element.

        return capacity - count.get();
    
public booleanremove(java.lang.Object o)
Removes a single instance of the specified element from this queue, if it is present. More formally, removes an element e such that o.equals(e), if this queue contains one or more such elements. Returns true if this queue contained the specified element (or equivalently, if this queue changed as a result of the call).

param
o element to be removed from this queue, if present
return
true if this queue changed as a result of the call

        if (o == null) return false;
        boolean removed = false;
        fullyLock();
        try {
            Node<E> trail = head;
            Node<E> p = head.next;
            while (p != null) {
                if (o.equals(p.item)) {
                    removed = true;
                    break;
                }
                trail = p;
                p = p.next;
            }
            if (removed) {
                p.item = null;
                trail.next = p.next;
                if (last == p)
                    last = trail;
                if (count.getAndDecrement() == capacity)
                    notFull.signalAll();
            }
        } finally {
            fullyUnlock();
        }
        return removed;
    
private voidsignalNotEmpty()
Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock takeLock.)


                        
       
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    
private voidsignalNotFull()
Signals a waiting put. Called only from take/poll.

        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    
public intsize()
Returns the number of elements in this queue.

return
the number of elements in this queue

        return count.get();
    
public Etake()

        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            try {
                while (count.get() == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to a non-interrupted thread
                throw ie;
            }

            x = extract();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    
public java.lang.Object[]toArray()
Returns an array containing all of the elements in this queue, in proper sequence.

The returned array will be "safe" in that no references to it are maintained by this queue. (In other words, this method must allocate a new array). The caller is thus free to modify the returned array.

This method acts as bridge between array-based and collection-based APIs.

return
an array containing all of the elements in this queue

        fullyLock();
        try {
            int size = count.get();
            Object[] a = new Object[size];
            int k = 0;
            for (Node<E> p = head.next; p != null; p = p.next)
                a[k++] = p.item;
            return a;
        } finally {
            fullyUnlock();
        }
    
public T[]toArray(T[] a)
Returns an array containing all of the elements in this queue, in proper sequence; the runtime type of the returned array is that of the specified array. If the queue fits in the specified array, it is returned therein. Otherwise, a new array is allocated with the runtime type of the specified array and the size of this queue.

If this queue fits in the specified array with room to spare (i.e., the array has more elements than this queue), the element in the array immediately following the end of the queue is set to null.

Like the {@link #toArray()} method, this method acts as bridge between array-based and collection-based APIs. Further, this method allows precise control over the runtime type of the output array, and may, under certain circumstances, be used to save allocation costs.

Suppose x is a queue known to contain only strings. The following code can be used to dump the queue into a newly allocated array of String:

String[] y = x.toArray(new String[0]);
Note that toArray(new Object[0]) is identical in function to toArray().

param
a the array into which the elements of the queue are to be stored, if it is big enough; otherwise, a new array of the same runtime type is allocated for this purpose
return
an array containing all of the elements in this queue
throws
ArrayStoreException if the runtime type of the specified array is not a supertype of the runtime type of every element in this queue
throws
NullPointerException if the specified array is null

        fullyLock();
        try {
            int size = count.get();
            if (a.length < size)
                a = (T[])java.lang.reflect.Array.newInstance
                    (a.getClass().getComponentType(), size);

            int k = 0;
            for (Node p = head.next; p != null; p = p.next)
                a[k++] = (T)p.item;
            if (a.length > k)
                a[k] = null;
            return a;
        } finally {
            fullyUnlock();
        }
    
public java.lang.StringtoString()

        fullyLock();
        try {
            return super.toString();
        } finally {
            fullyUnlock();
        }
    
private voidwriteObject(java.io.ObjectOutputStream s)
Save the state to a stream (that is, serialize it).

serialData
The capacity is emitted (int), followed by all of its elements (each an Object) in the proper order, followed by a null
param
s the stream


        fullyLock();
        try {
            // Write out any hidden stuff, plus capacity
            s.defaultWriteObject();

            // Write out all elements in the proper order.
            for (Node<E> p = head.next; p != null; p = p.next)
                s.writeObject(p.item);

            // Use trailing null as sentinel
            s.writeObject(null);
        } finally {
            fullyUnlock();
        }