FileDocCategorySizeDatePackage
SocketOrChannelConnectionImpl.javaAPI DocJava SE 6 API47307Tue Jun 10 00:21:44 BST 2008com.sun.corba.se.impl.transport

SocketOrChannelConnectionImpl

public class SocketOrChannelConnectionImpl extends EventHandlerBase implements Work, CorbaConnection
author
Harold Carr

Fields Summary
public static boolean
dprintWriteLocks
protected long
enqueueTime
protected SocketChannel
socketChannel
protected CorbaContactInfo
contactInfo
protected Acceptor
acceptor
protected ConnectionCache
connectionCache
protected Socket
socket
protected long
timeStamp
protected boolean
isServer
protected int
requestId
protected CorbaResponseWaitingRoom
responseWaitingRoom
protected int
state
protected Object
stateEvent
protected Object
writeEvent
protected boolean
writeLocked
protected int
serverRequestCount
Map
serverRequestMap
protected boolean
postInitialContexts
protected IOR
codeBaseServerIOR
protected CachedCodeBase
cachedCodeBase
protected ORBUtilSystemException
wrapper
protected ReadTimeouts
readTimeouts
protected boolean
shouldReadGiopHeaderOnly
protected CorbaMessageMediator
partialMessageMediator
protected CodeSetComponentInfo$CodeSetContext
codeSetContext
protected MessageMediator
clientReply_1_1
protected MessageMediator
serverRequest_1_1
Constructors Summary
protected SocketOrChannelConnectionImpl(ORB orb)


    // Used in genericRPCMSGFramework test.
      
    
	this.orb = orb;
	wrapper = ORBUtilSystemException.get( orb,
	    CORBALogDomains.RPC_TRANSPORT ) ;

	setWork(this);
	responseWaitingRoom = new CorbaResponseWaitingRoomImpl(orb, this);
	setReadTimeouts(orb.getORBData().getTransportTCPReadTimeouts());
    
protected SocketOrChannelConnectionImpl(ORB orb, boolean useSelectThreadToWait, boolean useWorkerThread)

	this(orb) ;
	setUseSelectThreadToWait(useSelectThreadToWait);
	setUseWorkerThreadForEvent(useWorkerThread);
    
public SocketOrChannelConnectionImpl(ORB orb, CorbaContactInfo contactInfo, boolean useSelectThreadToWait, boolean useWorkerThread, String socketType, String hostname, int port)

	this(orb, useSelectThreadToWait, useWorkerThread);

	this.contactInfo = contactInfo;

	try {
	    socket = orb.getORBData().getSocketFactory()
		.createSocket(socketType,
			      new InetSocketAddress(hostname, port));
	    socketChannel = socket.getChannel();

	    if (socketChannel != null) {
		boolean isBlocking = !useSelectThreadToWait;
		socketChannel.configureBlocking(isBlocking);
	    } else {
		// IMPORTANT: non-channel-backed sockets must use
		// dedicated reader threads.
		setUseSelectThreadToWait(false);
	    }
	    if (orb.transportDebugFlag) {
		dprint(".initialize: connection created: " + socket);
	    }
	} catch (Throwable t) {
	    throw wrapper.connectFailure(t, socketType, hostname, 
					 Integer.toString(port));
	}
	state = OPENING;
    
public SocketOrChannelConnectionImpl(ORB orb, CorbaContactInfo contactInfo, String socketType, String hostname, int port)

	this(orb, contactInfo,
	     orb.getORBData().connectionSocketUseSelectThreadToWait(),
	     orb.getORBData().connectionSocketUseWorkerThreadForEvent(),
	     socketType, hostname, port);
    
public SocketOrChannelConnectionImpl(ORB orb, Acceptor acceptor, Socket socket, boolean useSelectThreadToWait, boolean useWorkerThread)

	this(orb, useSelectThreadToWait, useWorkerThread);

	this.socket = socket;
	socketChannel = socket.getChannel();
	if (socketChannel != null) {
	    // REVISIT
	    try {
		boolean isBlocking = !useSelectThreadToWait;
		socketChannel.configureBlocking(isBlocking);
	    } catch (IOException e) {
		RuntimeException rte = new RuntimeException();
		rte.initCause(e);
		throw rte;
	    }
	}
	this.acceptor = acceptor;

	serverRequestMap = Collections.synchronizedMap(new HashMap());
        isServer = true;

	state = ESTABLISHED;
    
public SocketOrChannelConnectionImpl(ORB orb, Acceptor acceptor, Socket socket)

	this(orb, acceptor, socket,
	     (socket.getChannel() == null 
	      ? false 
	      : orb.getORBData().connectionSocketUseSelectThreadToWait()),
	     (socket.getChannel() == null
	      ? false		     
	      : orb.getORBData().connectionSocketUseWorkerThreadForEvent()));
    
Methods Summary
public com.sun.corba.se.pept.protocol.MessageMediatorclientReply_1_1_Get()

	return 	clientReply_1_1;
    
public voidclientReply_1_1_Put(com.sun.corba.se.pept.protocol.MessageMediator x)

	clientReply_1_1 = x;
    
public voidclientReply_1_1_Remove()

	clientReply_1_1 = null;
    
public com.sun.corba.se.pept.protocol.MessageMediatorclientRequestMapGet(int requestId)

	return responseWaitingRoom.getMessageMediator(requestId);
    
public synchronized voidclose()
Note:it is possible for this to be called more than once

	try {
	    if (orb.transportDebugFlag) {
		dprint(".close->: " + this);
	    }
	    writeLock();

	    // REVISIT It will be good to have a read lock on the reader thread
	    // before we proceed further, to avoid the reader thread (server side)
	    // from processing requests. This avoids the risk that a new request
	    // will be accepted by ReaderThread while the ListenerThread is 
	    // attempting to close this connection.

	    if (isBusy()) { // we are busy!
		writeUnlock();
		if (orb.transportDebugFlag) {
		    dprint(".close: isBusy so no close: " + this);
		}
		return;
	    }

	    try {
		try {
		    sendCloseConnection(GIOPVersion.V1_0);
		} catch (Throwable t) {
		    wrapper.exceptionWhenSendingCloseConnection(t);
		}

		synchronized ( stateEvent ){
		    state = CLOSE_SENT;
		    stateEvent.notifyAll();
		}

		// stop the reader without causing it to do purgeCalls
		//Exception ex = new Exception();
		//reader.stop(ex); // REVISIT

		// NOTE: !!!!!!
		// This does writeUnlock().
		purgeCalls(wrapper.connectionRebind(), false, true);

	    } catch (Exception ex) {
		if (orb.transportDebugFlag) {
		    dprint(".close: exception: " + this, ex);
		}
	    }
	    try {
		Selector selector = orb.getTransportManager().getSelector(0);
		selector.unregisterForEvent(this);
		if (socketChannel != null) {
		    socketChannel.close();
		}
		socket.close();
	    } catch (IOException e) {
		if (orb.transportDebugFlag) {
		    dprint(".close: " + this, e);
		}
	    }
	} finally {
	    if (orb.transportDebugFlag) {
		dprint(".close<-: " + this);
	    }
	}
    
public com.sun.corba.se.pept.encoding.OutputObjectcreateOutputObject(com.sun.corba.se.pept.protocol.MessageMediator messageMediator)

	// REVISIT - remove this method from Connection and all it subclasses.
	throw new RuntimeException("*****SocketOrChannelConnectionImpl.createOutputObject - should not be called.");
    
protected booleandispatch(com.sun.corba.se.spi.protocol.CorbaMessageMediator messageMediator)

	try {
	    if (orb.transportDebugFlag) {
		dprint(".dispatch->: " + this);
	    }

	    //
	    // NOTE:
	    //
	    // This call is the transition from the tranport block
	    // to the protocol block.
	    //

	    boolean result = 
		messageMediator.getProtocolHandler()
		.handleRequest(messageMediator);

	    return result;

	} catch (ThreadDeath td) {
	    if (orb.transportDebugFlag) {
		dprint(".dispatch: ThreadDeath", td );
	    }
	    try {
		purgeCalls(wrapper.connectionAbort(td), false, false);
	    } catch (Throwable t) {
		if (orb.transportDebugFlag) {
		    dprint(".dispatch: purgeCalls: Throwable", t);
		}
	    }
	    throw td;
	} catch (Throwable ex) {
	    if (orb.transportDebugFlag) {
		dprint(".dispatch: Throwable", ex ) ;
	    }

	    try {
		if (ex instanceof INTERNAL) {
		    sendMessageError(GIOPVersion.DEFAULT_VERSION);
		}
	    } catch (IOException e) {
		if (orb.transportDebugFlag) {
		    dprint(".dispatch: sendMessageError: IOException", e);
		}
	    }
	    purgeCalls(wrapper.connectionAbort(ex), false, false);
	    // REVISIT
	    //keepRunning = false;
	} finally {
	    if (orb.transportDebugFlag) {
		dprint(".dispatch<-: " + this);
	    }
	}

	return true;
    
public voiddoWork()

	try {
	    if (orb.transportDebugFlag) {
		dprint(".doWork->: " + this);
	    }

	    // IMPORTANT: Sanity checks on SelectionKeys such as
	    //            SelectorKey.isValid() should not be done
	    //            here.
	    //
	    
            if (!shouldReadGiopHeaderOnly()) {
                read();
            }
	    else {
	        // get the partialMessageMediator 
                // created by SelectorThread
	        CorbaMessageMediator messageMediator =
	       	                         this.getPartialMessageMediator();

	        // read remaining info needed in a MessageMediator
	        messageMediator = finishReadingBits(messageMediator);

	        if (messageMediator != null) {
		    // Null can happen when client closes stream
		    // causing purgecalls.
		    dispatch(messageMediator);
	        }
            }
	} catch (Throwable t) {
	    if (orb.transportDebugFlag) {
		dprint(".doWork: ignoring Throwable: "
		       + t
		       + " " + this);
	    }
	} finally {
	    if (orb.transportDebugFlag) {
		dprint(".doWork<-: " + this);
	    }
	}
    
public voiddprint(java.lang.String msg)

	ORBUtility.dprint("SocketOrChannelConnectionImpl", msg);
    
protected voiddprint(java.lang.String msg, java.lang.Throwable t)

	dprint(msg);
	t.printStackTrace(System.out);
    
protected com.sun.corba.se.spi.protocol.CorbaMessageMediatorfinishReadingBits(com.sun.corba.se.pept.protocol.MessageMediator messageMediator)

	try {

	    if (orb.transportDebugFlag) {
		dprint(".finishReadingBits->: " + this);
	    }

	    // REVISIT - use common factory base class.
	    if (contactInfo != null) {
		messageMediator =
		    contactInfo.finishCreatingMessageMediator(orb, this, messageMediator);
	    } else if (acceptor != null) {
		messageMediator = 
		    acceptor.finishCreatingMessageMediator(orb, this, messageMediator);
	    } else {
		throw 
		    new RuntimeException("SocketOrChannelConnectionImpl.finishReadingBits");
	    }
	    return (CorbaMessageMediator) messageMediator;

	} catch (ThreadDeath td) {
	    if (orb.transportDebugFlag) {
		dprint(".finishReadingBits: " + this + ": ThreadDeath: " + td, td);
	    }
	    try {
		purgeCalls(wrapper.connectionAbort(td), false, false);
	    } catch (Throwable t) {
		if (orb.transportDebugFlag) {
		    dprint(".finishReadingBits: " + this + ": purgeCalls: Throwable: " + t, t);
		}
	    }
	    throw td;
	} catch (Throwable ex) {
	    if (orb.transportDebugFlag) {
		dprint(".finishReadingBits: " + this + ": Throwable: " + ex, ex);
	    }

	    try {
		if (ex instanceof INTERNAL) {
		    sendMessageError(GIOPVersion.DEFAULT_VERSION);
		}
	    } catch (IOException e) {
		if (orb.transportDebugFlag) {
		    dprint(".finishReadingBits: " + this + 
			   ": sendMessageError: IOException: " + e, e);
		}
	    }
	    // REVISIT - make sure reader thread is killed.
	    orb.getTransportManager().getSelector(0).unregisterForEvent(this);
	    // Notify anyone waiting.
	    purgeCalls(wrapper.connectionAbort(ex), true, false);
	    // REVISIT
	    //keepRunning = false;
	    // REVISIT - if this is called after purgeCalls then
	    // the state of the socket is ABORT so the writeLock
	    // in close throws an exception.  It is ignored but
	    // causes IBM (screen scraping) tests to fail.
	    //close();
	} finally {
	    if (orb.transportDebugFlag) {
		dprint(".finishReadingBits<-: " + this);
	    }
	}
	return null;
    
public com.sun.corba.se.pept.transport.AcceptorgetAcceptor()

	return acceptor;
    
public com.sun.corba.se.spi.orb.ORBgetBroker()


       
    
        return orb;
    
public java.nio.channels.SelectableChannelgetChannel()

	return socketChannel;
    
public final com.sun.org.omg.SendingContext.CodeBasegetCodeBase()

        return cachedCodeBase;
    
public final com.sun.corba.se.spi.ior.IORgetCodeBaseIOR()

        return codeBaseServerIOR;
    
public com.sun.corba.se.impl.encoding.CodeSetComponentInfo$CodeSetContextgetCodeSetContext()

        // Needs to be synchronized for the following case when the client
        // doesn't send the code set context twice, and we have two threads
        // in ServerRequestDispatcher processCodeSetContext.
        //
        // Thread A checks to see if there is a context, there is none, so
        //     it calls setCodeSetContext, getting the synch lock.
        // Thread B checks to see if there is a context.  If we didn't synch,
        //     it might decide to outlaw wchar/wstring.
        if (codeSetContext == null) {
            synchronized(this) {
                return codeSetContext;
            }
        }

        return codeSetContext;
    
public com.sun.corba.se.pept.transport.ConnectiongetConnection()

	return this;
    
public com.sun.corba.se.pept.transport.ConnectionCachegetConnectionCache()

	return connectionCache;	
    
public com.sun.corba.se.pept.transport.ContactInfogetContactInfo()

	return contactInfo;
    
public longgetEnqueueTime()

	return enqueueTime;
    
public com.sun.corba.se.pept.transport.EventHandlergetEventHandler()

	return this;
    
public intgetInterestOps()

	return SelectionKey.OP_READ;
    
public java.lang.StringgetName()

	return this.toString();
    
public synchronized intgetNextRequestId()

	return requestId++;
    
protected com.sun.corba.se.spi.protocol.CorbaMessageMediatorgetPartialMessageMediator()

	return partialMessageMediator;
    
public com.sun.corba.se.pept.transport.ResponseWaitingRoomgetResponseWaitingRoom()

	return responseWaitingRoom;
    
public java.net.SocketgetSocket()

	return socket;
    
public java.nio.channels.SocketChannelgetSocketChannel()

      
    
	return socketChannel;
    
protected java.lang.StringgetStateString(int state)

        synchronized ( stateEvent ){
            switch (state) {
            case OPENING : return "OPENING" ;
            case ESTABLISHED : return "ESTABLISHED" ;
            case CLOSE_SENT : return "CLOSE_SENT" ;
            case CLOSE_RECVD : return "CLOSE_RECVD" ;
            case ABORT : return "ABORT" ;
            default : return "???" ;
            }
        }
    
public longgetTimeStamp()

	return timeStamp;
    
public voidhandleEvent()

	if (orb.transportDebugFlag) {
	    dprint(".handleEvent->: " + this);
	}
	getSelectionKey().interestOps(getSelectionKey().interestOps() &
				      (~ getInterestOps()));

	if (shouldUseWorkerThreadForEvent()) {
	    Throwable throwable = null;
	    try {
		int poolToUse = 0;
		if (shouldReadGiopHeaderOnly()) {
		    partialMessageMediator = readBits();
		    poolToUse = 
			partialMessageMediator.getThreadPoolToUse();
		}

		if (orb.transportDebugFlag) {
		    dprint(".handleEvent: addWork to pool: " + poolToUse);
		}
		orb.getThreadPoolManager().getThreadPool(poolToUse)
		    .getWorkQueue(0).addWork(getWork());
	    } catch (NoSuchThreadPoolException e) {
		throwable = e;
	    } catch (NoSuchWorkQueueException e) {
		throwable = e;
	    }
	    // REVISIT: need to close connection.
	    if (throwable != null) {
		if (orb.transportDebugFlag) {
		    dprint(".handleEvent: " + throwable);
		}
		INTERNAL i = new INTERNAL("NoSuchThreadPoolException");
		i.initCause(throwable);
		throw i;
	    }
	} else {
	    if (orb.transportDebugFlag) {
		dprint(".handleEvent: doWork");
	    }
	    getWork().doWork();
	}
	if (orb.transportDebugFlag) {
	    dprint(".handleEvent<-: " + this);
	}
    
public booleanisBusy()

        if (serverRequestCount > 0 ||
	    getResponseWaitingRoom().numberRegistered() > 0)
        {
            return true;
	} else {
            return false;
	}
    
public synchronized booleanisPostInitialContexts()

        return postInitialContexts;
    
public booleanisServer()

        return isServer;
    
public voidpurgeCalls(org.omg.CORBA.SystemException systemException, boolean die, boolean lockHeld)
Wake up the outstanding requests on the connection, and hand them COMM_FAILURE exception with a given minor code. Also, delete connection from connection table and stop the reader thread. Note that this should only ever be called by the Reader thread for this connection.

param
minor_code The minor code for the COMM_FAILURE major code.
param
die Kill the reader thread (this thread) before exiting.

	int minor_code = systemException.minor;

	try{
	    if (orb.transportDebugFlag) {
		dprint(".purgeCalls->: " 
		       + minor_code + "/" + die + "/" + lockHeld
		       + " " + this);
	    }

	    // If this invocation is a result of ThreadDeath caused
	    // by a previous execution of this routine, just exit.

	    synchronized ( stateEvent ){
		if ((state == ABORT) || (state == CLOSE_RECVD)) {
		    if (orb.transportDebugFlag) {
			dprint(".purgeCalls: exiting since state is: "
			       + getStateString(state)
			       + " " + this);
		    }
		    return;
		}
	    }

	    // Grab the writeLock (freeze the calls)
	    try {
		if (!lockHeld) {
		    writeLock();
		}
	    } catch (SystemException ex) {
		if (orb.transportDebugFlag)
		    dprint(".purgeCalls: SystemException" + ex 
			   + "; continuing " + this);
	    }

	    // Mark the state of the connection
	    // and determine the request status
	    org.omg.CORBA.CompletionStatus completion_status;
	    synchronized ( stateEvent ){
		if (minor_code == ORBUtilSystemException.CONNECTION_REBIND) {
		    state = CLOSE_RECVD;
		    systemException.completed = CompletionStatus.COMPLETED_NO;
		} else {
		    state = ABORT;
		    systemException.completed = CompletionStatus.COMPLETED_MAYBE;
		}
		stateEvent.notifyAll();
	    }
    
	    try {
		socket.getInputStream().close();
		socket.getOutputStream().close();
		socket.close();
	    } catch (Exception ex) {
		if (orb.transportDebugFlag) {
		    dprint(".purgeCalls: Exception closing socket: " + ex
			   + " " + this);
		}
	    }

	    // Signal all threads with outstanding requests on this
	    // connection and give them the SystemException;

	    responseWaitingRoom.signalExceptionToAllWaiters(systemException);

	    if (contactInfo != null) {
		((OutboundConnectionCache)getConnectionCache()).remove(contactInfo);
	    } else if (acceptor != null) {
		((InboundConnectionCache)getConnectionCache()).remove(this);
	    }

	    //
	    // REVISIT: Stop the reader thread
	    //

	    // Signal all the waiters of the writeLock.
	    // There are 4 types of writeLock waiters:
	    // 1. Send waiters:
	    // 2. SendReply waiters:
	    // 3. cleanUp waiters:
	    // 4. purge_call waiters:
	    //

	    writeUnlock();

	} finally {
	    if (orb.transportDebugFlag) {
		dprint(".purgeCalls<-: " 
		       + minor_code + "/" + die + "/" + lockHeld
		       + " " + this);
	    }
	}
    
public booleanread()

	try {
	    if (orb.transportDebugFlag) {
		dprint(".read->: " + this);
	    }
	    CorbaMessageMediator messageMediator = readBits();
	    if (messageMediator != null) {
		// Null can happen when client closes stream
		// causing purgecalls.
		return dispatch(messageMediator);
	    }
	    return true;
	} finally {
	    if (orb.transportDebugFlag) {
		dprint(".read<-: " + this);
	    }
	}
    
public java.nio.ByteBufferread(int size, int offset, int length, long max_wait_time)

	if (shouldUseDirectByteBuffers()) {
	
	    ByteBuffer byteBuffer =
		orb.getByteBufferPool().getByteBuffer(size);

	    if (orb.transportDebugFlag) {
		// print address of ByteBuffer gotten from pool
		int bbAddress = System.identityHashCode(byteBuffer);
		StringBuffer sb = new StringBuffer(80);
		sb.append(".read: got ByteBuffer id (");
		sb.append(bbAddress).append(") from ByteBufferPool.");
		String msgStr = sb.toString();
		dprint(msgStr);
	    }
	    
	    byteBuffer.position(offset);
	    byteBuffer.limit(size);
	    
	    readFully(byteBuffer, length, max_wait_time);

	    return byteBuffer;
	}

	byte[] buf = new byte[size];
	readFully(getSocket().getInputStream(), buf,
		  offset, length, max_wait_time);
	ByteBuffer byteBuffer = ByteBuffer.wrap(buf);
	byteBuffer.limit(size);
	return byteBuffer;
    
public java.nio.ByteBufferread(java.nio.ByteBuffer byteBuffer, int offset, int length, long max_wait_time)

	int size = offset + length;
	if (shouldUseDirectByteBuffers()) {

	    if (! byteBuffer.isDirect()) {
		throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
	    }
	    if (size > byteBuffer.capacity()) {
		if (orb.transportDebugFlag) {
		    // print address of ByteBuffer being released
		    int bbAddress = System.identityHashCode(byteBuffer);
		    StringBuffer bbsb = new StringBuffer(80);
		    bbsb.append(".read: releasing ByteBuffer id (")
			.append(bbAddress).append(") to ByteBufferPool.");
		    String bbmsg = bbsb.toString();
		    dprint(bbmsg);
		}
		orb.getByteBufferPool().releaseByteBuffer(byteBuffer);
		byteBuffer = orb.getByteBufferPool().getByteBuffer(size);
	    }
	    byteBuffer.position(offset);
	    byteBuffer.limit(size);
	    readFully(byteBuffer, length, max_wait_time);
	    byteBuffer.position(0);
	    byteBuffer.limit(size);
	    return byteBuffer;
	}
	if (byteBuffer.isDirect()) {
	    throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
	}
	byte[] buf = new byte[size];
	readFully(getSocket().getInputStream(), buf, 
		  offset, length, max_wait_time);
	return ByteBuffer.wrap(buf);
    
protected com.sun.corba.se.spi.protocol.CorbaMessageMediatorreadBits()

	try {

	    if (orb.transportDebugFlag) {
		dprint(".readBits->: " + this);
	    }

	    MessageMediator messageMediator;
	    // REVISIT - use common factory base class.
	    if (contactInfo != null) {
		messageMediator =
		    contactInfo.createMessageMediator(orb, this);
	    } else if (acceptor != null) {
		messageMediator = acceptor.createMessageMediator(orb, this);
	    } else {
		throw 
		    new RuntimeException("SocketOrChannelConnectionImpl.readBits");
	    }
	    return (CorbaMessageMediator) messageMediator;

	} catch (ThreadDeath td) {
	    if (orb.transportDebugFlag) {
		dprint(".readBits: " + this + ": ThreadDeath: " + td, td);
	    }
	    try {
		purgeCalls(wrapper.connectionAbort(td), false, false);
	    } catch (Throwable t) {
		if (orb.transportDebugFlag) {
		    dprint(".readBits: " + this + ": purgeCalls: Throwable: " + t, t);
		}
	    }
	    throw td;
	} catch (Throwable ex) {
	    if (orb.transportDebugFlag) {
		dprint(".readBits: " + this + ": Throwable: " + ex, ex);
	    }

	    try {
		if (ex instanceof INTERNAL) {
		    sendMessageError(GIOPVersion.DEFAULT_VERSION);
		}
	    } catch (IOException e) {
		if (orb.transportDebugFlag) {
		    dprint(".readBits: " + this + 
			   ": sendMessageError: IOException: " + e, e);
		}
	    }
	    // REVISIT - make sure reader thread is killed.
	    orb.getTransportManager().getSelector(0).unregisterForEvent(this);
	    // Notify anyone waiting.
	    purgeCalls(wrapper.connectionAbort(ex), true, false);
	    // REVISIT
	    //keepRunning = false;
	    // REVISIT - if this is called after purgeCalls then
	    // the state of the socket is ABORT so the writeLock
	    // in close throws an exception.  It is ignored but
	    // causes IBM (screen scraping) tests to fail.
	    //close();
	} finally {
	    if (orb.transportDebugFlag) {
		dprint(".readBits<-: " + this);
	    }
	}
	return null;
    
public voidreadFully(java.nio.ByteBuffer byteBuffer, int size, long max_wait_time)

        int n = 0;
	int bytecount = 0;
	long time_to_wait = readTimeouts.get_initial_time_to_wait();
	long total_time_in_wait = 0;

	// The reading of data incorporates a strategy to detect a
	// rogue client. The strategy is implemented as follows. As
	// long as data is being read, at least 1 byte or more, we
	// assume we have a well behaved client. If no data is read,
	// then we sleep for a time to wait, re-calculate a new time to
	// wait which is lengthier than the previous time spent waiting.
	// Then, if the total time spent waiting does not exceed a
	// maximum time we are willing to wait, we attempt another
	// read. If the maximum amount of time we are willing to
	// spend waiting for more data is exceeded, we throw an
	// IOException.

	// NOTE: Reading of GIOP headers are treated with a smaller
	//       maximum time to wait threshold. Based on extensive
	//       performance testing, all GIOP headers are being
	//       read in 1 read access.

	do {
	    bytecount = getSocketChannel().read(byteBuffer);

	    if (bytecount < 0) {
		throw new IOException("End-of-stream");
	    }
	    else if (bytecount == 0) {
		try {
		    Thread.sleep(time_to_wait);
		    total_time_in_wait += time_to_wait;
		    time_to_wait = 
			(long)(time_to_wait*readTimeouts.get_backoff_factor());
		}
		catch (InterruptedException ie) {
		    // ignore exception
		    if (orb.transportDebugFlag) {
			dprint("readFully(): unexpected exception "
				+ ie.toString());
		    }
		}
	    }
	    else {
		n += bytecount;
	    }
	}
	while (n < size && total_time_in_wait < max_wait_time);

	if (n < size && total_time_in_wait >= max_wait_time)
	{
	    // failed to read entire message
	    throw wrapper.transportReadTimeoutExceeded(new Integer(size),
                                      new Integer(n), new Long(max_wait_time),
				      new Long(total_time_in_wait));
	}

	getConnectionCache().stampTime(this);
    
public voidreadFully(java.io.InputStream is, byte[] buf, int offset, int size, long max_wait_time)

        int n = 0;
	int bytecount = 0;
	long time_to_wait = readTimeouts.get_initial_time_to_wait();
	long total_time_in_wait = 0;

	// The reading of data incorporates a strategy to detect a
	// rogue client. The strategy is implemented as follows. As
	// long as data is being read, at least 1 byte or more, we
	// assume we have a well behaved client. If no data is read,
	// then we sleep for a time to wait, re-calculate a new time to
	// wait which is lengthier than the previous time spent waiting.
	// Then, if the total time spent waiting does not exceed a
	// maximum time we are willing to wait, we attempt another
	// read. If the maximum amount of time we are willing to
	// spend waiting for more data is exceeded, we throw an
	// IOException.

	// NOTE: Reading of GIOP headers are treated with a smaller
	//       maximum time to wait threshold. Based on extensive
	//       performance testing, all GIOP headers are being
	//       read in 1 read access.

	do {
	    bytecount = is.read(buf, offset + n, size - n);
	    if (bytecount < 0) {
		throw new IOException("End-of-stream");
	    }
	    else if (bytecount == 0) {
		try {
		    Thread.sleep(time_to_wait);
		    total_time_in_wait += time_to_wait;
		    time_to_wait =
			(long)(time_to_wait*readTimeouts.get_backoff_factor());
		}
		catch (InterruptedException ie) {
		    // ignore exception
		    if (orb.transportDebugFlag) {
			dprint("readFully(): unexpected exception "
				+ ie.toString());
		    }
		}
	    }
	    else {
		n += bytecount;
	    }
	}
	while (n < size && total_time_in_wait < max_wait_time);

	if (n < size && total_time_in_wait >= max_wait_time)
	{
	    // failed to read entire message
	    throw wrapper.transportReadTimeoutExceeded(new Integer(size),
                                      new Integer(n), new Long(max_wait_time),
				      new Long(total_time_in_wait));
	}

	getConnectionCache().stampTime(this);
    
public voidregisterWaiter(com.sun.corba.se.pept.protocol.MessageMediator messageMediator)

        responseWaitingRoom.registerWaiter(messageMediator);
    
public voidsendCancelRequest(com.sun.corba.se.spi.ior.iiop.GIOPVersion giopVersion, int requestId)
Send a CancelRequest message. This does not lock the connection, so the caller needs to ensure this method is called appropriately.

exception
IOException - could be due to abortive connection closure.


        Message msg = MessageBase.createCancelRequest(giopVersion, requestId);
	sendHelper(giopVersion, msg);
    
public voidsendCancelRequestWithLock(com.sun.corba.se.spi.ior.iiop.GIOPVersion giopVersion, int requestId)

	writeLock();
	try {
	    sendCancelRequest(giopVersion, requestId);
	} finally {
	    writeUnlock();
	}
    
public voidsendCloseConnection(com.sun.corba.se.spi.ior.iiop.GIOPVersion giopVersion)
The following methods are for dealing with Connection cleaning for better scalability of servers in high network load conditions.

        Message msg = MessageBase.createCloseConnection(giopVersion);
	sendHelper(giopVersion, msg);
    
protected voidsendHelper(com.sun.corba.se.spi.ior.iiop.GIOPVersion giopVersion, com.sun.corba.se.impl.protocol.giopmsgheaders.Message msg)

	// REVISIT: See comments in CDROutputObject constructor.
        CDROutputObject outputObject = 
	    new CDROutputObject((ORB)orb, null, giopVersion, this, msg,
				ORBConstants.STREAM_FORMAT_VERSION_1);
        msg.write(outputObject);

	outputObject.writeTo(this);
    
public voidsendMessageError(com.sun.corba.se.spi.ior.iiop.GIOPVersion giopVersion)

        Message msg = MessageBase.createMessageError(giopVersion);
	sendHelper(giopVersion, msg);
    
public voidsendWithoutLock(com.sun.corba.se.pept.encoding.OutputObject outputObject)

        // Don't we need to check for CloseConnection
        // here?  REVISIT

        // XREVISIT - Shouldn't the MessageMediator 
        // be the one to handle writing the data here?

        try {

            // Write the fragment/message

	    CDROutputObject cdrOutputObject = (CDROutputObject) outputObject;
	    cdrOutputObject.writeTo(this);
	    // REVISIT - no flush?
            //socket.getOutputStream().flush();

        } catch (IOException e1) {

            /*
             * ADDED(Ram J) 10/13/2000 In the event of an IOException, try
             * sending a CancelRequest for regular requests / locate requests
             */

            // Since IIOPOutputStream's msgheader is set only once, and not
            // altered during sending multiple fragments, the original 
            // msgheader will always have the requestId.
	    // REVISIT This could be optimized to send a CancelRequest only
	    // if any fragments had been sent already.

	    /* REVISIT: MOVE TO SUBCONTRACT
            Message msg = os.getMessage();
            if (msg.getType() == Message.GIOPRequest ||
                    msg.getType() == Message.GIOPLocateRequest) {
                GIOPVersion requestVersion = msg.getGIOPVersion();
                int requestId = MessageBase.getRequestId(msg);
                try {
                    sendCancelRequest(requestVersion, requestId);
                } catch (IOException e2) {
                    // most likely an abortive connection closure.
                    // ignore, since nothing more can be done.
		    if (orb.transportDebugFlag) {
		    
                }
            }
	    */

            // REVISIT When a send failure happens, purgeCalls() need to be 
            // called to ensure that the connection is properly removed from
            // further usage (ie., cancelling pending requests with COMM_FAILURE 
            // with an appropriate minor_code CompletionStatus.MAY_BE).            

            // Relying on the IIOPOutputStream (as noted below) is not 
            // sufficient as it handles COMM_FAILURE only for the final 
            // fragment (during invoke processing). Note that COMM_FAILURE could 
            // happen while sending the initial fragments. 
            // Also the IIOPOutputStream does not properly close the connection.
            // It simply removes the connection from the table. An orderly
            // closure is needed (ie., cancel pending requests on the connection
            // COMM_FAILURE as well.
            
            // IIOPOutputStream will cleanup the connection info when it
            // sees this exception.
            SystemException exc = wrapper.writeErrorSend(e1); 
            purgeCalls(exc, false, true); 
            throw exc; 
        }
    
public com.sun.corba.se.spi.protocol.CorbaMessageMediatorserverRequestMapGet(int requestId)

	return (CorbaMessageMediator)
	    serverRequestMap.get(new Integer(requestId));
    
public voidserverRequestMapPut(int requestId, com.sun.corba.se.spi.protocol.CorbaMessageMediator messageMediator)

	serverRequestMap.put(new Integer(requestId), messageMediator);
    
public voidserverRequestMapRemove(int requestId)

	serverRequestMap.remove(new Integer(requestId));
    
public synchronized voidserverRequestProcessingBegins()
It is possible for a Close Connection to have been sent here, but we will not check for this. A "lazy" Exception will be thrown in the Worker thread after the incoming request has been processed even though the connection is closed before the request is processed. This is o.k because it is a boundary condition. To prevent it we would have to add more locks which would reduce performance in the normal case.

        serverRequestCount++;
    
public synchronized voidserverRequestProcessingEnds()

        serverRequestCount--;
    
public com.sun.corba.se.pept.protocol.MessageMediatorserverRequest_1_1_Get()

	return 	serverRequest_1_1;
    
public voidserverRequest_1_1_Put(com.sun.corba.se.pept.protocol.MessageMediator x)

	serverRequest_1_1 = x;
    
public voidserverRequest_1_1_Remove()

	serverRequest_1_1 = null;
    
public final voidsetCodeBaseIOR(com.sun.corba.se.spi.ior.IOR ior)

        codeBaseServerIOR = ior;
    
public synchronized voidsetCodeSetContext(com.sun.corba.se.impl.encoding.CodeSetComponentInfo$CodeSetContext csc)

        // Double check whether or not we need to do this
        if (codeSetContext == null) {
            
            if (OSFCodeSetRegistry.lookupEntry(csc.getCharCodeSet()) == null ||
                OSFCodeSetRegistry.lookupEntry(csc.getWCharCodeSet()) == null) {
                // If the client says it's negotiated a code set that
                // isn't a fallback and we never said we support, then
                // it has a bug.
		throw wrapper.badCodesetsFromClient() ;
            }

            codeSetContext = csc;
        }
    
public voidsetConnectionCache(com.sun.corba.se.pept.transport.ConnectionCache connectionCache)

	this.connectionCache = connectionCache;
    
public voidsetEnqueueTime(long timeInMillis)

	enqueueTime = timeInMillis;
    
protected voidsetPartialMessageMediator(com.sun.corba.se.spi.protocol.CorbaMessageMediator messageMediator)

       	partialMessageMediator = messageMediator;
    
public synchronized voidsetPostInitialContexts()

        postInitialContexts = true;
    
protected voidsetReadGiopHeaderOnly(boolean shouldReadHeaderOnly)

	shouldReadGiopHeaderOnly = shouldReadHeaderOnly;
    
protected voidsetReadTimeouts(com.sun.corba.se.spi.transport.ReadTimeouts readTimeouts)

	this.readTimeouts = readTimeouts;
    
public voidsetState(java.lang.String stateString)

	synchronized (stateEvent) {
	    if (stateString.equals("ESTABLISHED")) {
		state =  ESTABLISHED;
		stateEvent.notifyAll();
	    } else {
		// REVISIT: ASSERT
	    }
	}
    
public voidsetTimeStamp(long time)

	timeStamp = time;
    
public voidsetUseSelectThreadToWait(boolean x)

	useSelectThreadToWait = x;
	// REVISIT - Reading of a GIOP header only is information
	//           that should be passed into the constructor
	//           from the SocketOrChannelConnection factory.
        setReadGiopHeaderOnly(shouldUseSelectThreadToWait());
    
public booleanshouldReadGiopHeaderOnly()

	return shouldReadGiopHeaderOnly;
    
public booleanshouldRegisterReadEvent()

	return true;
    
public booleanshouldRegisterServerReadEvent()

	return true;
    
public booleanshouldUseDirectByteBuffers()

	return getSocketChannel() != null;
    
public java.lang.StringtoString()

        synchronized ( stateEvent ){
            return 
		"SocketOrChannelConnectionImpl[" + " "
		+ (socketChannel == null ?
		   socket.toString() : socketChannel.toString()) + " "
		+ getStateString( state ) + " "
		+ shouldUseSelectThreadToWait() + " "
		+ shouldUseWorkerThreadForEvent() + " "
		+ shouldReadGiopHeaderOnly()
		+ "]" ;
        }
    
public voidunregisterWaiter(com.sun.corba.se.pept.protocol.MessageMediator messageMediator)

        responseWaitingRoom.unregisterWaiter(messageMediator);
    
public com.sun.corba.se.pept.encoding.InputObjectwaitForResponse(com.sun.corba.se.pept.protocol.MessageMediator messageMediator)

	return responseWaitingRoom.waitForResponse(messageMediator);
    
public voidwrite(java.nio.ByteBuffer byteBuffer)

	if (shouldUseDirectByteBuffers()) {
	    /* NOTE: cannot perform this test.  If one ask for a
	       ByteBuffer from the pool which is bigger than the size
	       of ByteBuffers managed by the pool, then the pool will
	       return a HeapByteBuffer.
	    if (byteBuffer.hasArray()) {
		throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
	    }
	    */
            // IMPORTANT: For non-blocking SocketChannels, there's no guarantee
            //            all bytes are written on first write attempt.
            do {
                getSocketChannel().write(byteBuffer);
            }
            while (byteBuffer.hasRemaining());

	} else {
	    if (! byteBuffer.hasArray()) {
		throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
	    }
	    byte[] tmpBuf = byteBuffer.array();
	    getSocket().getOutputStream().write(tmpBuf, 0, byteBuffer.limit());
	    getSocket().getOutputStream().flush();
	}
	
	// TimeStamp connection to indicate it has been used
	// Note granularity of connection usage is assumed for
	// now to be that of a IIOP packet.
	getConnectionCache().stampTime(this);
    
public voidwriteLock()
Sets the writeLock for this connection. If the writeLock is already set by someone else, block till the writeLock is released and can set by us. IMPORTANT: this connection's lock must be acquired before setting the writeLock and must be unlocked after setting the writeLock.

      try {
        if (dprintWriteLocks && orb.transportDebugFlag) {
	    dprint(".writeLock->: " + this);
	}
        // Keep looping till we can set the writeLock.
        while ( true ) {
	    int localState = state;
	    switch ( localState ) {
	    
	    case OPENING:
		synchronized (stateEvent) {
		    if (state != OPENING) {
			// somebody has changed 'state' so be careful
			break;
		    }
		    try {
			stateEvent.wait();
		    } catch (InterruptedException ie) {
			if (orb.transportDebugFlag) {
			    dprint(".writeLock: OPENING InterruptedException: " + this);
			}
		    }
		}
		// Loop back
		break;
	    
	    case ESTABLISHED:
		synchronized (writeEvent) {
		    if (!writeLocked) {
			writeLocked = true;
			return;
		    }
		
		    try {
			// do not stay here too long if state != ESTABLISHED
			// Bug 4752117
			while (state == ESTABLISHED && writeLocked) {
			    writeEvent.wait(100);
			}
		    } catch (InterruptedException ie) {
			if (orb.transportDebugFlag) {
			    dprint(".writeLock: ESTABLISHED InterruptedException: " + this);
			}
		    }
		}
		// Loop back
		break;
	    
		//
		// XXX
		// Need to distinguish between client and server roles
		// here probably.
		//
	    case ABORT:
		synchronized ( stateEvent ){
                    if (state != ABORT) {
                        break;
                    }
		    throw wrapper.writeErrorSend() ;
		}
		 
	    case CLOSE_RECVD:
		// the connection has been closed or closing
		// ==> throw rebind exception
		synchronized ( stateEvent ){
                    if (state != CLOSE_RECVD) {
                        break;
                    }
		    throw wrapper.connectionCloseRebind() ;
		}
	    
	    default:
		if (orb.transportDebugFlag) {
		    dprint(".writeLock: default: " + this);
		}
		// REVISIT
		throw new RuntimeException(".writeLock: bad state");
	    }
        }
      } finally {
        if (dprintWriteLocks && orb.transportDebugFlag) {
	    dprint(".writeLock<-: " + this);
	}
      }
    
public voidwriteUnlock()

	try {
	    if (dprintWriteLocks && orb.transportDebugFlag) {
		dprint(".writeUnlock->: " + this);
	    }
	    synchronized (writeEvent) {
		writeLocked = false;
		writeEvent.notify(); // wake up one guy waiting to write
	    }
	} finally {
	    if (dprintWriteLocks && orb.transportDebugFlag) {
		dprint(".writeUnlock<-: " + this);
	    }
	}