FileDocCategorySizeDatePackage
CorbaResponseWaitingRoomImpl.javaAPI DocJava SE 5 API9470Fri Aug 26 14:54:32 BST 2005com.sun.corba.se.impl.transport

CorbaResponseWaitingRoomImpl.java

/*
 * @(#)CorbaResponseWaitingRoomImpl.java	1.29 04/03/01
 * 
 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
 * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
 */

package com.sun.corba.se.impl.transport;

import java.util.Hashtable;

import org.omg.CORBA.CompletionStatus;
import org.omg.CORBA.SystemException;

import com.sun.corba.se.pept.encoding.InputObject;
import com.sun.corba.se.pept.encoding.OutputObject;
import com.sun.corba.se.pept.protocol.MessageMediator;

import com.sun.corba.se.spi.logging.CORBALogDomains;
import com.sun.corba.se.spi.orb.ORB;
import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
import com.sun.corba.se.spi.transport.CorbaConnection;
import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;

import com.sun.corba.se.impl.encoding.BufferManagerReadStream;
import com.sun.corba.se.impl.encoding.CDRInputObject;
import com.sun.corba.se.impl.logging.ORBUtilSystemException;
import com.sun.corba.se.impl.orbutil.ORBUtility;
import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyOrReplyMessage;
import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage;

/**
 * @author Harold Carr
 */
public class CorbaResponseWaitingRoomImpl
    implements
	CorbaResponseWaitingRoom
{
    final static class OutCallDesc
    {
        java.lang.Object done = new java.lang.Object();
        Thread thread;
	MessageMediator messageMediator;
        SystemException exception;
        InputObject inputObject;
    }

    private ORB orb;
    private ORBUtilSystemException wrapper ;

    private CorbaConnection connection;
    // Maps requestId to an OutCallDesc.
    private Hashtable out_calls = null; // REVISIT - use int hastable/map

    public CorbaResponseWaitingRoomImpl(ORB orb, CorbaConnection connection)
    {
	this.orb = orb;
	wrapper = ORBUtilSystemException.get( orb, 
	    CORBALogDomains.RPC_TRANSPORT ) ;
	this.connection = connection;
        out_calls = new Hashtable();
    }

    ////////////////////////////////////////////////////
    //
    // pept.transport.ResponseWaitingRoom
    //

    public void registerWaiter(MessageMediator mediator)
    {
	CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;

	if (orb.transportDebugFlag) {
	    dprint(".registerWaiter: " + opAndId(messageMediator));
	}

	Integer requestId = messageMediator.getRequestIdInteger();
        
	OutCallDesc call = new OutCallDesc();
	call.thread = Thread.currentThread();
	call.messageMediator = messageMediator;
	out_calls.put(requestId, call);
    }

    public void unregisterWaiter(MessageMediator mediator)
    {
	CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;

	if (orb.transportDebugFlag) {
	    dprint(".unregisterWaiter: " + opAndId(messageMediator));
	}

	Integer requestId = messageMediator.getRequestIdInteger();

        out_calls.remove(requestId);
    }

    public InputObject waitForResponse(MessageMediator mediator)
    {
      CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;

      try {

        InputObject returnStream = null;

	if (orb.transportDebugFlag) {
	    dprint(".waitForResponse->: " + opAndId(messageMediator));
	}

	Integer requestId = messageMediator.getRequestIdInteger();

        if (messageMediator.isOneWay()) {
            // The waiter is removed in releaseReply in the same
            // way as a normal request.

	    if (orb.transportDebugFlag) {
		dprint(".waitForResponse: one way - not waiting: "
		       + opAndId(messageMediator));
	    }

            return null;
        }

        OutCallDesc call = (OutCallDesc)out_calls.get(requestId);
        if (call == null) {
	    throw wrapper.nullOutCall(CompletionStatus.COMPLETED_MAYBE);
	}

        synchronized(call.done) {

            while (call.inputObject == null && call.exception == null) {
                // Wait for the reply from the server.
                // The ReaderThread reads in the reply IIOP message
                // and signals us.
                try {
		    if (orb.transportDebugFlag) {
			dprint(".waitForResponse: waiting: "
			       + opAndId(messageMediator));
		    }
                    call.done.wait();
                } catch (InterruptedException ie) {};
            }

            if (call.exception != null) {
		if (orb.transportDebugFlag) {
		    dprint(".waitForResponse: exception: " 
			   + opAndId(messageMediator));
		}
                throw call.exception;
            }

            returnStream = call.inputObject;
        }

	// REVISIT -- exceptions from unmarshaling code will
        // go up through this client thread!

        if (returnStream != null) {
	    // On fragmented streams the header MUST be unmarshaled here
	    // (in the client thread) in case it blocks.
	    // If the header was already unmarshaled, this won't
	    // do anything
	    // REVISIT: cast - need interface method.
	    ((CDRInputObject)returnStream).unmarshalHeader();
	}

        return returnStream;

      } finally {
	if (orb.transportDebugFlag) {
	    dprint(".waitForResponse<-: " + opAndId(messageMediator));
	}
      }
    }

    public void responseReceived(InputObject is) 
    {
	CDRInputObject inputObject = (CDRInputObject) is;
	LocateReplyOrReplyMessage header = (LocateReplyOrReplyMessage)
	    inputObject.getMessageHeader();
        Integer requestId = new Integer(header.getRequestId());
        OutCallDesc call = (OutCallDesc) out_calls.get(requestId);

	if (orb.transportDebugFlag) {
	    dprint(".responseReceived: id/"
		   + requestId  + ": "
		   + header);
	}

        // This is an interesting case.  It could mean that someone sent us a
        // reply message, but we don't know what request it was for.  That
        // would probably call for an error.  However, there's another case
        // that's normal and we should think about --
        //
        // If the unmarshaling thread does all of its work inbetween the time
        // the ReaderThread gives it the last fragment and gets to the
        // out_calls.get line, then it will also be null, so just return;
        if (call == null) {
	    if (orb.transportDebugFlag) {
		dprint(".responseReceived: id/" 
		       + requestId
		       + ": no waiter: "
		       + header);
	    }
            return;
	}

        // Set the reply InputObject and signal the client thread
        // that the reply has been received.
        // The thread signalled will remove outcall descriptor if appropriate.
        // Otherwise, it'll be removed when last fragment for it has been put on
        // BufferManagerRead's queue.
        synchronized (call.done) {
	    CorbaMessageMediator messageMediator = (CorbaMessageMediator)
		call.messageMediator;

	    if (orb.transportDebugFlag) {
		dprint(".responseReceived: "
		       + opAndId(messageMediator)
		       + ": notifying waiters");
	    }

	    messageMediator.setReplyHeader(header);
	    messageMediator.setInputObject(is);
	    inputObject.setMessageMediator(messageMediator);
            call.inputObject = is;
            call.done.notify();
        }
    }

    public int numberRegistered()
    {
        // Note: Hashtable.size() is not synchronized
	return out_calls.size();
    }

    //////////////////////////////////////////////////
    //
    // CorbaResponseWaitingRoom
    //

    public void signalExceptionToAllWaiters(SystemException systemException)
    {

	if (orb.transportDebugFlag) {
	    dprint(".signalExceptionToAllWaiters: " + systemException);
	}

        OutCallDesc call;
        java.util.Enumeration e = out_calls.elements();
        while(e.hasMoreElements()) {
            call = (OutCallDesc) e.nextElement();
        
            synchronized(call.done){
                // anything waiting for BufferManagerRead's fragment queue
                // needs to be cancelled
                CorbaMessageMediator corbaMsgMediator =
                             (CorbaMessageMediator)call.messageMediator;
                CDRInputObject inputObject =
                           (CDRInputObject)corbaMsgMediator.getInputObject();
                // IMPORTANT: If inputObject is null, then no need to tell
                //            BufferManagerRead to cancel request processing. 
                if (inputObject != null) {
                    BufferManagerReadStream bufferManager =
                        (BufferManagerReadStream)inputObject.getBufferManager();
                    int requestId = corbaMsgMediator.getRequestId();
                    bufferManager.cancelProcessing(requestId);
                }
                call.inputObject = null;
                call.exception = systemException;
                call.done.notify();
            }
        }
    }

    public MessageMediator getMessageMediator(int requestId)
    {
        Integer id = new Integer(requestId);
        OutCallDesc call = (OutCallDesc) out_calls.get(id);
	if (call == null) {
	    // This can happen when getting early reply fragments for a
	    // request which has completed (e.g., client marshaling error).
	    return null;
	}
	return call.messageMediator;
    }

    ////////////////////////////////////////////////////
    //
    // Implementation.
    //

    protected void dprint(String msg)
    {
	ORBUtility.dprint("CorbaResponseWaitingRoomImpl", msg);
    }

    protected String opAndId(CorbaMessageMediator mediator)
    {
	return ORBUtility.operationNameAndRequestId(mediator);
    }
}

// End of file.