/*
* JBoss, Home of Professional Open Source.
* Copyright 2006, Red Hat Middleware LLC, and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.aspects.asynchronous.concurrent;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactoryUser;
import org.jboss.aspects.asynchronous.AsynchronousConstants;
import org.jboss.aspects.asynchronous.AsynchronousParameters;
import org.jboss.aspects.asynchronous.AsynchronousTask;
import org.jboss.aspects.asynchronous.AsynchronousUserTask;
import org.jboss.aspects.asynchronous.ThreadManager;
import org.jboss.aspects.asynchronous.ThreadManagerRequest;
import org.jboss.aspects.asynchronous.ThreadManagerResponse;
import org.jboss.aspects.asynchronous.common.AsynchronousEmptyTask;
import org.jboss.aspects.asynchronous.common.ThreadManagerResponseImpl;
/**
* @author <a href="mailto:chussenet@yahoo.com">{Claude Hussenet Independent Consultant}</a>.
* @version <tt>$Revision: 57186 $</tt>
*/
public class ThreadManagerImpl
extends ThreadFactoryUser
implements AsynchronousConstants, ThreadManager
{
protected PooledExecutor _pooledExecutor = null;
protected boolean waitWhenPoolSizeIsFull = true;
protected boolean isPooling = true;
/**
* Create a new pool with all default settings
*/
public ThreadManagerImpl()
{
_pooledExecutor = new PooledExecutor();
setWaitWhenPoolSizeIsFull(false);
}
/**
* Create a new pool with all default settings except
* <p/>
* for maximum pool size.
*/
public ThreadManagerImpl(int maximumPoolSize)
{
_pooledExecutor = new PooledExecutor(maximumPoolSize);
setWaitWhenPoolSizeIsFull(false);
}
/**
* Set the minimum number of threads to use.
*
* @throws IllegalArgumentException if less than zero. (It is not
* <p/>
* considered an error to set the minimum to be greater than the
* <p/>
* maximum. However, in this case there are no guarantees about
* <p/>
* behavior.)
*/
public void setMaximumPoolSize(int maximumPoolSize)
{
_pooledExecutor.setMaximumPoolSize(maximumPoolSize);
}
/**
* Set the policy for blocked execution to be to wait until a thread
* <p/>
* is available.
* <p/>
* <p/>
* <p/>
* OR
* <p/>
* <p/>
* <p/>
* Set the policy for blocked execution to be to
* <p/>
* throw a RuntimeException.
*/
public void setWaitWhenPoolSizeIsFull(boolean value)
{
if (value)
_pooledExecutor.waitWhenBlocked();
else
_pooledExecutor.abortWhenBlocked();
waitWhenPoolSizeIsFull = value;
}
/**
* return the policy when the pool is full
*/
public boolean getWaitWhenPoolSizeIsFull()
{
return waitWhenPoolSizeIsFull;
}
/**
* Return the maximum number of threads to simultaneously execute
*/
public int getMaximumPoolSize()
{
return _pooledExecutor.getMaximumPoolSize();
}
/**
* Set the minimum number of threads to use.
*
* @throws IllegalArgumentException if less than zero. (It is not
* <p/>
* considered an error to set the minimum to be greater than the
* <p/>
* maximum. However, in this case there are no guarantees about
* <p/>
* behavior.)
*/
public void setMinimumPoolSize(int minimumPoolSize)
{
_pooledExecutor.setMinimumPoolSize(minimumPoolSize);
}
/**
* Return the minimum number of threads to simultaneously execute.
* <p/>
* (Default value is 1). If fewer than the mininum number are
* <p/>
* running upon reception of a new request, a new thread is started
* <p/>
* to handle this request.
*/
public int getMinimumPoolSize()
{
return _pooledExecutor.getMinimumPoolSize();
}
/**
* Set the number of milliseconds to keep threads alive waiting for
* <p/>
* new commands. A negative value means to wait forever. A zero
* <p/>
* value means not to wait at all.
*/
public void setKeepAliveTime(long time)
{
_pooledExecutor.setKeepAliveTime(time);
}
/**
* Return the number of milliseconds to keep threads alive waiting
* <p/>
* for new commands. A negative value means to wait forever. A zero
* <p/>
* value means not to wait at all.
*/
public long getKeepAliveTime()
{
return _pooledExecutor.getKeepAliveTime();
}
/**
* Return the current number of active threads in the pool. This
* <p/>
* number is just a snaphot, and may change immediately upon
* <p/>
* returning
*/
public long getPoolSize()
{
return _pooledExecutor.getPoolSize();
}
/**
* Return the response from an asynchronous task
* <p/>
* The call returns within the timeout defined
* <p/>
* in the process method
*/
public ThreadManagerResponse waitForResponse(AsynchronousTask input)
{
AsynchronousTask[] tTask = {input};
return waitForResponses(tTask)[0];
}
/**
* Return an array of responses from an array of asynchronous task
* <p/>
* The call returns within the maximum timeout from the array of tasks
*/
public ThreadManagerResponse[] waitForResponses(AsynchronousTask[] inputImpl)
{
if (inputImpl == null)
{
System.err.println("PPMImpl:waitForResponses NULL PARAMETER");
return null;
}
ThreadManagerResponse[] response =
new ThreadManagerResponseImpl[inputImpl.length];
for (int i = 0; i < inputImpl.length; i++)
{
AsynchronousTask fr = inputImpl[i];
response[i] = fr.getResponse();
}
return response;
}
public AsynchronousTask process(ThreadManagerRequest ppmRequest)
{
return process(ppmRequest.getId(),
ppmRequest.getTaskClassImpl(),
ppmRequest.getInputParameters(),
ppmRequest.getTimeout());
}
/**
* Create, start and return a new asynchronous task from :
* <p/>
* <p/>
* <p/>
* <p><b>taskImpl</b> class instance defining the task to process
* <p/>
* <p><b>inputParametersImpl</b> class instance defining the input parameters
* <p/>
* <p><b>timeout</b> defined the given time limit to process the task
*/
private AsynchronousTask process(String id,
AsynchronousUserTask taskImpl,
AsynchronousParameters inputParametersImpl,
long timeout)
{
try
{
if (this.getPoolSize() + 1 > this.getMaximumPoolSize())
System.err.println("process: The maximum pool size defined at "
+ this.getMaximumPoolSize()
+ " is reached before processing task["
+ id
+ "] !");
org.jboss.aspects.asynchronous.concurrent.AsynchronousTask ft =
new AsynchronousTaskImpl(id,
taskImpl,
inputParametersImpl,
timeout);
Runnable cmd = ft.add();
if (isPooling())
_pooledExecutor.execute(cmd);
else
{
Thread thread = getThreadFactory().newThread(cmd);
thread.start();
}
Thread.yield();
Thread.sleep(0);
Thread.yield();
return ft;
}
catch (Exception e)
{
return new AsynchronousEmptyTask(id,
AsynchronousConstants.CAN_NOT_PROCESS,
e,
e.getMessage(),
System.currentTimeMillis());
}
}
public boolean isPooling()
{
return isPooling;
}
public void setPooling(boolean isPooling)
{
this.isPooling = isPooling;
}
}
|