Methods Summary |
---|
public void | close()
if (orb.transportDebugFlag) {
dprint(".close");
}
if (isClosed()) {
if (orb.transportDebugFlag) {
dprint(".close: already closed");
}
return;
}
setClosed(true);
Iterator i;
// Kill listeners.
i = listenerThreads.values().iterator();
while (i.hasNext()) {
ListenerThread listenerThread = (ListenerThread) i.next();
listenerThread.close();
}
// Kill readers.
i = readerThreads.values().iterator();
while (i.hasNext()) {
ReaderThread readerThread = (ReaderThread) i.next();
readerThread.close();
}
// Selector
try {
if (selector != null) {
// wakeup Selector thread to process close request
selector.wakeup();
}
} catch (Throwable t) {
if (orb.transportDebugFlag) {
dprint(".close: selector.close: " + t);
}
}
|
private void | createListenerThread(com.sun.corba.se.pept.transport.EventHandler eventHandler)
if (orb.transportDebugFlag) {
dprint(".createListenerThread: " + eventHandler);
}
Acceptor acceptor = eventHandler.getAcceptor();
ListenerThread listenerThread =
new ListenerThreadImpl(orb, acceptor, this);
listenerThreads.put(eventHandler, listenerThread);
Throwable throwable = null;
try {
orb.getThreadPoolManager().getThreadPool(0)
.getWorkQueue(0).addWork((Work)listenerThread);
} catch (NoSuchThreadPoolException e) {
throwable = e;
} catch (NoSuchWorkQueueException e) {
throwable = e;
}
if (throwable != null) {
RuntimeException rte = new RuntimeException(throwable.toString());
rte.initCause(throwable);
throw rte;
}
|
private void | createReaderThread(com.sun.corba.se.pept.transport.EventHandler eventHandler)
if (orb.transportDebugFlag) {
dprint(".createReaderThread: " + eventHandler);
}
Connection connection = eventHandler.getConnection();
ReaderThread readerThread =
new ReaderThreadImpl(orb, connection, this);
readerThreads.put(eventHandler, readerThread);
Throwable throwable = null;
try {
orb.getThreadPoolManager().getThreadPool(0)
.getWorkQueue(0).addWork((Work)readerThread);
} catch (NoSuchThreadPoolException e) {
throwable = e;
} catch (NoSuchWorkQueueException e) {
throwable = e;
}
if (throwable != null) {
RuntimeException rte = new RuntimeException(throwable.toString());
rte.initCause(throwable);
throw rte;
}
|
private void | destroyListenerThread(com.sun.corba.se.pept.transport.EventHandler eventHandler)
if (orb.transportDebugFlag) {
dprint(".destroyListenerThread: " + eventHandler);
}
ListenerThread listenerThread = (ListenerThread)
listenerThreads.get(eventHandler);
if (listenerThread == null) {
if (orb.transportDebugFlag) {
dprint(".destroyListenerThread: cannot find ListenerThread - ignoring.");
}
return;
}
listenerThreads.remove(eventHandler);
listenerThread.close();
|
private void | destroyReaderThread(com.sun.corba.se.pept.transport.EventHandler eventHandler)
if (orb.transportDebugFlag) {
dprint(".destroyReaderThread: " + eventHandler);
}
ReaderThread readerThread = (ReaderThread)
readerThreads.get(eventHandler);
if (readerThread == null) {
if (orb.transportDebugFlag) {
dprint(".destroyReaderThread: cannot find ReaderThread - ignoring.");
}
return;
}
readerThreads.remove(eventHandler);
readerThread.close();
|
private void | dprint(java.lang.String msg)
ORBUtility.dprint("SelectorImpl", msg);
|
protected void | dprint(java.lang.String msg, java.lang.Throwable t)
dprint(msg);
t.printStackTrace(System.out);
|
private void | enableInterestOps()
synchronized (interestOpsList) {
int listSize = interestOpsList.size();
if (listSize > 0) {
if (orb.transportDebugFlag) {
dprint(".enableInterestOps:->");
}
SelectionKey selectionKey = null;
SelectionKeyAndOp keyAndOp = null;
int keyOp, selectionKeyOps = 0;
for (int i = 0; i < listSize; i++) {
keyAndOp = (SelectionKeyAndOp)interestOpsList.get(i);
selectionKey = keyAndOp.selectionKey;
// Need to check if the SelectionKey is valid because a
// connection's SelectionKey could be put on the list to
// have its OP enabled and before it's enabled be reclaimed.
// Otherwise, the enabling of the OP will throw an exception
// here and exit this method an potentially not enable all
// registered ops.
//
// So, we ignore SelectionKeys that are invalid. They will get
// cleaned up on the next Selector.select() call.
if (selectionKey.isValid()) {
if (orb.transportDebugFlag) {
dprint(".enableInterestOps: " + keyAndOp);
}
keyOp = keyAndOp.keyOp;
selectionKeyOps = selectionKey.interestOps();
selectionKey.interestOps(selectionKeyOps | keyOp);
}
}
interestOpsList.clear();
if (orb.transportDebugFlag) {
dprint(".enableInterestOps:<-");
}
}
}
|
public long | getTimeout()
return timeout;
|
private void | handleDeferredRegistrations()
synchronized (deferredRegistrations) {
int deferredListSize = deferredRegistrations.size();
for (int i = 0; i < deferredListSize; i++) {
EventHandler eventHandler =
(EventHandler)deferredRegistrations.get(i);
if (orb.transportDebugFlag) {
dprint(".handleDeferredRegistrations: " + eventHandler);
}
SelectableChannel channel = eventHandler.getChannel();
SelectionKey selectionKey = null;
try {
selectionKey =
channel.register(selector,
eventHandler.getInterestOps(),
(Object)eventHandler);
} catch (ClosedChannelException e) {
if (orb.transportDebugFlag) {
dprint(".handleDeferredRegistrations: " + e);
}
}
eventHandler.setSelectionKey(selectionKey);
}
deferredRegistrations.clear();
}
|
private synchronized boolean | isClosed()
return closed;
|
public void | registerForEvent(com.sun.corba.se.pept.transport.EventHandler eventHandler)
if (orb.transportDebugFlag) {
dprint(".registerForEvent: " + eventHandler);
}
if (isClosed()) {
if (orb.transportDebugFlag) {
dprint(".registerForEvent: closed: " + eventHandler);
}
return;
}
if (eventHandler.shouldUseSelectThreadToWait()) {
synchronized (deferredRegistrations) {
deferredRegistrations.add(eventHandler);
}
if (! selectorStarted) {
startSelector();
}
selector.wakeup();
return;
}
switch (eventHandler.getInterestOps()) {
case SelectionKey.OP_ACCEPT :
createListenerThread(eventHandler);
break;
case SelectionKey.OP_READ :
createReaderThread(eventHandler);
break;
default:
if (orb.transportDebugFlag) {
dprint(".registerForEvent: default: " + eventHandler);
}
throw new RuntimeException(
"SelectorImpl.registerForEvent: unknown interest ops");
}
|
public void | registerInterestOps(com.sun.corba.se.pept.transport.EventHandler eventHandler)
if (orb.transportDebugFlag) {
dprint(".registerInterestOps:-> " + eventHandler);
}
SelectionKey selectionKey = eventHandler.getSelectionKey();
if (selectionKey.isValid()) {
int ehOps = eventHandler.getInterestOps();
SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps);
synchronized(interestOpsList) {
interestOpsList.add(keyAndOp);
}
// tell Selector Thread there's an update to a SelectorKey's Ops
selector.wakeup();
}
else {
wrapper.selectionKeyInvalid(eventHandler.toString());
if (orb.transportDebugFlag) {
dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler);
}
}
if (orb.transportDebugFlag) {
dprint(".registerInterestOps:<- ");
}
|
public void | run()
setName("SelectorThread");
while (!closed) {
try {
int n = 0;
if (timeout == 0 && orb.transportDebugFlag) {
dprint(".run: Beginning of selection cycle");
}
handleDeferredRegistrations();
enableInterestOps();
try {
n = selector.select(timeout);
} catch (IOException e) {
if (orb.transportDebugFlag) {
dprint(".run: selector.select: " + e);
}
}
if (closed) {
selector.close();
if (orb.transportDebugFlag) {
dprint(".run: closed - .run return");
}
return;
}
/*
if (timeout == 0 && orb.transportDebugFlag) {
dprint(".run: selector.select() returned: " + n);
}
if (n == 0) {
continue;
}
*/
Iterator iterator = selector.selectedKeys().iterator();
if (orb.transportDebugFlag) {
if (iterator.hasNext()) {
dprint(".run: n = " + n);
}
}
while (iterator.hasNext()) {
SelectionKey selectionKey = (SelectionKey) iterator.next();
iterator.remove();
EventHandler eventHandler = (EventHandler)
selectionKey.attachment();
try {
eventHandler.handleEvent();
} catch (Throwable t) {
if (orb.transportDebugFlag) {
dprint(".run: eventHandler.handleEvent", t);
}
}
}
if (timeout == 0 && orb.transportDebugFlag) {
dprint(".run: End of selection cycle");
}
} catch (Throwable t) {
// IMPORTANT: ignore all errors so the select thread keeps running.
// Otherwise a guaranteed hang.
if (orb.transportDebugFlag) {
dprint(".run: ignoring", t);
}
}
}
|
private synchronized void | setClosed(boolean closed)
this.closed = closed;
|
public void | setTimeout(long timeout)
this.timeout = timeout;
|
private void | startSelector()
try {
selector = Selector.open();
} catch (IOException e) {
if (orb.transportDebugFlag) {
dprint(".startSelector: Selector.open: IOException: " + e);
}
// REVISIT - better handling/reporting
RuntimeException rte =
new RuntimeException(".startSelector: Selector.open exception");
rte.initCause(e);
throw rte;
}
setDaemon(true);
start();
selectorStarted = true;
if (orb.transportDebugFlag) {
dprint(".startSelector: selector.start completed.");
}
|
public void | unregisterForEvent(com.sun.corba.se.pept.transport.EventHandler eventHandler)
if (orb.transportDebugFlag) {
dprint(".unregisterForEvent: " + eventHandler);
}
if (isClosed()) {
if (orb.transportDebugFlag) {
dprint(".unregisterForEvent: closed: " + eventHandler);
}
return;
}
if (eventHandler.shouldUseSelectThreadToWait()) {
SelectionKey selectionKey = eventHandler.getSelectionKey();
selectionKey.cancel();
selector.wakeup();
return;
}
switch (eventHandler.getInterestOps()) {
case SelectionKey.OP_ACCEPT :
destroyListenerThread(eventHandler);
break;
case SelectionKey.OP_READ :
destroyReaderThread(eventHandler);
break;
default:
if (orb.transportDebugFlag) {
dprint(".unregisterForEvent: default: " + eventHandler);
}
throw new RuntimeException(
"SelectorImpl.uregisterForEvent: unknown interest ops");
}
|