FileDocCategorySizeDatePackage
RxTaskPool.javaAPI DocApache Tomcat 6.0.145005Fri Jul 20 04:20:36 BST 2007org.apache.catalina.tribes.transport

RxTaskPool.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.catalina.tribes.transport;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ThreadFactory;

/**
 * @author not attributable
 * @version 1.0
 */

public class RxTaskPool
{
    /**
     * A very simple thread pool class.  The pool size is set at
     * construction time and remains fixed.  Threads are cycled
     * through a FIFO idle queue.
     */

    List idle = new LinkedList();
    List used = new LinkedList();
    
    Object mutex = new Object();
    boolean running = true;
    
    private static int counter = 1;
    private int maxTasks;
    private int minTasks;
    
    private TaskCreator creator = null;

    private static synchronized int inc() {
        return counter++;
    }

    
    public RxTaskPool (int maxTasks, int minTasks, TaskCreator creator) throws Exception {
        // fill up the pool with worker threads
        this.maxTasks = maxTasks;
        this.minTasks = minTasks;
        this.creator = creator;
    }
    
    protected void configureTask(AbstractRxTask task) {
        synchronized (task) {
            task.setTaskPool(this);
//            task.setName(task.getClass().getName() + "[" + inc() + "]");
//            task.setDaemon(true);
//            task.setPriority(Thread.MAX_PRIORITY);
//            task.start();
        }
    }

    /**
     * Find an idle worker thread, if any.  Could return null.
     */
    public AbstractRxTask getRxTask()
    {
        AbstractRxTask worker = null;
        synchronized (mutex) {
            while ( worker == null && running ) {
                if (idle.size() > 0) {
                    try {
                        worker = (AbstractRxTask) idle.remove(0);
                    } catch (java.util.NoSuchElementException x) {
                        //this means that there are no available workers
                        worker = null;
                    }
                } else if ( used.size() < this.maxTasks && creator != null) {
                    worker = creator.createRxTask();
                    configureTask(worker);
                } else {
                    try { mutex.wait(); } catch ( java.lang.InterruptedException x ) {Thread.currentThread().interrupted();}
                }
            }//while
            if ( worker != null ) used.add(worker);
        }
        return (worker);
    }
    
    public int available() {
        return idle.size();
    }

    /**
     * Called by the worker thread to return itself to the
     * idle pool.
     */
    public void returnWorker (AbstractRxTask worker) {
        if ( running ) {
            synchronized (mutex) {
                used.remove(worker);
                //if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker);
                if ( idle.size() < maxTasks && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
                else {
                    worker.setDoRun(false);
                    synchronized (worker){worker.notify();}
                }
                mutex.notify();
            }
        }else {
            worker.setDoRun(false);
            synchronized (worker){worker.notify();}
        }
    }

    public int getMaxThreads() {
        return maxTasks;
    }

    public int getMinThreads() {
        return minTasks;
    }

    public void stop() {
        running = false;
        synchronized (mutex) {
            Iterator i = idle.iterator();
            while ( i.hasNext() ) {
                AbstractRxTask worker = (AbstractRxTask)i.next();
                returnWorker(worker);
                i.remove();
            }
        }
    }

    public void setMaxTasks(int maxThreads) {
        this.maxTasks = maxThreads;
    }

    public void setMinTasks(int minThreads) {
        this.minTasks = minThreads;
    }

    public TaskCreator getTaskCreator() {
        return this.creator;
    }
    
    public static interface TaskCreator  {
        public AbstractRxTask createRxTask();
    }
}