FileDocCategorySizeDatePackage
SelectSocketsThreadPool.javaAPI DocExample6777Mon May 20 00:24:28 BST 2002com.ronsoft.books.nio.channels

SelectSocketsThreadPool.java

package com.ronsoft.books.nio.channels;

import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.channels.SelectionKey;
import java.util.List;
import java.util.LinkedList;
import java.io.IOException;

/**
 * Specialization of the SelectSockets class which uses a thread pool
 * to service channels.  The thread pool is an ad-hoc implementation
 * quicky lashed togther in a few hours for demonstration purposes.
 * It's definitely not production quality.
 *
 * Created May 2002
 * @author Ron Hitchens (ron@ronsoft.com)
 * @version $Id: SelectSocketsThreadPool.java,v 1.5 2002/05/20 07:24:29 ron Exp $
 */
public class SelectSocketsThreadPool extends SelectSockets
{
	private static final int MAX_THREADS = 5;

	private ThreadPool pool = new ThreadPool (MAX_THREADS);

	// -------------------------------------------------------------

	public static void main (String [] argv)
		throws Exception
	{
		new SelectSocketsThreadPool().go (argv);
	}

	// -------------------------------------------------------------

	/**
	 * Sample data handler method for a channel with data ready to read.
	 * This method is invoked from the go() method in the parent class.
	 * This handler delegates to a worker thread in a thread pool to
	 * service the channel, then returns immediately.
	 * @param key A SelectionKey object representing a channel
	 *  determined by the selector to be ready for reading.  If the
	 *  channel returns an EOF condition, it is closed here, which
	 *  automatically invalidates the associated key.  The selector
	 *  will then de-register the channel on the next select call.
	 */
	protected void readDataFromSocket (SelectionKey key)
		throws Exception
	{
		WorkerThread worker = pool.getWorker();

		if (worker == null) {
			// No threads available, do nothing, the selection
			// loop will keep calling this method until a
			// thread becomes available.  This design could
			// be improved.
			return;	
		}

		// invoking this wakes up the worker thread then returns
		worker.serviceChannel (key);
	}

	// ---------------------------------------------------------------

	/**
	 * A very simple thread pool class.  The pool size is set at
	 * construction time and remains fixed.  Threads are cycled
	 * through a FIFO idle queue.
	 */
	private class ThreadPool
	{
		List idle = new LinkedList();

		ThreadPool (int poolSize)
		{
			// fill up the pool with worker threads
			for (int i = 0; i < poolSize; i++) {
				WorkerThread thread = new WorkerThread (this);

				// set thread name for debugging, start it
				thread.setName ("Worker" + (i + 1));
				thread.start();

				idle.add (thread);
			}
		}

		/**
		 * Find an idle worker thread, if any.  Could return null.
		 */
		WorkerThread getWorker()
		{
			WorkerThread worker = null;

			synchronized (idle) {
				if (idle.size() > 0) {
					worker = (WorkerThread) idle.remove (0);
				}
			}

			return (worker);
		}

		/**
		 * Called by the worker thread to return itself to the
		 * idle pool.
		 */
		void returnWorker (WorkerThread worker)
		{
			synchronized (idle) {
				idle.add (worker);
			}
		}
	}

	/**
	 * A worker thread class which can drain channels and echo-back
	 * the input.  Each instance is constructed with a reference to
	 * the owning thread pool object. When started, the thread loops
	 * forever waiting to be awakened to service the channel associated
	 * with a SelectionKey object.
	 * The worker is tasked by calling its serviceChannel() method
	 * with a SelectionKey object.  The serviceChannel() method stores
	 * the key reference in the thread object then calls notify()
	 * to wake it up.  When the channel has been drained, the worker
	 * thread returns itself to its parent pool.
	 */
	private class WorkerThread extends Thread
	{
		private ByteBuffer buffer = ByteBuffer.allocate (1024);
		private ThreadPool pool;
		private SelectionKey key;

		WorkerThread (ThreadPool pool)
		{
			this.pool = pool;
		}

		// loop forever waiting for work to do
		public synchronized void run()
		{
			System.out.println (this.getName() + " is ready");

			while (true) {
				try {
					// sleep and release object lock
					this.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
					// clear interrupt status
					this.interrupted();
				}

				if (key == null) {
					continue;	// just in case
				}

				System.out.println (this.getName()
					+ " has been awakened");

				try {
					drainChannel (key);
				} catch (Exception e) {
					System.out.println ("Caught '"
						+ e + "' closing channel");

					// close channel and nudge selector
					try {
						key.channel().close();
					} catch (IOException ex) {
						ex.printStackTrace();
					}

					key.selector().wakeup();
				}

				key = null;

				// done, ready for more, return to pool
				this.pool.returnWorker (this);
			}
		}

		/**
		 * Called to initiate a unit of work by this worker thread
		 * on the provided SelectionKey object.  This method is
		 * synchronized, as is the run() method, so only one key
		 * can be serviced at a given time.
		 * Before waking the worker thread, and before returning
		 * to the main selection loop, this key's interest set is
		 * updated to remove OP_READ.  This will cause the selector
		 * to ignore read-readiness for this channel while the
		 * worker thread is servicing it.
		 */
		synchronized void serviceChannel (SelectionKey key)
		{
			this.key = key;

			key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));

			this.notify();		// awaken the thread
		}

		/**
		 * The actual code which drains the channel associated with
		 * the given key.  This method assumes the key has been
		 * modified prior to invocation to turn off selection
		 * interest in OP_READ.  When this method completes it
		 * re-enables OP_READ and calls wakeup() on the selector
		 * so the selector will resume watching this channel.
		 */
		void drainChannel (SelectionKey key)
			throws Exception
		{
			SocketChannel channel = (SocketChannel) key.channel();
			int count;

			buffer.clear();			// make buffer empty

			// loop while data available, channel is non-blocking
			while ((count = channel.read (buffer)) > 0) {
				buffer.flip();		// make buffer readable

				// send the data, may not go all at once
				while (buffer.hasRemaining()) {
					channel.write (buffer);
				}
				// WARNING: the above loop is evil.
				// See comments in superclass.

				buffer.clear();		// make buffer empty
			}

			if (count < 0) {
				// close channel on EOF, invalidates the key
				channel.close();
				return;
			}

			// resume interest in OP_READ
			key.interestOps (key.interestOps() | SelectionKey.OP_READ);

			// cycle the selector so this key is active again
			key.selector().wakeup();
		}
	}
}