ListenerManagerpublic class ListenerManager extends Object This class exists to support the invocation of listeners while *not* synchronized.
This is important as in general it is a bad idea to invoke an "external" component
whilst holding a lock on something as unexpected deadlocks can result.
It has been introduced to reduce the likelyhood of such deadlocks |
Fields Summary |
---|
protected String | name | protected ListenerManagerDispatcher | target | protected ListenerManagerDispatcherWithException | target_with_exception | protected boolean | async | protected Thread | async_thread | protected List | listeners | protected List | dispatch_queue | protected AESemaphore | dispatch_sem |
Constructors Summary |
---|
protected ListenerManager(String _name, ListenerManagerDispatcher _target, boolean _async)
name = _name;
target = _target;
async = _async;
if ( target instanceof ListenerManagerDispatcherWithException ){
target_with_exception = (ListenerManagerDispatcherWithException)target;
}
if ( async ){
dispatch_sem = new AESemaphore("ListenerManager::"+name);
dispatch_queue = new LinkedList();
if ( target_with_exception != null ){
throw( new RuntimeException( "Can't have an async manager with exceptions!"));
}
}
|
Methods Summary |
---|
public void | addListener(java.lang.Object listener)
synchronized( this ){
ArrayList new_listeners = new ArrayList( listeners );
if (new_listeners.contains(listener)) {
Logger.log(new LogEvent(LogIDs.CORE, LogEvent.LT_WARNING,
"addListener called but listener already added for " + name
+ "\n\t" + Debug.getStackTrace(true, false)));
}
new_listeners.add( listener );
if (new_listeners.size() > 50) {
Logger.log(new LogEvent(LogIDs.CORE, LogEvent.LT_WARNING,
"addListener: over 50 listeners added for " + name
+ "\n\t" + Debug.getStackTrace(true, false)));
}
listeners = new_listeners;
if ( async && async_thread == null ){
async_thread = new AEThread( name )
{
public void
runSupport()
{
dispatchLoop();
}
};
async_thread.setDaemon( true );
async_thread.start();
}
}
| public void | clear()
synchronized( this ){
listeners = new ArrayList();
if ( async ){
async_thread = null;
// try and wake up the thread so it kills itself
dispatch_sem.release();
}
}
| public static org.gudy.azureus2.core3.util.ListenerManager | createAsyncManager(java.lang.String name, ListenerManagerDispatcher target)
return( new ListenerManager( name, target, true ));
| public static org.gudy.azureus2.core3.util.ListenerManager | createManager(java.lang.String name, ListenerManagerDispatcher target)
return( new ListenerManager( name, target, false ));
| public void | dispatch(java.lang.Object listener, int type, java.lang.Object value)
dispatch( listener, type, value, false );
| public void | dispatch(java.lang.Object listener, int type, java.lang.Object value, boolean blocking)
if ( async ){
AESemaphore sem = null;
if ( blocking ){
sem = new AESemaphore( "ListenerManager:blocker");
}
synchronized( this ){
// 5 entries to denote single listener
dispatch_queue.add(new Object[]{ listener, new Integer(type), value, sem, null });
if ( async_thread == null ){
async_thread = new AEThread( name )
{
public void
runSupport()
{
dispatchLoop();
}
};
async_thread.setDaemon( true );
async_thread.start();
}
}
dispatch_sem.release();
if ( sem != null ){
sem.reserve();
}
}else{
if ( target_with_exception != null ){
throw( new RuntimeException( "call dispatchWithException, not dispatch"));
}
try{
target.dispatch( listener, type, value );
}catch( Throwable e ){
Debug.printStackTrace( e );
}
}
| public void | dispatch(int type, java.lang.Object value)
dispatch( type, value, false );
| public void | dispatch(int type, java.lang.Object value, boolean blocking)
if ( async ){
AESemaphore sem = null;
if ( blocking ){
sem = new AESemaphore( "ListenerManager:blocker");
}
synchronized( this ){
// if there's nobody listening then no point in queueing
if ( listeners.size() == 0 ){
return;
}
// listeners are "copy on write" updated, hence we grab a reference to the
// current listeners here. Any subsequent change won't affect our listeners
dispatch_queue.add(new Object[]{listeners, new Integer(type), value, sem });
}
dispatch_sem.release();
if ( sem != null ){
sem.reserve();
}
}else{
if ( target_with_exception != null ){
throw( new RuntimeException( "call dispatchWithException, not dispatch"));
}
List listeners_ref;
synchronized( this ){
listeners_ref = listeners;
}
try{
dispatchInternal( listeners_ref, type, value );
}catch( Throwable e ){
Debug.printStackTrace( e );
}
}
| protected void | dispatchInternal(java.util.List listeners_ref, int type, java.lang.Object value)
for (int i=0;i<listeners_ref.size();i++){
if ( target_with_exception != null ){
// System.out.println( name + ":dispatchWithException" );
// DON'T catch and handle exceptions here are they are permitted to
// occur!
target_with_exception.dispatchWithException( listeners_ref.get(i), type, value );
}else{
try{
// System.out.println( name + ":dispatch" );
target.dispatch( listeners_ref.get(i), type, value );
}catch( Throwable e ){
Debug.printStackTrace( e );
}
}
}
| protected void | dispatchInternal(java.lang.Object listener, int type, java.lang.Object value)
if ( target_with_exception != null ){
// System.out.println( name + ":dispatchWithException" );
// DON'T catch and handle exceptions here are they are permitted to
// occur!
target_with_exception.dispatchWithException( listener, type, value );
}else{
try{
// System.out.println( name + ":dispatch" );
target.dispatch( listener, type, value );
}catch( Throwable e ){
Debug.printStackTrace( e );
}
}
| public void | dispatchLoop()
// System.out.println( "ListenerManager::dispatch thread '" + Thread.currentThread() + "' starts");
while(true){
dispatch_sem.reserve();
Object[] data = null;
synchronized( this ){
if ( async_thread != Thread.currentThread()){
// we've been asked to close. this sem reservation must be
// "returned" to the pool in case it represents a valid entry
// to be picked up by another thread
dispatch_sem.release();
break;
}
if ( dispatch_queue.size() > 0 ){
data = (Object[])dispatch_queue.remove(0);
}
}
if ( data != null ){
try{
if ( data.length == 4 ){
dispatchInternal((List)data[0], ((Integer)data[1]).intValue(), data[2] );
}else{
dispatchInternal( data[0], ((Integer)data[1]).intValue(), data[2] );
}
}catch( Throwable e ){
Debug.printStackTrace( e );
}finally{
if ( data[3] != null ){
((AESemaphore)data[3]).release();
}
}
}
}
// System.out.println( "ListenerManager::dispatch thread '" + Thread.currentThread() + "' ends");
| public void | dispatchWithException(int type, java.lang.Object value)
List listeners_ref;
synchronized( this ){
listeners_ref = listeners;
}
dispatchInternal( listeners_ref, type, value );
| public static void | dispatchWithTimeout(java.util.List _listeners, ListenerManagerDispatcher _dispatcher, long _timeout)
final List listeners = new ArrayList( _listeners );
final boolean[] completed = new boolean[listeners.size()];
final AESemaphore timeout_sem = new AESemaphore("ListenerManager:dwt:timeout");
for (int i=0;i<listeners.size();i++){
final int f_i = i;
new AEThread( "ListenerManager:dwt:dispatcher", true ){
public void
runSupport()
{
try{
_dispatcher.dispatch( listeners.get(f_i), -1, null );
}catch( Throwable e ){
Debug.printStackTrace(e);
}finally{
completed[f_i] = true;
timeout_sem.release();
}
}
}.start();
}
boolean timeout_occurred = false;
for (int i=0;i<listeners.size() ;i++){
if ( _timeout <= 0 ){
timeout_occurred = true;
break;
}
long start = SystemTime.getCurrentTime();
if ( !timeout_sem.reserve( _timeout )){
timeout_occurred = true;
break;
}
long end = SystemTime.getCurrentTime();
if ( end > start ){
_timeout = _timeout - ( end - start );
}
}
if ( timeout_occurred ){
String str = "";
for (int i=0;i<completed.length;i++){
if ( !completed[i] ){
str += (str.length()==0?"":",") + listeners.get(i);
}
}
if ( str.length() > 0 ){
Debug.out( "Listener dispatch timeout: failed = " + str );
}
}
| public java.util.List | getListenersCopy()
// we can just return the listeners as we copy on update
return( listeners );
| public void | removeListener(java.lang.Object listener)
synchronized( this ){
ArrayList new_listeners = new ArrayList( listeners );
new_listeners.remove( listener );
listeners = new_listeners;
if ( async && listeners.size() == 0 ){
async_thread = null;
// try and wake up the thread so it kills itself
dispatch_sem.release();
}
}
| public long | size()
if (listeners == null)
return 0;
return listeners.size();
|
|