// ========================================================================
// Copyright (c) 1999 Mort Bay Consulting (Australia) Pty. Ltd.
// $Id: ThreadPool.java,v 1.15.2.9 2003/06/04 04:47:59 starksm Exp $
// ========================================================================
package org.mortbay.util;
import java.io.Serializable;
/* ------------------------------------------------------------ */
/** A pool of threads.
*
* Avoids the expense of thread creation by pooling threads after
* their run methods exit for reuse.
*
* If the maximum pool size is reached, jobs wait for a free thread.
* By default there is no maximum pool size. Idle threads timeout
* and terminate until the minimum number of threads are running.
*
* This implementation uses the run(Object) method to place a
* job on a queue, which is read by the getJob(timeout) method.
* Derived implementations may specialize getJob(timeout) to
* obtain jobs from other sources without queing overheads.
*
* @version $Id: ThreadPool.java,v 1.15.2.9 2003/06/04 04:47:59 starksm Exp $
* @author Juancarlo Aņez
* @author Greg Wilkins
*/
public class ThreadPool
implements LifeCycle, Serializable
{
public static final String __DAEMON="org.mortbay.util.ThreadPool.daemon";
/* ------------------------------------------------------------------- */
private String _name;
private Pool _pool;
private Object _join="";
private transient boolean _started;
/* ------------------------------------------------------------------- */
/* Construct
*/
public ThreadPool()
{
_pool=new Pool();
_pool.setPoolClass(ThreadPool.PoolThread.class);
_name=this.getClass().getName();
int dot=_name.lastIndexOf('.');
if (dot>=0)
_name=_name.substring(dot+1);
}
/* ------------------------------------------------------------ */
/**
* @return The name of the ThreadPool.
*/
public String getName()
{
return _name;
}
/* ------------------------------------------------------------ */
/**
* @param name Name of the ThreadPool to use when naming Threads.
*/
public void setName(String name)
{
_name=name;
}
/* ------------------------------------------------------------ */
/**
* @return Name of the Pool instance this ThreadPool uses or null for
* an anonymous private pool.
*/
public String getPoolName()
{
return _pool.getPoolName();
}
/* ------------------------------------------------------------ */
/** Set the Pool name.
* All ThreadPool instances with the same Pool name will share the
* same Pool instance. Thus they will share the same max, min and
* available Threads. The field values of the first ThreadPool to call
* setPoolName with a specific name are used for the named
* Pool. Subsequent ThreadPools that join the name pool will loose their
* private values.
* @param name Name of the Pool instance this ThreadPool uses or null for
* an anonymous private pool.
*/
public void setPoolName(String name)
{
synchronized(Pool.class)
{
if (isStarted())
{
if ((name==null && _pool.getPoolName()!=null) ||
(name!=null && !name.equals(_pool.getPoolName())))
throw new IllegalStateException("started");
return;
}
if (name==null)
{
if (_pool.getPoolName()!=null)
_pool=new Pool();
}
else
{
Pool pool=Pool.getPool(name);
if (pool==null)
_pool.setPoolName(name);
else
_pool=pool;
}
}
}
/* ------------------------------------------------------------ */
/**
* Delegated to the named or anonymous Pool.
*/
public boolean isDaemon()
{
return _pool.getAttribute(__DAEMON)!=null;
}
/* ------------------------------------------------------------ */
/**
* Delegated to the named or anonymous Pool.
*/
public void setDaemon(boolean daemon)
{
_pool.setAttribute(__DAEMON,daemon?"true":null);
}
/* ------------------------------------------------------------ */
/** Is the pool running jobs.
* @return True if start() has been called.
*/
public boolean isStarted()
{
return _started;
}
/* ------------------------------------------------------------ */
/** Get the number of threads in the pool.
* Delegated to the named or anonymous Pool.
* @see #getIdleThreads
* @return Number of threads
*/
public int getThreads()
{
return _pool.size();
}
/* ------------------------------------------------------------ */
/** Get the number of idle threads in the pool.
* Delegated to the named or anonymous Pool.
* @see #getThreads
* @return Number of threads
*/
public int getIdleThreads()
{
return _pool.available();
}
/* ------------------------------------------------------------ */
/** Get the minimum number of threads.
* Delegated to the named or anonymous Pool.
* @see #setMinThreads
* @return minimum number of threads.
*/
public int getMinThreads()
{
return _pool.getMinSize();
}
/* ------------------------------------------------------------ */
/** Set the minimum number of threads.
* Delegated to the named or anonymous Pool.
* @see #getMinThreads
* @param minThreads minimum number of threads
*/
public void setMinThreads(int minThreads)
{
_pool.setMinSize(minThreads);
}
/* ------------------------------------------------------------ */
/** Set the maximum number of threads.
* Delegated to the named or anonymous Pool.
* @see #setMaxThreads
* @return maximum number of threads.
*/
public int getMaxThreads()
{
return _pool.getMaxSize();
}
/* ------------------------------------------------------------ */
/** Set the maximum number of threads.
* Delegated to the named or anonymous Pool.
* @see #getMaxThreads
* @param maxThreads maximum number of threads.
*/
public void setMaxThreads(int maxThreads)
{
_pool.setMaxSize(maxThreads);
}
/* ------------------------------------------------------------ */
/** Get the maximum thread idle time.
* Delegated to the named or anonymous Pool.
* @see #setMaxIdleTimeMs
* @return Max idle time in ms.
*/
public int getMaxIdleTimeMs()
{
return _pool.getMaxIdleTimeMs();
}
/* ------------------------------------------------------------ */
/** Set the maximum thread idle time.
* Threads that are idle for longer than this period may be
* stopped.
* Delegated to the named or anonymous Pool.
* @see #getMaxIdleTimeMs
* @param maxIdleTimeMs Max idle time in ms.
*/
public void setMaxIdleTimeMs(int maxIdleTimeMs)
{
_pool.setMaxIdleTimeMs(maxIdleTimeMs);
}
/* ------------------------------------------------------------ */
/** Set Max Read Time.
* @deprecated maxIdleTime is used instead.
*/
public void setMaxStopTimeMs(int ms)
{
Code.warning("setMaxStopTimeMs is deprecated. No longer required.");
}
/* ------------------------------------------------------------ */
/* Start the ThreadPool.
* Construct the minimum number of threads.
*/
public void start()
throws Exception
{
_started=true;
_pool.start();
}
/* ------------------------------------------------------------ */
/** Stop the ThreadPool.
* New jobs are no longer accepted,idle threads are interrupted
* and stopJob is called on active threads.
* The method then waits
* min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to
* stop, at which time killJob is called.
*/
public void stop()
throws InterruptedException
{
_started=false;
_pool.stop();
synchronized(_join)
{
_join.notifyAll();
}
}
/* ------------------------------------------------------------ */
public void join()
{
while(isStarted() && _pool!=null)
{
synchronized(_join)
{
try{if (isStarted() && _pool!=null)_join.wait(30000);}
catch (Exception e)
{
e.printStackTrace();
Code.ignore(e);
}
}
}
}
/* ------------------------------------------------------------ */
public void shrink()
throws InterruptedException
{
_pool.shrink();
}
/* ------------------------------------------------------------ */
/** Run job.
* Give a job to the pool.
* @param job If the job is derived from Runnable, the run method
* is called, otherwise it is passed as the argument to the handle
* method.
*/
public void run(Object job)
throws InterruptedException
{
if (job==null)
return;
try
{
PoolThread thread=(PoolThread)_pool.get(getMaxIdleTimeMs());
if (thread!=null)
thread.run(this,job);
else
{
Code.warning("No thread for "+job);
stopJob(null,job);
}
}
catch (InterruptedException e) {throw e;}
catch (Exception e){Code.warning(e);}
}
/* ------------------------------------------------------------ */
/** Handle a job.
* Called by the allocated thread to handle a job. If the job is a
* Runnable, it's run method is called. Otherwise this method needs to be
* specialized by a derived class to provide specific handling.
* @param job The job to execute.
* @exception InterruptedException
*/
protected void handle(Object job)
throws InterruptedException
{
if (job!=null && job instanceof Runnable)
((Runnable)job).run();
else
Code.warning("Invalid job: "+job);
}
/* ------------------------------------------------------------ */
/** Stop a Job.
* This method is called by the Pool if a job needs to be stopped.
* The default implementation does nothing and should be extended by a
* derived thread pool class if special action is required.
* @param thread The thread allocated to the job, or null if no thread allocated.
* @param job The job object passed to run.
*/
protected void stopJob(Thread thread, Object job)
{
}
/* ------------------------------------------------------------ */
/** Pool Thread class.
* The PoolThread allows the threads job to be
* retrieved and active status to be indicated.
*/
public static class PoolThread extends Thread implements Pool.PondLife
{
ThreadPool _threadPool;
Pool _pool;
Object _job;
int _id;
String _name;
/* ------------------------------------------------------------ */
public void enterPool(Pool pool,int id)
{
_pool=pool;
_id=id;
_name=_pool.getPoolName()==null
?("PoolThread-"+id):(_pool.getPoolName()+"-"+id);
this.setName(_name);
this.setDaemon(pool.getAttribute(__DAEMON)!=null);
this.start();
if (Code.verbose())Code.debug("enterPool ",this," -> ",pool);
}
/* ------------------------------------------------------------ */
public int getID()
{
return _id;
}
/* ------------------------------------------------------------ */
public void poolClosing()
{
synchronized(this)
{
_pool=null;
if (_job==null)
notify();
else
interrupt();
}
}
/* ------------------------------------------------------------ */
public void leavePool()
{
if (Code.verbose())Code.debug("leavePool ",this," <- ",_pool);
synchronized(this)
{
_pool=null;
if (_job==null || _threadPool==null)
notify();
else
{
_threadPool.stopJob(this,_job);
_job=null;
}
}
}
/* ------------------------------------------------------------ */
public void run(ThreadPool pool, Object job)
{
synchronized(this)
{
_threadPool=pool;
_job=job;
notify();
}
}
/* ------------------------------------------------------------ */
/** ThreadPool run.
* Loop getting jobs and handling them until idle or stopped.
*/
public void run()
{
while (_pool!=null && _pool.isStarted())
{
try
{
synchronized(this)
{
// Wait for a job.
if (_pool!=null && _pool.isStarted() && _job==null)
wait(_pool.getMaxIdleTimeMs());
}
// handle
if (_job!=null)
_threadPool.handle(_job);
}
catch (InterruptedException e)
{
Code.ignore(e);
}
finally
{
synchronized(this)
{
boolean got=_job!=null;
_job=null;
_threadPool=null;
try
{
if (got&&_pool!=null)
_pool.put(this);
}
catch (InterruptedException e){Code.ignore(e);}
}
}
}
}
public String toString()
{
return _name;
}
}
}