Methods Summary |
---|
public void | aborted(java.lang.String participantId)
if (logger.isLogging(Level.FINER)) {
logger.entering("aborted", getCoordIdPartId(participantId));
}
setAborting();
final ATParticipant participant = (ATParticipant) getRegistrant(participantId);
if (participant == null) {
if (logger.isLogging(Level.WARNING)) {
logger.warning("aborted", LocalizationMessages.UNKNOWN_PART_0012(getCoordIdPartId(participantId)));
}
} else {
participant.aborted();
participant.forget();
if (logger.isLogging(Level.FINER)) {
logger.exiting("aborted", getCoordIdPartId(participantId));
}
}
|
private void | actionForAllParticipants(java.util.Collection particpants, com.sun.xml.ws.tx.at.ATCoordinator$ACTION action)
for (ATParticipant participant : particpants) {
switch (action) {
case PREPARE:
try {
participant.prepare();
} catch (TXException ex) {
setAborting();
return;
}
break;
case COMMIT:
try {
participant.commit();
} catch (TXException ex) {
setAborting();
return;
}
break;
case ROLLBACK:
participant.abort();
break;
default:
break;
}
}
|
public void | addRegistrant(com.sun.xml.ws.tx.coordinator.Registrant registrant, javax.xml.ws.WebServiceContext wsContext)Add the specified Registrant to the list of registrants for this
coordinated activity.
if (!allowNewParticipants) {
// send fault S4.1 ws:coor Invalid State
if(wsContext != null) {
WsaHelper.sendFault(
wsContext,
SOAPVersion.SOAP_11,
TxFault.InvalidState,
"Invalid to register a new participant after the first durable participant is prepared. Registrant id: " + // no I18N - spec requires xml:lang="en"
registrant.getIdValue());
}
throw new IllegalStateException(LocalizationMessages.LATE_PARTICIPANT_REGISTRATION_0002());
}
// TODO: check for duplicate registration and send fault S4.6 ws:coor Already Registered
switch (registrant.getProtocol()) {
case COMPLETION:
// Unimplemented OPTIONAL functionality.
// TODO: do we need to see if this field is already set?
// TODO: disallow if subordinate coordinator
completionRegistrant = (ATCompletion) registrant;
break;
case DURABLE:
logger.fine("ATCoordinator.addRegistrant", getCoordIdPartId(registrant));
durableParticipants.put(registrant.getIdValue(), (ATParticipant) registrant);
break;
case VOLATILE:
volatileParticipants.put(registrant.getIdValue(), (ATParticipant) registrant);
break;
default:
throw new UnsupportedOperationException(
LocalizationMessages.UNKNOWN_PROTOCOL_0003((registrant.getProtocol().getUri())));
}
|
public void | afterCompletion(int i)
logger.finest("afterCompletion", "afterCompletion called for coordId=" + getIdValue());
forget();
|
public void | beforeCompletion()Register this with TransactionSynchronizationRegistery. This should get called by JTS
transaction system before 2PC Participants and XAResources are prepared.
logger.finest("beforeCompletion", "beforeCompletion called for coordId=" + getIdValue());
if (volatileParticipants.size() != 0) {
initiateVolatilePrepare();
waitForVolatilePrepareResponse();
}
|
public void | commit(javax.transaction.xa.Xid xid, boolean onePhase)
if (logger.isLogging(Level.FINER)) {
logger.entering("XAResource_commit(xid=" + xid + " ,onePhase=" + onePhase + ")");
}
int result = 0;
if (onePhase) {
// if one phase commit, need to do prepare here
try {
result = prepare(xid);
} catch (XAException e) {
logger.warning("commit(1PC)", LocalizationMessages.PREPARE_FAILED_0010(e.toString()));
initiateRollback();
waitForCommitOrRollbackResponse(Protocol.DURABLE);
if (logger.isLogging(Level.FINER)) {
logger.exiting("XAResource_commit", e);
}
throw e;
}
}
// Commit volatile and durable 2PC participants. No ordering required.
if (result != XAResource.XA_RDONLY) {
initiateCommit();
waitForCommitOrRollbackResponse(Protocol.DURABLE);
waitForCommitOrRollbackResponse(Protocol.VOLATILE);
}
if (logger.isLogging(Level.FINER)) {
logger.exiting("XAResource_commit");
}
|
public void | committed(java.lang.String participantId)
if (logger.isLogging(Level.FINER)) {
logger.entering("committed", getCoordIdPartId(participantId));
}
final ATParticipant participant = (ATParticipant) getRegistrant(participantId);
if (participant != null) {
participant.committed();
participant.forget();
} else {
logger.warning("committed", LocalizationMessages.UNKNOWN_PART_0012(getCoordIdPartId(participantId)));
}
if (logger.isLogging(Level.FINER)) {
logger.exiting("committed", getCoordIdPartId(participantId));
}
|
private void | dumpParticipantsState(java.util.Collection lst, com.sun.xml.ws.tx.at.ATCoordinator$KIND kind)
final StringBuffer str = new StringBuffer(100);
str.append(" " + kind.toString() + " ");
for (ATParticipant p: lst ) {
str.append("Part: " + p.getIdValue() + " state:" + p.getState());
}
if (logger.isLogging(Level.FINE)) {
logger.fine("dumpParticipantState", "coordId=" + getIdValue() + str);
}
|
public void | end(javax.transaction.xa.Xid xid, int flags)
switch (flags) {
case TMSUCCESS:
//TODO
break;
case TMSUSPEND:
// TODO
break;
case TMFAIL:
setAborting();
break;
}
|
public boolean | expirationGuard()Return false if it is okay to rollback the transaction.
Do not allow transaction expiration after Phase 2 begins.
synchronized (this) {
return guardTimeout;
}
|
public void | expire()
if (!expirationGuard()) {
setAborting();
}
super.expire();
|
public void | forget(javax.transaction.xa.Xid xid)forget everything about this transaction.
Recovers resources held by a transaction. After a transaction is committed or aborted, it is forgotten.
logger.finest("forget", "XAResource.forget(XID) called for coordId=" + getIdValue());
forget();
|
public void | forget(ATParticipant part)
forget(part.getIdValue());
|
public void | forget(java.lang.String partId)
ATParticipant removed = volatileParticipants.remove(partId);
if (removed != null) {
if (logger.isLogging(Level.FINE)) {
logger.fine("forget", "forgot volatile participant " + getCoordIdPartId(partId));
}
if (!hasOutstandingParticipants()) {
forget();
}
return;
}
removed = durableParticipants.remove(partId);
if (removed != null) {
if (logger.isLogging(Level.FINE)) {
logger.fine("forget", "forgot durable participant " + getCoordIdPartId(partId));
}
if (!hasOutstandingParticipants()) {
forget();
}
return;
}
/*
* TODO: implement if optional completion is ever supported.
if ((completionRegistrant != null) && (completionRegistrant.getId().equals(id))) {
r = completionRegistrant;
}
*/
|
public void | forget()
synchronized(this) {
if (forgotten) {
return;
} else {
forgotten = true;
}
for (ATParticipant participant : getDurableParticipantsSnapshot()) {
participant.forget();
}
for (ATParticipant participant : getVolatileParticipantsSnapshot()) {
participant.forget();
}
super.forget();
}
|
public ATCompletion | getCompletionRegistrant()Get the completion registrant.
return completionRegistrant;
|
protected java.lang.String | getCoordIdPartId(com.sun.xml.ws.tx.coordinator.Registrant registrant)
return getCoordIdPartId(registrant.getIdValue());
|
protected java.lang.String | getCoordIdPartId(java.lang.String participantId)
return " coordId=" + getIdValue() + " partId:" + participantId + " ";
|
public javax.xml.ws.EndpointReference | getCoordinatorProtocolServiceForRegistrant(com.sun.xml.ws.tx.coordinator.Registrant r)
MemberSubmissionEndpointReference epr = new MemberSubmissionEndpointReference();
epr.addr = new MemberSubmissionEndpointReference.Address();
epr.addr.uri = localCoordinationProtocolServiceURI.toString();
localCoordinatorProtocolService = epr;
return localCoordinatorProtocolService;
|
public java.util.Collection | getDurableParticipants()Return a Collection of durable 2pc participants.
This Collection is modifiable.
return durableParticipants.values();
|
public java.util.Collection | getDurableParticipantsSnapshot()
return new ArrayList<ATParticipant>(durableParticipants.values());
|
public javax.xml.ws.EndpointReference | getParentCoordinatorRegistrationEPR()
if (getContext() == null) {
return null;
} else {
return getContext().getRootRegistrationService();
}
|
public com.sun.xml.ws.tx.coordinator.Registrant | getRegistrant(java.lang.String id)Get the registrant with the specified id or null if it does not exist.
Registrant r = volatileParticipants.get(id);
if (r == null) {
r = durableParticipants.get(id);
}
if ((r == null) && (completionRegistrant != null) &&
(completionRegistrant.getId().getValue().equals(id))) {
r = completionRegistrant;
}
return r;
|
public java.util.List | getRegistrants()Get the list of {@link com.sun.xml.ws.tx.coordinator.Registrant}s for this coordinated activity.
The returned list is unmodifiable (read-only). Add new Registrants
with the {@link #addRegistrant(com.sun.xml.ws.tx.coordinator.Registrant, javax.xml.ws.WebServiceContext)} api instead.
final ArrayList<Registrant> list;
if (completionRegistrant != null) {
list = new ArrayList<Registrant>(volatileParticipants.size() + durableParticipants.size() + 1);
} else {
list = new ArrayList<Registrant>(volatileParticipants.size() + durableParticipants.size());
}
list.addAll(volatileParticipants.values());
list.addAll(durableParticipants.values());
if (completionRegistrant != null) {
list.add(completionRegistrant);
}
return Collections.unmodifiableList(list);
|
public javax.transaction.Transaction | getTransaction()
return transaction;
|
public int | getTransactionTimeout()
return (int) (getExpires() / 1000L);
|
public java.util.Collection | getVolatileParticipants()Return a Collection of volatile 2pc participants.
This Collection is modifiable.
return volatileParticipants.values();
|
public java.util.Collection | getVolatileParticipantsSnapshot()
return new ArrayList<ATParticipant>(volatileParticipants.values());
|
public static com.sun.xml.ws.tx.webservice.member.at.WSATCoordinator | getWSATCoordinatorService()
return wsatCoordinatorService;
|
public boolean | hasOutstandingParticipants()
return getDurableParticipants().size() != 0 || getVolatileParticipants().size() != 0;
|
public void | initiateCommit()
initiateVolatileCommit();
initiateDurableCommit();
|
public void | initiateDurableCommit()
// assert all participants must be in PREPARED, PREPARED_SUCCESS, COMMITTING, READONLY
// PRE-CONDITION: durableParticipantState is PREPARED
if (isAborting()) {
initiateRollback();
return;
}
if (durableParticipantsState != PREPARED_SUCCESS) {
logger.warning("durableVolatileCommit", LocalizationMessages.UNEXPECTED_STATE_0008(durableParticipantsState));
}
durableParticipantsState = COMMITTING;
guardTimeout = true;
actionForAllParticipants(getDurableParticipantsSnapshot(), ACTION.COMMIT);
|
public void | initiateDurablePrepare()TODO: Each PREPARED/READONLY Volatile ATParticipant should check if it is time to start
the durable 2PC phase by calling this method.
final Collection<ATParticipant> ps = getDurableParticipants();
final int numParticipants = ps == null ? 0 : ps.size();
if (logger.isLogging(Level.FINEST)) {
logger.finest("initializeDurablePrepare", " coordId=" + getIdValue() +
" numDurableParticipants=" + numParticipants + " volatile participant state=" + volatileParticipantsState +
" numVolatileParticipants" + getVolatileParticipants().size());
}
// PRE-CONDITION: volatileParticipantState is PREPARED
if (isAborting()) {
initiateRollback();
return;
}
assert volatileParticipantsState == PREPARED_SUCCESS || getVolatileParticipants().size() == 0;
// No outstanding volatile participants at this point.
// No new participants allowed as soon as durable 2PC begins.
allowNewParticipants = false;
durableParticipantsState = PREPARING;
actionForAllParticipants(getDurableParticipantsSnapshot(), ACTION.PREPARE);
|
public void | initiateDurableRollback()
durableParticipantsState = ABORTING;
for (ATParticipant durableP : getDurableParticipantsSnapshot()) {
durableP.abort();
}
|
public void | initiateRollback()
initiateVolatileRollback();
initiateDurableRollback();
|
public void | initiateVolatileCommit()
// assert all participants must be in PREPARED, PREPARED_SUCCESS, COMMITTING, READONLY
// PRE-CONDITION: durableParticipantState is PREPARED
if (isAborting()) {
initiateRollback();
return;
}
if (volatileParticipantsState != PREPARED_SUCCESS && getVolatileParticipants().size() != 0) {
logger.warning("initateVolatileCommit", LocalizationMessages.UNEXPECTED_STATE_0008(volatileParticipantsState));
}
// No new participants allowed as soon as durable 2PC begins.
volatileParticipantsState = COMMITTING;
actionForAllParticipants(getVolatileParticipantsSnapshot(), ACTION.COMMIT);
|
public void | initiateVolatilePrepare()Send 2PC prepare to all volatile participants
Volatile 2PC prepare constraint from 2004 WS-AT, section 3.3.1
the root coordinator begins the prepare phase of all participants registered for the Volatile 2PC protocol.
All participants registered for this protocol must respond before a Prepare is issued to a
participant registered for Durable 2PC. Further participants may register with the coordinator until the
coordinator issues a Prepare to any durable participant.
// send prepare to all volatile participants before durable participants.
// Section 3.3.1
if (isAborting()) {
initiateRollback();
return;
}
volatileParticipantsState = PREPARING;
actionForAllParticipants(getVolatileParticipantsSnapshot(), ACTION.PREPARE);
|
public void | initiateVolatileRollback()
volatileParticipantsState = ABORTING;
for (ATParticipant volatileP : getVolatileParticipantsSnapshot()) {
volatileP.abort();
}
|
boolean | isAborting()
return volatileParticipantsState == ABORTING || durableParticipantsState == ABORTING;
|
public boolean | isSameRM(javax.transaction.xa.XAResource xAResource)
return false;
|
public boolean | isSubordinateCoordinator()
return false;
|
public int | prepare(javax.transaction.xa.Xid xid)Synchronous prepare request invoked by JTS coordinator as part of its 2PC protocol.
Prepare this coordinator and return result of preparation.
if (logger.isLogging(Level.FINER)) {
logger.entering("XAResource_prepare(xid=" + xid + ")");
}
int result = 0;
synchronized(this) {
initiateDurablePrepare();
// Map asynchonous WS-AT 2PC protocol to XAResource synchronous protocol.
// Wait for all possible pending responses to prepare message.
waitForDurablePrepareResponse(); // result in durableParticipantsState: PREPARED, COMMITTED, ABORTING
}
// check if volatile or durable WS-AT participants aborted
if (isAborting()) {
// TODO: be more specific on XAException error code for why rollback occurred. Using generic code now.
throw new XAException(XAException.XA_RBROLLBACK);
} else if (getDurableParticipants().size() == 0 && this.getVolatileParticipants().size() == 0) {
result = XAResource.XA_RDONLY;
} else {
result = XAResource.XA_OK;
}
if (logger.isLogging(Level.FINER)) {
logger.exiting("XAResource_prepare", result);
}
return result;
|
public void | prepared(java.lang.String participantId)
prepared(participantId, null);
|
public void | prepared(java.lang.String participantId, javax.xml.ws.EndpointReference unknownParticipantReplyEPR)
if (logger.isLogging(Level.FINER)) {
logger.entering("prepared", getCoordIdPartId(participantId));
}
final ATParticipant participant = (ATParticipant) getRegistrant(participantId);
if (participant == null) {
if (unknownParticipantReplyEPR != null) {
logger.warning("prepared", LocalizationMessages.UNKNOWN_CORD_OR_PART_0011(getCoordIdPartId(participantId), unknownParticipantReplyEPR));
ATParticipant.getATParticipantWS(unknownParticipantReplyEPR, null, false).rollbackOperation(null);
}
} else {
participant.prepared();
}
if (logger.isLogging(Level.FINER)) {
logger.exiting("prepared", getCoordIdPartId(participantId));
}
|
public void | readonly(java.lang.String participantId)
if (logger.isLogging(Level.FINER)) {
logger.entering("readonly", getCoordIdPartId(participantId));
}
final ATParticipant participant = (ATParticipant) getRegistrant(participantId);
if (participant == null) {
logger.warning("readonly", LocalizationMessages.UNKNOWN_PART_0012(getCoordIdPartId(participantId)));
} else {
participant.readonly();
participant.forget();
if (logger.isLogging(Level.FINER)) {
logger.exiting("readonly", getCoordIdPartId(participantId));
}
}
|
public javax.transaction.xa.Xid[] | recover(int i)
throw new UnsupportedOperationException("Not yet implemented");
|
private void | registerSynchronization()Register interposed synchronization for this instance.
Initial volatile participant registration triggers this registration.
if (!registeredSynchronization) {
registeredSynchronization = true;
TransactionManagerImpl.getInstance().registerSynchronization(this);
if (logger.isLogging(Level.FINEST)) {
logger.finest("registerSynchronization", "Synchronization registered for WS-AT coordinated activity " + this.getIdValue());
}
}
|
protected boolean | registerWithDurableParent()Enlist with parent of ATCoordinator which is JTA transaction manager.
boolean result = false;
if (getTransaction() != null) {
result = getTransaction().enlistResource(this);
}
return result;
|
protected void | registerWithVolatileParent()
registerSynchronization();
|
public void | removeRegistrant(java.lang.String id)
forget(id);
|
public void | replay(java.lang.String participantId)Implement inbound event replay for Atomic Transaction 2PC Protocol(Coordinator View).
final String METHOD_NAME="replay";
if (logger.isLogging(Level.FINER)) {
logger.entering(METHOD_NAME, getCoordIdPartId(participantId));
}
final ATParticipant participant = (ATParticipant) getRegistrant(participantId);
final ATParticipant.STATE state = participant.getState();
switch (state) {
case NONE:
if (participant.isDurable()) {
participant.abort();
} else { // participant.isVolatile()
// TODO: Invalid State. Send back an invalid state fault.
logger.severe(METHOD_NAME, LocalizationMessages.INVALID_STATE_0013(getCoordIdPartId(participantId)));
}
break;
case ACTIVE:
case PREPARING:
case ABORTING:
participant.abort();
break;
case COMMITTING:
try {
participant.commit();
} catch (TXException ex) {
logger.warning(METHOD_NAME, ex.getLocalizedMessage());
}
break;
case PREPARED:
case PREPARED_SUCCESS:
// nothing to do for all other cases.
}
if (logger.isLogging(Level.FINER)) {
logger.exiting(METHOD_NAME, getCoordIdPartId(participantId));
}
|
public void | resumeTransaction()
if (transaction != null) {
try {
tm.resume(transaction);
logger.finest("resumeTransaction", "successfully resumed txn " + transaction);
} catch (Exception ex) {
String handlerMsg = LocalizationMessages.TXN_MGR_RESUME_FAILED_0032(transaction.toString());
logger.warning("resumeTransaction", handlerMsg, ex);
throw new WebServiceException(handlerMsg, ex);
}
}
|
public void | rollback(javax.transaction.xa.Xid xid)
if (logger.isLogging(Level.FINER)) {
logger.entering("XAResource_rollback(xid=" + xid + ")");
}
// Commit volatile and durable 2PC participants. No ordering required.
initiateRollback();
waitForCommitOrRollbackResponse(Protocol.DURABLE);
waitForCommitOrRollbackResponse(Protocol.VOLATILE);
guardTimeout = false;
if (logger.isLogging(Level.FINER)) {
logger.exiting("XAResource_rollback");
}
|
void | setAborting()
volatileParticipantsState = ABORTING;
durableParticipantsState = ABORTING;
|
public void | setTransaction(javax.transaction.Transaction txn)Set once field.
transaction = txn;
if (txn == null) {
return;
}
try {
if (! this.isSubordinateCoordinator()) {
// see #beforeCompletion and #afterCompletion for what this does.
// NEVER to be used for subordinate coordiator.
registerSynchronization();
}
// MUST register synchronization BEFORE next line that
// causes local transaction to upgrade to JTS txn in glassfish.
// (Otherwise registerSynchronization with local txn even though JTS transaction exists.
// Bug appears as beforeCompletion and afterCompletion never get called due to
// mis-registration.)
registerWithDurableParent();
} catch (SystemException ex) {
logger.severe("setTransaction", LocalizationMessages.XA_REGISTER_0004(ex.getLocalizedMessage()));
// TODO: link and rethrow
} catch (IllegalStateException ex) {
logger.severe("setTransaction", LocalizationMessages.XA_REGISTER_0004(ex.getLocalizedMessage()));
// TODO: link and rethrow
} catch (RollbackException ex) {
logger.severe("setTransaction", LocalizationMessages.XA_REGISTER_0004(ex.getLocalizedMessage()));
// TODO: link and rethrow
}
|
public boolean | setTransactionTimeout(int i)
setExpires(i * 1000L);
return true;
|
public void | start(javax.transaction.xa.Xid xid, int flags)
// Start transaction hook
|
public javax.transaction.Transaction | suspendTransaction()
Transaction tx = null;
try {
tx = tm.suspend();
logger.finest("suspendTransation", tx == null ? "no txn to suspend" : "suspended txn " + tx.toString());
return tx;
} catch (SystemException ex) {
String handlerMsg = LocalizationMessages.TXN_MGR_OPERATION_FAILED_0031("suspend");
logger.warning("suspendTransaction", handlerMsg, ex);
return tx;
}
|
protected void | waitForCommitOrRollbackResponse(com.sun.xml.ws.api.tx.Protocol protocol)
// all participants have been committed or rolled back.
// wait for all outstanding participants to send final notification of wsat COMMITTED or ABORTED.
// boolean communicationTimeout = false; // TODO: resend prepare due to communication timeout. Assume msg lost.
boolean allProcessed;
for (int i = 0; i < MAX_WAIT_ITERATION; i++) {
allProcessed = true; // assume all committed/aborted until encounter participant is not.
if (protocol == Protocol.DURABLE) {
for (ATParticipant participant : getDurableParticipantsSnapshot()) {
if (participant.getState() == COMMITTED ||
participant.getState() == ABORTED ||
participant.getState() == READONLY) {
participant.forget();
} else {
allProcessed = false;
if (logger.isLogging(Level.FINEST)) {
logger.finest("waitForCommitRollback", getCoordIdPartId(participant) + " state:" + participant.getState());
}
/* Don't retry aggressively. Have to put communication timeout to retry.
if (isAborting()){
participant.abort();
} else {
try {
participant.commit();
} catch (TXException ex) {
logger.warning("waitForCommitOrRollbackResponse", ex.getLocalizedMessage());
}
}
**/
}
}
} else if (protocol == Protocol.VOLATILE) {
// best effort to receive committed/aborted from volatile participants. But do not wait for them to send committed.
allProcessed = true; // assume all committed/aborted until encounter participant is not.
for (ATParticipant participant : getVolatileParticipantsSnapshot()) {
if (participant.getState() != COMMITTED &&
participant.getState() != ABORTED ||
participant.getState() != READONLY) {
logger.warning("waitForCommitOrRollbackResponse",
LocalizationMessages.FORGETTING_0009(participant.getState(), getCoordIdPartId(participant)));
participant.forget();
}
}
}
if (allProcessed) {
guardTimeout = false;
if (logger.isLogging(Level.FINER)) {
logger.exiting("waitForCommitRollback", "coordId=" + getIdValue());
}
return;
} else { //wait some before checking again
try {
if (logger.isLogging(Level.FINEST)) {
logger.finest("waitForCommitRollback", "checking...");
}
Thread.sleep(WAIT_SLEEP);
} catch (InterruptedException ex) { }
}
}
|
protected void | waitForDurablePrepareResponse()Wait for all Durable participants to respond to prepare.
Durable participant state is set before this method returns.
// TODO: implement logic to resend prepare due to communication timeout.
// Assumes prepare request was lost on way to participant OR the participant's response was lost/delayed.
// boolean communicationTimeout = false;
boolean allPrepared = false;
for (int i = 0; i < MAX_WAIT_ITERATION; i++) {
if (isAborting()) {
break;
}
allPrepared = true; // assume true until find at least one participant that is not prepared yet.
for (ATParticipant participant : getDurableParticipantsSnapshot()) {
switch (participant.getState()) {
case PREPARING:
allPrepared = false;
if (logger.isLogging(Level.FINEST)) {
logger.finest("intitatedurableParticipant", "not prepared, readonly or aborted " +
getCoordIdPartId(participant) + " state=" + participant.getState());
}
break;
case ACTIVE:
case NONE:
logger.warning("waitForDurablePrepareResponse",
LocalizationMessages.INITIATE_ROLLBACK_0007(this.getCoordIdPartId(participant), participant.getState()));
allPrepared = false;
setAborting();
// TODO: throw illegal state exception
assert false;
case ABORTING:
setAborting();
return;
// these states indicate a response to prepare request
case PREPARED:
case PREPARED_SUCCESS:
case COMMITTING:
break;
case ABORTED:
setAborting();
participant.forget();
break;
case COMMITTED:
case READONLY:
participant.forget();
break;
}
}
if (isAborting()) {
return;
} else if (allPrepared) {
durableParticipantsState = PREPARED_SUCCESS;
if (logger.isLogging(Level.FINER)) {
logger.exiting("waitForDurablePrepare", "coordId=" + getIdValue() +
"state:" + durableParticipantsState);
}
return;
} else { //wait some before checking again
try {
if (logger.isLogging(Level.FINEST)) {
logger.finest("waitForDurablePrepare", "checking...");
}
Thread.sleep(WAIT_SLEEP);
} catch (InterruptedException ex) {}
}
}
if (logger.isLogging(Level.FINE)) {
dumpParticipantsState(getDurableParticipantsSnapshot(), KIND.DURABLE);
}
// some participants not prepared still, timing out
setAborting();
logger.warning("waitForDurablePrepare", LocalizationMessages.TIMEOUT_0006(getIdValue(), durableParticipantsState));
|
protected void | waitForVolatilePrepareResponse()Wait for all volatile participants to respond to prepare.
Volatile participant state is set before this method returns.
final String METHOD_NAME = "waitForVolatilePrepareResponse";
final int numParticipants = getVolatileParticipants().size();
if (volatileParticipantsState == PREPARED_SUCCESS || numParticipants == 0) {
if (logger.isLogging(Level.FINER)) {
logger.exiting(METHOD_NAME, "prepared coordId=" + getIdValue() + " state=" +
volatileParticipantsState + " numParticipants=" + numParticipants);
}
return;
}
boolean communicationTimeout = false; // TODO: resend prepare due to communication timeout. Assume msg lost.
boolean allPrepared;
for (int i = 0; i < MAX_WAIT_ITERATION; i++) {
allPrepared = true; // assume all prepared until encounter participant is not
final Iterator<ATParticipant> iter = getVolatileParticipantsSnapshot().iterator();
while (iter.hasNext()) {
final ATParticipant participant = iter.next();
if (isAborting()) {
return;
}
switch (participant.getState()) {
case ACTIVE:
// accomodate late registration: volatile 2PC prepare can register new volatile or durable participant.
allPrepared = false;
try {
participant.prepare();
} catch (TXException ex) {
logger.warning(METHOD_NAME, LocalizationMessages.CAUGHT_TX_EX_DURING_PREPARE_0005(ex.getLocalizedMessage()));
setAborting();
}
break;
case PREPARING:
allPrepared = false;
if (communicationTimeout) {
try {
participant.prepare();
} catch (TXException ex) {
logger.warning(METHOD_NAME, LocalizationMessages.CAUGHT_TX_EX_DURING_PREPARE_0005(ex.getLocalizedMessage()));
setAborting();
}
}
break;
case ABORTING:
setAborting();
return;
// these states indicate a response to prepare request
case PREPARED:
case PREPARED_SUCCESS:
case COMMITTING:
break;
case NONE:
case COMMITTED:
case ABORTED:
case READONLY:
forget(participant);
break;
}
}
if (isAborting()) {
return;
} else if (allPrepared) {
volatileParticipantsState = PREPARED_SUCCESS;
if (logger.isLogging(Level.FINER)) {
logger.exiting(METHOD_NAME, "prepared coordId=" + getIdValue() + " state=" +
volatileParticipantsState);
}
return;
} else { //wait some before checking again
try {
if (logger.isLogging(Level.FINEST)) {
logger.finest(METHOD_NAME, "checking...");
}
Thread.sleep(WAIT_SLEEP);
} catch (InterruptedException ex) {
logger.warning(METHOD_NAME, ex.getLocalizedMessage());
}
}
}
if (logger.isLogging(Level.FINE)) {
dumpParticipantsState(getVolatileParticipantsSnapshot(), KIND.VOLATILE);
}
setAborting();
if (logger.isLogging(Level.FINER)) {
logger.warning(METHOD_NAME, LocalizationMessages.TIMEOUT_0006(getIdValue(), volatileParticipantsState));
}
|