/* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.tm; import org.jboss.logging.Logger; import org.jboss.util.timeout.Timeout; import org.jboss.util.timeout.TimeoutFactory; import org.jboss.util.timeout.TimeoutTarget; import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicRollbackException; import javax.transaction.RollbackException; import javax.transaction.Status; import javax.transaction.Synchronization; import javax.transaction.SystemException; import javax.transaction.Transaction; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.ArrayList; /** * Our Transaction implementation. * * @see TxManager * * @author Rickard Öberg * @author Marc Fleury * @author Ole Husgaard * @author Toby Allsopp * @author Jason Dillon * @author David Jencks * @author Bill Burke * @version $Revision: 1.5.2.16 $ */ class TransactionImpl implements Transaction, TimeoutTarget { // Constants ----------------------------------------------------- /** * Code meaning "no heuristics seen", * must not be XAException.XA_HEURxxx */ private static final int HEUR_NONE = XAException.XA_RETRY; // Resource states private final static int RS_NEW = 0; // not yet enlisted private final static int RS_ENLISTED = 1; // enlisted private final static int RS_SUSPENDED = 2; // suspended private final static int RS_ENDED = 3; // not associated private final static int RS_VOTE_READONLY = 4; // voted read-only private final static int RS_VOTE_OK = 5; // voted ok private final static int RS_FORGOT = 6; // RM has forgotten // Attributes ---------------------------------------------------- /** Class logger, we don't want a new logger with every transaction. */ private static Logger log = Logger.getLogger(TransactionImpl.class); /** True if trace messages should be logged. */ private boolean trace = log.isTraceEnabled(); /** The ID of this transaction. */ private Xid xid; private ArrayList threads = new ArrayList(1); private HashMap transactionLocalMap = new HashMap(); private Throwable cause; /** * The global ID of this transaction. * This is used as a transaction propagation context, and in the * TxManager for mapping transaction IDs to transactions. */ private GlobalId globalId; /** * The synchronizations to call back. */ private Synchronization[] sync = new Synchronization[3]; /** * Size of allocated synchronization array. */ private int syncAllocSize = 3; /** * Count of synchronizations for this transaction. */ private int syncCount = 0; /** * A list of the XARessources that have participated in this transaction. */ private XAResource[] resources = new XAResource[3]; /** * The state of the resources. */ private int[] resourceState = new int[3]; /** * Index of the first XAResource representing the same resource manager, * or -1 if this XAResource is the first XAResource in this * transaction that represents its resource manager. */ private int[] resourceSameRM = new int[3]; /** * A list of the XARessources that have participated in this transaction. */ private Xid[] resourceXids = new Xid[3]; /** * Size of allocated resource arrays. */ private int resourceAllocSize = 3; /** * Count of resources that have participated in this transaction. * This contains a count of all XAResources, not a count of distinct * resource managers. * It is the length of resources and other such arrays. */ private int resourceCount = 0; /** * Flags that it is too late to enlist new resources. */ private boolean resourcesEnded = false; /** * Last branch id used. */ private long lastBranchId = 0; /** * Status of this transaction. */ private int status; /** * The heuristics status of this transaction. */ private int heuristicCode = HEUR_NONE; /** * The time when this transaction was started. */ private long start; /** * The timeout handle for this transaction. */ private Timeout timeout; /** * Timeout in millisecs */ private long timeoutPeriod; /** * Mutex for thread-safety. This should only be changed in the * lock() and unlock() methods. */ private boolean locked = false; /** * Flags that we are done with this transaction and that it can be reused. */ private boolean done = false; // Static -------------------------------------------------------- /** * Factory for Xid instances of specified class. * This is set from the TransactionManagerService * MBean. */ static XidFactoryMBean xidFactory; static TransactionManagerService txManagerService; /** * This static code is only present for testing purposes so a * tm can be usable without a lot of setup. * */ static void defaultXidFactory() { if (xidFactory == null) { xidFactory = new XidFactory(); } // end of if () } // Constructors -------------------------------------------------- TransactionImpl(long timeout) { xid = xidFactory.newXid(); globalId = new GlobalId(xid); status = Status.STATUS_ACTIVE; start = System.currentTimeMillis(); this.timeout = TimeoutFactory.createTimeout(start+timeout, this); this.timeoutPeriod = timeout; if (trace) log.trace("Created new instance for tx=" + toString()); } // Implements TimeoutTarget -------------------------------------- /** * Called when our timeout expires. */ public void timedOut(Timeout timeout) { try { lock(); log.warn("Transaction " + toString() + " timed out." + " status=" + getStringStatus(status)); if (this.timeout == null) return; // Don't race with timeout cancellation. this.timeout = null; switch (status) { case Status.STATUS_ROLLEDBACK: case Status.STATUS_COMMITTED: case Status.STATUS_NO_TRANSACTION: return; // Transaction done. case Status.STATUS_ROLLING_BACK: return; // Will be done shortly. case Status.STATUS_COMMITTING: // This is _very_ bad: // We are in the second commit phase, and have decided // to commit, but now we get a timeout and should rollback. // So we end up with a mixed decision. gotHeuristic(-1, XAException.XA_HEURMIX); status = Status.STATUS_MARKED_ROLLBACK; return; // commit will fail case Status.STATUS_PREPARED: // This is bad: // We are done with the first phase, and are persistifying // our decision. Fortunately this case is currently never // hit, as we do not release the lock between the two phases. case Status.STATUS_ACTIVE: status = Status.STATUS_MARKED_ROLLBACK; // fall through.. case Status.STATUS_MARKED_ROLLBACK: // don't rollback for now, this messes up with the TxInterceptor. interruptThreads(); return; case Status.STATUS_PREPARING: status = Status.STATUS_MARKED_ROLLBACK; return; // commit will fail default: log.warn("Unknown status at timeout, tx=" + toString()); return; } } finally { unlock(); } } // Implements Transaction ---------------------------------------- public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, java.lang.SecurityException, java.lang.IllegalStateException, SystemException { try { lock(); if (trace) { log.trace("Committing, tx=" + this + ", status=" + getStringStatus(status)); } switch (status) { case Status.STATUS_PREPARING: throw new IllegalStateException("Already started preparing."); case Status.STATUS_PREPARED: throw new IllegalStateException("Already prepared."); case Status.STATUS_ROLLING_BACK: throw new IllegalStateException("Already started rolling back."); case Status.STATUS_ROLLEDBACK: instanceDone(); checkHeuristics(); throw new IllegalStateException("Already rolled back."); case Status.STATUS_COMMITTING: throw new IllegalStateException("Already started committing."); case Status.STATUS_COMMITTED: instanceDone(); checkHeuristics(); throw new IllegalStateException("Already committed."); case Status.STATUS_NO_TRANSACTION: throw new IllegalStateException("No transaction."); case Status.STATUS_UNKNOWN: throw new IllegalStateException("Unknown state"); case Status.STATUS_MARKED_ROLLBACK: doBeforeCompletion(); endResources(); rollbackResources(); doAfterCompletion(); cancelTimeout(); instanceDone(); checkHeuristics(); throw new RollbackException("Already marked for rollback"); case Status.STATUS_ACTIVE: break; default: throw new IllegalStateException("Illegal status: " + status); } doBeforeCompletion(); if (trace) { log.trace("Before completion done, tx=" + this + ", status=" + getStringStatus(status)); } endResources(); if (status == Status.STATUS_ACTIVE) { if (resourceCount == 0) { // Zero phase commit is really fast ;-) if (trace) { log.trace("Zero phase commit: No resources."); } status = Status.STATUS_COMMITTED; } else if (isOneResource()) { // One phase commit if (trace) { log.trace("One phase commit: One resource."); } commitResources(true); } else { // Two phase commit if (trace) { log.trace("Two phase commit: Many resources."); } if (!prepareResources()) { boolean commitDecision = status == Status.STATUS_PREPARED && (heuristicCode == HEUR_NONE || heuristicCode == XAException.XA_HEURCOM); // TODO: Save decision to stable storage for recovery // after system crash. if (commitDecision) commitResources(false); } else status = Status.STATUS_COMMITTED; // all was read-only } } if (status != Status.STATUS_COMMITTED) { rollbackResources(); doAfterCompletion(); cancelTimeout(); // save off the cause throwable as Instance done resets it to null Throwable causedByThrowable = cause; instanceDone(); // throw jboss rollback exception with the saved off cause throw new JBossRollbackException("Unable to commit, tx=" + toString() + " status=" + getStringStatus(status), causedByThrowable); } cancelTimeout(); doAfterCompletion(); instanceDone(); checkHeuristics(); if (trace) { log.trace("Committed OK, tx=" + this); } } finally { transactionLocalMap.clear(); threads.clear(); unlock(); } } public void rollback() throws java.lang.IllegalStateException, java.lang.SecurityException, SystemException { try { lock(); if (trace) { log.trace("rollback(): Entered, tx=" + toString() + " status=" + getStringStatus(status)); } switch (status) { case Status.STATUS_ACTIVE: status = Status.STATUS_MARKED_ROLLBACK; // fall through.. case Status.STATUS_MARKED_ROLLBACK: doBeforeCompletion(); endResources(); rollbackResources(); cancelTimeout(); doAfterCompletion(); instanceDone(); // Cannot throw heuristic exception, so we just have to // clear the heuristics without reporting. heuristicCode = HEUR_NONE; return; case Status.STATUS_PREPARING: // Set status to avoid race with prepareResources(). status = Status.STATUS_MARKED_ROLLBACK; return; // commit() will do rollback. default: throw new IllegalStateException("Cannot rollback(), " + "tx=" + toString() + " status=" + getStringStatus(status)); } } finally { transactionLocalMap.clear(); threads.clear(); Thread.interrupted();// clear timeout that did an interrupt unlock(); } } public boolean delistResource(XAResource xaRes, int flag) throws java.lang.IllegalStateException, SystemException { if (xaRes == null) throw new IllegalArgumentException("null xaRes"); if (flag != XAResource.TMSUCCESS && flag != XAResource.TMSUSPEND && flag != XAResource.TMFAIL) throw new IllegalArgumentException("Bad flag: " + flag); try { lock(); if (trace) { log.trace("delistResource(): Entered, tx=" + toString() + " status=" + getStringStatus(status)); } int idx = findResource(xaRes); if (idx == -1) throw new IllegalArgumentException("xaRes not enlisted"); switch (status) { case Status.STATUS_ACTIVE: case Status.STATUS_MARKED_ROLLBACK: break; case Status.STATUS_PREPARING: throw new IllegalStateException("Already started preparing."); case Status.STATUS_ROLLING_BACK: throw new IllegalStateException("Already started rolling back."); case Status.STATUS_PREPARED: throw new IllegalStateException("Already prepared."); case Status.STATUS_COMMITTING: throw new IllegalStateException("Already started committing."); case Status.STATUS_COMMITTED: throw new IllegalStateException("Already committed."); case Status.STATUS_ROLLEDBACK: throw new IllegalStateException("Already rolled back."); case Status.STATUS_NO_TRANSACTION: throw new IllegalStateException("No transaction."); case Status.STATUS_UNKNOWN: throw new IllegalStateException("Unknown state"); default: throw new IllegalStateException("Illegal status: " + status); } try { if (resourceState[idx] == RS_ENDED && !resources[idx].isSameRM(xaRes)) { // This RM always returns false on isSameRM. Further, // the last resource has already been delisted. log.warn("Resource already delisted. tx=" + toString()); return false; } endResource(idx, flag); return true; } catch (XAException xae) { logXAException(xae); status = Status.STATUS_MARKED_ROLLBACK; cause = xae; return false; } } finally { unlock(); } } public boolean enlistResource(XAResource xaRes) throws RollbackException, java.lang.IllegalStateException, SystemException { if (xaRes == null) throw new IllegalArgumentException("null xaRes"); try { lock(); if (trace) { log.trace("enlistResource(): Entered, tx=" + toString() + " status=" + getStringStatus(status)); } switch (status) { case Status.STATUS_ACTIVE: case Status.STATUS_PREPARING: break; case Status.STATUS_PREPARED: throw new IllegalStateException("Already prepared."); case Status.STATUS_COMMITTING: throw new IllegalStateException("Already started committing."); case Status.STATUS_COMMITTED: throw new IllegalStateException("Already committed."); case Status.STATUS_MARKED_ROLLBACK: throw new RollbackException("Already marked for rollback"); case Status.STATUS_ROLLING_BACK: throw new RollbackException("Already started rolling back."); case Status.STATUS_ROLLEDBACK: throw new RollbackException("Already rolled back."); case Status.STATUS_NO_TRANSACTION: throw new IllegalStateException("No transaction."); case Status.STATUS_UNKNOWN: throw new IllegalStateException("Unknown state"); default: throw new IllegalStateException("Illegal status: " + status); } if (resourcesEnded) throw new IllegalStateException("Too late to enlist resources"); // Add resource try { int idx = findResource(xaRes); if (idx != -1) { if (resourceState[idx] == RS_ENLISTED) return false; // already enlisted if (resourceState[idx] == RS_ENDED && !resources[idx].isSameRM(xaRes)) { // this is a resource that returns false on all calls to // isSameRM. Further, the last resource enlisted has // already been delisted, so it is time to enlist it again. idx = -1; } else { startResource(idx); return true; } } for (int i = 0; i < resourceCount; ++i) { if (resourceSameRM[i] == -1 && xaRes.isSameRM(resources[i])) { // The xaRes is new. We register the xaRes with the Xid // that the RM has previously seen from this transaction, // and note that it has the same RM. startResource(addResource(xaRes, resourceXids[i], i)); return true; } } // New resource and new RM: Create a new transaction branch. startResource(addResource(xaRes, createXidBranch(), -1)); return true; } catch (XAException xae) { logXAException(xae); cause = xae; return false; } } finally { unlock(); } } public int getStatus() throws SystemException { if (done) return Status.STATUS_NO_TRANSACTION; return status; } public void registerSynchronization(Synchronization s) throws RollbackException, java.lang.IllegalStateException, SystemException { if (s == null) throw new IllegalArgumentException("Null synchronization"); try { lock(); if (trace) { log.trace("registerSynchronization(): Entered, " + "tx=" + toString() + " status=" + getStringStatus(status)); } switch (status) { case Status.STATUS_ACTIVE: case Status.STATUS_PREPARING: break; case Status.STATUS_PREPARED: throw new IllegalStateException("Already prepared."); case Status.STATUS_COMMITTING: throw new IllegalStateException("Already started committing."); case Status.STATUS_COMMITTED: throw new IllegalStateException("Already committed."); case Status.STATUS_MARKED_ROLLBACK: throw new RollbackException("Already marked for rollback"); case Status.STATUS_ROLLING_BACK: throw new RollbackException("Already started rolling back."); case Status.STATUS_ROLLEDBACK: throw new RollbackException("Already rolled back."); case Status.STATUS_NO_TRANSACTION: throw new IllegalStateException("No transaction."); case Status.STATUS_UNKNOWN: throw new IllegalStateException("Unknown state"); default: throw new IllegalStateException("Illegal status: " + status); } if (syncCount == syncAllocSize) { // expand table syncAllocSize = 2 * syncAllocSize; Synchronization[] sy = new Synchronization[syncAllocSize]; System.arraycopy(sync, 0, sy, 0, syncCount); sync = sy; } sync[syncCount++] = s; } finally { unlock(); } } public void setRollbackOnly() throws java.lang.IllegalStateException, SystemException { try { lock(); if (trace) log.trace("setRollbackOnly(): Entered, tx=" + toString() + " status=" + getStringStatus(status)); switch (status) { case Status.STATUS_ACTIVE: case Status.STATUS_PREPARING: case Status.STATUS_PREPARED: status = Status.STATUS_MARKED_ROLLBACK; // fall through.. case Status.STATUS_MARKED_ROLLBACK: case Status.STATUS_ROLLING_BACK: return; case Status.STATUS_COMMITTING: throw new IllegalStateException("Already started committing."); case Status.STATUS_COMMITTED: throw new IllegalStateException("Already committed."); case Status.STATUS_ROLLEDBACK: throw new IllegalStateException("Already rolled back."); case Status.STATUS_NO_TRANSACTION: throw new IllegalStateException("No transaction."); case Status.STATUS_UNKNOWN: throw new IllegalStateException("Unknown state"); default: throw new IllegalStateException("Illegal status: " + status); } } finally { unlock(); } } // Public -------------------------------------------------------- public void associateCurrentThread() { threads.add(Thread.currentThread()); } public void disassociateCurrentThread() { threads.remove(Thread.currentThread()); Thread.interrupted(); } public void clearThreads() { for (int i = 0; i < threads.size(); i++) { Thread t = (Thread)threads.get(i); } } public int hashCode() { return globalId.hashCode(); } public String toString() { return "TransactionImpl:" + xidFactory.toString(xid); } public boolean equals(Object obj) { if (obj != null && obj instanceof TransactionImpl) return globalId.equals(((TransactionImpl)obj).globalId); return false; } // Package protected --------------------------------------------- /** * Getter for property done. */ boolean isDone() { return done; } /** * Return the global id of this transaction. */ GlobalId getGlobalId() { return globalId; } // Private ------------------------------------------------------- /** * Interrupt all threads involved with transaction * This is called on timeout */ private void interruptThreads() { Iterator it = threads.iterator(); while (it.hasNext()) { Thread thread = (Thread)it.next(); try { thread.interrupt(); } catch (Exception ignored) {} } threads.clear(); } /** * Return a string representation of the given status code. */ private String getStringStatus(int status) { switch (status) { case Status.STATUS_PREPARING: return "STATUS_PREPARING"; case Status.STATUS_PREPARED: return "STATUS_PREPARED"; case Status.STATUS_ROLLING_BACK: return "STATUS_ROLLING_BACK"; case Status.STATUS_ROLLEDBACK: return "STATUS_ROLLEDBACK"; case Status.STATUS_COMMITTING: return "STATUS_COMMITING"; case Status.STATUS_COMMITTED: return "STATUS_COMMITED"; case Status.STATUS_NO_TRANSACTION: return "STATUS_NO_TRANSACTION"; case Status.STATUS_UNKNOWN: return "STATUS_UNKNOWN"; case Status.STATUS_MARKED_ROLLBACK: return "STATUS_MARKED_ROLLBACK"; case Status.STATUS_ACTIVE: return "STATUS_ACTIVE"; default: return "STATUS_UNKNOWN(" + status + ")"; } } /** * Return a string representation of the given XA error code. */ private String getStringXAErrorCode(int errorCode) { switch (errorCode) { case XAException.XA_HEURCOM: return "XA_HEURCOM"; case XAException.XA_HEURHAZ: return "XA_HEURHAZ"; case XAException.XA_HEURMIX: return "XA_HEURMIX"; case XAException.XA_HEURRB: return "XA_HEURRB"; case XAException.XA_NOMIGRATE: return "XA_NOMIGRATE"; case XAException.XA_RBCOMMFAIL: return "XA_RBCOMMFAIL"; case XAException.XA_RBDEADLOCK: return "XA_RBDEADLOCK"; case XAException.XA_RBINTEGRITY: return "XA_RBINTEGRITY"; case XAException.XA_RBOTHER: return "XA_RBOTHER"; case XAException.XA_RBPROTO: return "XA_RBPROTO"; case XAException.XA_RBROLLBACK: return "XA_RBROLLBACK"; case XAException.XA_RBTIMEOUT: return "XA_RBTIMEOUT"; case XAException.XA_RBTRANSIENT: return "XA_RBTRANSIENT"; case XAException.XA_RDONLY: return "XA_RDONLY"; case XAException.XA_RETRY: return "XA_RETRY"; case XAException.XAER_ASYNC: return "XAER_ASYNC"; case XAException.XAER_DUPID: return "XAER_DUPID"; case XAException.XAER_INVAL: return "XAER_INVAL"; case XAException.XAER_NOTA: return "XAER_NOTA"; case XAException.XAER_OUTSIDE: return "XAER_OUTSIDE"; case XAException.XAER_PROTO: return "XAER_PROTO"; case XAException.XAER_RMERR: return "XAER_RMERR"; case XAException.XAER_RMFAIL: return "XAER_RMFAIL"; default: return "XA_UNKNOWN(" + errorCode + ")"; } } private void logXAException(XAException xae) { log.warn("XAException: tx=" + toString() + " errorCode=" + getStringXAErrorCode(xae.errorCode), xae); if (txManagerService != null) { txManagerService.formatXAException(xae, log); } // end of if () } /** * Lock this instance. */ private synchronized void lock() { if (done) throw new IllegalStateException("Transaction has terminated"); if (locked) { log.warn("Lock contention, tx=" + toString()); //DEBUG Thread.currentThread().dumpStack(); while (locked) { try { // Wakeup happens when: // - notify() is called from unlock() // - notifyAll is called from instanceDone() wait(); } catch (InterruptedException ex) { // ignore } if (done) throw new IllegalStateException("Transaction has now terminated"); } } locked = true; } /** * Unlock this instance. */ private synchronized void unlock() { if (!locked) { log.warn("Unlocking, but not locked, tx=" + toString(), new Throwable("[Stack trace]")); } locked = false; notify(); } /** * Mark this transaction as non-existing. */ private synchronized void instanceDone() { TxManager manager = TxManager.getInstance(); if (status == Status.STATUS_COMMITTED) manager.incCommitCount(); else manager.incRollbackCount(); // Garbage collection manager.releaseTransactionImpl(this); // Set the status status = Status.STATUS_NO_TRANSACTION; // Clear tables refering to external objects. // Even if a client holds on to this instance forever, the objects // that we have referenced may be garbage collected. sync = null; resources = null; // Notify all threads waiting for the lock. notifyAll(); // set the done flag done = true; } /** * Cancel the timeout. * This will release the lock while calling out. */ private void cancelTimeout() { if (timeout != null) { unlock(); try { timeout.cancel(); } catch (Exception e) { if (trace) log.trace("failed to cancel timeout", e); } finally { lock(); } timeout = null; } } /** * Return index of XAResource, or -1 if not found. */ private int findResource(XAResource xaRes) { // A linear search may seem slow, but please note that // the number of XA resources registered with a transaction // are usually low. // Note: This searches backwards intentionally! It ensures that // if this resource was enlisted multiple times, then the last one // will be returned. All others should be in the state RS_ENDED. // This allows ResourceManagers that always return false from isSameRM // to be enlisted and delisted multiple times. for (int idx = resourceCount - 1; idx >= 0; --idx) if (xaRes == resources[idx]) return idx; return -1; } /** * Add a resource, expanding tables if needed. * * @param xaRes The new XA resource to add. It is assumed that the * resource is not already in the table of XA resources. * @param branchXid The Xid for the transaction branch that is to * be used for associating with this resource. * @param idxSameRM The index in our XA resource tables of the first * XA resource having the same resource manager as * xaRes, or -1 if xaRes * is the first resource seen with this resource manager. * * @return The index of the new resource in our internal tables. */ private int addResource(XAResource xaRes, Xid branchXid, int idxSameRM) { if (resourceCount == resourceAllocSize) { // expand tables resourceAllocSize = 2 * resourceAllocSize; XAResource[] res = new XAResource[resourceAllocSize]; System.arraycopy(resources, 0, res, 0, resourceCount); resources = res; int[] stat = new int[resourceAllocSize]; System.arraycopy(resourceState, 0, stat, 0, resourceCount); resourceState = stat; Xid[] xids = new Xid[resourceAllocSize]; System.arraycopy(resourceXids, 0, xids, 0, resourceCount); resourceXids = xids; int[] sameRM = new int[resourceAllocSize]; System.arraycopy(resourceSameRM, 0, sameRM, 0, resourceCount); resourceSameRM = sameRM; } resources[resourceCount] = xaRes; resourceState[resourceCount] = RS_NEW; resourceXids[resourceCount] = branchXid; resourceSameRM[resourceCount] = idxSameRM; return resourceCount++; } /** * Call start() on a XAResource and update * internal state information. * This will release the lock while calling out. * * @param idx The index of the resource in our internal tables. */ private void startResource(int idx) throws XAException { int flags = XAResource.TMJOIN; if (resourceSameRM[idx] == -1) { switch (resourceState[idx]) { case RS_NEW: flags = XAResource.TMNOFLAGS; break; case RS_SUSPENDED: flags = XAResource.TMRESUME; break; default: if (trace) { log.trace("Unhandled resource state: " + resourceState[idx] + " (not RS_NEW or RS_SUSPENDED, using TMJOIN flags)"); } } } if (trace) { log.trace("startResource(" + xidFactory.toString(resourceXids[idx]) + ") entered: " + resources[idx].toString() + " flags=" + flags); } unlock(); // OSH FIXME: resourceState could be incorrect during this callout. try { try { resources[idx].start(resourceXids[idx], flags); } catch(XAException e) { throw e; } catch (Throwable t) { if (trace) { log.trace("unhandled throwable error in startResource", t); } status = Status.STATUS_MARKED_ROLLBACK; return; } // Now the XA resource is associated with a transaction. resourceState[idx] = RS_ENLISTED; } finally { lock(); if (trace) { log.trace("startResource(" + xidFactory.toString(resourceXids[idx]) + ") leaving: " + resources[idx].toString() + " flags=" + flags); } } } /** * Call end() on the XAResource and update * internal state information. * This will release the lock while calling out. * * @param idx The index of the resource in our internal tables. * @param flag The flag argument for the end() call. */ private void endResource(int idx, int flag) throws XAException { if (trace) { log.trace("endResource(" + xidFactory.toString(resourceXids[idx]) + ") entered: " + resources[idx].toString() + " flag=" + flag); } unlock(); // OSH FIXME: resourceState could be incorrect during this callout. try { try { resources[idx].end(resourceXids[idx], flag); } catch(XAException e) { throw e; } catch (Throwable t) { if (trace) { log.trace("unhandled throwable error in endResource", t); } status = Status.STATUS_MARKED_ROLLBACK; // Resource may or may not be ended after illegal exception. // We just assume it ended. resourceState[idx] = RS_ENDED; return; } // Update our internal state information if (flag == XAResource.TMSUSPEND) resourceState[idx] = RS_SUSPENDED; else { if (flag == XAResource.TMFAIL) { status = Status.STATUS_MARKED_ROLLBACK; } resourceState[idx] = RS_ENDED; } } finally { lock(); if (trace) { log.trace("endResource(" + xidFactory.toString(resourceXids[idx]) + ") leaving: " + resources[idx].toString() + " flag=" + flag); } } } /** * End Tx association for all resources. */ private void endResources() { for (int idx = 0; idx < resourceCount; idx++) { try { /*We don't have minerva crap any more! If your adapter doesn't like this, use the matchConnectionWithTx flag to prevent your resources from getting suspended. if (resourceState[idx] == RS_SUSPENDED) { // This is mad, but JTA 1.0.1 spec says on page 41: // "If TMSUSPEND is specified in flags, the transaction // branch is temporarily suspended in incomplete state. // The transaction context is in suspened state and must // be resumed via start with TMRESUME specified." // Note the _must_ above: It does not say _may_. // The above citation also seem to contradict the XA resource // state table on pages 17-18 where it is legal to do both // end(TMSUCCESS) and end(TMFAIL) when the resource is in // a suspended state. // But the Minerva XA pool does not like that we call end() // two times in a row, so we resume before ending. startResource(idx); }*/ if (resourceState[idx] == RS_ENLISTED || resourceState[idx] == RS_SUSPENDED) { if (trace) log.trace("endresources(" + idx + "): state=" + resourceState[idx]); endResource(idx, XAResource.TMSUCCESS); } } catch(XAException xae) { logXAException(xae); status = Status.STATUS_MARKED_ROLLBACK; cause = xae; } } resourcesEnded = true; // Too late to enlist new resources. } /** * Call synchronization beforeCompletion(). * This will release the lock while calling out. */ private void doBeforeCompletion() { unlock(); try { for (int i = 0; i < syncCount; i++) { try { if (trace) { log.trace("calling sync " + i + ", " + sync[i]); } // end of if () sync[i].beforeCompletion(); } catch (Throwable t) { if (trace) { log.trace("failed before completion", t); } status = Status.STATUS_MARKED_ROLLBACK; // save the cause off so the user can inspect it cause = t; break; } } } finally { lock(); } } /** * Call synchronization afterCompletion(). * This will release the lock while calling out. */ private void doAfterCompletion() { // Assert: Status indicates: Too late to add new synchronizations. unlock(); try { for (int i = 0; i < syncCount; i++) { try { sync[i].afterCompletion(status); } catch (Throwable t) { if (trace) { log.trace("failed after completion", t); } } } } finally { lock(); } } /** * We got another heuristic. * * Promote heuristicCode if needed and tell * the resource to forget the heuristic. * This will release the lock while calling out. * * @param resIdx The index of the XA resource that got a * heurictic in our internal tables, or -1 * if the heuristic came from here. * @param code The heuristic code, one of * XAException.XA_HEURxxx. */ private void gotHeuristic(int resIdx, int code) { switch (code) { case XAException.XA_HEURMIX: heuristicCode = XAException.XA_HEURMIX; break; case XAException.XA_HEURRB: if (heuristicCode == HEUR_NONE) heuristicCode = XAException.XA_HEURRB; else if (heuristicCode == XAException.XA_HEURCOM || heuristicCode == XAException.XA_HEURHAZ) heuristicCode = XAException.XA_HEURMIX; break; case XAException.XA_HEURCOM: if (heuristicCode == HEUR_NONE) heuristicCode = XAException.XA_HEURCOM; else if (heuristicCode == XAException.XA_HEURRB || heuristicCode == XAException.XA_HEURHAZ) heuristicCode = XAException.XA_HEURMIX; break; case XAException.XA_HEURHAZ: if (heuristicCode == HEUR_NONE) heuristicCode = XAException.XA_HEURHAZ; else if (heuristicCode == XAException.XA_HEURCOM || heuristicCode == XAException.XA_HEURRB) heuristicCode = XAException.XA_HEURMIX; break; default: throw new IllegalArgumentException(); } if (resIdx != -1) { try { unlock(); resources[resIdx].forget(resourceXids[resIdx]); } catch (XAException xae) { logXAException(xae); cause = xae; } finally { lock(); } resourceState[resIdx] = RS_FORGOT; } } /** * Check for heuristics, clear and throw exception if any found. */ private void checkHeuristics() throws HeuristicMixedException, HeuristicRollbackException { switch (heuristicCode) { case XAException.XA_HEURHAZ: case XAException.XA_HEURMIX: heuristicCode = HEUR_NONE; if (trace) { log.trace("Throwing HeuristicMixedException, " + "status=" + getStringStatus(status)); } throw new HeuristicMixedException(); case XAException.XA_HEURRB: heuristicCode = HEUR_NONE; if (trace) { log.trace("Throwing HeuristicRollbackException, " + "status=" + getStringStatus(status)); } throw new HeuristicRollbackException(); case XAException.XA_HEURCOM: heuristicCode = HEUR_NONE; // Why isn't HeuristicCommitException used in JTA ? // And why define something that is not used ? // For now we just have to ignore this failure, even if it happened // on rollback. if (trace) { log.trace("NOT Throwing HeuristicCommitException, " + "status=" + getStringStatus(status)); } return; } } /** * Prepare all enlisted resources. * If the first phase of the commit process results in a decision * to commit the status will be * Status.STATUS_PREPARED on return. * Otherwise the status will be * Status.STATUS_MARKED_ROLLBACK on return. * This will release the lock while calling out. * * @returns True iff all resources voted read-only. */ private boolean prepareResources() { boolean readOnly = true; status = Status.STATUS_PREPARING; for (int i = 0; i < resourceCount; i++) { // Abort prepare on state change. if (status != Status.STATUS_PREPARING) return false; if (resourceSameRM[i] != -1) continue; // This RM already prepared. XAResource resource = resources[i]; try { int vote; unlock(); try { vote = resources[i].prepare(resourceXids[i]); } finally { lock(); } if (vote == XAResource.XA_OK) { readOnly = false; resourceState[i] = RS_VOTE_OK; } else if (vote == XAResource.XA_RDONLY) resourceState[i] = RS_VOTE_READONLY; else { // Illegal vote: rollback. if (trace) { log.trace("illegal vote in prepare resources", new Exception()); } // end of if () status = Status.STATUS_MARKED_ROLLBACK; return false; } } catch (XAException e) { readOnly = false; logXAException(e); switch (e.errorCode) { case XAException.XA_HEURCOM: // Heuristic commit is not that bad when preparing. // But it means trouble if we have to rollback. gotHeuristic(i, e.errorCode); break; case XAException.XA_HEURRB: case XAException.XA_HEURMIX: case XAException.XA_HEURHAZ: gotHeuristic(i, e.errorCode); if (status == Status.STATUS_PREPARING) status = Status.STATUS_MARKED_ROLLBACK; break; default: cause = e; if (status == Status.STATUS_PREPARING) status = Status.STATUS_MARKED_ROLLBACK; break; } } catch (Throwable t) { if (trace) { log.trace("unhandled throwable in prepareResources", t); } if (status == Status.STATUS_PREPARING) status = Status.STATUS_MARKED_ROLLBACK; cause = t; } } if (status == Status.STATUS_PREPARING) status = Status.STATUS_PREPARED; return readOnly; } /** * Commit all enlisted resources. * This will release the lock while calling out. */ private void commitResources(boolean onePhase) { status = Status.STATUS_COMMITTING; for (int i = 0; i < resourceCount; i++) { if (trace) { log.trace("Committing resources, resourceStates["+i+"]=" + resourceState[i]); } if (!onePhase && resourceState[i] != RS_VOTE_OK) continue; // Voted read-only at prepare phase. if (resourceSameRM[i] != -1) continue; // This RM already committed. // Abort commit on state change. if (status != Status.STATUS_COMMITTING) return; try { unlock(); try { resources[i].commit(resourceXids[i], onePhase); } finally { lock(); } } catch (XAException e) { logXAException(e); switch (e.errorCode) { case XAException.XA_HEURRB: case XAException.XA_HEURCOM: case XAException.XA_HEURMIX: case XAException.XA_HEURHAZ: //usually throws an exception, but not for a couple of cases. gotHeuristic(i, e.errorCode); //May not be correct for HEURCOM //Two phase commit is committed after prepare is logged. if (onePhase) { status = Status.STATUS_MARKED_ROLLBACK; } // end of if () break; default: cause = e; if (onePhase) { status = Status.STATUS_MARKED_ROLLBACK; break; } // end of if () //Not much we can do if there is an RMERR in the //commit phase of 2pc. I guess we try the other rms. } } catch (Throwable t) { if (trace) { log.trace("unhandled throwable in commitResources", t); } } } if (status == Status.STATUS_COMMITTING) status = Status.STATUS_COMMITTED; } /** * Rollback all enlisted resources. * This will release the lock while calling out. */ private void rollbackResources() { status = Status.STATUS_ROLLING_BACK; for (int i = 0; i < resourceCount; i++) { if (resourceState[i] == RS_VOTE_READONLY) { continue; } // Already forgotten if (resourceState[i] == RS_FORGOT) continue; if (resourceSameRM[i] != -1) { continue; // This RM already rolled back. } try { unlock(); try { resources[i].rollback(resourceXids[i]); } finally { lock(); } } catch (XAException e) { logXAException(e); switch (e.errorCode) { case XAException.XA_HEURRB: // Heuristic rollback is not that bad when rolling back. gotHeuristic(i, e.errorCode); continue; case XAException.XA_HEURCOM: case XAException.XA_HEURMIX: case XAException.XA_HEURHAZ: gotHeuristic(i, e.errorCode); continue; default: cause = e; break; } } catch (Throwable t) { if (trace) log.trace("unhandled throwable in rollbackResources", t); } } status = Status.STATUS_ROLLEDBACK; } /** * Create an Xid representing a new branch of this transaction. */ private Xid createXidBranch() { long branchId = ++lastBranchId; return xidFactory.newBranch(xid, branchId); } /** * Check if we can do one-phase optimization. * We can do that only if no more than a single resource manager * is involved in this transaction. */ private boolean isOneResource() { if (resourceCount == 1) return true; // first XAResource surely has -1, it's the first! for (int i = 1; i < resourceCount; i++) { if (resourceSameRM[i] == -1) { // this one is not the same rm as previous ones, // there must be at least 2 return false; } } // all rms are the same one, one phase commit is ok. return true; } public long getTimeLeftBeforeTimeout() { return (start + timeoutPeriod) - System.currentTimeMillis(); } Object getTransactionLocalValue(TransactionLocal tlocal) { return transactionLocalMap.get(tlocal); } void putTransactionLocalValue(TransactionLocal tlocal, Object value) { transactionLocalMap.put(tlocal, value); } boolean containsTransactionLocal(TransactionLocal tlocal) { return transactionLocalMap.containsKey(tlocal); } // Inner classes ------------------------------------------------- }