FileDocCategorySizeDatePackage
ThreadPoolImpl.javaAPI DocJava SE 5 API15750Fri Aug 26 14:54:30 BST 2005com.sun.corba.se.impl.orbutil.threadpool

ThreadPoolImpl.java

/*
 * Copyright 2005 Sun Microsystems, Inc. All rights reserved.
 * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
 */

package com.sun.corba.se.impl.orbutil.threadpool;

import java.security.AccessController;
import java.security.PrivilegedAction;

import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
import com.sun.corba.se.spi.orbutil.threadpool.Work;
import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue;

import com.sun.corba.se.impl.orbutil.ORBConstants;
import com.sun.corba.se.impl.orbutil.threadpool.WorkQueueImpl;

import com.sun.corba.se.spi.monitoring.MonitoringConstants;
import com.sun.corba.se.spi.monitoring.MonitoredObject;
import com.sun.corba.se.spi.monitoring.MonitoringFactories;
import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;

public class ThreadPoolImpl implements ThreadPool
{
    private static int threadCounter = 0; // serial counter useful for debugging

    private WorkQueue workQueue;
    
    // Stores the number of available worker threads
    private int availableWorkerThreads = 0;
    
    // Stores the number of threads in the threadpool currently
    private int currentThreadCount = 0;
    
    // Minimum number of worker threads created at instantiation of the threadpool
    private int minWorkerThreads = 0;
    
    // Maximum number of worker threads in the threadpool
    private int maxWorkerThreads = 0;
    
    // Inactivity timeout value for worker threads to exit and stop running
    private long inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT ;
    
    // Indicates if the threadpool is bounded or unbounded
    private boolean boundedThreadPool = false;
    
    // Running count of the work items processed
    // Set the value to 1 so that divide by zero is avoided in 
    // averageWorkCompletionTime()
    private long processedCount = 1;
    
    // Running aggregate of the time taken in millis to execute work items
    // processed by the threads in the threadpool
    private long totalTimeTaken = 0;

    // Lock for protecting state when required
    private Object lock = new Object();

    // Name of the ThreadPool
    private String name;

    // MonitoredObject for ThreadPool
    private MonitoredObject threadpoolMonitoredObject;
    
    // ThreadGroup in which threads should be created
    private final ThreadGroup threadGroup ;

    /**
     * This constructor is used to create an unbounded threadpool
     */
    public ThreadPoolImpl(ThreadGroup tg, String threadpoolName) {
        maxWorkerThreads = Integer.MAX_VALUE;
        workQueue = new WorkQueueImpl(this);
	threadGroup = tg ;
	name = threadpoolName;
	initializeMonitoring();
    }
 
    /**
     * This constructor is used to create an unbounded threadpool
     * in the ThreadGroup of the current thread
     */
    public ThreadPoolImpl(String threadpoolName) {
	this( Thread.currentThread().getThreadGroup(), threadpoolName ) ; 
    }

    /**
     * This constructor is used to create bounded threadpool
     */
    public ThreadPoolImpl(int minSize, int maxSize, long timeout, 
					    String threadpoolName) 
    {
        inactivityTimeout = timeout;
        minWorkerThreads = minSize;
        maxWorkerThreads = maxSize;
        boundedThreadPool = true;
        workQueue = new WorkQueueImpl(this);
	threadGroup = Thread.currentThread().getThreadGroup() ;
	name = threadpoolName;
        for (int i = 0; i < minWorkerThreads; i++) {
            createWorkerThread();
        }
	initializeMonitoring();
    }

    // Setup monitoring for this threadpool
    private void initializeMonitoring() {
	// Get root monitored object
	MonitoredObject root = MonitoringFactories.getMonitoringManagerFactory().
		createMonitoringManager(MonitoringConstants.DEFAULT_MONITORING_ROOT, null).
		getRootMonitoredObject();

	// Create the threadpool monitoring root
	MonitoredObject threadPoolMonitoringObjectRoot = root.getChild(
		    MonitoringConstants.THREADPOOL_MONITORING_ROOT);
	if (threadPoolMonitoringObjectRoot == null) {
	    threadPoolMonitoringObjectRoot =  MonitoringFactories.
		    getMonitoredObjectFactory().createMonitoredObject(
		    MonitoringConstants.THREADPOOL_MONITORING_ROOT,
		    MonitoringConstants.THREADPOOL_MONITORING_ROOT_DESCRIPTION);
	    root.addChild(threadPoolMonitoringObjectRoot);
	}
	threadpoolMonitoredObject = MonitoringFactories.
		    getMonitoredObjectFactory().
		    createMonitoredObject(name,
		    MonitoringConstants.THREADPOOL_MONITORING_DESCRIPTION);

	threadPoolMonitoringObjectRoot.addChild(threadpoolMonitoredObject);

	LongMonitoredAttributeBase b1 = new 
	    LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS, 
		    MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
		public Object getValue() {
		    return new Long(ThreadPoolImpl.this.currentNumberOfThreads());
		}
	    };
	threadpoolMonitoredObject.addAttribute(b1);
	LongMonitoredAttributeBase b2 = new 
	    LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_AVAILABLE_THREADS, 
		    MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
		public Object getValue() {
		    return new Long(ThreadPoolImpl.this.numberOfAvailableThreads());
		}
	    };
	threadpoolMonitoredObject.addAttribute(b2);
	LongMonitoredAttributeBase b3 = new 
	    LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS, 
		    MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS_DESCRIPTION) {
		public Object getValue() {
		    return new Long(ThreadPoolImpl.this.numberOfBusyThreads());
		}
	    };
	threadpoolMonitoredObject.addAttribute(b3);
	LongMonitoredAttributeBase b4 = new 
	    LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME, 
		    MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME_DESCRIPTION) {
		public Object getValue() {
		    return new Long(ThreadPoolImpl.this.averageWorkCompletionTime());
		}
	    };
	threadpoolMonitoredObject.addAttribute(b4);
	LongMonitoredAttributeBase b5 = new 
	    LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT, 
		    MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT_DESCRIPTION) {
		public Object getValue() {
		    return new Long(ThreadPoolImpl.this.currentProcessedCount());
		}
	    };
	threadpoolMonitoredObject.addAttribute(b5);

	// Add the monitored object for the WorkQueue
	
	threadpoolMonitoredObject.addChild(
		((WorkQueueImpl)workQueue).getMonitoredObject());
    }

    // Package private method to get the monitored object for this
    // class
    MonitoredObject getMonitoredObject() {
	return threadpoolMonitoredObject;
    }
    
    public WorkQueue getAnyWorkQueue()
    {
	return workQueue;
    }

    public WorkQueue getWorkQueue(int queueId)
	throws NoSuchWorkQueueException
    {
	if (queueId != 0)
	    throw new NoSuchWorkQueueException();
	return workQueue;
    }

    /**
     * To be called from the workqueue when work is added to the
     * workQueue. This method would create new threads if required
     * or notify waiting threads on the queue for available work
     */
    void notifyForAvailableWork(WorkQueue aWorkQueue) {
	synchronized (lock) {
	    if (availableWorkerThreads == 0) {
		createWorkerThread();
	    } else {
		aWorkQueue.notify();
	    }
	}
    }
    

    /**
     * To be called from the workqueue to create worker threads when none
     * available.
     */
    void createWorkerThread() {
	synchronized (lock) {
	    final String name = getName() ;
	      
	    if (boundedThreadPool) {
		if (currentThreadCount < maxWorkerThreads) {
		    currentThreadCount++;
		} else {
		    // REVIST - Need to create a thread to monitor the
		    // the state for deadlock i.e. all threads waiting for
		    // something which can be got from the item in the 
		    // workqueue, but there is no thread available to
		    // process that work item - DEADLOCK !!
		    return;
		}
	    } else {
		currentThreadCount++;
	    }

	    // If we get here, we need to create a thread.
	    AccessController.doPrivileged( 
		new PrivilegedAction() {
		    public Object run() {
			// Thread creation needs to be in a doPrivileged block
			// for two reasons:
			// 1. The creation of a thread in a specific ThreadGroup
			//    is a privileged operation.  Lack of a doPrivileged
			//    block here causes an AccessControlException 
			//    (see bug 6268145).
			// 2. We want to make sure that the permissions associated 
			//    with this thread do NOT include the permissions of
			//    the current thread that is calling this method.
			//    This leads to problems in the app server where
			//    some threads in the ThreadPool randomly get 
			//    bad permissions, leading to unpredictable 
			//    permission errors.
			WorkerThread thread = new WorkerThread(threadGroup, name);
			    
			// The thread must be set to a daemon thread so the
			// VM can exit if the only threads left are PooledThreads
			// or other daemons.  We don't want to rely on the
			// calling thread always being a daemon.
			// Note that no exception is possible here since we
			// are inside the doPrivileged block.
			thread.setDaemon(true);

			thread.start();
			
			return null ; 
		    }
		} 
	    ) ;
	} 
    }
    
    /** 
    * This method will return the minimum number of threads maintained 
    * by the threadpool. 
    */ 
    public int minimumNumberOfThreads() {
        return minWorkerThreads;
    }
    
    /** 
    * This method will return the maximum number of threads in the 
    * threadpool at any point in time, for the life of the threadpool 
    */ 
    public int maximumNumberOfThreads() {
        return maxWorkerThreads;
    }
    
    /** 
    * This method will return the time in milliseconds when idle 
    * threads in the threadpool are removed. 
    */ 
    public long idleTimeoutForThreads() {
        return inactivityTimeout;
    }
    
    /** 
    * This method will return the total number of threads currently in the 
    * threadpool. This method returns a value which is not synchronized. 
    */ 
    public int currentNumberOfThreads() {
	synchronized (lock) {
	    return currentThreadCount;
	}
    }
    
    /** 
    * This method will return the number of available threads in the 
    * threadpool which are waiting for work. This method returns a 
    * value which is not synchronized. 
    */ 
    public int numberOfAvailableThreads() {
	synchronized (lock) {
	    return availableWorkerThreads;
	}
    }
    
    /** 
    * This method will return the number of busy threads in the threadpool 
    * This method returns a value which is not synchronized. 
    */ 
    public int numberOfBusyThreads() {
	synchronized (lock) {
	    return (currentThreadCount - availableWorkerThreads);
	}
    }
    
    /**
     * This method returns the average elapsed time taken to complete a Work
     * item in milliseconds.
     */
    public long averageWorkCompletionTime() {
	synchronized (lock) {
	    return (totalTimeTaken / processedCount);
	}
    }
    
    /**
     * This method returns the number of Work items processed by the threadpool
     */
    public long currentProcessedCount() {
	synchronized (lock) {
	    return processedCount;
	}
    }

    public String getName() {
        return name;
    }

    /** 
    * This method will return the number of WorkQueues serviced by the threadpool. 
    */ 
    public int numberOfWorkQueues() {
        return 1;
    } 


    private static synchronized int getUniqueThreadId() {
        return ThreadPoolImpl.threadCounter++;
    }


    private class WorkerThread extends Thread
    {
        private Work currentWork;
        private int threadId = 0; // unique id for the thread
        // thread pool this WorkerThread belongs too
        private String threadPoolName;
	// name seen by Thread.getName()
	private StringBuffer workerThreadName = new StringBuffer();

        WorkerThread(ThreadGroup tg, String threadPoolName) {
	    super(tg, "Idle");
	    this.threadId = ThreadPoolImpl.getUniqueThreadId();
            this.threadPoolName = threadPoolName;
	    setName(composeWorkerThreadName(threadPoolName, "Idle"));
        }
        
        public void run() {
            while (true) {
                try {

		    synchronized (lock) {
			availableWorkerThreads++;
		    }
                    
                    // Get some work to do
                    currentWork = ((WorkQueueImpl)workQueue).requestWork(inactivityTimeout);

		    synchronized (lock) {
			availableWorkerThreads--;
			// It is possible in notifyForAvailableWork that the
			// check for availableWorkerThreads = 0 may return
			// false, because the availableWorkerThreads has not been
			// decremented to zero before the producer thread added 
			// work to the queue. This may create a deadlock, if the
			// executing thread needs information which is in the work
			// item queued in the workqueue, but has no thread to work
			// on it since none was created because availableWorkerThreads = 0
			// returned false.
			// The following code will ensure that a thread is always available
			// in those situations
			if  ((availableWorkerThreads == 0) && 
				(workQueue.workItemsInQueue() > 0)) {
			    createWorkerThread();
			}
		    }

                    // Set the thread name for debugging.
	            setName(composeWorkerThreadName(threadPoolName,
				      Integer.toString(this.threadId)));

                    long start = System.currentTimeMillis();
                    
		    try {
			// Do the work
			currentWork.doWork();
		    } catch (Throwable t) {
			// Ignore all errors.
			;
		    }
                    
                    long end = System.currentTimeMillis();
                    

		    synchronized (lock) {
			totalTimeTaken += (end - start);
			processedCount++;
		    }

		    // set currentWork to null so that the work item can be 
		    // garbage collected
		    currentWork = null;

	            setName(composeWorkerThreadName(threadPoolName, "Idle"));

                } catch (TimeoutException e) {
                    // This thread timed out waiting for something to do.

		    synchronized (lock) {
			availableWorkerThreads--;

			// This should for both bounded and unbounded case
			if (currentThreadCount > minWorkerThreads) {
			    currentThreadCount--;
			    // This thread can exit.
			    return;
			} else {
			    // Go back to waiting on workQueue
			    continue;
			}
		    }
                } catch (InterruptedException ie) {
                    // InterruptedExceptions are
                    // caught here.  Thus, threads can be forced out of
                    // requestWork and so they have to reacquire the lock.
                    // Other options include ignoring or
                    // letting this thread die.
                    // Ignoring for now. REVISIT
		    synchronized (lock) {
			availableWorkerThreads--;
		    }

                } catch (Throwable e) {

                    // Ignore any exceptions that currentWork.process
                    // accidently lets through, but let Errors pass.
                    // Add debugging output?  REVISIT
		    synchronized (lock) {
			availableWorkerThreads--;
		    }

                }
            }
        }

	private String composeWorkerThreadName(String poolName, String workerName) {
            workerThreadName.setLength(0);
	    workerThreadName.append("p: ").append(poolName);
	    workerThreadName.append("; w: ").append(workerName);
	    return workerThreadName.toString();
	}
    } // End of WorkerThread class

}

// End of file.