FileDocCategorySizeDatePackage
ArrayBlockingQueue.javaAPI DocAndroid 1.5 API21249Wed May 06 22:41:02 BST 2009java.util.concurrent

ArrayBlockingQueue

public class ArrayBlockingQueue extends AbstractQueue implements BlockingQueue, Serializable
A bounded {@linkplain BlockingQueue blocking queue} backed by an array. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.

This is a classic "bounded buffer", in which a fixed-sized array holds elements inserted by producers and extracted by consumers. Once created, the capacity cannot be increased. Attempts to offer an element to a full queue will result in the offer operation blocking; attempts to retrieve an element from an empty queue will similarly block.

This class supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness set to true grants threads access in FIFO order. Fairness generally decreases throughput but reduces variability and avoids starvation.

This class implements all of the optional methods of the {@link Collection} and {@link Iterator} interfaces.

since
1.5
author
Doug Lea
param
the type of elements held in this collection

Fields Summary
private static final long
serialVersionUID
Serialization ID. This class relies on default serialization even for the items array, which is default-serialized, even if it is empty. Otherwise it could not be declared final, which is necessary here.
private final E[]
items
The queued items
private transient int
takeIndex
items index for next take, poll or remove
private transient int
putIndex
items index for next put, offer, or add.
private int
count
Number of items in the queue
private final ReentrantLock
lock
Main lock guarding all access
private final Condition
notEmpty
Condition for waiting takes
private final Condition
notFull
Condition for waiting puts
Constructors Summary
public ArrayBlockingQueue(int capacity)
Creates an ArrayBlockingQueue with the given (fixed) capacity and default access policy.

param
capacity the capacity of this queue
throws
IllegalArgumentException if capacity is less than 1

        this(capacity, false);
    
public ArrayBlockingQueue(int capacity, boolean fair)
Creates an ArrayBlockingQueue with the given (fixed) capacity and the specified access policy.

param
capacity the capacity of this queue
param
fair if true then queue accesses for threads blocked on insertion or removal, are processed in FIFO order; if false the access order is unspecified.
throws
IllegalArgumentException if capacity is less than 1

        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = (E[]) new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    
public ArrayBlockingQueue(int capacity, boolean fair, Collection c)
Creates an ArrayBlockingQueue with the given (fixed) capacity, the specified access policy and initially containing the elements of the given collection, added in traversal order of the collection's iterator.

param
capacity the capacity of this queue
param
fair if true then queue accesses for threads blocked on insertion or removal, are processed in FIFO order; if false the access order is unspecified.
param
c the collection of elements to initially contain
throws
IllegalArgumentException if capacity is less than c.size(), or less than 1.
throws
NullPointerException if c or any element within it is null

        this(capacity, fair);
        if (capacity < c.size())
            throw new IllegalArgumentException();

        for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
            add(it.next());
    
Methods Summary
public voidclear()

        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = takeIndex;
            int k = count;
            while (k-- > 0) {
                items[i] = null;
                i = inc(i);
            }
            count = 0;
            putIndex = 0;
            takeIndex = 0;
            notFull.signalAll();
        } finally {
            lock.unlock();
        }
    
public booleancontains(java.lang.Object o)

        if (o == null) return false;
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = takeIndex;
            int k = 0;
            while (k++ < count) {
                if (o.equals(items[i]))
                    return true;
                i = inc(i);
            }
            return false;
        } finally {
            lock.unlock();
        }
    
public intdrainTo(java.util.Collection c)

        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = takeIndex;
            int n = 0;
            int max = count;
            while (n < max) {
                c.add(items[i]);
                items[i] = null;
                i = inc(i);
                ++n;
            }
            if (n > 0) {
                count = 0;
                putIndex = 0;
                takeIndex = 0;
                notFull.signalAll();
            }
            return n;
        } finally {
            lock.unlock();
        }
    
public intdrainTo(java.util.Collection c, int maxElements)

        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = takeIndex;
            int n = 0;
            int sz = count;
            int max = (maxElements < count)? maxElements : count;
            while (n < max) {
                c.add(items[i]);
                items[i] = null;
                i = inc(i);
                ++n;
            }
            if (n > 0) {
                count -= n;
                takeIndex = i;
                notFull.signalAll();
            }
            return n;
        } finally {
            lock.unlock();
        }
    
private Eextract()
Extract element at current take position, advance, and signal. Call only when holding lock.

        final E[] items = this.items;
        E x = items[takeIndex];
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();
        return x;
    
final intinc(int i)
Circularly increment i.


    // Internal helper methods

            
        
        return (++i == items.length)? 0 : i;
    
private voidinsert(E x)
Insert element at current put position, advance, and signal. Call only when holding lock.

        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    
public java.util.Iteratoriterator()
Returns an iterator over the elements in this queue in proper sequence. The returned Iterator is a "weakly consistent" iterator that will never throw {@link java.util.ConcurrentModificationException}, and guarantees to traverse elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any modifications subsequent to construction.

return
an iterator over the elements in this queue in proper sequence.

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return new Itr();
        } finally {
            lock.unlock();
        }
    
public booleanoffer(E o)
Inserts the specified element at the tail of this queue if possible, returning immediately if this queue is full.

param
o the element to add.
return
true if it was possible to add the element to this queue, else false
throws
NullPointerException if the specified element is null

        if (o == null) throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                insert(o);
                return true;
            }
        } finally {
            lock.unlock();
        }
    
public booleanoffer(E o, long timeout, java.util.concurrent.TimeUnit unit)
Inserts the specified element at the tail of this queue, waiting if necessary up to the specified wait time for space to become available.

param
o the element to add
param
timeout how long to wait before giving up, in units of unit
param
unit a TimeUnit determining how to interpret the timeout parameter
return
true if successful, or false if the specified waiting time elapses before space is available.
throws
InterruptedException if interrupted while waiting.
throws
NullPointerException if the specified element is null.


        if (o == null) throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            long nanos = unit.toNanos(timeout);
            for (;;) {
                if (count != items.length) {
                    insert(o);
                    return true;
                }
                if (nanos <= 0)
                    return false;
                try {
                    nanos = notFull.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notFull.signal(); // propagate to non-interrupted thread
                    throw ie;
                }
            }
        } finally {
            lock.unlock();
        }
    
public Epeek()

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : items[takeIndex];
        } finally {
            lock.unlock();
        }
    
public Epoll()

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == 0)
                return null;
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    
public Epoll(long timeout, java.util.concurrent.TimeUnit unit)

        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            long nanos = unit.toNanos(timeout);
            for (;;) {
                if (count != 0) {
                    E x = extract();
                    return x;
                }
                if (nanos <= 0)
                    return null;
                try {
                    nanos = notEmpty.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // propagate to non-interrupted thread
                    throw ie;
                }

            }
        } finally {
            lock.unlock();
        }
    
public voidput(E o)
Adds the specified element to the tail of this queue, waiting if necessary for space to become available.

param
o the element to add
throws
InterruptedException if interrupted while waiting.
throws
NullPointerException if the specified element is null.

        if (o == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            insert(o);
        } finally {
            lock.unlock();
        }
    
public intremainingCapacity()
Returns the number of elements that this queue can ideally (in the absence of memory or resource constraints) accept without blocking. This is always equal to the initial capacity of this queue less the current size of this queue.

Note that you cannot always tell if an attempt to add an element will succeed by inspecting remainingCapacity because it may be the case that a waiting consumer is ready to take an element out of an otherwise full queue.

return
the remaining capacity

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return items.length - count;
        } finally {
            lock.unlock();
        }
    
public booleanremove(java.lang.Object o)

        if (o == null) return false;
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = takeIndex;
            int k = 0;
            for (;;) {
                if (k++ >= count)
                    return false;
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                i = inc(i);
            }

        } finally {
            lock.unlock();
        }
    
voidremoveAt(int i)
Utility for remove and iterator.remove: Delete item at position i. Call only when holding lock.

        final E[] items = this.items;
        // if removing front item, just advance
        if (i == takeIndex) {
            items[takeIndex] = null;
            takeIndex = inc(takeIndex);
        } else {
            // slide over all others up through putIndex.
            for (;;) {
                int nexti = inc(i);
                if (nexti != putIndex) {
                    items[i] = items[nexti];
                    i = nexti;
                } else {
                    items[i] = null;
                    putIndex = i;
                    break;
                }
            }
        }
        --count;
        notFull.signal();
    
public intsize()
Returns the number of elements in this queue.

return
the number of elements in this queue.

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    
public Etake()

        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    
public java.lang.Object[]toArray()

        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] a = new Object[count];
            int k = 0;
            int i = takeIndex;
            while (k < count) {
                a[k++] = items[i];
                i = inc(i);
            }
            return a;
        } finally {
            lock.unlock();
        }
    
public T[]toArray(T[] a)

        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (a.length < count)
                a = (T[])java.lang.reflect.Array.newInstance(
                    a.getClass().getComponentType(),
                    count
                    );

            int k = 0;
            int i = takeIndex;
            while (k < count) {
                a[k++] = (T)items[i];
                i = inc(i);
            }
            if (a.length > count)
                a[count] = null;
            return a;
        } finally {
            lock.unlock();
        }
    
public java.lang.StringtoString()

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return super.toString();
        } finally {
            lock.unlock();
        }