FileDocCategorySizeDatePackage
ArrayBlockingQueue.javaAPI DocJava SE 6 API25433Tue Jun 10 00:25:56 BST 2008java.util.concurrent

ArrayBlockingQueue

public class ArrayBlockingQueue extends AbstractQueue implements BlockingQueue, Serializable
A bounded {@linkplain BlockingQueue blocking queue} backed by an array. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.

This is a classic "bounded buffer", in which a fixed-sized array holds elements inserted by producers and extracted by consumers. Once created, the capacity cannot be increased. Attempts to put an element into a full queue will result in the operation blocking; attempts to take an element from an empty queue will similarly block.

This class supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness set to true grants threads access in FIFO order. Fairness generally decreases throughput but reduces variability and avoids starvation.

This class and its iterator implement all of the optional methods of the {@link Collection} and {@link Iterator} interfaces.

This class is a member of the Java Collections Framework.

since
1.5
author
Doug Lea
param
the type of elements held in this collection

Fields Summary
private static final long
serialVersionUID
Serialization ID. This class relies on default serialization even for the items array, which is default-serialized, even if it is empty. Otherwise it could not be declared final, which is necessary here.
private final E[]
items
The queued items
private int
takeIndex
items index for next take, poll or remove
private int
putIndex
items index for next put, offer, or add.
private int
count
Number of items in the queue
private final ReentrantLock
lock
Main lock guarding all access
private final Condition
notEmpty
Condition for waiting takes
private final Condition
notFull
Condition for waiting puts
Constructors Summary
public ArrayBlockingQueue(int capacity)
Creates an ArrayBlockingQueue with the given (fixed) capacity and default access policy.

param
capacity the capacity of this queue
throws
IllegalArgumentException if capacity is less than 1

        this(capacity, false);
    
public ArrayBlockingQueue(int capacity, boolean fair)
Creates an ArrayBlockingQueue with the given (fixed) capacity and the specified access policy.

param
capacity the capacity of this queue
param
fair if true then queue accesses for threads blocked on insertion or removal, are processed in FIFO order; if false the access order is unspecified.
throws
IllegalArgumentException if capacity is less than 1

        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = (E[]) new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    
public ArrayBlockingQueue(int capacity, boolean fair, Collection c)
Creates an ArrayBlockingQueue with the given (fixed) capacity, the specified access policy and initially containing the elements of the given collection, added in traversal order of the collection's iterator.

param
capacity the capacity of this queue
param
fair if true then queue accesses for threads blocked on insertion or removal, are processed in FIFO order; if false the access order is unspecified.
param
c the collection of elements to initially contain
throws
IllegalArgumentException if capacity is less than c.size(), or less than 1.
throws
NullPointerException if the specified collection or any of its elements are null

        this(capacity, fair);
        if (capacity < c.size())
            throw new IllegalArgumentException();

        for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
            add(it.next());
    
Methods Summary
public booleanadd(E e)
Inserts the specified element at the tail of this queue if it is possible to do so immediately without exceeding the queue's capacity, returning true upon success and throwing an IllegalStateException if this queue is full.

param
e the element to add
return
true (as specified by {@link Collection#add})
throws
IllegalStateException if this queue is full
throws
NullPointerException if the specified element is null

	return super.add(e);
    
public voidclear()
Atomically removes all of the elements from this queue. The queue will be empty after this call returns.

        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = takeIndex;
            int k = count;
            while (k-- > 0) {
                items[i] = null;
                i = inc(i);
            }
            count = 0;
            putIndex = 0;
            takeIndex = 0;
            notFull.signalAll();
        } finally {
            lock.unlock();
        }
    
public booleancontains(java.lang.Object o)
Returns true if this queue contains the specified element. More formally, returns true if and only if this queue contains at least one element e such that o.equals(e).

param
o object to be checked for containment in this queue
return
true if this queue contains the specified element

        if (o == null) return false;
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = takeIndex;
            int k = 0;
            while (k++ < count) {
                if (o.equals(items[i]))
                    return true;
                i = inc(i);
            }
            return false;
        } finally {
            lock.unlock();
        }
    
public intdrainTo(java.util.Collection c)

throws
UnsupportedOperationException {@inheritDoc}
throws
ClassCastException {@inheritDoc}
throws
NullPointerException {@inheritDoc}
throws
IllegalArgumentException {@inheritDoc}

        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = takeIndex;
            int n = 0;
            int max = count;
            while (n < max) {
                c.add(items[i]);
                items[i] = null;
                i = inc(i);
                ++n;
            }
            if (n > 0) {
                count = 0;
                putIndex = 0;
                takeIndex = 0;
                notFull.signalAll();
            }
            return n;
        } finally {
            lock.unlock();
        }
    
public intdrainTo(java.util.Collection c, int maxElements)

throws
UnsupportedOperationException {@inheritDoc}
throws
ClassCastException {@inheritDoc}
throws
NullPointerException {@inheritDoc}
throws
IllegalArgumentException {@inheritDoc}

        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = takeIndex;
            int n = 0;
            int sz = count;
            int max = (maxElements < count)? maxElements : count;
            while (n < max) {
                c.add(items[i]);
                items[i] = null;
                i = inc(i);
                ++n;
            }
            if (n > 0) {
                count -= n;
                takeIndex = i;
                notFull.signalAll();
            }
            return n;
        } finally {
            lock.unlock();
        }
    
private Eextract()
Extracts element at current take position, advances, and signals. Call only when holding lock.

        final E[] items = this.items;
        E x = items[takeIndex];
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();
        return x;
    
final intinc(int i)
Circularly increment i.


    // Internal helper methods

            
        
        return (++i == items.length)? 0 : i;
    
private voidinsert(E x)
Inserts element at current put position, advances, and signals. Call only when holding lock.

        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    
public java.util.Iteratoriterator()
Returns an iterator over the elements in this queue in proper sequence. The returned Iterator is a "weakly consistent" iterator that will never throw {@link ConcurrentModificationException}, and guarantees to traverse elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any modifications subsequent to construction.

return
an iterator over the elements in this queue in proper sequence

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return new Itr();
        } finally {
            lock.unlock();
        }
    
public booleanoffer(E e, long timeout, java.util.concurrent.TimeUnit unit)
Inserts the specified element at the tail of this queue, waiting up to the specified wait time for space to become available if the queue is full.

throws
InterruptedException {@inheritDoc}
throws
NullPointerException {@inheritDoc}


        if (e == null) throw new NullPointerException();
	long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                if (count != items.length) {
                    insert(e);
                    return true;
                }
                if (nanos <= 0)
                    return false;
                try {
                    nanos = notFull.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notFull.signal(); // propagate to non-interrupted thread
                    throw ie;
                }
            }
        } finally {
            lock.unlock();
        }
    
public booleanoffer(E e)
Inserts the specified element at the tail of this queue if it is possible to do so immediately without exceeding the queue's capacity, returning true upon success and false if this queue is full. This method is generally preferable to method {@link #add}, which can fail to insert an element only by throwing an exception.

throws
NullPointerException if the specified element is null

        if (e == null) throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                insert(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    
public Epeek()

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : items[takeIndex];
        } finally {
            lock.unlock();
        }
    
public Epoll()

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == 0)
                return null;
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    
public Epoll(long timeout, java.util.concurrent.TimeUnit unit)

	long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                if (count != 0) {
                    E x = extract();
                    return x;
                }
                if (nanos <= 0)
                    return null;
                try {
                    nanos = notEmpty.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // propagate to non-interrupted thread
                    throw ie;
                }

            }
        } finally {
            lock.unlock();
        }
    
public voidput(E e)
Inserts the specified element at the tail of this queue, waiting for space to become available if the queue is full.

throws
InterruptedException {@inheritDoc}
throws
NullPointerException {@inheritDoc}

        if (e == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            insert(e);
        } finally {
            lock.unlock();
        }
    
public intremainingCapacity()
Returns the number of additional elements that this queue can ideally (in the absence of memory or resource constraints) accept without blocking. This is always equal to the initial capacity of this queue less the current size of this queue.

Note that you cannot always tell if an attempt to insert an element will succeed by inspecting remainingCapacity because it may be the case that another thread is about to insert or remove an element.

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return items.length - count;
        } finally {
            lock.unlock();
        }
    
public booleanremove(java.lang.Object o)
Removes a single instance of the specified element from this queue, if it is present. More formally, removes an element e such that o.equals(e), if this queue contains one or more such elements. Returns true if this queue contained the specified element (or equivalently, if this queue changed as a result of the call).

param
o element to be removed from this queue, if present
return
true if this queue changed as a result of the call

        if (o == null) return false;
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = takeIndex;
            int k = 0;
            for (;;) {
                if (k++ >= count)
                    return false;
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                i = inc(i);
            }

        } finally {
            lock.unlock();
        }
    
voidremoveAt(int i)
Utility for remove and iterator.remove: Delete item at position i. Call only when holding lock.

        final E[] items = this.items;
        // if removing front item, just advance
        if (i == takeIndex) {
            items[takeIndex] = null;
            takeIndex = inc(takeIndex);
        } else {
            // slide over all others up through putIndex.
            for (;;) {
                int nexti = inc(i);
                if (nexti != putIndex) {
                    items[i] = items[nexti];
                    i = nexti;
                } else {
                    items[i] = null;
                    putIndex = i;
                    break;
                }
            }
        }
        --count;
        notFull.signal();
    
public intsize()
Returns the number of elements in this queue.

return
the number of elements in this queue

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    
public Etake()

        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    
public java.lang.Object[]toArray()
Returns an array containing all of the elements in this queue, in proper sequence.

The returned array will be "safe" in that no references to it are maintained by this queue. (In other words, this method must allocate a new array). The caller is thus free to modify the returned array.

This method acts as bridge between array-based and collection-based APIs.

return
an array containing all of the elements in this queue

        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] a = new Object[count];
            int k = 0;
            int i = takeIndex;
            while (k < count) {
                a[k++] = items[i];
                i = inc(i);
            }
            return a;
        } finally {
            lock.unlock();
        }
    
public T[]toArray(T[] a)
Returns an array containing all of the elements in this queue, in proper sequence; the runtime type of the returned array is that of the specified array. If the queue fits in the specified array, it is returned therein. Otherwise, a new array is allocated with the runtime type of the specified array and the size of this queue.

If this queue fits in the specified array with room to spare (i.e., the array has more elements than this queue), the element in the array immediately following the end of the queue is set to null.

Like the {@link #toArray()} method, this method acts as bridge between array-based and collection-based APIs. Further, this method allows precise control over the runtime type of the output array, and may, under certain circumstances, be used to save allocation costs.

Suppose x is a queue known to contain only strings. The following code can be used to dump the queue into a newly allocated array of String:

String[] y = x.toArray(new String[0]);
Note that toArray(new Object[0]) is identical in function to toArray().

param
a the array into which the elements of the queue are to be stored, if it is big enough; otherwise, a new array of the same runtime type is allocated for this purpose
return
an array containing all of the elements in this queue
throws
ArrayStoreException if the runtime type of the specified array is not a supertype of the runtime type of every element in this queue
throws
NullPointerException if the specified array is null

        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (a.length < count)
                a = (T[])java.lang.reflect.Array.newInstance(
                    a.getClass().getComponentType(),
                    count
                    );

            int k = 0;
            int i = takeIndex;
            while (k < count) {
                a[k++] = (T)items[i];
                i = inc(i);
            }
            if (a.length > count)
                a[count] = null;
            return a;
        } finally {
            lock.unlock();
        }
    
public java.lang.StringtoString()

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return super.toString();
        } finally {
            lock.unlock();
        }