Fields Summary |
---|
private static final long | serialVersionUID |
private final int | capacityThe capacity bound, or Integer.MAX_VALUE if none |
private final AtomicInteger | countCurrent number of elements |
private transient Node | headHead of linked list |
private transient Node | lastTail of linked list |
private final ReentrantLock | takeLockLock held by take, poll, etc |
private final Condition | notEmptyWait queue for waiting takes |
private final ReentrantLock | putLockLock held by put, offer, etc |
private final Condition | notFullWait queue for waiting puts |
Methods Summary |
---|
public void | clear()
fullyLock();
try {
head.next = null;
if (count.getAndSet(0) == capacity)
notFull.signalAll();
} finally {
fullyUnlock();
}
|
public int | drainTo(java.util.Collection c)
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
Node first;
fullyLock();
try {
first = head.next;
head.next = null;
if (count.getAndSet(0) == capacity)
notFull.signalAll();
} finally {
fullyUnlock();
}
// Transfer the elements outside of locks
int n = 0;
for (Node<E> p = first; p != null; p = p.next) {
c.add(p.item);
p.item = null;
++n;
}
return n;
|
public int | drainTo(java.util.Collection c, int maxElements)
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
fullyLock();
try {
int n = 0;
Node<E> p = head.next;
while (p != null && n < maxElements) {
c.add(p.item);
p.item = null;
p = p.next;
++n;
}
if (n != 0) {
head.next = p;
if (count.getAndAdd(-n) == capacity)
notFull.signalAll();
}
return n;
} finally {
fullyUnlock();
}
|
private E | extract()Remove a node from head of queue,
Node<E> first = head.next;
head = first;
E x = first.item;
first.item = null;
return x;
|
private void | fullyLock()Lock to prevent both puts and takes.
putLock.lock();
takeLock.lock();
|
private void | fullyUnlock()Unlock to allow both puts and takes.
takeLock.unlock();
putLock.unlock();
|
private void | insert(E x)Create a node and link it at end of queue
last = last.next = new Node<E>(x);
|
public java.util.Iterator | iterator()Returns an iterator over the elements in this queue in proper sequence.
The returned Iterator is a "weakly consistent" iterator that
will never throw {@link java.util.ConcurrentModificationException},
and guarantees to traverse elements as they existed upon
construction of the iterator, and may (but is not guaranteed to)
reflect any modifications subsequent to construction.
return new Itr();
|
public boolean | offer(E o, long timeout, java.util.concurrent.TimeUnit unit)Inserts the specified element at the tail of this queue, waiting if
necessary up to the specified wait time for space to become available.
if (o == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
for (;;) {
if (count.get() < capacity) {
insert(o);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
break;
}
if (nanos <= 0)
return false;
try {
nanos = notFull.awaitNanos(nanos);
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
|
public boolean | offer(E o)Inserts the specified element at the tail of this queue if possible,
returning immediately if this queue is full.
if (o == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
insert(o);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
|
public E | peek()
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
|
public E | poll(long timeout, java.util.concurrent.TimeUnit unit)
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
for (;;) {
if (count.get() > 0) {
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
break;
}
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted thread
throw ie;
}
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
|
public E | poll()
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
|
public void | put(E o)Adds the specified element to the tail of this queue, waiting if
necessary for space to become available.
if (o == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset
// local var holding count negative to indicate failure unless set.
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from
* capacity. Similarly for all other uses of count in
* other wait guards.
*/
try {
while (count.get() == capacity)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
insert(o);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
|
private void | readObject(java.io.ObjectInputStream s)Reconstitute this queue instance from a stream (that is,
deserialize it).
// Read in capacity, and any hidden stuff
s.defaultReadObject();
count.set(0);
last = head = new Node<E>(null);
// Read in all elements and place in queue
for (;;) {
E item = (E)s.readObject();
if (item == null)
break;
add(item);
}
|
public int | remainingCapacity()Returns the number of elements that this queue can ideally (in
the absence of memory or resource constraints) accept without
blocking. This is always equal to the initial capacity of this queue
less the current size of this queue.
Note that you cannot always tell if
an attempt to add an element will succeed by
inspecting remainingCapacity because it may be the
case that a waiting consumer is ready to take an
element out of an otherwise full queue.
return capacity - count.get();
|
public boolean | remove(java.lang.Object o)
if (o == null) return false;
boolean removed = false;
fullyLock();
try {
Node<E> trail = head;
Node<E> p = head.next;
while (p != null) {
if (o.equals(p.item)) {
removed = true;
break;
}
trail = p;
p = p.next;
}
if (removed) {
p.item = null;
trail.next = p.next;
if (count.getAndDecrement() == capacity)
notFull.signalAll();
}
} finally {
fullyUnlock();
}
return removed;
|
private void | signalNotEmpty()Signal a waiting take. Called only from put/offer (which do not
otherwise ordinarily lock takeLock.)
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
|
private void | signalNotFull()Signal a waiting put. Called only from take/poll.
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
|
public int | size()Returns the number of elements in this queue.
return count.get();
|
public E | take()
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
try {
while (count.get() == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted thread
throw ie;
}
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
|
public java.lang.Object[] | toArray()
fullyLock();
try {
int size = count.get();
Object[] a = new Object[size];
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = p.item;
return a;
} finally {
fullyUnlock();
}
|
public T[] | toArray(T[] a)
fullyLock();
try {
int size = count.get();
if (a.length < size)
a = (T[])java.lang.reflect.Array.newInstance
(a.getClass().getComponentType(), size);
int k = 0;
for (Node p = head.next; p != null; p = p.next)
a[k++] = (T)p.item;
return a;
} finally {
fullyUnlock();
}
|
public java.lang.String | toString()
fullyLock();
try {
return super.toString();
} finally {
fullyUnlock();
}
|
private void | writeObject(java.io.ObjectOutputStream s)Save the state to a stream (that is, serialize it).
fullyLock();
try {
// Write out any hidden stuff, plus capacity
s.defaultWriteObject();
// Write out all elements in the proper order.
for (Node<E> p = head.next; p != null; p = p.next)
s.writeObject(p.item);
// Use trailing null as sentinel
s.writeObject(null);
} finally {
fullyUnlock();
}
|