FileDocCategorySizeDatePackage
ClientOutboundSequence.javaAPI DocExample24078Tue May 29 16:56:42 BST 2007com.sun.xml.ws.rm.jaxws.runtime.client

ClientOutboundSequence.java

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 * 
 * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
 * 
 * The contents of this file are subject to the terms of either the GNU
 * General Public License Version 2 only ("GPL") or the Common Development
 * and Distribution License("CDDL") (collectively, the "License").  You
 * may not use this file except in compliance with the License. You can obtain
 * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 * language governing permissions and limitations under the License.
 * 
 * When distributing the software, include this License Header Notice in each
 * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 * Sun designates this particular file as subject to the "Classpath" exception
 * as provided by Sun in the GPL Version 2 section of the License file that
 * accompanied this code.  If applicable, add the following below the License
 * Header, with the fields enclosed by brackets [] replaced by your own
 * identifying information: "Portions Copyrighted [year]
 * [name of copyright owner]"
 * 
 * Contributor(s):
 * 
 * If you wish your version of this file to be governed by only the CDDL or
 * only the GPL Version 2, indicate your decision by adding "[Contributor]
 * elects to include this software in this distribution under the [CDDL or GPL
 * Version 2] license."  If you don't indicate a single choice of license, a
 * recipient has the option to distribute your version of this file under
 * either the CDDL, the GPL Version 2 or to extend the choice of license to
 * its licensees as provided above.  However, if you add GPL Version 2 code
 * and therefore, elected the GPL Version 2 license, then the option applies
 * only if the new code is made subject to such option by the copyright
 * holder.
 */

// ClientOutboundSequence.java
//
//
// @author Mike Grogan
// Created on October 15, 2005, 3:13 PM
//
package com.sun.xml.ws.rm.jaxws.runtime.client;
import com.sun.xml.ws.api.SOAPVersion;
import com.sun.xml.ws.api.addressing.AddressingVersion;
import com.sun.xml.ws.api.addressing.WSEndpointReference;
import com.sun.xml.ws.rm.InvalidMessageNumberException;
import com.sun.xml.ws.rm.Message;
import com.sun.xml.ws.rm.RMException;
import com.sun.xml.ws.rm.jaxws.runtime.OutboundSequence;
import com.sun.xml.ws.rm.jaxws.runtime.SequenceConfig;
import com.sun.xml.ws.rm.protocol.*;
import com.sun.xml.ws.security.secext10.SecurityTokenReferenceType;

import javax.xml.ws.Service;
import javax.xml.bind.JAXBElement;
import javax.xml.transform.Source;
import javax.xml.ws.wsaddressing.W3CEndpointReference;
import java.net.URI;
import java.util.UUID;
import java.util.logging.Logger;
import java.util.logging.Level;
import com.sun.xml.ws.rm.jaxws.util.LoggingHelper;
import com.sun.xml.ws.api.rm.client.ClientSequence;
import com.sun.xml.ws.api.rm.AcknowledgementListener;
import com.sun.xml.ws.api.rm.SequenceSettings;
import com.sun.xml.ws.rm.jaxws.runtime.InboundSequence;
/**
 * ClientOutboundSequence represents the set of all messages from a single BindingProvider instance.
 * It includes methods that connect and disconnect to a remote RMDestination using
 * a client for a WebService that uses CreateSequence and TerminateSequence as its request messages.
 */

public class ClientOutboundSequence extends OutboundSequence
                                    implements ClientSequence {

    private static final Logger logger =
        Logger.getLogger(LoggingHelper.getLoggerName(ClientOutboundSequence.class));

    /**
     * Current value of receive buffer read from incoming SequenceAcknowledgement
     * messages if RM Destination implements properietary Indigo Flow Control feature.
     */
    protected int receiveBufferSize;


    /**
     * The helper class used to send protocol messages
     * <code>CreateSequenceElement</code>
     * <code>CreateSequenceResponseElement</code>
     * <code>LastMessage</code>
     * <code>AckRequestedElement</code>
     *
     */
    protected ProtocolMessageSender protocolMessageSender ;


    private SOAPVersion version ;

    /**
     * Flag to indicate if secureReliableMessaging is on
     */
    private boolean secureReliableMessaging;



    /**
     * The SecurityTokenReference to pass to CreateSequence
     */
    private JAXBElement<SecurityTokenReferenceType> str = null;


    /**
     * Indicates whether the sequence uses anonymous acksTo
     */
    private boolean isAnonymous = false;


    /*
     * Flag which indicates whether sequence is active (disconnect() has not
     * been called.
     */
    private boolean isActive = true;

    /**
     * Time after which resend of messages in sequences is attempted at
     * next opportunity.
     */
    private long resendDeadline;

    /**
     * Time after which Ack is requested at next opportunity.
     */
    private long ackRequestDeadline;
    
    /**
     * Can be registered to listen for sequence acknowledgements.
     */
    private AcknowledgementListener ackListener;
    
    /**
     * Service using this sequence (if known)
     */
    private Service service;


    /**
     * This field is used only as a hack to test Server-side
     * timeout functionality.  It is not intended to be used
     * for any other purpose.
     */
    private static boolean sendHeartbeats = true;

    
    public ClientOutboundSequence(SequenceConfig config) {
        this.config = config;

        //for now
        this.version = config.getSoapVersion();
        this.ackHandler = new AcknowledgementHandler(config);
        this.rmConstants = config.getRMConstants();
        this.bufferRemaining = config.getBufferSize();

    }

    /**
     * Accessor for the sequenceConfig field
     *
     * @return The value of the field.
     */
    public SequenceConfig getSequenceConfig() {
        return config;
    }

    /**
     * Mutator for the <code>receiveBufferSize</code> field.
     *
     * @param receiveBufferSize The new value for the field.
     */
    public void setReceiveBufferSize(int receiveBufferSize) {
        this.receiveBufferSize = receiveBufferSize;
    }

    /**
     * Accessor for the <code>receiveBufferSize</code> field.
     *
     * @return The value for the field.
     */
    public int getReceiveBufferSize() {
        return receiveBufferSize;
    }

    public boolean isSecureReliableMessaging() {
        return secureReliableMessaging;
    }

    /**
     * Return the hoped-for limit to number of stored messages.  Currently
     * the limit is not enforced, but as the number of stored messages approaches
     * the limit, resends and ackRequests occur more frequently.
     */
    public int getTransferWindowSize() {
       //Use server size receive buffer size for now.  Might
       //want to make this configurable.
       return config.getBufferSize();
    }
    
    /**
     * Registers a <code>AcknowledgementListener</code> for this
     * sequence
     *
     * @param listener The <code>AcknowledgementListener</code>
     */
  
    public void setAcknowledgementListener(AcknowledgementListener listener) {
        this.ackListener = listener;
    }
    
   
    /**
     * Implementation of the getSequenceSettings method in
     * com.sun.xml.ws.rm.api.client.ClientSequence.  Need
     * to populate the sequence ids in the returned SequenceSettings
     * object, since in general, they will not be set in the underlying
     * SequenceConfig object.
     */
    public SequenceSettings getSequenceSettings() {
        
        SequenceSettings settings = getSequenceConfig();
        settings.sequenceId = getId();
        
        InboundSequence iseq = getInboundSequence();
        
        settings.companionSequenceId = (iseq != null) ?
                                       iseq.getId() :
                                       null;
        return settings;
    }
    
    /**
     * Accessor for the AcknowledgementListener field.
     *
     * @return The AcknowledgementListener.
     */
    public AcknowledgementListener getAcknowledgementListener() {
        return ackListener;
    }

    public void setSecureReliableMessaging(boolean secureReliableMessaging) {
        this.secureReliableMessaging = secureReliableMessaging;
    }
    
    /**
     * Accessor for the service field.
     *
     * @returns The value of the service field.  May be null if not known.
     */
    public Service getService() {
        return service;
    }
    
    /**
     * Sets the value of the service field.
     *
     * @param service The service using the sequence.
     */
    public void setService(Service service) {
        this.service = service;
    }

    /**
     * Connects to remote RM Destination by sending request through the proxy
     * stored in the <code>port</code> field.
     *
     * @param destination Destination URI for RM Destination
     * @param acksTo reply to EPR for protocol responses.  The null value indicates
     *          use of the WS-Addressing anonymous EPR
     * @throws RMException wrapper for all exceptions thrown during execution of method.
     */
    public void connect(URI destination,
                        URI acksTo,
                        boolean twoWay) throws RMException {
        try {

            this.destination = destination;
            this.acksTo = acksTo;

            String anonymous = rmConstants.getAnonymousURI().toString();
            String acksToString;

            if (acksTo == null) {
                acksToString = anonymous;
            } else {
                acksToString = acksTo.toString();

            }

            this.isAnonymous = acksToString.equals(anonymous);


            CreateSequenceElement cs = new CreateSequenceElement();

            /**
             * ADDRESSING_FIXME
             * This needs to be fixed commenting temporarily to get the compilation
             * problems fixed
             */
            /*if (RMConstants.getAddressingVersion() == AddressingVersion.W3C){
                cs.setAcksTo(new W3CAcksToImpl(new URI(acksToString)));
            }    else {
                cs.setAcksTo(new MemberSubmissionAcksToImpl(new URI(acksToString)));
                
            }*/
            W3CEndpointReference endpointReference = null;
            AddressingVersion addressingVersion = rmConstants.getAddressingVersion();
            if ( addressingVersion == AddressingVersion.W3C){
                  //WSEndpointReference wsepr = new WSEndpointReference(getClass().getResourceAsStream("w3c-anonymous-acksTo.xml"), addressingVersion);
                  WSEndpointReference epr = AddressingVersion.W3C.anonymousEpr;
                  Source s = epr.asSource("AcksTo");
                  endpointReference = new W3CEndpointReference(s);
            }/*else {
                  WSEndpointReference wsepr = new WSEndpointReference(getClass().getResourceAsStream("member-anonymous-acksTo.xml"), addressingVersion);
                  Source s = wsepr.asSource("AcksTo");
                  endpointReference = new MemberSubmissionEndpointReference(s);
            }*/
            cs.setAcksTo(endpointReference);

            String incomingID = "uuid:" + UUID.randomUUID();

            if (twoWay) {
                Identifier id = new Identifier();
                id.setValue(incomingID);
                OfferType offer = new OfferType();
                offer.setIdentifier(id);

                cs.setOffer(offer);
            }

            if (secureReliableMessaging) {
                JAXBElement<SecurityTokenReferenceType> str = getSecurityTokenReference();
                if (str != null) {
                    cs.setSecurityTokenReference(str.getValue());
                }   else {
                    throw new RMException("SecurityTokenReference is null");
                }
            }

            CreateSequenceResponseElement csr = protocolMessageSender.sendCreateSequence(cs,destination,
                    acksTo,version);
           
            
            if (csr != null ) {
                Identifier idOutbound = csr.getIdentifier();
                this.id = idOutbound.getValue();
                
                AcceptType accept = csr.getAccept();
                
                if (accept != null) {
                    /**
                     * ADDRESSING_FIXME Needs to be fixes once
                     * AcksTO issue is resolved
                     */
                   /* URI uriAccept = accept.getAcksTo();*/
                   URI uriAccept = null;

                    inboundSequence = new ClientInboundSequence(this,
                            incomingID,
                            uriAccept);
                } else {
                    inboundSequence = new ClientInboundSequence(this,
                            incomingID, null);
                }
                
                //start the inactivity clock
                resetLastActivityTime();

            } else {
                //maybe a non-anonymous AcksTo
                //Handle CreateSequenceRefused fault
            }
        } catch (Exception e) {
            throw new RMException(e);
        }
    }

   
    /**
     * Disconnect from the RMDestination by invoking <code>TerminateSequence</code> on
     * the proxy stored in the <code>port</code> field. State of 
     * sequence is set to inactive.
     *
     * @throws RMException wrapper for all exceptions thrown during execution of method.
     */
    public void disconnect() throws RMException {
        disconnect(false);
    }


    /**
     * Disconnect from the RMDestination by invoking <code>TerminateSequence</code> on
     * the proxy stored in the <code>port</code> field.
     *
     * @param keepAlive If true, state of sequence is kept in
     * active atate allowing the reuse of the sequence.
     *
     * @throws RMException wrapper for all exceptions thrown during execution of method.
     */
    public void disconnect(boolean keepAlive) throws RMException {

        //FIXME - find another check for connectiveness.. want to get rid of
        //unnecessary InboundSequences.
        if (inboundSequence == null) {
            throw new IllegalStateException("Not connected.");
        }
        
        isActive = keepAlive;
 
        //TODO 
        //Move this after waitForAcks to obviate  problems caused by
        //the LastMessage Protocol message being processed concurrently with
        //application messages.  At the moment, this may cause problems in
        //Glassfish container with ordered delivery configured.  This will
        //probably no longer be the case when the Tube/Fibre architecture
        //is used.
        sendLast();
         
        //this will block until all messages are complete
        waitForAcks();
                 
        TerminateSequenceElement ts = new TerminateSequenceElement();
        Identifier idTerminate = new Identifier();
        idTerminate.setValue(id);
        ts.setIdentifier(idTerminate);
        protocolMessageSender.sendTerminateSequence(ts,this,version);

    }

    private void sendLast() throws RMException{
        protocolMessageSender.sendLast(this,version);
    }

    /**
     * Causes the specified message number to be resent.
     *
     * @param messageNumber The message number to resend
     */
    public void resend(int messageNumber) throws RMException {
        Message mess = get(messageNumber);
        mess.resume();
    }

    /**
     * Forces an ack request on next message
     */
    public synchronized void requestAck() {
        ackRequestDeadline = System.currentTimeMillis();
    }

    
    /**
     * Checks whether an ack should be requested.  Currently checks whether the
     * The algorithm checks whether the ackRequest deadline has elapsed.  
     * The ackRequestDeadline is determined by the ackRequestInterval in the 
     * SequenceConfig member for this sequence.
     *
     */
    protected synchronized boolean isAckRequested(){
      
        long time = System.currentTimeMillis();
        if (time > ackRequestDeadline) {
            //reset the clock
            ackRequestDeadline = time + getAckRequestInterval();
            return true;
        } else {
            return false;
        }
    }
    
    /**
     * Checks whether a resend should happen.  The algorithm checks whether 
     * the resendDeadline has elapsed.  
     * The resendDeadline is determined by the resendInterval in the 
     * SequenceConfig member for this sequence.
     *
     */
    public synchronized boolean isResendDue() {
        long time = System.currentTimeMillis();
        if (time > resendDeadline) {
            //reset the clock
            resendDeadline = time + getResendInterval();
            return true;
        } else {
            return false;
        }
    }
    
    private long getResendInterval() {
        
        //do a resend at every opportunity under these conditions
        //1. Sequence has been terminated
        //2. Number of stored messages exceeds 1/2 available space.
        
        if (!isActive || 
            storedMessages > (getTransferWindowSize() / 2) ) {
            return 0;
        }
        return config.getResendInterval();
    }
    
    /**
     * Returns true if TransferWindow is full.  In this case, we 
     * hold off on sending messages.
     */
    public boolean isTransferWindowFull() {
        return getTransferWindowSize() == storedMessages;
    }
    
    private long getAckRequestInterval() {
        //send an ackRequest at every opportunity under these conditions
        //1. Sequence has been terminated
        //2. Number of stored messages exceeds 1/2 available space.
        //3. Number of stored messages at endpoint exceeds 1/2
        //   available space.
        if (!isActive || 
            storedMessages > (getTransferWindowSize() / 2) ||
            getReceiveBufferSize() > (config.getBufferSize() / 2)) {
            return 0;
        }
        return config.getAckRequestInterval();
    }
    
    /**
     * Implementation of acknowledge defers discarding stored messages when
     * the AcksTo endpoint is anonymous and the message is a two-way request.
     * In this case, the actual work usually done by acknowledge() needs to
     * wait until the response is received.  The RMClientPipe invokes 
     * <code>acknowledgeResponse</code> at that time.
     *
     * @param i The index to acknowledge
     * @throws InvalidMessageNumberException
     */
    public synchronized void acknowledge(int i) 
            throws InvalidMessageNumberException {
        
        Message mess = get(i);
        if (isAnonymous() && mess.isTwoWayRequest) {
            return;
        } else {
            super.acknowledge(i);
                    
            if (ackListener != null) {
                ackListener.notify(this, i);
            }
            //if this acknowledgement is not on the protocol
            //response for the one-way message (endpoint behaved
            //unkindly, or possibly dropped the request), the sending
            //thread is waiting in the resend loop in RMClientPipe.
            mess.resume();
        }
    }
    
    /**
     * Acknowledges that a response to a two-way operation has been
     * received. See Javadoc for <code>acknowledge</code>
     *
     * @param i The index to acknowledge
     * @throws InvalidMessageNumberException
     */
    public synchronized void acknowledgeResponse(int i) 
            throws InvalidMessageNumberException {
            
        super.acknowledge(i);
        if (ackListener != null) {
                ackListener.notify(this, i);
        }
    }
    
    /**
     * Return value is determined by whether the destination endpoint is the
     * anonymous URI.
     * 
     * @return <code>true</code> if the destination is the anonymous URI.
     *         <code>false</code> otherwise.
     */
    public boolean isAnonymous() {
        return isAnonymous;
    }


    public void registerProtocolMessageSender(ProtocolMessageSender pms) {
        this.protocolMessageSender = pms;

    }

    public JAXBElement<SecurityTokenReferenceType> getSecurityTokenReference() {
        return str;
    }

    public void setSecurityTokenReference(JAXBElement<SecurityTokenReferenceType> str) {
        this.str = str;
    }
    
    /**
     * Handler periodically invoked by RMSource.MaintenanceThread.
     * Has two duties:<p>
     * <ul><li>Resend incomplete messages.</li>
     *     <li>Send AckRequested message down the pipeline if Inactivity 
     *      timeout is approaching.</li>
     * </ul>
     *
     * @throws RMException 
     */
    public synchronized void doMaintenanceTasks() throws RMException {
         
        if (storedMessages > 0 && isResendDue()) {
            int top = getNextIndex();
            for (int i = 1; i < top; i++) {
                Message mess = get(i);
                if (mess != null && !mess.isComplete()) {
                    logger.fine("resending "  + getId() + ":" + i);
                    resend(i);
                }
            }
        } else {
            //check whether we need to prime the pump
            if (isGettingClose(System.currentTimeMillis() - getLastActivityTime(),
                                config.getInactivityTimeout())) { 
                //send an AckRequested down the pipe.  Need to use a background
                //Thread.  This is being called by the RMSource maintenance thread
                //whose health we have to be very careful with.  If the heartbeat
                //message takes inordinately long to process, the maintenance thread
                //could miss many assignments.
                new AckRequestedSender(this).start();
            }
        }
    }
    
    
  
    private class AckRequestedSender extends Thread {
        
        private ClientOutboundSequence sequence;
        
        AckRequestedSender(ClientOutboundSequence sequence) {
            this.sequence = sequence;
        }
        public void run() {
            try {
               
                if (sendHeartbeats) {
                    
                    logger.fine(Messages.HEARTBEAT_MESSAGE_MESSAGE
                            .format(sequence.getId(), System.currentTimeMillis()));
                    
                    protocolMessageSender.sendAckRequested(sequence, 
                                                       version);
                }
                
            } catch (Exception e) {
                //We get here in at least two cases.
                //1. Client running in Webapp that is undeployed, 
                //2. SequenceFault from AckRequested message.
                //
                //In both cases the sequence is of no further use.  We
                //will assume for now that this is already the case.
                logger.log(Level.FINE, 
                           Messages.HEARTBEAT_MESSAGE_EXCEPTION.format() + " " +
                           sequence.getId(), e);
                try {
                    RMSource.getRMSource().removeOutboundSequence(sequence);
                } catch (Exception ex){
                }         
            }
        }
    }

}