CondVarpublic class CondVar extends Object This class is designed for fans of POSIX pthreads programming.
If you restrict yourself to Mutexes and CondVars, you can
use most of your favorite constructions. Don't randomly mix them
with synchronized methods or blocks though.
Method names and behavior are as close as is reasonable to
those in POSIX.
Sample Usage. Here is a full version of a bounded buffer
that implements the BoundedChannel interface, written in
a style reminscent of that in POSIX programming books.
class CVBuffer implements BoundedChannel {
private final Mutex mutex;
private final CondVar notFull;
private final CondVar notEmpty;
private int count = 0;
private int takePtr = 0;
private int putPtr = 0;
private final Object[] array;
public CVBuffer(int capacity) {
array = new Object[capacity];
mutex = new Mutex();
notFull = new CondVar(mutex);
notEmpty = new CondVar(mutex);
}
public int capacity() { return array.length; }
public void put(Object x) throws InterruptedException {
mutex.acquire();
try {
while (count == array.length) {
notFull.await();
}
array[putPtr] = x;
putPtr = (putPtr + 1) % array.length;
++count;
notEmpty.signal();
}
finally {
mutex.release();
}
}
public Object take() throws InterruptedException {
Object x = null;
mutex.acquire();
try {
while (count == 0) {
notEmpty.await();
}
x = array[takePtr];
array[takePtr] = null;
takePtr = (takePtr + 1) % array.length;
--count;
notFull.signal();
}
finally {
mutex.release();
}
return x;
}
public boolean offer(Object x, long msecs) throws InterruptedException {
mutex.acquire();
try {
if (count == array.length) {
notFull.timedwait(msecs);
if (count == array.length)
return false;
}
array[putPtr] = x;
putPtr = (putPtr + 1) % array.length;
++count;
notEmpty.signal();
return true;
}
finally {
mutex.release();
}
}
public Object poll(long msecs) throws InterruptedException {
Object x = null;
mutex.acquire();
try {
if (count == 0) {
notEmpty.timedwait(msecs);
if (count == 0)
return null;
}
x = array[takePtr];
array[takePtr] = null;
takePtr = (takePtr + 1) % array.length;
--count;
notFull.signal();
}
finally {
mutex.release();
}
return x;
}
}
|
Fields Summary |
---|
protected boolean | debug_ | protected final Sync | mutex_The mutex | protected final ReentrantMutex | remutex_ |
Constructors Summary |
---|
public CondVar(Sync mutex, boolean debug)Create a new CondVar that relies on the given mutual
exclusion lock.
debug_ = debug ;
mutex_ = mutex;
if (mutex instanceof ReentrantMutex)
remutex_ = (ReentrantMutex)mutex;
else
remutex_ = null;
| public CondVar(Sync mutex)
this( mutex, false ) ;
|
Methods Summary |
---|
private void | acquireMutex(int count)
if (remutex_!=null)
remutex_.acquireAll( count ) ;
else
mutex_.acquire() ;
| public void | await()Wait for notification. This operation at least momentarily
releases the mutex. The mutex is always held upon return,
even if interrupted.
int count = 0 ;
if (Thread.interrupted())
throw new InterruptedException();
try {
if (debug_)
ORBUtility.dprintTrace( this, "await enter" ) ;
synchronized(this) {
count = releaseMutex() ;
try {
wait();
} catch (InterruptedException ex) {
notify();
throw ex;
}
}
} finally {
// Must ignore interrupt on re-acquire
boolean interrupted = false;
for (;;) {
try {
acquireMutex( count );
break;
} catch (InterruptedException ex) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
if (debug_)
ORBUtility.dprintTrace( this, "await exit" ) ;
}
| public synchronized void | broadcast()Notify all waiting threads
notifyAll();
| private int | releaseMutex()
int count = 1 ;
if (remutex_!=null)
count = remutex_.releaseAll() ;
else
mutex_.release() ;
return count ;
| public synchronized void | signal()Notify a waiting thread.
If one exists, a non-interrupted thread will return
normally (i.e., not via InterruptedException) from await or timedwait.
notify();
| public boolean | timedwait(long msecs)Wait for at most msecs for notification.
This operation at least momentarily
releases the mutex. The mutex is always held upon return,
even if interrupted.
if (Thread.interrupted())
throw new InterruptedException();
boolean success = false;
int count = 0;
try {
if (debug_)
ORBUtility.dprintTrace( this, "timedwait enter" ) ;
synchronized(this) {
count = releaseMutex() ;
try {
if (msecs > 0) {
long start = System.currentTimeMillis();
wait(msecs);
success = System.currentTimeMillis() - start <= msecs;
}
} catch (InterruptedException ex) {
notify();
throw ex;
}
}
} finally {
// Must ignore interrupt on re-acquire
boolean interrupted = false;
for (;;) {
try {
acquireMutex( count ) ;
break;
} catch (InterruptedException ex) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
if (debug_)
ORBUtility.dprintTrace( this, "timedwait exit" ) ;
}
return success;
|
|