001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.InterruptedIOException;
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.List;
023import java.util.concurrent.ConcurrentHashMap;
024
025import javax.jms.JMSException;
026import javax.jms.TransactionInProgressException;
027import javax.jms.TransactionRolledBackException;
028import javax.transaction.xa.XAException;
029import javax.transaction.xa.XAResource;
030import javax.transaction.xa.Xid;
031
032import org.apache.activemq.command.Command;
033import org.apache.activemq.command.ConnectionId;
034import org.apache.activemq.command.DataArrayResponse;
035import org.apache.activemq.command.DataStructure;
036import org.apache.activemq.command.IntegerResponse;
037import org.apache.activemq.command.LocalTransactionId;
038import org.apache.activemq.command.Response;
039import org.apache.activemq.command.TransactionId;
040import org.apache.activemq.command.TransactionInfo;
041import org.apache.activemq.command.XATransactionId;
042import org.apache.activemq.transaction.Synchronization;
043import org.apache.activemq.util.JMSExceptionSupport;
044import org.apache.activemq.util.LongSequenceGenerator;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * A TransactionContext provides the means to control a JMS transaction. It
050 * provides a local transaction interface and also an XAResource interface. <p/>
051 * An application server controls the transactional assignment of an XASession
052 * by obtaining its XAResource. It uses the XAResource to assign the session to
053 * a transaction, prepare and commit work on the transaction, and so on. <p/> An
054 * XAResource provides some fairly sophisticated facilities for interleaving
055 * work on multiple transactions, recovering a list of transactions in progress,
056 * and so on. A JTA aware JMS provider must fully implement this functionality.
057 * This could be done by using the services of a database that supports XA, or a
058 * JMS provider may choose to implement this functionality from scratch. <p/>
059 * 
060 * 
061 * @see javax.jms.Session
062 * @see javax.jms.QueueSession
063 * @see javax.jms.TopicSession
064 * @see javax.jms.XASession
065 */
066public class TransactionContext implements XAResource {
067
068    private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
069
070    // XATransactionId -> ArrayList of TransactionContext objects
071    private final static ConcurrentHashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = new ConcurrentHashMap<TransactionId, List<TransactionContext>>();
072
073    private final ActiveMQConnection connection;
074    private final LongSequenceGenerator localTransactionIdGenerator;
075    private final ConnectionId connectionId;
076    private List<Synchronization> synchronizations;
077
078    // To track XA transactions.
079    private Xid associatedXid;
080    private TransactionId transactionId;
081    private LocalTransactionEventListener localTransactionEventListener;
082    private int beforeEndIndex;
083
084    public TransactionContext(ActiveMQConnection connection) {
085        this.connection = connection;
086        this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
087        this.connectionId = connection.getConnectionInfo().getConnectionId();
088    }
089
090    public boolean isInXATransaction() {
091        return (transactionId != null && transactionId.isXATransaction()) || !ENDED_XA_TRANSACTION_CONTEXTS.isEmpty();
092    }
093
094    public boolean isInLocalTransaction() {
095        return transactionId != null && transactionId.isLocalTransaction();
096    }
097
098    public boolean isInTransaction() {
099        return transactionId != null;
100    }
101    
102    /**
103     * @return Returns the localTransactionEventListener.
104     */
105    public LocalTransactionEventListener getLocalTransactionEventListener() {
106        return localTransactionEventListener;
107    }
108
109    /**
110     * Used by the resource adapter to listen to transaction events.
111     * 
112     * @param localTransactionEventListener The localTransactionEventListener to
113     *                set.
114     */
115    public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
116        this.localTransactionEventListener = localTransactionEventListener;
117    }
118
119    // ///////////////////////////////////////////////////////////
120    //
121    // Methods that work with the Synchronization objects registered with
122    // the transaction.
123    //
124    // ///////////////////////////////////////////////////////////
125
126    public void addSynchronization(Synchronization s) {
127        if (synchronizations == null) {
128            synchronizations = new ArrayList<Synchronization>(10);
129        }
130        synchronizations.add(s);
131    }
132
133    private void afterRollback() throws JMSException {
134        if (synchronizations == null) {
135            return;
136        }
137
138        int size = synchronizations.size();
139        try {
140            for (int i = 0; i < size; i++) {
141                synchronizations.get(i).afterRollback();
142            }
143        } catch (JMSException e) {
144            throw e;
145        } catch (Throwable e) {
146            throw JMSExceptionSupport.create(e);
147        } finally {
148            synchronizations = null;
149        }
150    }
151
152    private void afterCommit() throws JMSException {
153        if (synchronizations == null) {
154            return;
155        }
156
157        int size = synchronizations.size();
158        try {
159            for (int i = 0; i < size; i++) {
160                synchronizations.get(i).afterCommit();
161            }
162        } catch (JMSException e) {
163            throw e;
164        } catch (Throwable e) {
165            throw JMSExceptionSupport.create(e);
166        } finally {
167            synchronizations = null;
168        }
169    }
170
171    private void beforeEnd() throws JMSException {
172        if (synchronizations == null) {
173            return;
174        }
175
176        int size = synchronizations.size();
177        try {
178            for (;beforeEndIndex < size;) {
179                synchronizations.get(beforeEndIndex++).beforeEnd();
180            }
181        } catch (JMSException e) {
182            throw e;
183        } catch (Throwable e) {
184            throw JMSExceptionSupport.create(e);
185        }
186    }
187
188    public TransactionId getTransactionId() {
189        return transactionId;
190    }
191
192    // ///////////////////////////////////////////////////////////
193    //
194    // Local transaction interface.
195    //
196    // ///////////////////////////////////////////////////////////
197
198    /**
199     * Start a local transaction.
200     * @throws javax.jms.JMSException on internal error
201     */
202    public void begin() throws JMSException {
203
204        if (isInXATransaction()) {
205            throw new TransactionInProgressException("Cannot start local transaction.  XA transaction is already in progress.");
206        }
207        
208        if (transactionId == null) {
209            synchronizations = null;
210            beforeEndIndex = 0;
211            this.transactionId = new LocalTransactionId(connectionId, localTransactionIdGenerator.getNextSequenceId());
212            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
213            this.connection.ensureConnectionInfoSent();
214            this.connection.asyncSendPacket(info);
215
216            // Notify the listener that the tx was started.
217            if (localTransactionEventListener != null) {
218                localTransactionEventListener.beginEvent();
219            }
220            if (LOG.isDebugEnabled()) {
221                LOG.debug("Begin:" + transactionId);
222            }
223        }
224        
225    }
226
227    /**
228     * Rolls back any work done in this transaction and releases any locks
229     * currently held.
230     * 
231     * @throws JMSException if the JMS provider fails to roll back the
232     *                 transaction due to some internal error.
233     * @throws javax.jms.IllegalStateException if the method is not called by a
234     *                 transacted session.
235     */
236    public void rollback() throws JMSException {
237        if (isInXATransaction()) {
238            throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
239        }
240        
241        try {
242            beforeEnd();
243        } catch (TransactionRolledBackException canOcurrOnFailover) {
244            LOG.warn("rollback processing error", canOcurrOnFailover);
245        }
246        if (transactionId != null) {
247            if (LOG.isDebugEnabled()) {
248                LOG.debug("Rollback: "  + transactionId
249                + " syncCount: " 
250                + (synchronizations != null ? synchronizations.size() : 0));
251            }
252
253            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
254            this.transactionId = null;
255            //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364
256            this.connection.syncSendPacket(info);
257            // Notify the listener that the tx was rolled back
258            if (localTransactionEventListener != null) {
259                localTransactionEventListener.rollbackEvent();
260            }
261        }
262
263        afterRollback();
264    }
265
266    /**
267     * Commits all work done in this transaction and releases any locks
268     * currently held.
269     * 
270     * @throws JMSException if the JMS provider fails to commit the transaction
271     *                 due to some internal error.
272     * @throws javax.jms.IllegalStateException if the method is not called by a
273     *                 transacted session.
274     */
275    public void commit() throws JMSException {
276        if (isInXATransaction()) {
277            throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
278        }
279        
280        try {
281            beforeEnd();
282        } catch (JMSException e) {
283            rollback();
284            throw e;
285        }
286
287        // Only send commit if the transaction was started.
288        if (transactionId != null) {
289            if (LOG.isDebugEnabled()) {
290                LOG.debug("Commit: "  + transactionId
291                        + " syncCount: " 
292                        + (synchronizations != null ? synchronizations.size() : 0));
293            }
294
295            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
296            this.transactionId = null;
297            // Notify the listener that the tx was committed back
298            try {
299                syncSendPacketWithInterruptionHandling(info);
300                if (localTransactionEventListener != null) {
301                    localTransactionEventListener.commitEvent();
302                }
303                afterCommit();
304            } catch (JMSException cause) {
305                LOG.info("commit failed for transaction " + info.getTransactionId(), cause);
306                if (localTransactionEventListener != null) {
307                    localTransactionEventListener.rollbackEvent();
308                }
309                afterRollback();
310                throw cause;
311            }
312            
313        }
314    }
315
316    // ///////////////////////////////////////////////////////////
317    //
318    // XAResource Implementation
319    //
320    // ///////////////////////////////////////////////////////////
321    /**
322     * Associates a transaction with the resource.
323     */
324    public void start(Xid xid, int flags) throws XAException {
325
326        if (LOG.isDebugEnabled()) {
327            LOG.debug("Start: " + xid);
328        }
329        if (isInLocalTransaction()) {
330            throw new XAException(XAException.XAER_PROTO);
331        }
332        // Are we already associated?
333        if (associatedXid != null) {
334            throw new XAException(XAException.XAER_PROTO);
335        }
336
337        // if ((flags & TMJOIN) == TMJOIN) {
338        // TODO: verify that the server has seen the xid
339        // // }
340        // if ((flags & TMJOIN) == TMRESUME) {
341        // // TODO: verify that the xid was suspended.
342        // }
343
344        // associate
345        synchronizations = null;
346        beforeEndIndex = 0;
347        setXid(xid);
348    }
349
350    /**
351     * @return connectionId for connection
352     */
353    private ConnectionId getConnectionId() {
354        return connection.getConnectionInfo().getConnectionId();
355    }
356
357    public void end(Xid xid, int flags) throws XAException {
358
359        if (LOG.isDebugEnabled()) {
360            LOG.debug("End: " + xid);
361        }
362        
363        if (isInLocalTransaction()) {
364            throw new XAException(XAException.XAER_PROTO);
365        }
366        
367        if ((flags & (TMSUSPEND | TMFAIL)) != 0) {
368            // You can only suspend the associated xid.
369            if (!equals(associatedXid, xid)) {
370                throw new XAException(XAException.XAER_PROTO);
371            }
372
373            // TODO: we may want to put the xid in a suspended list.
374            try {
375                beforeEnd();
376            } catch (JMSException e) {
377                throw toXAException(e);
378            }
379            setXid(null);
380        } else if ((flags & TMSUCCESS) == TMSUCCESS) {
381            // set to null if this is the current xid.
382            // otherwise this could be an asynchronous success call
383            if (equals(associatedXid, xid)) {
384                try {
385                    beforeEnd();
386                } catch (JMSException e) {
387                    throw toXAException(e);
388                }
389                setXid(null);
390            }
391        } else {
392            throw new XAException(XAException.XAER_INVAL);
393        }
394    }
395
396    private boolean equals(Xid xid1, Xid xid2) {
397        if (xid1 == xid2) {
398            return true;
399        }
400        if (xid1 == null ^ xid2 == null) {
401            return false;
402        }
403        return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier())
404               && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId());
405    }
406
407    public int prepare(Xid xid) throws XAException {
408        if (LOG.isDebugEnabled()) {
409            LOG.debug("Prepare: " + xid);
410        }
411        
412        // We allow interleaving multiple transactions, so
413        // we don't limit prepare to the associated xid.
414        XATransactionId x;
415        // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been
416        // called first
417        if (xid == null || (equals(associatedXid, xid))) {
418            throw new XAException(XAException.XAER_PROTO);
419        } else {
420            // TODO: cache the known xids so we don't keep recreating this one??
421            x = new XATransactionId(xid);
422        }
423
424        try {
425            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
426
427            // Find out if the server wants to commit or rollback.
428            IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
429            if (XAResource.XA_RDONLY == response.getResult()) {
430                // transaction stops now, may be syncs that need a callback
431                List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
432                if (l != null && !l.isEmpty()) {
433                    if (LOG.isDebugEnabled()) {
434                        LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: " + xid);
435                    }
436                    for (TransactionContext ctx : l) {
437                        ctx.afterCommit();
438                    }
439                }
440            }
441            return response.getResult();
442
443        } catch (JMSException e) {
444            LOG.warn("prepare of: " + x + " failed with: " + e, e);
445            List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
446            if (l != null && !l.isEmpty()) {
447                for (TransactionContext ctx : l) {
448                    try {
449                        ctx.afterRollback();
450                    } catch (Throwable ignored) {
451                        if (LOG.isDebugEnabled()) {
452                            LOG.debug("failed to firing afterRollback callbacks on prepare failure, txid: " + x + ", context: " + ctx, ignored);
453                        }
454                    }
455                }
456            }
457            throw toXAException(e);
458        }
459    }
460
461    public void rollback(Xid xid) throws XAException {
462
463        if (LOG.isDebugEnabled()) {
464            LOG.debug("Rollback: " + xid);
465        }
466        
467        // We allow interleaving multiple transactions, so
468        // we don't limit rollback to the associated xid.
469        XATransactionId x;
470        if (xid == null) {
471            throw new XAException(XAException.XAER_PROTO);
472        }
473        if (equals(associatedXid, xid)) {
474            // I think this can happen even without an end(xid) call. Need to
475            // check spec.
476            x = (XATransactionId)transactionId;
477        } else {
478            x = new XATransactionId(xid);
479        }
480
481        try {
482            this.connection.checkClosedOrFailed();
483            this.connection.ensureConnectionInfoSent();
484
485            // Let the server know that the tx is rollback.
486            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
487            syncSendPacketWithInterruptionHandling(info);
488
489            List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
490            if (l != null && !l.isEmpty()) {
491                for (TransactionContext ctx : l) {
492                    ctx.afterRollback();
493                }
494            }
495
496        } catch (JMSException e) {
497            throw toXAException(e);
498        }
499    }
500
501    // XAResource interface
502    public void commit(Xid xid, boolean onePhase) throws XAException {
503
504        if (LOG.isDebugEnabled()) {
505            LOG.debug("Commit: " + xid + ", onePhase=" + onePhase);
506        }
507        
508        // We allow interleaving multiple transactions, so
509        // we don't limit commit to the associated xid.
510        XATransactionId x;
511        if (xid == null || (equals(associatedXid, xid))) {
512            // should never happen, end(xid,TMSUCCESS) must have been previously
513            // called
514            throw new XAException(XAException.XAER_PROTO);
515        } else {
516            x = new XATransactionId(xid);
517        }
518
519        try {
520            this.connection.checkClosedOrFailed();
521            this.connection.ensureConnectionInfoSent();
522
523            // Notify the server that the tx was committed back
524            TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
525
526            syncSendPacketWithInterruptionHandling(info);
527
528            List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
529            if (l != null && !l.isEmpty()) {
530                for (TransactionContext ctx : l) {
531                    ctx.afterCommit();
532                }
533            }
534
535        } catch (JMSException e) {
536            LOG.warn("commit of: " + x + " failed with: " + e, e);
537            if (onePhase) {
538                List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
539                if (l != null && !l.isEmpty()) {
540                    for (TransactionContext ctx : l) {
541                        try {
542                            ctx.afterRollback();
543                        } catch (Throwable ignored) {
544                            if (LOG.isDebugEnabled()) {
545                                LOG.debug("failed to firing afterRollback callbacks commit failure, txid: " + x + ", context: " + ctx, ignored);
546                            }
547                        }
548                    }
549                }
550            }
551            throw toXAException(e);
552        }
553
554    }
555
556    public void forget(Xid xid) throws XAException {
557        if (LOG.isDebugEnabled()) {
558            LOG.debug("Forget: " + xid);
559        }
560        
561        // We allow interleaving multiple transactions, so
562        // we don't limit forget to the associated xid.
563        XATransactionId x;
564        if (xid == null) {
565            throw new XAException(XAException.XAER_PROTO);
566        }
567        if (equals(associatedXid, xid)) {
568            // TODO determine if this can happen... I think not.
569            x = (XATransactionId)transactionId;
570        } else {
571            x = new XATransactionId(xid);
572        }
573
574        TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET);
575
576        try {
577            // Tell the server to forget the transaction.
578            syncSendPacketWithInterruptionHandling(info);
579        } catch (JMSException e) {
580            throw toXAException(e);
581        }
582        ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
583    }
584
585    public boolean isSameRM(XAResource xaResource) throws XAException {
586        if (xaResource == null) {
587            return false;
588        }
589        if (!(xaResource instanceof TransactionContext)) {
590            return false;
591        }
592        TransactionContext xar = (TransactionContext)xaResource;
593        try {
594            return getResourceManagerId().equals(xar.getResourceManagerId());
595        } catch (Throwable e) {
596            throw (XAException)new XAException("Could not get resource manager id.").initCause(e);
597        }
598    }
599
600    public Xid[] recover(int flag) throws XAException {
601        if (LOG.isDebugEnabled()) {
602            LOG.debug("Recover: " + flag);
603        }
604        
605        TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
606        try {
607            this.connection.checkClosedOrFailed();
608            this.connection.ensureConnectionInfoSent();
609
610            DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info);
611            DataStructure[] data = receipt.getData();
612            XATransactionId[] answer;
613            if (data instanceof XATransactionId[]) {
614                answer = (XATransactionId[])data;
615            } else {
616                answer = new XATransactionId[data.length];
617                System.arraycopy(data, 0, answer, 0, data.length);
618            }
619            return answer;
620        } catch (JMSException e) {
621            throw toXAException(e);
622        }
623    }
624
625    public int getTransactionTimeout() throws XAException {
626        return 0;
627    }
628
629    public boolean setTransactionTimeout(int seconds) throws XAException {
630        return false;
631    }
632
633    // ///////////////////////////////////////////////////////////
634    //
635    // Helper methods.
636    //
637    // ///////////////////////////////////////////////////////////
638    private String getResourceManagerId() throws JMSException {
639        return this.connection.getResourceManagerId();
640    }
641
642    private void setXid(Xid xid) throws XAException {
643
644        try {
645            this.connection.checkClosedOrFailed();
646            this.connection.ensureConnectionInfoSent();
647        } catch (JMSException e) {
648            throw toXAException(e);
649        }
650
651        if (xid != null) {
652            // associate
653            associatedXid = xid;
654            transactionId = new XATransactionId(xid);
655
656            TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.BEGIN);
657            try {
658                this.connection.asyncSendPacket(info);
659                if (LOG.isDebugEnabled()) {
660                    LOG.debug("Started XA transaction: " + transactionId);
661                }
662            } catch (JMSException e) {
663                throw toXAException(e);
664            }
665
666        } else {
667
668            if (transactionId != null) {
669                TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END);
670                try {
671                    syncSendPacketWithInterruptionHandling(info);
672                    if (LOG.isDebugEnabled()) {
673                        LOG.debug("Ended XA transaction: " + transactionId);
674                    }
675                } catch (JMSException e) {
676                    throw toXAException(e);
677                }
678
679                // Add our self to the list of contexts that are interested in
680                // post commit/rollback events.
681                List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
682                if (l == null) {
683                    l = new ArrayList<TransactionContext>(3);
684                    ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
685                    l.add(this);
686                } else if (!l.contains(this)) {
687                    l.add(this);
688                }
689            }
690
691            // dis-associate
692            associatedXid = null;
693            transactionId = null;
694        }
695    }
696
697    /**
698     * Sends the given command. Also sends the command in case of interruption,
699     * so that important commands like rollback and commit are never interrupted.
700     * If interruption occurred, set the interruption state of the current 
701     * after performing the action again. 
702     * 
703     * @return the response
704     */
705    private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException {
706        try {
707            return this.connection.syncSendPacket(command);
708        } catch (JMSException e) {
709            if (e.getLinkedException() instanceof InterruptedIOException) {
710                try {
711                    Thread.interrupted();
712                    return this.connection.syncSendPacket(command);
713                } finally {
714                    Thread.currentThread().interrupt();
715                }               
716            }
717            
718            throw e;
719        }
720    }
721
722    /**
723     * Converts a JMSException from the server to an XAException. if the
724     * JMSException contained a linked XAException that is returned instead.
725     * 
726     * @param e JMSException to convert
727     * @return XAException wrapping original exception or its message
728     */
729    private XAException toXAException(JMSException e) {
730        if (e.getCause() != null && e.getCause() instanceof XAException) {
731            XAException original = (XAException)e.getCause();
732            XAException xae = new XAException(original.getMessage());
733            xae.errorCode = original.errorCode;
734            xae.initCause(original);
735            return xae;
736        }
737
738        XAException xae = new XAException(e.getMessage());
739        xae.errorCode = XAException.XAER_RMFAIL;
740        xae.initCause(e);
741        return xae;
742    }
743
744    public ActiveMQConnection getConnection() {
745        return connection;
746    }
747
748    public void cleanup() {
749        associatedXid = null;
750        transactionId = null;
751    }
752
753    @Override
754    public String toString() {
755        return "TransactionContext{" +
756                "transactionId=" + transactionId +
757                '}';
758    }
759}