FileDocCategorySizeDatePackage
ListenerManager.javaAPI DocAzureus 3.0.3.411163Sun Apr 30 05:07:56 BST 2006org.gudy.azureus2.core3.util

ListenerManager

public class ListenerManager extends Object
This class exists to support the invocation of listeners while *not* synchronized. This is important as in general it is a bad idea to invoke an "external" component whilst holding a lock on something as unexpected deadlocks can result. It has been introduced to reduce the likelyhood of such deadlocks

Fields Summary
protected String
name
protected ListenerManagerDispatcher
target
protected ListenerManagerDispatcherWithException
target_with_exception
protected boolean
async
protected Thread
async_thread
protected List
listeners
protected List
dispatch_queue
protected AESemaphore
dispatch_sem
Constructors Summary
protected ListenerManager(String _name, ListenerManagerDispatcher _target, boolean _async)

	
	
	
									
				
									 
	
		name	= _name;
		target	= _target;
		async	= _async;
		
		if ( target instanceof ListenerManagerDispatcherWithException ){
			
			target_with_exception = (ListenerManagerDispatcherWithException)target;
		}
		
		if ( async ){
			
			dispatch_sem	= new AESemaphore("ListenerManager::"+name);
			dispatch_queue 	= new LinkedList();
			
			if ( target_with_exception != null ){
				
				throw( new RuntimeException( "Can't have an async manager with exceptions!"));
			}
		}
	
Methods Summary
public voidaddListener(java.lang.Object listener)

		synchronized( this ){
			
			ArrayList	new_listeners	= new ArrayList( listeners );
			
			if (new_listeners.contains(listener)) {
				Logger.log(new LogEvent(LogIDs.CORE, LogEvent.LT_WARNING,
						"addListener called but listener already added for " + name
								+ "\n\t" + Debug.getStackTrace(true, false)));
			}
			new_listeners.add( listener );
			
			if (new_listeners.size() > 50) {
				Logger.log(new LogEvent(LogIDs.CORE, LogEvent.LT_WARNING,
						"addListener: over 50 listeners added for " + name
								+ "\n\t" + Debug.getStackTrace(true, false)));
			}
			
			listeners	= new_listeners;
			
			if ( async && async_thread == null ){
				
				async_thread = new AEThread( name )
					{
						public void
						runSupport()
						{
							dispatchLoop();
						}
					};
					
				async_thread.setDaemon( true );
				
				async_thread.start();
			}
		}
	
public voidclear()

		synchronized( this ){
									
			listeners	= new ArrayList();
			
			if ( async ){
				
				async_thread = null;
				
					// try and wake up the thread so it kills itself
				
				dispatch_sem.release();
			}
		}
	
public static org.gudy.azureus2.core3.util.ListenerManagercreateAsyncManager(java.lang.String name, ListenerManagerDispatcher target)

		return( new ListenerManager( name, target, true ));
	
public static org.gudy.azureus2.core3.util.ListenerManagercreateManager(java.lang.String name, ListenerManagerDispatcher target)

		return( new ListenerManager( name, target, false ));
	
public voiddispatch(java.lang.Object listener, int type, java.lang.Object value)

		dispatch( listener, type, value, false );
	
public voiddispatch(java.lang.Object listener, int type, java.lang.Object value, boolean blocking)

		if ( async ){
			
			AESemaphore	sem = null;
			
			if ( blocking ){
				
				sem = new AESemaphore( "ListenerManager:blocker");
			}
	
			synchronized( this ){
								
					// 5 entries to denote single listener
				
				dispatch_queue.add(new Object[]{ listener, new Integer(type), value, sem, null });
				
				if ( async_thread == null ){
					
					async_thread = new AEThread( name )
						{
							public void
							runSupport()
							{
								dispatchLoop();
							}
						};
						
					async_thread.setDaemon( true );
					
					async_thread.start();
				}
			}
			
			dispatch_sem.release();
	
			if ( sem != null ){
				
				sem.reserve();
			}
		}else{
			
			if ( target_with_exception != null ){
				
				throw( new RuntimeException( "call dispatchWithException, not dispatch"));
			}
			
			try{
				target.dispatch( listener, type, value );
				
			}catch( Throwable e ){
				
				Debug.printStackTrace( e );
			}
		}
	
public voiddispatch(int type, java.lang.Object value)

		dispatch( type, value, false );
	
public voiddispatch(int type, java.lang.Object value, boolean blocking)

		if ( async ){
			
			AESemaphore	sem = null;
			
			if ( blocking ){
				
				sem = new AESemaphore( "ListenerManager:blocker");
			}
			
			synchronized( this ){
				
					// if there's nobody listening then no point in queueing 
				
				if ( listeners.size() == 0 ){
						
					return;
				}
				
					// listeners are "copy on write" updated, hence we grab a reference to the 
					// current listeners here. Any subsequent change won't affect our listeners
												
				dispatch_queue.add(new Object[]{listeners, new Integer(type), value, sem });
			}
			
			dispatch_sem.release();
			
			if ( sem != null ){
				
				sem.reserve();
			}
		}else{
			
			if ( target_with_exception != null ){
				
				throw( new RuntimeException( "call dispatchWithException, not dispatch"));
			}
			
			List	listeners_ref;
			
			synchronized( this ){
				
				listeners_ref = listeners;				
			}	
			
			try{
				dispatchInternal( listeners_ref, type, value );
				
			}catch( Throwable e ){
				
				Debug.printStackTrace( e );
			}
		}
	
protected voiddispatchInternal(java.util.List listeners_ref, int type, java.lang.Object value)

		
		for (int i=0;i<listeners_ref.size();i++){
		
			
			if ( target_with_exception != null ){
					
				// System.out.println( name + ":dispatchWithException" );
				
					// DON'T catch and handle exceptions here are they are permitted to
					// occur!
				
				target_with_exception.dispatchWithException( listeners_ref.get(i), type, value );
					
			}else{
			
				try{
						// System.out.println( name + ":dispatch" );
						
					target.dispatch( listeners_ref.get(i), type, value );
					
				}catch( Throwable e ){
					
					Debug.printStackTrace( e );
				}
			}
		}
	
protected voiddispatchInternal(java.lang.Object listener, int type, java.lang.Object value)

		
		if ( target_with_exception != null ){
				
				// System.out.println( name + ":dispatchWithException" );
				
				// DON'T catch and handle exceptions here are they are permitted to
				// occur!

			target_with_exception.dispatchWithException( listener, type, value );
				
		}else{
			try{
				
					// System.out.println( name + ":dispatch" );
				
				target.dispatch( listener, type, value );
			
			}catch( Throwable e ){
					
				Debug.printStackTrace( e );
			}
		}
	
public voiddispatchLoop()

		// System.out.println( "ListenerManager::dispatch thread '" + Thread.currentThread() + "' starts");
		
		while(true){
			
			dispatch_sem.reserve();
			
			Object[] data = null;
			
			synchronized( this ){
				
				if ( async_thread != Thread.currentThread()){
					
						// we've been asked to close. this sem reservation must be
						// "returned" to the pool in case it represents a valid  entry
						// to be picked up by another thread
					
					dispatch_sem.release();
					
					break;
				}
				
				if ( dispatch_queue.size() > 0 ){
					
					data = (Object[])dispatch_queue.remove(0);
				}
			}
			
			if ( data != null ){
			
				try{						
					if ( data.length == 4 ){
					
						dispatchInternal((List)data[0], ((Integer)data[1]).intValue(), data[2] );
						
					}else{
						
						dispatchInternal( data[0], ((Integer)data[1]).intValue(), data[2] );
					}
					
				}catch( Throwable e ){
					
					Debug.printStackTrace( e );
					
				}finally{
					
					if ( data[3] != null ){
						
						((AESemaphore)data[3]).release();
					}
				}
			}
		}
		
		// System.out.println( "ListenerManager::dispatch thread '" + Thread.currentThread() + "' ends");
	
public voiddispatchWithException(int type, java.lang.Object value)

		List	listeners_ref;
		
		synchronized( this ){
			
			listeners_ref = listeners;			
		}
		
		dispatchInternal( listeners_ref, type, value );
	
public static voiddispatchWithTimeout(java.util.List _listeners, ListenerManagerDispatcher _dispatcher, long _timeout)

		final List	listeners = new ArrayList( _listeners );
		
		final boolean[]	completed = new boolean[listeners.size()];
		
		final AESemaphore	timeout_sem = new AESemaphore("ListenerManager:dwt:timeout");
		
		for (int i=0;i<listeners.size();i++){
			
			final int f_i	= i;
						
			new AEThread( "ListenerManager:dwt:dispatcher", true ){
				public void
				runSupport()
				{
					try{
						_dispatcher.dispatch( listeners.get(f_i), -1, null );
						
					}catch( Throwable e ){
						
						Debug.printStackTrace(e);
						
					}finally{
						
						completed[f_i]	= true;
						
						timeout_sem.release();
					}
				}
			}.start();
		}
		
		boolean	timeout_occurred = false;
		
		for (int i=0;i<listeners.size() ;i++){
			
			if ( _timeout <= 0 ){
				
				timeout_occurred	= true;
				
				break;
			}
			
			long start = SystemTime.getCurrentTime();
			
			if ( !timeout_sem.reserve( _timeout )){
				
				timeout_occurred	= true;
				
				break;
			}
			
			long end = SystemTime.getCurrentTime();

			if ( end > start ){

				_timeout = _timeout - ( end - start );
			}
		}
		
		if ( timeout_occurred ){
			
			String	str = "";
			
			for (int i=0;i<completed.length;i++){
			
				if ( !completed[i] ){
					
					str += (str.length()==0?"":",") + listeners.get(i);
				}
			}
			
			if ( str.length() > 0 ){
				
				Debug.out( "Listener dispatch timeout: failed = " + str );
			}
		}
	
public java.util.ListgetListenersCopy()

			// we can just return the listeners as we copy on update
				
		return( listeners );
	
public voidremoveListener(java.lang.Object listener)

		synchronized( this ){
			
			ArrayList	new_listeners = new ArrayList( listeners );
			
			new_listeners.remove( listener );
			
			listeners	= new_listeners;
			
			if ( async && listeners.size() == 0 ){
				
				async_thread = null;
				
					// try and wake up the thread so it kills itself
				
				dispatch_sem.release();
			}
		}
	
public longsize()

		if (listeners == null)
			return 0;

		return listeners.size();