FileDocCategorySizeDatePackage
SelectorImpl.javaAPI DocJava SE 5 API14229Fri Aug 26 14:54:34 BST 2005com.sun.corba.se.impl.transport

SelectorImpl.java

/*
 * @(#)SelectorImpl.java	1.17 04/04/07
 * 
 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
 * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
 */

package com.sun.corba.se.impl.transport;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;

import com.sun.corba.se.pept.broker.Broker;
import com.sun.corba.se.pept.transport.Acceptor;
import com.sun.corba.se.pept.transport.Connection;
import com.sun.corba.se.pept.transport.EventHandler;
import com.sun.corba.se.pept.transport.ListenerThread;
import com.sun.corba.se.pept.transport.ReaderThread;

import com.sun.corba.se.spi.logging.CORBALogDomains;
import com.sun.corba.se.spi.orb.ORB;
import com.sun.corba.se.spi.orbutil.threadpool.Work;
import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;

import com.sun.corba.se.impl.logging.ORBUtilSystemException;
import com.sun.corba.se.impl.orbutil.ORBUtility;

/**
 * @author Harold Carr
 */
public class SelectorImpl
    extends
	Thread
    implements
	com.sun.corba.se.pept.transport.Selector
{
    private ORB orb;
    private Selector selector;
    private long timeout;
    private List deferredRegistrations;
    private List interestOpsList;
    private HashMap listenerThreads;
    private HashMap readerThreads;
    private boolean selectorStarted;
    private boolean closed;
    private ORBUtilSystemException wrapper ;


    public SelectorImpl(ORB orb)
    {
	this.orb = orb;
	selector = null;
	selectorStarted = false;
	timeout = 60000;
	deferredRegistrations = new ArrayList();
	interestOpsList = new ArrayList();
	listenerThreads = new HashMap();
	readerThreads = new HashMap();
	closed = false;
        wrapper = ORBUtilSystemException.get(orb,CORBALogDomains.RPC_TRANSPORT);
    }

    public void setTimeout(long timeout) 
    {
	this.timeout = timeout;
    }

    public long getTimeout()
    {
	return timeout;
    }

    public void registerInterestOps(EventHandler eventHandler)
    {
	if (orb.transportDebugFlag) {
	    dprint(".registerInterestOps:-> " + eventHandler);
	}

	SelectionKey selectionKey = eventHandler.getSelectionKey();
	if (selectionKey.isValid()) {
            int ehOps = eventHandler.getInterestOps();
            SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps);
	    synchronized(interestOpsList) {
		interestOpsList.add(keyAndOp);
	    }
            // tell Selector Thread there's an update to a SelectorKey's Ops
            selector.wakeup();
	}
	else {
            wrapper.selectionKeyInvalid(eventHandler.toString());
	    if (orb.transportDebugFlag) {
		dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler);
	    }
	}

	if (orb.transportDebugFlag) {
	    dprint(".registerInterestOps:<- ");
	}
    }

    public void registerForEvent(EventHandler eventHandler)
    {
	if (orb.transportDebugFlag) {
	    dprint(".registerForEvent: " + eventHandler);
	}

	if (isClosed()) {
	    if (orb.transportDebugFlag) {
		dprint(".registerForEvent: closed: " + eventHandler);
	    }
	    return;
	}

	if (eventHandler.shouldUseSelectThreadToWait()) {
	    synchronized (deferredRegistrations) {
		deferredRegistrations.add(eventHandler);
	    }
	    if (! selectorStarted) {
		startSelector();
	    }
	    selector.wakeup();
	    return;
	}

	switch (eventHandler.getInterestOps()) {
	case SelectionKey.OP_ACCEPT :
	    createListenerThread(eventHandler);
	    break;
	case SelectionKey.OP_READ :
	    createReaderThread(eventHandler);
	    break;
	default:
	    if (orb.transportDebugFlag) {
		dprint(".registerForEvent: default: " + eventHandler);
	    }
	    throw new RuntimeException(
                "SelectorImpl.registerForEvent: unknown interest ops");
	}
    }

    public void unregisterForEvent(EventHandler eventHandler)
    {
	if (orb.transportDebugFlag) {
	    dprint(".unregisterForEvent: " + eventHandler);
	}

	if (isClosed()) {
	    if (orb.transportDebugFlag) {
		dprint(".unregisterForEvent: closed: " + eventHandler);
	    }
	    return;
	}

	if (eventHandler.shouldUseSelectThreadToWait()) {
	    SelectionKey selectionKey = eventHandler.getSelectionKey();
	    selectionKey.cancel();
	    selector.wakeup();
	    return;
	}

	switch (eventHandler.getInterestOps()) {
	case SelectionKey.OP_ACCEPT :
	    destroyListenerThread(eventHandler);
	    break;
	case SelectionKey.OP_READ :
	    destroyReaderThread(eventHandler);
	    break;
	default:
	    if (orb.transportDebugFlag) {
		dprint(".unregisterForEvent: default: " + eventHandler);
	    }
	    throw new RuntimeException(
                "SelectorImpl.uregisterForEvent: unknown interest ops");
	}
    }

    public void close()
    {
	if (orb.transportDebugFlag) {
	    dprint(".close");
	}

	if (isClosed()) {
	    if (orb.transportDebugFlag) {
		dprint(".close: already closed");
	    }
	    return;
	}

	setClosed(true);

	Iterator i;

	// Kill listeners.

	i = listenerThreads.values().iterator();
	while (i.hasNext()) {
	    ListenerThread listenerThread = (ListenerThread) i.next();
	    listenerThread.close();
	}

	// Kill readers.

	i = readerThreads.values().iterator();
	while (i.hasNext()) {
	    ReaderThread readerThread = (ReaderThread) i.next();
	    readerThread.close();
	}

	// Selector

	try {
	    if (selector != null) {
		// wakeup Selector thread to process close request
		selector.wakeup();
	    }
	} catch (Throwable t) {
	    if (orb.transportDebugFlag) {
		dprint(".close: selector.close: " + t);
	    }
	}
    }

    ///////////////////////////////////////////////////
    //
    // Thread methods.
    //

    public void run()
    {
	setName("SelectorThread");
	while (!closed) {
	    try {
		int n = 0;
		if (timeout == 0 && orb.transportDebugFlag) {
		    dprint(".run: Beginning of selection cycle");
		}
		handleDeferredRegistrations();
		enableInterestOps();
		try {
		    n = selector.select(timeout);
		} catch (IOException  e) {
		    if (orb.transportDebugFlag) {
			dprint(".run: selector.select: " + e);
		    }
		}
		if (closed) {
		    selector.close();
		    if (orb.transportDebugFlag) {
			dprint(".run: closed - .run return");
		    }
		    return;
		}
		/*
		  if (timeout == 0 && orb.transportDebugFlag) {
		  dprint(".run: selector.select() returned: " + n);
		  }
		  if (n == 0) {
		  continue;
		  }
		*/
		Iterator iterator = selector.selectedKeys().iterator();
		if (orb.transportDebugFlag) {
		    if (iterator.hasNext()) {
			dprint(".run: n = " + n);
		    }
		}
		while (iterator.hasNext()) {
		    SelectionKey selectionKey = (SelectionKey) iterator.next();
		    iterator.remove();
		    EventHandler eventHandler = (EventHandler)
			selectionKey.attachment();
		    try {
			eventHandler.handleEvent();
		    } catch (Throwable t) {
			if (orb.transportDebugFlag) {
			    dprint(".run: eventHandler.handleEvent", t);
			}
		    }
		}
		if (timeout == 0 && orb.transportDebugFlag) {
		    dprint(".run: End of selection cycle");
		}
	    } catch (Throwable t) {
		// IMPORTANT: ignore all errors so the select thread keeps running.
		// Otherwise a guaranteed hang.
		if (orb.transportDebugFlag) {
		    dprint(".run: ignoring", t);
		}
	    }
	}
    }

    /////////////////////////////////////////////////////
    //
    // Implementation.
    //

    private synchronized boolean isClosed ()
    {
	return closed;
    }

    private synchronized void setClosed(boolean closed)
    {
	this.closed = closed;
    }

    private void startSelector()
    {
	try {
	    selector = Selector.open();
	} catch (IOException e) {
	    if (orb.transportDebugFlag) {
		dprint(".startSelector: Selector.open: IOException: " + e);
	    }
	    // REVISIT - better handling/reporting
	    RuntimeException rte =
		new RuntimeException(".startSelector: Selector.open exception");
	    rte.initCause(e);
	    throw rte;
	}
	setDaemon(true);
	start();
	selectorStarted = true;
	if (orb.transportDebugFlag) {
	    dprint(".startSelector: selector.start completed.");
	}
    }

    private void handleDeferredRegistrations()
    {
	synchronized (deferredRegistrations) {
            int deferredListSize = deferredRegistrations.size();
            for (int i = 0; i < deferredListSize; i++) {
                EventHandler eventHandler = 
		    (EventHandler)deferredRegistrations.get(i);
                if (orb.transportDebugFlag) {
                    dprint(".handleDeferredRegistrations: " + eventHandler);
                }
                SelectableChannel channel = eventHandler.getChannel();
                SelectionKey selectionKey = null;
                try {
                    selectionKey =
                        channel.register(selector,
                                         eventHandler.getInterestOps(),
                                         (Object)eventHandler);
                } catch (ClosedChannelException e) {
                    if (orb.transportDebugFlag) {
                        dprint(".handleDeferredRegistrations: " + e);
                    }
                }
                eventHandler.setSelectionKey(selectionKey);
            }
            deferredRegistrations.clear();
        }
    }

    private void enableInterestOps()
    {
	synchronized (interestOpsList) {
	    int listSize = interestOpsList.size();
	    if (listSize > 0) {
                if (orb.transportDebugFlag) {
                    dprint(".enableInterestOps:->");
                }
                SelectionKey selectionKey = null;
		SelectionKeyAndOp keyAndOp = null;
		int keyOp, selectionKeyOps = 0;
		for (int i = 0; i < listSize; i++) {
		    keyAndOp = (SelectionKeyAndOp)interestOpsList.get(i);
		    selectionKey = keyAndOp.selectionKey;

		    // Need to check if the SelectionKey is valid because a
		    // connection's SelectionKey could be put on the list to
		    // have its OP enabled and before it's enabled be reclaimed.
		    // Otherwise, the enabling of the OP will throw an exception
		    // here and exit this method an potentially not enable all
		    // registered ops.
		    //
		    // So, we ignore SelectionKeys that are invalid. They will get
		    // cleaned up on the next Selector.select() call.

		    if (selectionKey.isValid()) {
                        if (orb.transportDebugFlag) {
                            dprint(".enableInterestOps: " + keyAndOp);
                        }
		        keyOp = keyAndOp.keyOp;
		        selectionKeyOps = selectionKey.interestOps();
		        selectionKey.interestOps(selectionKeyOps | keyOp);
		    }
		}
		interestOpsList.clear();
                if (orb.transportDebugFlag) {
                    dprint(".enableInterestOps:<-");
                }
	    }
	}
    }

    private void createListenerThread(EventHandler eventHandler)
    {
	if (orb.transportDebugFlag) {
	    dprint(".createListenerThread: " + eventHandler);
	}
	Acceptor acceptor = eventHandler.getAcceptor();
	ListenerThread listenerThread =
	    new ListenerThreadImpl(orb, acceptor, this);
	listenerThreads.put(eventHandler, listenerThread);
	Throwable throwable = null;
	try {
	    orb.getThreadPoolManager().getThreadPool(0)
		.getWorkQueue(0).addWork((Work)listenerThread);
	} catch (NoSuchThreadPoolException e) {
	    throwable = e;
	} catch (NoSuchWorkQueueException e) {
	    throwable = e;
	}
	if (throwable != null) {
	    RuntimeException rte = new RuntimeException(throwable.toString());
	    rte.initCause(throwable);
	    throw rte;
	}
    }

    private void destroyListenerThread(EventHandler eventHandler)
    {
	if (orb.transportDebugFlag) {
	    dprint(".destroyListenerThread: " + eventHandler);
	}
	ListenerThread listenerThread = (ListenerThread)
	    listenerThreads.get(eventHandler);
	if (listenerThread == null) {
	    if (orb.transportDebugFlag) {
		dprint(".destroyListenerThread: cannot find ListenerThread - ignoring.");
	    }
	    return;
	}
	listenerThreads.remove(eventHandler);
	listenerThread.close();
    }

    private void createReaderThread(EventHandler eventHandler)
    {
	if (orb.transportDebugFlag) {
	    dprint(".createReaderThread: " + eventHandler);
	}
	Connection connection = eventHandler.getConnection();
	ReaderThread readerThread = 
	    new ReaderThreadImpl(orb, connection, this);
	readerThreads.put(eventHandler, readerThread);
	Throwable throwable = null;
	try {
	    orb.getThreadPoolManager().getThreadPool(0)
		.getWorkQueue(0).addWork((Work)readerThread);
	} catch (NoSuchThreadPoolException e) {
	    throwable = e;
	} catch (NoSuchWorkQueueException e) {
	    throwable = e;
	}
	if (throwable != null) {
	    RuntimeException rte = new RuntimeException(throwable.toString());
	    rte.initCause(throwable);
	    throw rte;
	}
    }

    private void destroyReaderThread(EventHandler eventHandler)
    {
	if (orb.transportDebugFlag) {
	    dprint(".destroyReaderThread: " + eventHandler);
	}
	ReaderThread readerThread = (ReaderThread)
	    readerThreads.get(eventHandler);
	if (readerThread == null) {
	    if (orb.transportDebugFlag) {
		dprint(".destroyReaderThread: cannot find ReaderThread - ignoring.");
	    }
	    return;
	}
	readerThreads.remove(eventHandler);
	readerThread.close();
    }

    private void dprint(String msg)
    {
	ORBUtility.dprint("SelectorImpl", msg);
    }

    protected void dprint(String msg, Throwable t)
    {
	dprint(msg);
	t.printStackTrace(System.out);
    }

    // Private class to contain a SelectionKey and a SelectionKey op.
    // Used only by SelectorImpl to register and enable SelectionKey
    // Op.
    // REVISIT - Could do away with this class and use the EventHanlder
    //           directly.
    private class SelectionKeyAndOp
    {
        // A SelectionKey.[OP_READ|OP_WRITE|OP_ACCEPT|OP_CONNECT]
        public int keyOp;
        public SelectionKey selectionKey;

        // constructor
        public SelectionKeyAndOp(SelectionKey selectionKey, int keyOp) {
	    this.selectionKey = selectionKey;
	    this.keyOp = keyOp;
	}
    }

// End of file.
}