FileDocCategorySizeDatePackage
TxServerPipe.javaAPI DocExample21700Tue May 29 16:57:18 BST 2007com.sun.xml.ws.tx.service

TxServerPipe

public class TxServerPipe extends com.sun.xml.ws.tx.common.TxBasePipe
Process transactional context for incoming message to server.

Supports following WS-Coordination protocols: 2004 WS-Atomic Transaction protocol

version
$Revision: 1.17.2.1 $
since
1.0

Fields Summary
private static com.sun.xml.ws.tx.common.TxLogger
logger
private final Unmarshaller
unmarshaller
private final com.sun.xml.ws.api.model.wsdl.WSDLPort
port
private final com.sun.xml.ws.api.WSBinding
wsbinding
private Map
opPolicyCache
Cache of operation name to its WS-AT policies computed in constructor. An operation is not inserted into cache if its WS-AT policies are the default values. This is a space/time tradeoff. Saves time in process call by taking up more memory for pipe. Makes sense for server-side, probably not for client-side
private static OperationATPolicy
DEFAULT
Constructors Summary
public TxServerPipe(com.sun.xml.ws.api.model.wsdl.WSDLPort port, com.sun.xml.ws.api.WSBinding wsbinding, com.sun.xml.ws.policy.PolicyMap map, com.sun.xml.ws.api.pipe.Pipe next)
Deployment time computations for WS-Atomic Tranaction processing.

Computes WS-Atomic Policy Assertions for all wsdl bound operations for this wsbinding.

param
port WSDL port for this pipe
param
map PolicyMap
param
next Next pipe to be executed.



                                               
      
                         
                         
                          
        super(next);
        unmarshaller = TxJAXBContext.createUnmarshaller();
        this.port = port;
        this.wsbinding = wsbinding;
        cacheOperationToPolicyMappings(map, port.getBinding());
    
private TxServerPipe(TxServerPipe from, com.sun.xml.ws.api.pipe.PipeCloner cloner)

        super(cloner.copy(from.next));
        cloner.add(from, this);
        this.port = from.port;
        this.wsbinding = from.wsbinding;
        this.unmarshaller = TxJAXBContext.createUnmarshaller();
        this.opPolicyCache = from.opPolicyCache;
    
Methods Summary
private voidassertNoCurrentTransaction(java.lang.String message)

        Transaction txn = null;
        try {
            txn = txnMgr.getTransaction();
        } catch (SystemException ex) {
            String handlerMsg = LocalizationMessages.TXN_MGR_OPERATION_FAILED_5011("getTransaction");
            logger.warning("assertNoCurrentTransaction", handlerMsg);
            throw new WebServiceException(handlerMsg, ex);
        }     
        if (txn != null) {
            logger.severe("TxServerPipe.process", message + " " + txn.toString());
            try {
                txnMgr.suspend();
            } catch (SystemException ex) {
                String handlerMsg = LocalizationMessages.TXN_MGR_OPERATION_FAILED_5011("suspend");
                logger.warning("assertNoCurrentTransaction", handlerMsg);
                throw new WebServiceException(handlerMsg, ex);
            }
        } 
    
private voidbeginTransaction()
Create a transaction context for pipe to be processed within.

        try {
           txnMgr.getUserTransaction().begin();
        } catch (NotSupportedException ex) {
            String handlerMsg = LocalizationMessages.TXN_MGR_OPERATION_FAILED_5011("getUserTransaction().begin()");
            logger.warning("beginTransaction", handlerMsg, ex);
            throw new WebServiceException(handlerMsg, ex);
        } catch (SystemException ex) {
            String handlerMsg = LocalizationMessages.TXN_MGR_OPERATION_FAILED_5011("getUserTransaction().begin()");
            logger.warning("beginTransaction", handlerMsg, ex);
            throw new WebServiceException(handlerMsg, ex);
        }
    
private voidcacheOperationToPolicyMappings(com.sun.xml.ws.policy.PolicyMap pmap, com.sun.xml.ws.api.model.wsdl.WSDLBoundPortType binding)
This method caches WS-AT policy assertion for all binding operations for the pipe. If an operation has the default WS-AT policy assertions, nothing is inserted in cache for method, the getATPolicy() method handles this case.

        
        // Cache wsat policy for each wsdl:binding/wsdl:operation for binding
        for (WSDLBoundOperation bindingOp : binding.getBindingOperations()) {
            TxBasePipe.OperationATPolicy opat = DEFAULT;
            try {
                opat = getOperationATPolicy(pmap, port, bindingOp);
                opPolicyCache.put(bindingOp.getName(), opat);
            } catch (WebServiceException wse) {
                logger.warning("cacheOperationToPolicyMappings", 
                        LocalizationMessages.WSAT_POLICY_PROCESSING_FAILURE_5017(
                                                       binding.getName(), bindingOp.getName()), 
                        wse);
            }
            if (logger.isLogging(Level.FINE)) {
                logger.fine("cacheOperationToPolicyMappings", "Operation: " +
                        binding.getName() + ":" +
                        bindingOp.getName() + " WS-AT Policy Assertions: ATAssertion:" + opat.atAssertion +
                        " ATAlwaysCapability:" + opat.ATAlwaysCapability);
            }
            
        }
    
private voidcommitTransaction()
Complete transaction context for pipe to be processed within.

        try {
            txnMgr.getUserTransaction().commit();
        } catch (Exception ex) {
            String commitExceptionMsg = LocalizationMessages.EXCEPTION_DURING_COMMIT_5008();
            logger.warning("commitTransaction", commitExceptionMsg, ex);
            throw new WebServiceException(commitExceptionMsg, ex);
        }
    
public com.sun.xml.ws.api.pipe.Pipecopy(com.sun.xml.ws.api.pipe.PipeCloner cloner)
Creates an identical clone of this Pipe.

        return new TxServerPipe(this, cloner);
    
private OperationATPolicygetATPolicy(javax.xml.namespace.QName operationName)
Return the WS-AT policy assertions for wsdl bounded operationName.

param
operationName wsdl bound operation
return
WS-AT policy assertions for operationName

        OperationATPolicy result = opPolicyCache.get(operationName);
        if (result == null) {
            // return default wsat policies
            result = getDefaultATPolicy();
        }
        return result;
    
private OperationATPolicygetDefaultATPolicy()

    
       
        return DEFAULT;
    
private booleanisServlet(com.sun.xml.ws.api.message.Packet pkt)
Returns true if pkt was sent to a servlet.

param
pkt packet representing a request message

        ServletContext sCtx = pkt.endpoint.getContainer().getSPI(javax.servlet.ServletContext.class);
        return sCtx != null;
    
public com.sun.xml.ws.api.message.Packetprocess(com.sun.xml.ws.api.message.Packet pkt)
Process WS-AT transactional context in incoming request message.

Transactional context processing is driven by ws-at policy assertions associated with wsdl:binding/wsdl:operation of parameter pkt.

param
pkt a packet is an incoming request message with JAX-WS properties
return
processing of pkt by next pipe in pipeline

         final String METHOD_NAME = "TxServerPipe.process";
         if (logger.isLogging(Level.FINER)) {
             Object[] params = new Object[1];
             params[0] = pkt;
             logger.entering(METHOD_NAME, params);
         }
        
        com.sun.xml.ws.tx.common.Message msg = new Message(pkt.getMessage(), wsbinding);
        WSDLBoundOperation msgOp = msg.getOperation(port);
        QName msgOperation = msgOp.getName();
        OperationATPolicy msgOpATPolicy = getATPolicy(msgOperation);
        String bindingName = port.getBinding().getName().toString();
        CoordinationContextInterface coordTxnCtx = null;
        
        // Precondition check
        assertNoCurrentTransaction(
           LocalizationMessages.INVALID_JTA_TRANSACTION_ENTERING_5002(bindingName, msgOperation.getLocalPart()));
        
        try {
            coordTxnCtx = msg.getCoordinationContext(unmarshaller);
        } catch (JAXBException je) {
            String invalidMsg = 
                    LocalizationMessages.INVALID_COORDINATION_CONTEXT_5006(
                                    msg.getMessageID(), 
                                    bindingName, 
                                    msgOperation.getLocalPart(), 
                                    je.getLocalizedMessage());
            logger.warning(METHOD_NAME, invalidMsg, je);
            
            // send fault S4.3 wscoor:InvalidParameters
            
            // DO NOT LOCALIZE englishReason
            // 2004 WS-Coordination, S4. Coordination Faults "[Reason] The English language reason element""
            String englishReason = "WSTX-SERVICE-5006: Unable to read CoordinationContext in request message, msgId=" + 
                                msg.getMessageID() + ", sent to endpoint:operation " + bindingName + ":" + 
                                msgOperation.getLocalPart() + " due to JAXException " + je.getMessage(); 
            WSEndpointReference replyTo = msg.getReplyTo();
            if (replyTo != null) {
                WsaHelper.sendFault(
                        msg.getFaultTo(),
                        replyTo.toSpec(),
                        SOAPVersion.SOAP_11,
                        TxFault.InvalidParameters,
                        englishReason,
                        msg.getMessageID());
            }
        }
        
        // verify coordination type protocol
        if (coordTxnCtx != null) {
            if (coordTxnCtx.getCoordinationType().equals(WSAT_2004_PROTOCOL)) {
                if (logger.isLogging(Level.FINEST)) {
                    logger.finest(METHOD_NAME, "Processing wscoor:CoordinationContext(protocol=" + coordTxnCtx.getCoordinationType() +
                                  "coordId=" + coordTxnCtx.getIdentifier() + ") for binding:" + bindingName + 
                                  "operation:"  + msgOp.getName());
                }
                
                //  let jax-ws runtime know that coordination context header was understood.
                msg.setCoordCtxUnderstood();
            } // else check for other supported protocols in future.
            else {  // unrecognized ws coordination protocol type
                // Another pipe *may* process CoordinationContext with this unknown protocol type so just log this.
                logger.info(METHOD_NAME, 
                        LocalizationMessages.IGNORING_UNRECOGNIZED_PROTOCOL_5005(
                        coordTxnCtx.getCoordinationType(),
                        coordTxnCtx.getIdentifier(),
                        bindingName,
                        msgOperation.getLocalPart()));
                coordTxnCtx = null;
            }
        }

        // wsat:ATAssertion policy assertion check.  Note 2004 WS-Atomic Transaction does not require this check.
        if (msgOpATPolicy.atAssertion == MANDATORY && coordTxnCtx == null) {
            String inconsistencyMsg = 
                    LocalizationMessages.MUST_FLOW_WSAT_COORDINATION_CONTEXT_5000(
                                bindingName, msgOperation.getLocalPart(), msg.getMessageID());
            logger.warning(METHOD_NAME, inconsistencyMsg);
            // TODO:  complete evaluation if desire to throw this as an exception or not.
            //        Since WS-AT specification does not require this, might be best not to throw exception.
            throw new WebServiceException(inconsistencyMsg);
        }

        /*
         * Absence of a ws-at ATAssertion does not forbid WS-AT CoordinationContext from flowing.
         * OASIS WS-AT refers to this case as no claims made. Rules external to WS-AT may have
         * caused this situation to occur.
         */
        if (coordTxnCtx != null && msgOpATPolicy.atAssertion == NOT_ALLOWED) {
            
            // Not an error just log this as occuring since it would be helpful to know about this.
            logger.info(METHOD_NAME, 
                    LocalizationMessages.UNEXPECTED_FLOWED_TXN_CONTEXT_5004(
                                  bindingName, msgOperation.getLocalPart(), msg.getMessageID(),
                                  coordTxnCtx.getIdentifier()));
        }

        boolean importedTxn = false;
        Packet responsePkt = null;
        Transaction jtaTxn = null;
        Exception rethrow = null;
        ATCoordinator coord = null;

        if (coordTxnCtx != null) {
            coord = (ATCoordinator) CoordinationManager.getInstance().lookupOrCreateCoordinator(coordTxnCtx);
            assert coord != null;

            jtaTxn = coord.getTransaction();
            if (jtaTxn != null) {
                if (logger.isLogging(Level.FINER)) {
                    logger.finer(METHOD_NAME, "Resume JTA Txn already associated with coordId=" + 
                                  coordTxnCtx.getIdentifier());
                }
                coord.resumeTransaction();  
                try {
                    responsePkt = next.process(pkt);
                } catch (Exception e) {
                    logger.warning(METHOD_NAME,
                                   LocalizationMessages.HANDLE_EXCEPTION_TO_COMPLETE_TRANSACTION_5012(coordTxnCtx.getIdentifier(),
                                                                                                 jtaTxn.toString()), 
                                   e);
                    rethrow = e;
                    txnMgr.setRollbackOnly();
                }
                coord.suspendTransaction();
            } else if (coord.isSubordinateCoordinator()) {
                if (logger.isLogging(Level.FINER)) {
                    logger.finer(METHOD_NAME, "importing ws-at activity id:" + coordTxnCtx.getIdentifier() +   
                                              " from external WS-AT coordinator");
                }
                importedTxn = true;
                ((ATSubCoordinator)coord).beginImportTransaction();
                try {
                    responsePkt = next.process(pkt);
                } catch (Exception e) {
                    jtaTxn = coord.getTransaction();
                    final String jtaTxnString = jtaTxn == null ? "" : jtaTxn.toString();
                    logger.warning(METHOD_NAME,
                                   LocalizationMessages.HANDLE_EXCEPTION_TO_RELEASE_IMPORTED_TXN_5013(coordTxnCtx.getIdentifier(),
                                                                                                 jtaTxnString), 
                                   e);
                    rethrow = e;
                    txnMgr.setRollbackOnly();
                }
                // Sun App Server 9.1 does not support suspend/resume with TransactionInflow, so release imported txn here.
                //coord.suspendTransaction();
                ((ATSubCoordinator)coord).endImportTransaction();
            } else {
                responsePkt = next.process(pkt);
            }
        } else if (msgOpATPolicy.ATAlwaysCapability == true) {
            
            // no Transaction context flowed with message but WS-AT policy assertion requests auto creation of txn
            // context on server side invocation of method.
            if (isServlet(pkt)) {
                beginTransaction();
                
                if (logger.isLogging(Level.FINER)) {
                    logger.finer(METHOD_NAME, "create JTA Transaction, no CoordinationContext flowed with operation and wsat:ATAlwaysCapability is enabled");
                }
            }    // else allow Java EE EJB container to create transaction context
                 // for Sun Application Server:  see BaseContainer.preInvokeTx
            try {
                responsePkt = next.process(pkt);
            } catch (Exception e) {
                rethrow = e;
                logger.warning(METHOD_NAME, 
                               LocalizationMessages.HANDLE_EXCEPTION_TO_COMMIT_CREATED_TXN_5015(), e);
                txnMgr.setRollbackOnly();
            }
            if (isServlet(pkt)) {
                 commitTransaction();
            } // else allow Java EE EJB container to finish transaction context
              // for Sun Application Server:  see BaseContainer.postInvokeTx
        } else { 
            responsePkt = next.process(pkt);
        }
        
        // Postcondition check
        assertNoCurrentTransaction(
                LocalizationMessages.INVALID_JTA_TRANSACTION_RETURNING_5003(bindingName, 
                                                                            msgOperation.getLocalPart()));
        if (rethrow != null) {
            String exMsg = LocalizationMessages.WSTXRETHROW_5014(bindingName, 
                                                                 msgOperation.toString());
            throw new WebServiceException(exMsg, rethrow);
        }
        
        logger.exiting(METHOD_NAME, responsePkt);
        return responsePkt;