Methods Summary |
---|
public void | clear()Atomically removes all of the elements from this queue.
The queue will be empty after this call returns.
fullyLock();
try {
head.next = null;
assert head.item == null;
last = head;
if (count.getAndSet(0) == capacity)
notFull.signalAll();
} finally {
fullyUnlock();
}
|
public int | drainTo(java.util.Collection c)
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
Node<E> first;
fullyLock();
try {
first = head.next;
head.next = null;
assert head.item == null;
last = head;
if (count.getAndSet(0) == capacity)
notFull.signalAll();
} finally {
fullyUnlock();
}
// Transfer the elements outside of locks
int n = 0;
for (Node<E> p = first; p != null; p = p.next) {
c.add(p.item);
p.item = null;
++n;
}
return n;
|
public int | drainTo(java.util.Collection c, int maxElements)
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
fullyLock();
try {
int n = 0;
Node<E> p = head.next;
while (p != null && n < maxElements) {
c.add(p.item);
p.item = null;
p = p.next;
++n;
}
if (n != 0) {
head.next = p;
assert head.item == null;
if (p == null)
last = head;
if (count.getAndAdd(-n) == capacity)
notFull.signalAll();
}
return n;
} finally {
fullyUnlock();
}
|
private E | extract()Removes a node from head of queue,
Node<E> first = head.next;
head = first;
E x = first.item;
first.item = null;
return x;
|
private void | fullyLock()Lock to prevent both puts and takes.
putLock.lock();
takeLock.lock();
|
private void | fullyUnlock()Unlock to allow both puts and takes.
takeLock.unlock();
putLock.unlock();
|
private void | insert(E x)Creates a node and links it at end of queue.
last = last.next = new Node<E>(x);
|
public java.util.Iterator | iterator()Returns an iterator over the elements in this queue in proper sequence.
The returned Iterator is a "weakly consistent" iterator that
will never throw {@link ConcurrentModificationException},
and guarantees to traverse elements as they existed upon
construction of the iterator, and may (but is not guaranteed to)
reflect any modifications subsequent to construction.
return new Itr();
|
public boolean | offer(E e, long timeout, java.util.concurrent.TimeUnit unit)Inserts the specified element at the tail of this queue, waiting if
necessary up to the specified wait time for space to become available.
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
for (;;) {
if (count.get() < capacity) {
insert(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
break;
}
if (nanos <= 0)
return false;
try {
nanos = notFull.awaitNanos(nanos);
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
|
public boolean | offer(E e)Inserts the specified element at the tail of this queue if it is
possible to do so immediately without exceeding the queue's capacity,
returning true upon success and false if this queue
is full.
When using a capacity-restricted queue, this method is generally
preferable to method {@link BlockingQueue#add add}, which can fail to
insert an element only by throwing an exception.
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
insert(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
|
public E | peek()
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
|
public E | poll(long timeout, java.util.concurrent.TimeUnit unit)
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
for (;;) {
if (count.get() > 0) {
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
break;
}
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted thread
throw ie;
}
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
|
public E | poll()
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
|
public void | put(E e)Inserts the specified element at the tail of this queue, waiting if
necessary for space to become available.
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset
// local var holding count negative to indicate failure unless set.
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from
* capacity. Similarly for all other uses of count in
* other wait guards.
*/
try {
while (count.get() == capacity)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
insert(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
|
private void | readObject(java.io.ObjectInputStream s)Reconstitute this queue instance from a stream (that is,
deserialize it).
// Read in capacity, and any hidden stuff
s.defaultReadObject();
count.set(0);
last = head = new Node<E>(null);
// Read in all elements and place in queue
for (;;) {
E item = (E)s.readObject();
if (item == null)
break;
add(item);
}
|
public int | remainingCapacity()Returns the number of additional elements that this queue can ideally
(in the absence of memory or resource constraints) accept without
blocking. This is always equal to the initial capacity of this queue
less the current size of this queue.
Note that you cannot always tell if an attempt to insert
an element will succeed by inspecting remainingCapacity
because it may be the case that another thread is about to
insert or remove an element.
return capacity - count.get();
|
public boolean | remove(java.lang.Object o)Removes a single instance of the specified element from this queue,
if it is present. More formally, removes an element e such
that o.equals(e), if this queue contains one or more such
elements.
Returns true if this queue contained the specified element
(or equivalently, if this queue changed as a result of the call).
if (o == null) return false;
boolean removed = false;
fullyLock();
try {
Node<E> trail = head;
Node<E> p = head.next;
while (p != null) {
if (o.equals(p.item)) {
removed = true;
break;
}
trail = p;
p = p.next;
}
if (removed) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signalAll();
}
} finally {
fullyUnlock();
}
return removed;
|
private void | signalNotEmpty()Signals a waiting take. Called only from put/offer (which do not
otherwise ordinarily lock takeLock.)
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
|
private void | signalNotFull()Signals a waiting put. Called only from take/poll.
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
|
public int | size()Returns the number of elements in this queue.
return count.get();
|
public E | take()
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
try {
while (count.get() == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted thread
throw ie;
}
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
|
public java.lang.Object[] | toArray()Returns an array containing all of the elements in this queue, in
proper sequence.
The returned array will be "safe" in that no references to it are
maintained by this queue. (In other words, this method must allocate
a new array). The caller is thus free to modify the returned array.
This method acts as bridge between array-based and collection-based
APIs.
fullyLock();
try {
int size = count.get();
Object[] a = new Object[size];
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = p.item;
return a;
} finally {
fullyUnlock();
}
|
public T[] | toArray(T[] a)Returns an array containing all of the elements in this queue, in
proper sequence; the runtime type of the returned array is that of
the specified array. If the queue fits in the specified array, it
is returned therein. Otherwise, a new array is allocated with the
runtime type of the specified array and the size of this queue.
If this queue fits in the specified array with room to spare
(i.e., the array has more elements than this queue), the element in
the array immediately following the end of the queue is set to
null.
Like the {@link #toArray()} method, this method acts as bridge between
array-based and collection-based APIs. Further, this method allows
precise control over the runtime type of the output array, and may,
under certain circumstances, be used to save allocation costs.
Suppose x is a queue known to contain only strings.
The following code can be used to dump the queue into a newly
allocated array of String:
String[] y = x.toArray(new String[0]);
Note that toArray(new Object[0]) is identical in function to
toArray().
fullyLock();
try {
int size = count.get();
if (a.length < size)
a = (T[])java.lang.reflect.Array.newInstance
(a.getClass().getComponentType(), size);
int k = 0;
for (Node p = head.next; p != null; p = p.next)
a[k++] = (T)p.item;
if (a.length > k)
a[k] = null;
return a;
} finally {
fullyUnlock();
}
|
public java.lang.String | toString()
fullyLock();
try {
return super.toString();
} finally {
fullyUnlock();
}
|
private void | writeObject(java.io.ObjectOutputStream s)Save the state to a stream (that is, serialize it).
fullyLock();
try {
// Write out any hidden stuff, plus capacity
s.defaultWriteObject();
// Write out all elements in the proper order.
for (Node<E> p = head.next; p != null; p = p.next)
s.writeObject(p.item);
// Use trailing null as sentinel
s.writeObject(null);
} finally {
fullyUnlock();
}
|