FileDocCategorySizeDatePackage
LinkedBlockingQueue.javaAPI DocAndroid 1.5 API22682Wed May 06 22:41:02 BST 2009java.util.concurrent

LinkedBlockingQueue

public class LinkedBlockingQueue extends AbstractQueue implements BlockingQueue, Serializable
An optionally-bounded {@linkplain BlockingQueue blocking queue} based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.

The optional capacity bound constructor argument serves as a way to prevent excessive queue expansion. The capacity, if unspecified, is equal to {@link Integer#MAX_VALUE}. Linked nodes are dynamically created upon each insertion unless this would bring the queue above capacity.

This class implements all of the optional methods of the {@link Collection} and {@link Iterator} interfaces.

since
1.5
author
Doug Lea
param
the type of elements held in this collection

Fields Summary
private static final long
serialVersionUID
private final int
capacity
The capacity bound, or Integer.MAX_VALUE if none
private final AtomicInteger
count
Current number of elements
private transient Node
head
Head of linked list
private transient Node
last
Tail of linked list
private final ReentrantLock
takeLock
Lock held by take, poll, etc
private final Condition
notEmpty
Wait queue for waiting takes
private final ReentrantLock
putLock
Lock held by put, offer, etc
private final Condition
notFull
Wait queue for waiting puts
Constructors Summary
public LinkedBlockingQueue()
Creates a LinkedBlockingQueue with a capacity of {@link Integer#MAX_VALUE}.

        this(Integer.MAX_VALUE);
    
public LinkedBlockingQueue(int capacity)
Creates a LinkedBlockingQueue with the given (fixed) capacity.

param
capacity the capacity of this queue.
throws
IllegalArgumentException if capacity is not greater than zero.

        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    
public LinkedBlockingQueue(Collection c)
Creates a LinkedBlockingQueue with a capacity of {@link Integer#MAX_VALUE}, initially containing the elements of the given collection, added in traversal order of the collection's iterator.

param
c the collection of elements to initially contain
throws
NullPointerException if c or any element within it is null

        this(Integer.MAX_VALUE);
        for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
            add(it.next());
    
Methods Summary
public voidclear()

        fullyLock();
        try {
            head.next = null;
            if (count.getAndSet(0) == capacity)
                notFull.signalAll();
        } finally {
            fullyUnlock();
        }
    
public intdrainTo(java.util.Collection c)

        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        Node first;
        fullyLock();
        try {
            first = head.next;
            head.next = null;
            if (count.getAndSet(0) == capacity)
                notFull.signalAll();
        } finally {
            fullyUnlock();
        }
        // Transfer the elements outside of locks
        int n = 0;
        for (Node<E> p = first; p != null; p = p.next) {
            c.add(p.item);
            p.item = null;
            ++n;
        }
        return n;
    
public intdrainTo(java.util.Collection c, int maxElements)

        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        fullyLock();
        try {
            int n = 0;
            Node<E> p = head.next;
            while (p != null && n < maxElements) {
                c.add(p.item);
                p.item = null;
                p = p.next;
                ++n;
            }
            if (n != 0) {
                head.next = p;
                if (count.getAndAdd(-n) == capacity)
                    notFull.signalAll();
            }
            return n;
        } finally {
            fullyUnlock();
        }
    
private Eextract()
Remove a node from head of queue,

return
the node

        Node<E> first = head.next;
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    
private voidfullyLock()
Lock to prevent both puts and takes.

        putLock.lock();
        takeLock.lock();
    
private voidfullyUnlock()
Unlock to allow both puts and takes.

        takeLock.unlock();
        putLock.unlock();
    
private voidinsert(E x)
Create a node and link it at end of queue

param
x the item

        last = last.next = new Node<E>(x);
    
public java.util.Iteratoriterator()
Returns an iterator over the elements in this queue in proper sequence. The returned Iterator is a "weakly consistent" iterator that will never throw {@link java.util.ConcurrentModificationException}, and guarantees to traverse elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any modifications subsequent to construction.

return
an iterator over the elements in this queue in proper sequence.

      return new Itr();
    
public booleanoffer(E o, long timeout, java.util.concurrent.TimeUnit unit)
Inserts the specified element at the tail of this queue, waiting if necessary up to the specified wait time for space to become available.

param
o the element to add
param
timeout how long to wait before giving up, in units of unit
param
unit a TimeUnit determining how to interpret the timeout parameter
return
true if successful, or false if the specified waiting time elapses before space is available.
throws
InterruptedException if interrupted while waiting.
throws
NullPointerException if the specified element is null.


        if (o == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            for (;;) {
                if (count.get() < capacity) {
                    insert(o);
                    c = count.getAndIncrement();
                    if (c + 1 < capacity)
                        notFull.signal();
                    break;
                }
                if (nanos <= 0)
                    return false;
                try {
                    nanos = notFull.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notFull.signal(); // propagate to a non-interrupted thread
                    throw ie;
                }
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    
public booleanoffer(E o)
Inserts the specified element at the tail of this queue if possible, returning immediately if this queue is full.

param
o the element to add.
return
true if it was possible to add the element to this queue, else false
throws
NullPointerException if the specified element is null

        if (o == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                insert(o);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    
public Epeek()

        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    
public Epoll(long timeout, java.util.concurrent.TimeUnit unit)

        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            for (;;) {
                if (count.get() > 0) {
                    x = extract();
                    c = count.getAndDecrement();
                    if (c > 1)
                        notEmpty.signal();
                    break;
                }
                if (nanos <= 0)
                    return null;
                try {
                    nanos = notEmpty.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // propagate to a non-interrupted thread
                    throw ie;
                }
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    
public Epoll()

        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = extract();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    
public voidput(E o)
Adds the specified element to the tail of this queue, waiting if necessary for space to become available.

param
o the element to add
throws
InterruptedException if interrupted while waiting.
throws
NullPointerException if the specified element is null.

        if (o == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset
        // local var holding count  negative to indicate failure unless set.
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from
             * capacity. Similarly for all other uses of count in
             * other wait guards.
             */
            try {
                while (count.get() == capacity)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to a non-interrupted thread
                throw ie;
            }
            insert(o);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    
private voidreadObject(java.io.ObjectInputStream s)
Reconstitute this queue instance from a stream (that is, deserialize it).

param
s the stream

        // Read in capacity, and any hidden stuff
        s.defaultReadObject();

        count.set(0);
        last = head = new Node<E>(null);

        // Read in all elements and place in queue
        for (;;) {
            E item = (E)s.readObject();
            if (item == null)
                break;
            add(item);
        }
    
public intremainingCapacity()
Returns the number of elements that this queue can ideally (in the absence of memory or resource constraints) accept without blocking. This is always equal to the initial capacity of this queue less the current size of this queue.

Note that you cannot always tell if an attempt to add an element will succeed by inspecting remainingCapacity because it may be the case that a waiting consumer is ready to take an element out of an otherwise full queue.

return
the remaining capacity

        return capacity - count.get();
    
public booleanremove(java.lang.Object o)

        if (o == null) return false;
        boolean removed = false;
        fullyLock();
        try {
            Node<E> trail = head;
            Node<E> p = head.next;
            while (p != null) {
                if (o.equals(p.item)) {
                    removed = true;
                    break;
                }
                trail = p;
                p = p.next;
            }
            if (removed) {
                p.item = null;
                trail.next = p.next;
                if (count.getAndDecrement() == capacity)
                    notFull.signalAll();
            }
        } finally {
            fullyUnlock();
        }
        return removed;
    
private voidsignalNotEmpty()
Signal a waiting take. Called only from put/offer (which do not otherwise ordinarily lock takeLock.)


                        
       
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    
private voidsignalNotFull()
Signal a waiting put. Called only from take/poll.

        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    
public intsize()
Returns the number of elements in this queue.

return
the number of elements in this queue.

        return count.get();
    
public Etake()

        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            try {
                while (count.get() == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to a non-interrupted thread
                throw ie;
            }

            x = extract();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    
public java.lang.Object[]toArray()

        fullyLock();
        try {
            int size = count.get();
            Object[] a = new Object[size];
            int k = 0;
            for (Node<E> p = head.next; p != null; p = p.next)
                a[k++] = p.item;
            return a;
        } finally {
            fullyUnlock();
        }
    
public T[]toArray(T[] a)

        fullyLock();
        try {
            int size = count.get();
            if (a.length < size)
                a = (T[])java.lang.reflect.Array.newInstance
                    (a.getClass().getComponentType(), size);

            int k = 0;
            for (Node p = head.next; p != null; p = p.next)
                a[k++] = (T)p.item;
            return a;
        } finally {
            fullyUnlock();
        }
    
public java.lang.StringtoString()

        fullyLock();
        try {
            return super.toString();
        } finally {
            fullyUnlock();
        }
    
private voidwriteObject(java.io.ObjectOutputStream s)
Save the state to a stream (that is, serialize it).

serialData
The capacity is emitted (int), followed by all of its elements (each an Object) in the proper order, followed by a null
param
s the stream


        fullyLock();
        try {
            // Write out any hidden stuff, plus capacity
            s.defaultWriteObject();

            // Write out all elements in the proper order.
            for (Node<E> p = head.next; p != null; p = p.next)
                s.writeObject(p.item);

            // Use trailing null as sentinel
            s.writeObject(null);
        } finally {
            fullyUnlock();
        }