FileDocCategorySizeDatePackage
ThreadPool.javaAPI DocAzureus 3.0.3.414696Fri Aug 31 15:46:26 BST 2007org.gudy.azureus2.core3.util

ThreadPool

public class ThreadPool extends Object
author
parg

Fields Summary
private static final int
IDLE_LINGER_TIME
private static final boolean
LOG_WARNINGS
private static final int
WARN_TIME
private static List
busy_pools
private static boolean
busy_pool_timer_set
private static boolean
debug_thread_pool
private static boolean
debug_thread_pool_log_on
private static ThreadLocal
tls
private String
name
private int
max_size
private int
thread_name_index
private long
execution_limit
private Stack
thread_pool
private List
busy
private boolean
queue_when_full
private List
task_queue
private AESemaphore
thread_sem
private int
thread_priority
private boolean
warn_when_full
private long
task_total
private long
task_total_last
private Average
task_average
Constructors Summary
public ThreadPool(String _name, int _max_size)

	
	
	
			
				 
	
		this( _name, _max_size, false );
	
public ThreadPool(String _name, int _max_size, boolean _queue_when_full)

		name			= _name;
		max_size		= _max_size;
		queue_when_full	= _queue_when_full;
		
		thread_sem = new AESemaphore( "ThreadPool::" + name, _max_size );
		
		thread_pool	= new Stack();
					
		busy		= new ArrayList( _max_size );
	
Methods Summary
protected static voidcheckAllTimeouts()

		
	  
	
	
		List	pools;	
	
			// copy the busy pools to avoid potential deadlock due to synchronization
			// nestings
		
		synchronized( busy_pools ){
			
			pools	= new ArrayList( busy_pools );
		}
		
		for (int i=0;i<pools.size();i++){
			
			((ThreadPool)pools.get(i)).checkTimeouts();
		}
	
protected voidcheckTimeouts()

		synchronized( ThreadPool.this ){
		
			long	diff = task_total - task_total_last;
			
			task_average.addValue( diff );
			
			task_total_last = task_total;
			
			if ( debug_thread_pool_log_on ){
				
				System.out.println( "ThreadPool '" + getName() + "'/" + thread_name_index + ": max=" + max_size + ",sem=[" + thread_sem.getString() + "],pool=" + thread_pool.size() + ",busy=" + busy.size() + ",queue=" + task_queue.size());
			}
			
			long	now = SystemTime.getCurrentTime();
			
			for (int i=0;i<busy.size();i++){
					
				threadPoolWorker	x = (threadPoolWorker)busy.get(i);
			
				long	elapsed = now - x.run_start_time ;
					
				if ( elapsed > ( WARN_TIME * (x.warn_count+1))){
		
					x.warn_count++;
					
					if ( LOG_WARNINGS ){
						
						DebugLight.out( x.getWorkerName() + ": running, elapsed = " + elapsed + ", state = " + x.state );
					}
					
					if ( execution_limit > 0 && elapsed > execution_limit ){
						
						if ( LOG_WARNINGS ){
							
							DebugLight.out( x.getWorkerName() + ": interrupting" );
						}
						
						AERunnable r = x.runnable;

						try{
							if ( r instanceof ThreadPoolTask ){
								
								((ThreadPoolTask)r).interruptTask();
								
							}else{
								
								x.worker_thread.interrupt();
							}
						}catch( Throwable e ){
							
							DebugLight.printStackTrace( e );
						}
					}
				}
			}
		}
	
protected voidcheckWarning()

		if ( warn_when_full ){
			
			Debug.out( "Thread pool '" + getName() + "' is full" );
			
			warn_when_full	= false;
		}
	
private voidgenerateEvidence(IndentWriter writer)

		writer.println( name + ": max=" + max_size +",qwf=" + queue_when_full + ",queue=" + task_queue.size() + ",busy=" + busy.size() + ",pool=" + thread_pool.size() + ",total=" + task_total + ":" + DisplayFormatters.formatDecimal(task_average.getDoubleAverage(),2) + "/sec");
	
public intgetMaxThreads()

		return( max_size );
	
public java.lang.StringgetName()

		return( name );
	
public intgetQueueSize()

		synchronized( this ){
			
			return( task_queue.size());
		}
	
public AERunnable[]getQueuedTasks()

		synchronized( this ){

			AERunnable[]	res = new AERunnable[task_queue.size()];
			
			task_queue.toArray(res);
			
			return( res );
		}
	
public AERunnable[]getRunningTasks()

		List	runnables	= new ArrayList();
		
		synchronized( this ){

			Iterator	it = busy.iterator();
			
			while( it.hasNext()){
				
				threadPoolWorker	worker = (threadPoolWorker)it.next();
				
				AERunnable	runnable = worker.getRunnable();
				
				if ( runnable != null ){
					
					runnables.add( runnable );
				}
			}
		}
		
		AERunnable[]	res = new AERunnable[runnables.size()];
			
		runnables.toArray(res);
			
		return( res );
	
public booleanisQueued(AERunnable task)

		synchronized( this ){

			return( task_queue.contains( task ));
		}
	
public org.gudy.azureus2.core3.util.ThreadPool$threadPoolWorkerrun(AERunnable runnable, boolean high_priority)

param
runnable
param
high_priority inserts at front if tasks queueing
return

		// System.out.println( "Thread pool:" + name + " - sem = " + thread_sem.getValue() + ", queue = " + task_queue.size());
		
			// not queueing, grab synchronous sem here
		
		if ( !queue_when_full ){
		
			if ( !thread_sem.reserveIfAvailable()){
			
					// defend against recursive entry when in queuing mode (yes, it happens)
				
				threadPoolWorker	recursive_worker = (threadPoolWorker)tls.get();
				
				if ( recursive_worker == null || recursive_worker.getOwner() != this ){
	
						// do a blocking reserve here, not recursive 
					
					checkWarning();
					
					thread_sem.reserve();
	
				}else{
						// run immediately
							
					if ( runnable instanceof ThreadPoolTask ){
						
						ThreadPoolTask task = (ThreadPoolTask)runnable;
						                        
						task.worker = recursive_worker;
						
						try{
							task.taskStarted();
							
							task.run();
							
						}finally{
							
							task.taskCompleted();  
						}
					}else{
					
						runnable.runSupport();
					}
					
					return( recursive_worker );
				}
			}
		}
						
		threadPoolWorker allocated_worker;
						
		synchronized( this ){
		
				// reserve if available is non-blocking
			
			if ( queue_when_full && !thread_sem.reserveIfAvailable()){
			
				allocated_worker	= null;
			
				checkWarning();
				
				if ( high_priority ){
					
					task_queue.add( 0, runnable );
					
				}else{
				
					task_queue.add( runnable );
				}
			}else{
				
				if ( thread_pool.isEmpty()){
							
					allocated_worker = new threadPoolWorker();	
		
				}else{
									
					allocated_worker = (threadPoolWorker)thread_pool.pop();
				}
				
				if ( runnable instanceof ThreadPoolTask ){
					
					((ThreadPoolTask)runnable).worker = allocated_worker;
				}
				
				allocated_worker.run( runnable );
			}
		}
		
		return( queue_when_full?null:allocated_worker );
	
public org.gudy.azureus2.core3.util.ThreadPool$threadPoolWorkerrun(AERunnable runnable)

		return( run( runnable, false ));
	
public voidsetExecutionLimit(long millis)

		execution_limit	= millis;
	
public voidsetThreadPriority(int _priority)

		thread_priority	= _priority;
	
public voidsetWarnWhenFull()

		warn_when_full	= true;