FileDocCategorySizeDatePackage
ATCoordinator.javaAPI DocExample48035Tue May 29 16:57:16 BST 2007com.sun.xml.ws.tx.at

ATCoordinator.java

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 * 
 * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
 * 
 * The contents of this file are subject to the terms of either the GNU
 * General Public License Version 2 only ("GPL") or the Common Development
 * and Distribution License("CDDL") (collectively, the "License").  You
 * may not use this file except in compliance with the License. You can obtain
 * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
 * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
 * language governing permissions and limitations under the License.
 * 
 * When distributing the software, include this License Header Notice in each
 * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
 * Sun designates this particular file as subject to the "Classpath" exception
 * as provided by Sun in the GPL Version 2 section of the License file that
 * accompanied this code.  If applicable, add the following below the License
 * Header, with the fields enclosed by brackets [] replaced by your own
 * identifying information: "Portions Copyrighted [year]
 * [name of copyright owner]"
 * 
 * Contributor(s):
 * 
 * If you wish your version of this file to be governed by only the CDDL or
 * only the GPL Version 2, indicate your decision by adding "[Contributor]
 * elects to include this software in this distribution under the [CDDL or GPL
 * Version 2] license."  If you don't indicate a single choice of license, a
 * recipient has the option to distribute your version of this file under
 * either the CDDL, the GPL Version 2 or to extend the choice of license to
 * its licensees as provided above.  However, if you add GPL Version 2 code
 * and therefore, elected the GPL Version 2 license, then the option applies
 * only if the new code is made subject to such option by the copyright
 * holder.
 */
package com.sun.xml.ws.tx.at;

import com.sun.xml.ws.api.SOAPVersion;
import com.sun.xml.ws.api.tx.Protocol;
import com.sun.xml.ws.api.tx.TXException;
import com.sun.xml.ws.developer.MemberSubmissionEndpointReference;
import static com.sun.xml.ws.tx.at.ATParticipant.STATE.*;
import com.sun.xml.ws.tx.common.AT_2PC_State;
import static com.sun.xml.ws.tx.common.AT_2PC_State.ABORTING;
import static com.sun.xml.ws.tx.common.AT_2PC_State.ACTIVE;
import static com.sun.xml.ws.tx.common.AT_2PC_State.COMMITTING;
import static com.sun.xml.ws.tx.common.AT_2PC_State.PREPARED_SUCCESS;
import static com.sun.xml.ws.tx.common.AT_2PC_State.PREPARING;
import com.sun.xml.ws.tx.common.AddressManager;
import com.sun.xml.ws.tx.common.TransactionManagerImpl;
import com.sun.xml.ws.tx.common.TxFault;
import com.sun.xml.ws.tx.common.TxLogger;
import com.sun.xml.ws.tx.common.WsaHelper;
import com.sun.xml.ws.tx.coordinator.CoordinationContextInterface;
import com.sun.xml.ws.tx.coordinator.Coordinator;
import com.sun.xml.ws.tx.coordinator.Registrant;
import com.sun.xml.ws.tx.webservice.member.at.CoordinatorPortType;
import com.sun.xml.ws.tx.webservice.member.at.WSATCoordinator;
import com.sun.xml.ws.tx.webservice.member.coord.CreateCoordinationContextType;

import javax.transaction.RollbackException;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import javax.xml.ws.EndpointReference;
import javax.xml.ws.WebServiceContext;
import javax.xml.ws.WebServiceException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;

/**
 * Atomic Transaction Coordinator
 * <p/>
 * <p/>
 * Coordinator States: NONE, ACTIVE, Volatile2PCPrepare, Durable2PCPrepare, Committing, Aborting
 * <p/>
 * <p/>
 * <p/>
 * <p/>
 * <p/>
 * Relationship between ATCoordinator and Java Transaction Manager.
 * ATCoordinator is registered as an XAResource with Java Transaction Manager.
 * This enables Java Transaction Manager to be root transaction manager that
 * for ATCoordinator durable 2pc participants.  ATCoordinator registers for
 * Transaction Synchronization if it has volatile participants. This enables
 * volatile participants to be prepared BEFORE durable 2pc participants are prepared.
 * <p/>
 * <p/>
 * <p/>
 * Coordination Context expires  specifies the period, measured from
 * the point in time at which the context was first created or received, after which a
 * transaction MAY be terminated solely due to its length of operation.
 * From that point forward, the coordinator MAY elect to unilaterally roll back the transaction,
 * so long as it has not made a commit decision.
 *
 * @author Ryan.Shoemaker@Sun.COM
 * @author Joe.Fialli@Sun.COM
 * @version $Revision: 1.18.2.1 $
 * @since 1.0
 */
public class ATCoordinator extends Coordinator implements Synchronization, XAResource {

    public static final URI localCoordinationProtocolServiceURI =
            AddressManager.getPreferredAddress(CoordinatorPortType.class);

    // TODO: short term solution so waitFor* do not hang.  Remove when implement transaction timeout.
    static private final int MAX_WAIT_ITERATION = 300;
    static private final long WAIT_SLEEP = 2000;

    static private TxLogger logger = TxLogger.getATLogger(ATCoordinator.class);
    static final protected TransactionManagerImpl tm = TransactionManagerImpl.getInstance();

    enum ACTION { PREPARE, COMMIT, ROLLBACK };
    enum KIND { VOLATILE, DURABLE };
    
    /* map <Registrant.getId(), Registrant> of volatile 2pc participants */
    private final Map<String, ATParticipant> volatileParticipants = new LinkedHashMap<String, ATParticipant>(4);
    private AT_2PC_State volatileParticipantsState = ACTIVE;

    /* map <Registrant.getId(), Registrant> of durable 2pc participants */
    private final Map<String, ATParticipant> durableParticipants = new LinkedHashMap<String, ATParticipant>(4);
    private AT_2PC_State durableParticipantsState = ACTIVE;

    /* the completion registrant  - only allowed on root ATCoordinator
     */
    private ATCompletion completionRegistrant;
    
    private boolean guardTimeout = false;
    private boolean forgotten = false;


    /**
     * From WSAT 2004 S3.1.1,  new participants are not allowed once coordinator has responses from all volatile 2PC participants.
     */
    private boolean allowNewParticipants = true;

    // associated JTA transaction
    // since JTA provides no portable way to look up a transaction by id, cache Java Transaction with
    // ATCoordinator.
    protected Transaction transaction = null;

    /**
     * Construct a new Coordinator object from the specified context and soap request.
     * <p/>
     * This method is an entry point for the Activation service's createCoordinationContext
     * operation.  This entry point probably won't be used much, or not at all if we choose
     * not to publish the operation (which is optional in the WS-Coordination spec).
     *
     * @param context The coordination context
     * @param request The soap request
     */
    public ATCoordinator(CoordinationContextInterface context, CreateCoordinationContextType request) {
        super(context, request);
    }

    /**
     * Construct a new Coordinator object from the specified context.
     * <p/>
     * This constructor will be the main entry point for activity within the
     * AppServer.
     *
     * @param context The coordination context
     */
    public ATCoordinator(CoordinationContextInterface context) {
        super(context);
    }

    /**
     * Set once field.
     */
    public void setTransaction(final Transaction txn) {
        transaction = txn;
        if (txn == null) {
            return;
        }
        
        try {
            if (! this.isSubordinateCoordinator()) {
                // see #beforeCompletion and #afterCompletion for what this does.
                // NEVER to be used for subordinate coordiator.
                registerSynchronization();
            }
            
            // MUST register synchronization BEFORE next line that
            // causes local transaction to upgrade to JTS txn in glassfish.
            // (Otherwise registerSynchronization with local txn even though JTS transaction exists.
            //  Bug appears as beforeCompletion and afterCompletion never get called due to 
            //  mis-registration.)
            registerWithDurableParent();
        } catch (SystemException ex) {
            logger.severe("setTransaction", LocalizationMessages.XA_REGISTER_0004(ex.getLocalizedMessage()));
            // TODO: link and rethrow
        } catch (IllegalStateException ex) {
                logger.severe("setTransaction", LocalizationMessages.XA_REGISTER_0004(ex.getLocalizedMessage()));
                // TODO: link and rethrow
        } catch (RollbackException ex) {
                logger.severe("setTransaction", LocalizationMessages.XA_REGISTER_0004(ex.getLocalizedMessage()));
                // TODO: link and rethrow
        }
    }
    

    public Transaction getTransaction() {
        return transaction;
    }

    void setAborting() {
        volatileParticipantsState = ABORTING;
        durableParticipantsState = ABORTING;
    }

    boolean isAborting() {
        return volatileParticipantsState == ABORTING || durableParticipantsState == ABORTING;
    }

    /**
     * Get the list of {@link com.sun.xml.ws.tx.coordinator.Registrant}s for this coordinated activity.
     * <p/>
     * The returned list is unmodifiable (read-only).  Add new Registrants
     * with the {@link #addRegistrant(com.sun.xml.ws.tx.coordinator.Registrant, javax.xml.ws.WebServiceContext)} api instead.
     * <p/>
     *
     * @return the list of Registrant objects
     */
    public List<Registrant> getRegistrants() {
        final ArrayList<Registrant> list;
        if (completionRegistrant != null) {
            list = new ArrayList<Registrant>(volatileParticipants.size() + durableParticipants.size() + 1);
        } else {
            list = new ArrayList<Registrant>(volatileParticipants.size() + durableParticipants.size());
        }
        list.addAll(volatileParticipants.values());
        list.addAll(durableParticipants.values());
        if (completionRegistrant != null) {
            list.add(completionRegistrant);
        }
        return Collections.unmodifiableList(list);
    }

    protected void registerWithVolatileParent() {
        registerSynchronization();
    }

    /**
     * Enlist with parent of ATCoordinator which is JTA transaction manager.
     */
    protected boolean registerWithDurableParent() throws RollbackException, SystemException {
        boolean result = false;
        if (getTransaction() != null) {
            result = getTransaction().enlistResource(this);
        }
        return result;
    }

    /**
     * Add the specified Registrant to the list of registrants for this
     * coordinated activity.
     *
     * @param registrant The {@link Registrant}
     */
    @Override
    public void addRegistrant(final Registrant registrant, final WebServiceContext wsContext) {
        if (!allowNewParticipants) {
            // send fault S4.1 ws:coor Invalid State
            if(wsContext != null) {
                WsaHelper.sendFault(
                        wsContext,
                        SOAPVersion.SOAP_11,
                        TxFault.InvalidState,
                        "Invalid to register a new participant after the first durable participant is prepared.  Registrant id: " +  // no I18N - spec requires xml:lang="en"
                                registrant.getIdValue());
            }
            throw new IllegalStateException(LocalizationMessages.LATE_PARTICIPANT_REGISTRATION_0002());
        }
        // TODO: check for duplicate registration and send fault S4.6 ws:coor Already Registered
        switch (registrant.getProtocol()) {
            case COMPLETION:
                // Unimplemented OPTIONAL functionality.
                // TODO: do we need to see if this field is already set?
                // TODO: disallow if subordinate coordinator
                completionRegistrant = (ATCompletion) registrant;
                break;

            case DURABLE:
                logger.fine("ATCoordinator.addRegistrant", getCoordIdPartId(registrant));
                durableParticipants.put(registrant.getIdValue(), (ATParticipant) registrant);
                break;

            case VOLATILE:
                volatileParticipants.put(registrant.getIdValue(), (ATParticipant) registrant);
                break;

            default:
                throw new UnsupportedOperationException(
                        LocalizationMessages.UNKNOWN_PROTOCOL_0003((registrant.getProtocol().getUri())));
        }
    }

    /**
     * Get the registrant with the specified id or null if it does not exist.
     *
     * @param id the registrant id
     * @return the Registrant object or null if the id does not exist
     */
    public Registrant getRegistrant(final String id) {
        Registrant r = volatileParticipants.get(id);

        if (r == null) {
            r = durableParticipants.get(id);
        }

        if ((r == null) && (completionRegistrant != null) && 
            (completionRegistrant.getId().getValue().equals(id))) {
            r = completionRegistrant;
        }

        return r;
    }
    
    public void removeRegistrant(final String id) {
        forget(id);
    }

    /**
     * Return a Collection of volatile 2pc participants.
     * <p/>
     * This Collection is modifiable.
     *
     * @return A modifiable Collection of the volatile 2pc participants
     */
    public Collection<ATParticipant> getVolatileParticipants() {
        return volatileParticipants.values();
    }

    public Collection<ATParticipant> getVolatileParticipantsSnapshot() {
        return new ArrayList<ATParticipant>(volatileParticipants.values());
    }

    /**
     * Return a Collection of durable 2pc participants.
     * <p/>
     * This Collection is modifiable.
     *
     * @return A modifiable Collection of the durable 2pc participants
     */
    public Collection<ATParticipant> getDurableParticipants() {
        return durableParticipants.values();
    }

    public Collection<ATParticipant> getDurableParticipantsSnapshot() {
        return new ArrayList<ATParticipant>(durableParticipants.values());
    }


    /**
     * Get the completion registrant.
     *
     * @return The completion registrant
     */
    public ATCompletion getCompletionRegistrant() {
        return completionRegistrant;
    }

    /**
     * Send 2PC prepare to all volatile participants
     * <p/>
     * Volatile 2PC prepare constraint from 2004 WS-AT, section 3.3.1
     * the root coordinator begins the prepare phase of all participants registered for the Volatile 2PC protocol.
     * All participants registered for this protocol must respond before a Prepare is issued to a
     * participant registered for Durable 2PC. Further participants may register with the coordinator until the
     * coordinator issues a Prepare to any durable participant.
     */
    public void initiateVolatilePrepare() {
        // send prepare to all volatile participants before durable participants.
        // Section 3.3.1
        if (isAborting()) {
            initiateRollback();
            return;
        }
        volatileParticipantsState = PREPARING;
        actionForAllParticipants(getVolatileParticipantsSnapshot(), ACTION.PREPARE);
    }
    
    private void actionForAllParticipants(final Collection<ATParticipant> particpants, final ACTION action) {
        for (ATParticipant participant : particpants) {
            switch (action) {
                case PREPARE:
                    try {
                        participant.prepare();
                    } catch (TXException ex) {
                        setAborting();
                        return;
                    }
                    break;
                    
                case COMMIT:
                     try {
                        participant.commit();
                    } catch (TXException ex) {
                        setAborting();
                        return;
                    }
                    break;
                    
                case ROLLBACK:
                    participant.abort();
                    break;
                    
                default:
                    break;
            }
        }
    }

    /**
     * Wait for all volatile participants to respond to prepare.
     * <p/>
     * <p/>
     * Volatile participant state is set before this method returns.
     */
    protected void waitForVolatilePrepareResponse() {
        final String METHOD_NAME = "waitForVolatilePrepareResponse";
        final int numParticipants = getVolatileParticipants().size();
        if (volatileParticipantsState == PREPARED_SUCCESS || numParticipants == 0) {
            if (logger.isLogging(Level.FINER)) {
                logger.exiting(METHOD_NAME, "prepared coordId=" + getIdValue() + " state=" +
                        volatileParticipantsState + " numParticipants=" + numParticipants);
            }
            return;
        }

        boolean communicationTimeout = false;  // TODO: resend prepare due to communication timeout. Assume msg lost.
        boolean allPrepared;
        for (int i = 0; i < MAX_WAIT_ITERATION; i++) {
            allPrepared = true;  // assume all prepared until encounter participant is not
            final Iterator<ATParticipant> iter = getVolatileParticipantsSnapshot().iterator();
            while (iter.hasNext()) {
                final ATParticipant participant = iter.next();
                if (isAborting()) {
                    return;
                }
                switch (participant.getState()) {
                    case ACTIVE:
                        // accomodate late registration: volatile 2PC prepare can register new volatile or durable participant.
                        allPrepared = false;
                        try {
                            participant.prepare();
                        } catch (TXException ex) {
                                logger.warning(METHOD_NAME, LocalizationMessages.CAUGHT_TX_EX_DURING_PREPARE_0005(ex.getLocalizedMessage()));
                                setAborting();
                        }
                        break;

                    case PREPARING:
                        allPrepared = false;
                        if (communicationTimeout) {
                            try {
                                participant.prepare();
                            } catch (TXException ex) {
                                logger.warning(METHOD_NAME, LocalizationMessages.CAUGHT_TX_EX_DURING_PREPARE_0005(ex.getLocalizedMessage()));
                                setAborting();
                            }
                        }
                        break;

                    case ABORTING:
                        setAborting();
                        return;

                        // these states indicate a response to prepare request
                    case PREPARED:
                    case PREPARED_SUCCESS:
                    case COMMITTING:
                        break;

                    case NONE:
                    case COMMITTED:
                    case ABORTED:
                    case READONLY:
                        forget(participant);
                        break;
                }
            }
            if (isAborting()) {
                return;
            } else if (allPrepared) {
                volatileParticipantsState = PREPARED_SUCCESS;
                if (logger.isLogging(Level.FINER)) {
                    logger.exiting(METHOD_NAME, "prepared coordId=" + getIdValue() + " state=" +
                            volatileParticipantsState);
                }
                return;
            } else { //wait some before checking again
                try {
                    if (logger.isLogging(Level.FINEST)) {
                        logger.finest(METHOD_NAME, "checking...");
                    }
                    Thread.sleep(WAIT_SLEEP);
                } catch (InterruptedException ex) {
                    logger.warning(METHOD_NAME, ex.getLocalizedMessage());
                }
            }
        }
        if (logger.isLogging(Level.FINE)) {
            dumpParticipantsState(getVolatileParticipantsSnapshot(), KIND.VOLATILE); 
        }
        setAborting();
        if (logger.isLogging(Level.FINER)) {
            logger.warning(METHOD_NAME, LocalizationMessages.TIMEOUT_0006(getIdValue(), volatileParticipantsState));
        }
    }


    /**
     * TODO: Each PREPARED/READONLY Volatile ATParticipant should check if it is time to start
     * the durable 2PC phase by calling this method.
     */
    public void initiateDurablePrepare() {
        final Collection<ATParticipant> ps = getDurableParticipants();
        final int numParticipants = ps == null ? 0 : ps.size();
        if (logger.isLogging(Level.FINEST)) {
            logger.finest("initializeDurablePrepare", " coordId=" + getIdValue() +
                    " numDurableParticipants=" + numParticipants + " volatile participant state=" + volatileParticipantsState +
                    " numVolatileParticipants" + getVolatileParticipants().size());
        }

        // PRE-CONDITION: volatileParticipantState is PREPARED
        if (isAborting()) {
            initiateRollback();
            return;
        }

        assert volatileParticipantsState == PREPARED_SUCCESS || getVolatileParticipants().size() == 0;

        // No outstanding volatile participants at this point.

        // No new participants allowed as soon as durable 2PC begins.
        allowNewParticipants = false;
        durableParticipantsState = PREPARING;
        actionForAllParticipants(getDurableParticipantsSnapshot(), ACTION.PREPARE);
    }

    /**
     * Wait for all Durable participants to respond to prepare.
     * <p/>
     * <p/>
     * Durable participant state is set before this method returns.
     */
    protected void waitForDurablePrepareResponse() {
        // TODO: implement logic to resend prepare due to communication timeout.
        // Assumes prepare request was lost on way to participant OR the participant's response was lost/delayed.
        // boolean communicationTimeout = false;

        boolean allPrepared = false;
        for (int i = 0; i < MAX_WAIT_ITERATION; i++) {
            if (isAborting()) {
                break;
            }
            allPrepared = true;  // assume true until find at least one participant that is not prepared yet.
            for (ATParticipant participant : getDurableParticipantsSnapshot()) {
                switch (participant.getState()) {
                    case PREPARING:
                        allPrepared = false;
                        if (logger.isLogging(Level.FINEST)) {
                            logger.finest("intitatedurableParticipant", "not prepared, readonly or aborted " +
                                getCoordIdPartId(participant) + " state=" + participant.getState());
                        }
                        break;

                    case ACTIVE:
                    case NONE:
                        logger.warning("waitForDurablePrepareResponse",
                                LocalizationMessages.INITIATE_ROLLBACK_0007(this.getCoordIdPartId(participant), participant.getState()));
                        allPrepared = false;
                        setAborting();

                        // TODO: throw illegal state exception
                        assert false;

                    case ABORTING:
                        setAborting();
                        return;

                    // these states indicate a response to prepare request
                    case PREPARED:
                    case PREPARED_SUCCESS:
                    case COMMITTING:
                        break;

                    case ABORTED:
                        setAborting();
                        participant.forget();
                        break;
                        
                    case COMMITTED:
                    case READONLY:
                        participant.forget();
                        break;
                }
            }
            if (isAborting()) {
                return;
            } else if (allPrepared) {
                durableParticipantsState = PREPARED_SUCCESS;
                if (logger.isLogging(Level.FINER)) {
                    logger.exiting("waitForDurablePrepare", "coordId=" + getIdValue() +
                            "state:" + durableParticipantsState);
                }
                return;
            } else { //wait some before checking again
                try {
                    if (logger.isLogging(Level.FINEST)) {
                        logger.finest("waitForDurablePrepare", "checking...");
                    }
                    Thread.sleep(WAIT_SLEEP);
                } catch (InterruptedException ex) {}
            }
        }
        if (logger.isLogging(Level.FINE)) {
            dumpParticipantsState(getDurableParticipantsSnapshot(), KIND.DURABLE); 
        }
        // some participants not prepared still, timing out
        setAborting();
        logger.warning("waitForDurablePrepare", LocalizationMessages.TIMEOUT_0006(getIdValue(), durableParticipantsState));
    }
    
    private void dumpParticipantsState(final Collection<ATParticipant> lst, final KIND kind) {
        final StringBuffer str = new StringBuffer(100);
        str.append(" " + kind.toString() + " ");
        for (ATParticipant p: lst ) {
            str.append("Part: " + p.getIdValue() + " state:" + p.getState());
        }
        if (logger.isLogging(Level.FINE)) {
            logger.fine("dumpParticipantState", "coordId=" + getIdValue() + str);
        }
    }

    public void initiateCommit() {
        initiateVolatileCommit();
        initiateDurableCommit();
    }

    public void initiateDurableCommit() {
        // assert all participants must be in PREPARED, PREPARED_SUCCESS, COMMITTING, READONLY

        // PRE-CONDITION: durableParticipantState is PREPARED
        if (isAborting()) {
            initiateRollback();
            return;
        }
        if (durableParticipantsState != PREPARED_SUCCESS) {
            logger.warning("durableVolatileCommit", LocalizationMessages.UNEXPECTED_STATE_0008(durableParticipantsState));
        }
        durableParticipantsState = COMMITTING;
        guardTimeout = true;
        actionForAllParticipants(getDurableParticipantsSnapshot(), ACTION.COMMIT);
    }

    public void initiateVolatileCommit() {
        // assert all participants must be in PREPARED, PREPARED_SUCCESS, COMMITTING, READONLY

        // PRE-CONDITION: durableParticipantState is PREPARED
        if (isAborting()) {
            initiateRollback();
            return;
        }
        if (volatileParticipantsState != PREPARED_SUCCESS && getVolatileParticipants().size() != 0) {
                logger.warning("initateVolatileCommit", LocalizationMessages.UNEXPECTED_STATE_0008(volatileParticipantsState));
        }

        // No new participants allowed as soon as durable 2PC begins.
        volatileParticipantsState = COMMITTING;
        actionForAllParticipants(getVolatileParticipantsSnapshot(), ACTION.COMMIT);
    }

    public void initiateRollback() {
        initiateVolatileRollback();
        initiateDurableRollback();
    }

    public void initiateDurableRollback() {
        durableParticipantsState = ABORTING;
        for (ATParticipant durableP : getDurableParticipantsSnapshot()) {
            durableP.abort();
        }
    }

    public void initiateVolatileRollback() {
        volatileParticipantsState = ABORTING;
        for (ATParticipant volatileP : getVolatileParticipantsSnapshot()) {
            volatileP.abort();
        }
    }

    /**
     * Register this with TransactionSynchronizationRegistery.  This should get called by JTS
     * transaction system before 2PC Participants and XAResources are prepared.
     */
    public void beforeCompletion() {
        logger.finest("beforeCompletion", "beforeCompletion called for coordId=" + getIdValue());
        if (volatileParticipants.size() != 0) {
            initiateVolatilePrepare();
            waitForVolatilePrepareResponse();
        }
    }

    public void afterCompletion(final int i) {
        logger.finest("afterCompletion", "afterCompletion called for coordId=" + getIdValue());
        forget();
    }

    protected void waitForCommitOrRollbackResponse(final Protocol protocol) {
        // all participants have been committed or rolled back.
        // wait for all outstanding participants to send final notification of wsat COMMITTED or ABORTED.
        // boolean communicationTimeout = false;  // TODO: resend prepare due to communication timeout. Assume msg lost.
        boolean allProcessed;
        
        
        for (int i = 0; i < MAX_WAIT_ITERATION; i++) {
            allProcessed = true;  // assume all committed/aborted until encounter participant is not.
            if (protocol == Protocol.DURABLE) {
                
                for (ATParticipant participant : getDurableParticipantsSnapshot()) {
                    if (participant.getState() == COMMITTED ||
                            participant.getState() == ABORTED ||
                            participant.getState() == READONLY) {
                        participant.forget();
                    } else {
                        allProcessed = false;
                        if (logger.isLogging(Level.FINEST)) {
                            logger.finest("waitForCommitRollback", getCoordIdPartId(participant) + " state:" + participant.getState());
                        }
                        /*  Don't retry aggressively. Have to put communication timeout to retry.
                       if (isAborting()){
                           participant.abort();
                       } else {
                           try {
                               participant.commit();
                           } catch (TXException ex) {
                                logger.warning("waitForCommitOrRollbackResponse", ex.getLocalizedMessage());
                           }
                       }
                         **/
                    }
                }
            } else if (protocol == Protocol.VOLATILE) {
                // best effort to receive committed/aborted from volatile participants. But do not wait for them to send committed.
                allProcessed = true;  // assume all committed/aborted until encounter participant is not.
                for (ATParticipant participant : getVolatileParticipantsSnapshot()) {
                    if (participant.getState() != COMMITTED &&
                            participant.getState() != ABORTED ||
                            participant.getState() != READONLY) {
                        logger.warning("waitForCommitOrRollbackResponse",
                                    LocalizationMessages.FORGETTING_0009(participant.getState(), getCoordIdPartId(participant)));
                        participant.forget();
                    }
                }
            }
            if (allProcessed) {
                guardTimeout = false;
                if (logger.isLogging(Level.FINER)) {
                    logger.exiting("waitForCommitRollback", "coordId=" + getIdValue());
                }
                return;
            } else { //wait some before checking again
                try {
                    if (logger.isLogging(Level.FINEST)) {
                        logger.finest("waitForCommitRollback", "checking...");
                    }
                    Thread.sleep(WAIT_SLEEP);
                } catch (InterruptedException ex) { }
            }
        }
    }

    /**
     * Synchronous prepare request invoked by JTS coordinator as part of its 2PC protocol.
     * <p/>
     * <p>Prepare this coordinator and return result of preparation.
     */
    public int prepare(final Xid xid) throws XAException {
        if (logger.isLogging(Level.FINER)) {
            logger.entering("XAResource_prepare(xid=" + xid + ")");
        }
        int result = 0;
         
        synchronized(this) {
            initiateDurablePrepare();

            // Map asynchonous WS-AT 2PC protocol to XAResource synchronous protocol.
            // Wait for all possible pending responses to prepare message.
            waitForDurablePrepareResponse();  // result in durableParticipantsState: PREPARED, COMMITTED, ABORTING
        }
        
        // check if volatile or durable WS-AT participants aborted
        if (isAborting()) {
            // TODO:  be more specific on XAException error code for why rollback occurred. Using generic code now.
            throw new XAException(XAException.XA_RBROLLBACK);
        } else if (getDurableParticipants().size() == 0 && this.getVolatileParticipants().size() == 0) {
            result = XAResource.XA_RDONLY;
        } else {
            result = XAResource.XA_OK;
        }

        if (logger.isLogging(Level.FINER)) {
            logger.exiting("XAResource_prepare", result);
        }
        return result;
    }

    public void commit(final Xid xid, final boolean onePhase) throws XAException {
        if (logger.isLogging(Level.FINER)) {
            logger.entering("XAResource_commit(xid=" + xid + " ,onePhase=" + onePhase + ")");
        }
   
        int result = 0;
        if (onePhase) {
            // if one phase commit, need to do prepare here
            try {
                result = prepare(xid);
            } catch (XAException e) {
                logger.warning("commit(1PC)", LocalizationMessages.PREPARE_FAILED_0010(e.toString()));
                initiateRollback();
                waitForCommitOrRollbackResponse(Protocol.DURABLE);
                if (logger.isLogging(Level.FINER)) {
                    logger.exiting("XAResource_commit", e);
                }
                throw e;
            }   
        }
        
        // Commit volatile and durable 2PC participants.  No ordering required.
        if (result != XAResource.XA_RDONLY) {
            initiateCommit();
            waitForCommitOrRollbackResponse(Protocol.DURABLE);
            waitForCommitOrRollbackResponse(Protocol.VOLATILE);
        }
       
        if (logger.isLogging(Level.FINER)) {
            logger.exiting("XAResource_commit");
        }   
    }
    
    public void rollback(final Xid xid) throws XAException {
        if (logger.isLogging(Level.FINER)) {
            logger.entering("XAResource_rollback(xid=" + xid + ")");
        }
        // Commit volatile and durable 2PC participants.  No ordering required.
        initiateRollback();
        waitForCommitOrRollbackResponse(Protocol.DURABLE);
        waitForCommitOrRollbackResponse(Protocol.VOLATILE);
        guardTimeout = false;
        if (logger.isLogging(Level.FINER)) {
            logger.exiting("XAResource_rollback");
        }
    }

    public Xid[] recover(final int i) throws XAException {
        throw new UnsupportedOperationException("Not yet implemented");
    }


    public boolean setTransactionTimeout(final int i) throws XAException {
        setExpires(i * 1000L);
        return true;
    }

    public void start(final Xid xid, final int flags) throws XAException {
        // Start transaction hook
    }

    /**
     */
    public void end(final Xid xid, final int flags) throws XAException {
        switch (flags) {
            case TMSUCCESS:
                //TODO
                break;
            case TMSUSPEND:
                // TODO
                break;
            case TMFAIL:
                setAborting();
                break;
        }
    }

    /**
     * forget everything about this transaction.
     * <p/>
     * <p/>
     * Recovers resources held by a transaction.  After a transaction is committed or aborted, it is forgotten.
     */
    public void forget(final Xid xid) throws XAException {
        logger.finest("forget", "XAResource.forget(XID) called for coordId=" + getIdValue());
        forget();
    }

    public int getTransactionTimeout() throws XAException {
        return (int) (getExpires() / 1000L);
    }

    public boolean isSameRM(final XAResource xAResource) throws XAException {
        return false;
    }

    public void prepared(final String participantId) {
        prepared(participantId, null);
    }

    public void prepared(final String participantId, final EndpointReference unknownParticipantReplyEPR) {
        if (logger.isLogging(Level.FINER)) {
            logger.entering("prepared", getCoordIdPartId(participantId));
        }
        final ATParticipant participant = (ATParticipant) getRegistrant(participantId);
        if (participant == null) {
            if (unknownParticipantReplyEPR != null) {
                logger.warning("prepared", LocalizationMessages.UNKNOWN_CORD_OR_PART_0011(getCoordIdPartId(participantId), unknownParticipantReplyEPR));
                
                ATParticipant.getATParticipantWS(unknownParticipantReplyEPR, null, false).rollbackOperation(null);
            }
        } else {
            participant.prepared();
        }
        if (logger.isLogging(Level.FINER)) {
            logger.exiting("prepared", getCoordIdPartId(participantId));
        }
    }

    public void committed(final String participantId) {
        if (logger.isLogging(Level.FINER)) {
            logger.entering("committed", getCoordIdPartId(participantId));
        }
        final ATParticipant participant = (ATParticipant) getRegistrant(participantId);
        if (participant != null) {
            participant.committed();
            participant.forget();
        } else {
            logger.warning("committed", LocalizationMessages.UNKNOWN_PART_0012(getCoordIdPartId(participantId)));
        }
        if (logger.isLogging(Level.FINER)) {
            logger.exiting("committed", getCoordIdPartId(participantId));
        }
    }

    public void readonly(final String participantId) {
        if (logger.isLogging(Level.FINER)) {
            logger.entering("readonly", getCoordIdPartId(participantId));
        }
        final ATParticipant participant = (ATParticipant) getRegistrant(participantId);
        if (participant == null) {
            logger.warning("readonly", LocalizationMessages.UNKNOWN_PART_0012(getCoordIdPartId(participantId)));
        } else {
            participant.readonly();
            participant.forget();
            if (logger.isLogging(Level.FINER)) {
                logger.exiting("readonly", getCoordIdPartId(participantId));
            }
        }
    }

    public void aborted(final String participantId) {
        if (logger.isLogging(Level.FINER)) {
            logger.entering("aborted", getCoordIdPartId(participantId));
        }
        setAborting();
        final ATParticipant participant = (ATParticipant) getRegistrant(participantId);
        if (participant == null) {
            if (logger.isLogging(Level.WARNING)) {
                logger.warning("aborted", LocalizationMessages.UNKNOWN_PART_0012(getCoordIdPartId(participantId)));
            }
        } else {
            participant.aborted();
            participant.forget();
            if (logger.isLogging(Level.FINER)) {
                logger.exiting("aborted", getCoordIdPartId(participantId));
            }
        }
    }

    /**
     * Implement inbound event <i>replay</i> for Atomic Transaction 2PC Protocol(Coordinator View).
     */
    public void replay(final String participantId) {
        final String METHOD_NAME="replay";
        if (logger.isLogging(Level.FINER)) {
            logger.entering(METHOD_NAME, getCoordIdPartId(participantId));
        }

        final ATParticipant participant = (ATParticipant) getRegistrant(participantId);
        final ATParticipant.STATE state = participant.getState();
        switch (state) {
            case NONE:
                if (participant.isDurable()) {
                    participant.abort();
                } else {  // participant.isVolatile()
                    // TODO: Invalid State. Send back an invalid state fault.
                    logger.severe(METHOD_NAME, LocalizationMessages.INVALID_STATE_0013(getCoordIdPartId(participantId)));
                }
                break;
                
            case ACTIVE:
            case PREPARING:
            case ABORTING:
                participant.abort();
                break;
                
            case COMMITTING:
                try {
                    participant.commit();
                } catch (TXException ex) {
                    logger.warning(METHOD_NAME, ex.getLocalizedMessage());
                }
                break;
            case PREPARED:
            case PREPARED_SUCCESS:
                // nothing to do for all other cases.
        }
        if (logger.isLogging(Level.FINER)) {
            logger.exiting(METHOD_NAME, getCoordIdPartId(participantId));
        }
    }

    volatile private boolean registeredSynchronization = false;

    /**
     * Register interposed synchronization for this instance.
     * <p/>
     * Initial volatile participant registration triggers this registration.
     */
    private void registerSynchronization() {
        if (!registeredSynchronization) {
            registeredSynchronization = true;
            TransactionManagerImpl.getInstance().registerSynchronization(this);
            if (logger.isLogging(Level.FINEST)) {
                logger.finest("registerSynchronization", "Synchronization registered for WS-AT coordinated activity " + this.getIdValue());
            }
        }
    }

    public boolean isSubordinateCoordinator() {
        return false;
    }

    public EndpointReference getParentCoordinatorRegistrationEPR() {
        if (getContext() == null) {
            return null;
        } else {
            return getContext().getRootRegistrationService();
        }
    }

    static private WSATCoordinator wsatCoordinatorService = new WSATCoordinator();

    public static WSATCoordinator getWSATCoordinatorService() {
        return wsatCoordinatorService;
    }

    protected String getCoordIdPartId(final Registrant registrant) {
        return getCoordIdPartId(registrant.getIdValue());
    }

    protected String getCoordIdPartId(final String participantId) {
        return " coordId=" + getIdValue() + " partId:" + participantId + " ";
    }

    public void forget(final ATParticipant part) {
        forget(part.getIdValue());
    }

    public void forget(final String partId) {
        ATParticipant removed = volatileParticipants.remove(partId);
        if (removed != null) {
            if (logger.isLogging(Level.FINE)) {
                logger.fine("forget", "forgot volatile participant " + getCoordIdPartId(partId));
            }
            if (!hasOutstandingParticipants()) {
                forget();
            }
            return;
        }
        removed = durableParticipants.remove(partId);
        if (removed != null) {
            if (logger.isLogging(Level.FINE)) {
                logger.fine("forget", "forgot durable participant " + getCoordIdPartId(partId));
            } 
            if (!hasOutstandingParticipants()) {
                forget();
            }
            return;
        }
        /*
         * TODO: implement if optional completion is ever supported.
        if ((completionRegistrant != null) && (completionRegistrant.getId().equals(id))) {
            r = completionRegistrant;
        }
        */
    }

    static public final EndpointReference localCoordinatorProtocolService;
    
    static {
         MemberSubmissionEndpointReference epr = new MemberSubmissionEndpointReference();
         epr.addr = new MemberSubmissionEndpointReference.Address();
         epr.addr.uri = localCoordinationProtocolServiceURI.toString();
         localCoordinatorProtocolService = epr;
    }

    public EndpointReference getCoordinatorProtocolServiceForRegistrant(final Registrant r) {
        return localCoordinatorProtocolService;
    }

    /**
     * Return false if it is okay to rollback the transaction.
     * Do not allow transaction expiration after Phase 2 begins.
     */
    public boolean expirationGuard() {
        synchronized (this) {
            return guardTimeout;
        }
    }
    
    @Override 
    public void expire() {
        if (!expirationGuard()) {
            setAborting();
        }
        super.expire();
    }

    @Override
    public void forget() {
        synchronized(this) {
            if (forgotten) {
                return;
            } else {
                forgotten = true;
            }
            for (ATParticipant participant : getDurableParticipantsSnapshot()) {
                participant.forget();
            }
            for (ATParticipant participant : getVolatileParticipantsSnapshot()) {
                participant.forget();
            }
            super.forget();
        }
    }
    
     public void resumeTransaction() throws WebServiceException {
        if (transaction != null) {
            try {
                tm.resume(transaction);
                logger.finest("resumeTransaction", "successfully resumed txn " + transaction);
            } catch (Exception ex) {
                String handlerMsg = LocalizationMessages.TXN_MGR_RESUME_FAILED_0032(transaction.toString());
                logger.warning("resumeTransaction", handlerMsg, ex);
                throw new WebServiceException(handlerMsg, ex);
            }
        }
    }
    
    public Transaction suspendTransaction() {
        Transaction tx = null;
        try {
            tx = tm.suspend();
            logger.finest("suspendTransation", tx == null ? "no txn to suspend" : "suspended txn " + tx.toString());
            return tx;
        } catch (SystemException ex) {
            String handlerMsg = LocalizationMessages.TXN_MGR_OPERATION_FAILED_0031("suspend");
            logger.warning("suspendTransaction", handlerMsg, ex);
            return tx;
        }
    }
   
    public boolean hasOutstandingParticipants() {
        return getDurableParticipants().size() != 0 || getVolatileParticipants().size() != 0;
    }    
}