FileDocCategorySizeDatePackage
SelectorImpl.javaAPI DocJava SE 5 API14229Fri Aug 26 14:54:34 BST 2005com.sun.corba.se.impl.transport

SelectorImpl

public class SelectorImpl extends Thread implements com.sun.corba.se.pept.transport.Selector
author
Harold Carr

Fields Summary
private com.sun.corba.se.spi.orb.ORB
orb
private Selector
selector
private long
timeout
private List
deferredRegistrations
private List
interestOpsList
private HashMap
listenerThreads
private HashMap
readerThreads
private boolean
selectorStarted
private boolean
closed
private com.sun.corba.se.impl.logging.ORBUtilSystemException
wrapper
Constructors Summary
public SelectorImpl(com.sun.corba.se.spi.orb.ORB orb)

	this.orb = orb;
	selector = null;
	selectorStarted = false;
	timeout = 60000;
	deferredRegistrations = new ArrayList();
	interestOpsList = new ArrayList();
	listenerThreads = new HashMap();
	readerThreads = new HashMap();
	closed = false;
        wrapper = ORBUtilSystemException.get(orb,CORBALogDomains.RPC_TRANSPORT);
    
Methods Summary
public voidclose()

	if (orb.transportDebugFlag) {
	    dprint(".close");
	}

	if (isClosed()) {
	    if (orb.transportDebugFlag) {
		dprint(".close: already closed");
	    }
	    return;
	}

	setClosed(true);

	Iterator i;

	// Kill listeners.

	i = listenerThreads.values().iterator();
	while (i.hasNext()) {
	    ListenerThread listenerThread = (ListenerThread) i.next();
	    listenerThread.close();
	}

	// Kill readers.

	i = readerThreads.values().iterator();
	while (i.hasNext()) {
	    ReaderThread readerThread = (ReaderThread) i.next();
	    readerThread.close();
	}

	// Selector

	try {
	    if (selector != null) {
		// wakeup Selector thread to process close request
		selector.wakeup();
	    }
	} catch (Throwable t) {
	    if (orb.transportDebugFlag) {
		dprint(".close: selector.close: " + t);
	    }
	}
    
private voidcreateListenerThread(com.sun.corba.se.pept.transport.EventHandler eventHandler)

	if (orb.transportDebugFlag) {
	    dprint(".createListenerThread: " + eventHandler);
	}
	Acceptor acceptor = eventHandler.getAcceptor();
	ListenerThread listenerThread =
	    new ListenerThreadImpl(orb, acceptor, this);
	listenerThreads.put(eventHandler, listenerThread);
	Throwable throwable = null;
	try {
	    orb.getThreadPoolManager().getThreadPool(0)
		.getWorkQueue(0).addWork((Work)listenerThread);
	} catch (NoSuchThreadPoolException e) {
	    throwable = e;
	} catch (NoSuchWorkQueueException e) {
	    throwable = e;
	}
	if (throwable != null) {
	    RuntimeException rte = new RuntimeException(throwable.toString());
	    rte.initCause(throwable);
	    throw rte;
	}
    
private voidcreateReaderThread(com.sun.corba.se.pept.transport.EventHandler eventHandler)

	if (orb.transportDebugFlag) {
	    dprint(".createReaderThread: " + eventHandler);
	}
	Connection connection = eventHandler.getConnection();
	ReaderThread readerThread = 
	    new ReaderThreadImpl(orb, connection, this);
	readerThreads.put(eventHandler, readerThread);
	Throwable throwable = null;
	try {
	    orb.getThreadPoolManager().getThreadPool(0)
		.getWorkQueue(0).addWork((Work)readerThread);
	} catch (NoSuchThreadPoolException e) {
	    throwable = e;
	} catch (NoSuchWorkQueueException e) {
	    throwable = e;
	}
	if (throwable != null) {
	    RuntimeException rte = new RuntimeException(throwable.toString());
	    rte.initCause(throwable);
	    throw rte;
	}
    
private voiddestroyListenerThread(com.sun.corba.se.pept.transport.EventHandler eventHandler)

	if (orb.transportDebugFlag) {
	    dprint(".destroyListenerThread: " + eventHandler);
	}
	ListenerThread listenerThread = (ListenerThread)
	    listenerThreads.get(eventHandler);
	if (listenerThread == null) {
	    if (orb.transportDebugFlag) {
		dprint(".destroyListenerThread: cannot find ListenerThread - ignoring.");
	    }
	    return;
	}
	listenerThreads.remove(eventHandler);
	listenerThread.close();
    
private voiddestroyReaderThread(com.sun.corba.se.pept.transport.EventHandler eventHandler)

	if (orb.transportDebugFlag) {
	    dprint(".destroyReaderThread: " + eventHandler);
	}
	ReaderThread readerThread = (ReaderThread)
	    readerThreads.get(eventHandler);
	if (readerThread == null) {
	    if (orb.transportDebugFlag) {
		dprint(".destroyReaderThread: cannot find ReaderThread - ignoring.");
	    }
	    return;
	}
	readerThreads.remove(eventHandler);
	readerThread.close();
    
private voiddprint(java.lang.String msg)

	ORBUtility.dprint("SelectorImpl", msg);
    
protected voiddprint(java.lang.String msg, java.lang.Throwable t)

	dprint(msg);
	t.printStackTrace(System.out);
    
private voidenableInterestOps()

	synchronized (interestOpsList) {
	    int listSize = interestOpsList.size();
	    if (listSize > 0) {
                if (orb.transportDebugFlag) {
                    dprint(".enableInterestOps:->");
                }
                SelectionKey selectionKey = null;
		SelectionKeyAndOp keyAndOp = null;
		int keyOp, selectionKeyOps = 0;
		for (int i = 0; i < listSize; i++) {
		    keyAndOp = (SelectionKeyAndOp)interestOpsList.get(i);
		    selectionKey = keyAndOp.selectionKey;

		    // Need to check if the SelectionKey is valid because a
		    // connection's SelectionKey could be put on the list to
		    // have its OP enabled and before it's enabled be reclaimed.
		    // Otherwise, the enabling of the OP will throw an exception
		    // here and exit this method an potentially not enable all
		    // registered ops.
		    //
		    // So, we ignore SelectionKeys that are invalid. They will get
		    // cleaned up on the next Selector.select() call.

		    if (selectionKey.isValid()) {
                        if (orb.transportDebugFlag) {
                            dprint(".enableInterestOps: " + keyAndOp);
                        }
		        keyOp = keyAndOp.keyOp;
		        selectionKeyOps = selectionKey.interestOps();
		        selectionKey.interestOps(selectionKeyOps | keyOp);
		    }
		}
		interestOpsList.clear();
                if (orb.transportDebugFlag) {
                    dprint(".enableInterestOps:<-");
                }
	    }
	}
    
public longgetTimeout()

	return timeout;
    
private voidhandleDeferredRegistrations()

	synchronized (deferredRegistrations) {
            int deferredListSize = deferredRegistrations.size();
            for (int i = 0; i < deferredListSize; i++) {
                EventHandler eventHandler = 
		    (EventHandler)deferredRegistrations.get(i);
                if (orb.transportDebugFlag) {
                    dprint(".handleDeferredRegistrations: " + eventHandler);
                }
                SelectableChannel channel = eventHandler.getChannel();
                SelectionKey selectionKey = null;
                try {
                    selectionKey =
                        channel.register(selector,
                                         eventHandler.getInterestOps(),
                                         (Object)eventHandler);
                } catch (ClosedChannelException e) {
                    if (orb.transportDebugFlag) {
                        dprint(".handleDeferredRegistrations: " + e);
                    }
                }
                eventHandler.setSelectionKey(selectionKey);
            }
            deferredRegistrations.clear();
        }
    
private synchronized booleanisClosed()

	return closed;
    
public voidregisterForEvent(com.sun.corba.se.pept.transport.EventHandler eventHandler)

	if (orb.transportDebugFlag) {
	    dprint(".registerForEvent: " + eventHandler);
	}

	if (isClosed()) {
	    if (orb.transportDebugFlag) {
		dprint(".registerForEvent: closed: " + eventHandler);
	    }
	    return;
	}

	if (eventHandler.shouldUseSelectThreadToWait()) {
	    synchronized (deferredRegistrations) {
		deferredRegistrations.add(eventHandler);
	    }
	    if (! selectorStarted) {
		startSelector();
	    }
	    selector.wakeup();
	    return;
	}

	switch (eventHandler.getInterestOps()) {
	case SelectionKey.OP_ACCEPT :
	    createListenerThread(eventHandler);
	    break;
	case SelectionKey.OP_READ :
	    createReaderThread(eventHandler);
	    break;
	default:
	    if (orb.transportDebugFlag) {
		dprint(".registerForEvent: default: " + eventHandler);
	    }
	    throw new RuntimeException(
                "SelectorImpl.registerForEvent: unknown interest ops");
	}
    
public voidregisterInterestOps(com.sun.corba.se.pept.transport.EventHandler eventHandler)

	if (orb.transportDebugFlag) {
	    dprint(".registerInterestOps:-> " + eventHandler);
	}

	SelectionKey selectionKey = eventHandler.getSelectionKey();
	if (selectionKey.isValid()) {
            int ehOps = eventHandler.getInterestOps();
            SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps);
	    synchronized(interestOpsList) {
		interestOpsList.add(keyAndOp);
	    }
            // tell Selector Thread there's an update to a SelectorKey's Ops
            selector.wakeup();
	}
	else {
            wrapper.selectionKeyInvalid(eventHandler.toString());
	    if (orb.transportDebugFlag) {
		dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler);
	    }
	}

	if (orb.transportDebugFlag) {
	    dprint(".registerInterestOps:<- ");
	}
    
public voidrun()

	setName("SelectorThread");
	while (!closed) {
	    try {
		int n = 0;
		if (timeout == 0 && orb.transportDebugFlag) {
		    dprint(".run: Beginning of selection cycle");
		}
		handleDeferredRegistrations();
		enableInterestOps();
		try {
		    n = selector.select(timeout);
		} catch (IOException  e) {
		    if (orb.transportDebugFlag) {
			dprint(".run: selector.select: " + e);
		    }
		}
		if (closed) {
		    selector.close();
		    if (orb.transportDebugFlag) {
			dprint(".run: closed - .run return");
		    }
		    return;
		}
		/*
		  if (timeout == 0 && orb.transportDebugFlag) {
		  dprint(".run: selector.select() returned: " + n);
		  }
		  if (n == 0) {
		  continue;
		  }
		*/
		Iterator iterator = selector.selectedKeys().iterator();
		if (orb.transportDebugFlag) {
		    if (iterator.hasNext()) {
			dprint(".run: n = " + n);
		    }
		}
		while (iterator.hasNext()) {
		    SelectionKey selectionKey = (SelectionKey) iterator.next();
		    iterator.remove();
		    EventHandler eventHandler = (EventHandler)
			selectionKey.attachment();
		    try {
			eventHandler.handleEvent();
		    } catch (Throwable t) {
			if (orb.transportDebugFlag) {
			    dprint(".run: eventHandler.handleEvent", t);
			}
		    }
		}
		if (timeout == 0 && orb.transportDebugFlag) {
		    dprint(".run: End of selection cycle");
		}
	    } catch (Throwable t) {
		// IMPORTANT: ignore all errors so the select thread keeps running.
		// Otherwise a guaranteed hang.
		if (orb.transportDebugFlag) {
		    dprint(".run: ignoring", t);
		}
	    }
	}
    
private synchronized voidsetClosed(boolean closed)

	this.closed = closed;
    
public voidsetTimeout(long timeout)

	this.timeout = timeout;
    
private voidstartSelector()

	try {
	    selector = Selector.open();
	} catch (IOException e) {
	    if (orb.transportDebugFlag) {
		dprint(".startSelector: Selector.open: IOException: " + e);
	    }
	    // REVISIT - better handling/reporting
	    RuntimeException rte =
		new RuntimeException(".startSelector: Selector.open exception");
	    rte.initCause(e);
	    throw rte;
	}
	setDaemon(true);
	start();
	selectorStarted = true;
	if (orb.transportDebugFlag) {
	    dprint(".startSelector: selector.start completed.");
	}
    
public voidunregisterForEvent(com.sun.corba.se.pept.transport.EventHandler eventHandler)

	if (orb.transportDebugFlag) {
	    dprint(".unregisterForEvent: " + eventHandler);
	}

	if (isClosed()) {
	    if (orb.transportDebugFlag) {
		dprint(".unregisterForEvent: closed: " + eventHandler);
	    }
	    return;
	}

	if (eventHandler.shouldUseSelectThreadToWait()) {
	    SelectionKey selectionKey = eventHandler.getSelectionKey();
	    selectionKey.cancel();
	    selector.wakeup();
	    return;
	}

	switch (eventHandler.getInterestOps()) {
	case SelectionKey.OP_ACCEPT :
	    destroyListenerThread(eventHandler);
	    break;
	case SelectionKey.OP_READ :
	    destroyReaderThread(eventHandler);
	    break;
	default:
	    if (orb.transportDebugFlag) {
		dprint(".unregisterForEvent: default: " + eventHandler);
	    }
	    throw new RuntimeException(
                "SelectorImpl.uregisterForEvent: unknown interest ops");
	}