001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transaction;
018
019import java.io.IOException;
020import javax.transaction.xa.XAException;
021import javax.transaction.xa.XAResource;
022import org.apache.activemq.broker.TransactionBroker;
023import org.apache.activemq.command.ConnectionId;
024import org.apache.activemq.command.TransactionId;
025import org.apache.activemq.command.XATransactionId;
026import org.apache.activemq.store.TransactionStore;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * 
032 */
033public class XATransaction extends Transaction {
034
035    private static final Logger LOG = LoggerFactory.getLogger(XATransaction.class);
036
037    private final TransactionStore transactionStore;
038    private final XATransactionId xid;
039    private final TransactionBroker broker;
040    private final ConnectionId connectionId;
041
042    public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker broker, ConnectionId connectionId) {
043        this.transactionStore = transactionStore;
044        this.xid = xid;
045        this.broker = broker;
046        this.connectionId = connectionId;
047        if (LOG.isDebugEnabled()) {
048            LOG.debug("XA Transaction new/begin : " + xid);
049        }
050    }
051
052    @Override
053    public void commit(boolean onePhase) throws XAException, IOException {
054        if (LOG.isDebugEnabled()) {
055            LOG.debug("XA Transaction commit: " + xid);
056        }
057
058        switch (getState()) {
059        case START_STATE:
060            // 1 phase commit, no work done.
061            checkForPreparedState(onePhase);
062            setStateFinished();
063            break;
064        case IN_USE_STATE:
065            // 1 phase commit, work done.
066            checkForPreparedState(onePhase);
067            doPrePrepare();
068            setStateFinished();
069            storeCommit(getTransactionId(), false, preCommitTask, postCommitTask);
070            break;
071        case PREPARED_STATE:
072            // 2 phase commit, work done.
073            // We would record commit here.
074            setStateFinished();
075            storeCommit(getTransactionId(), true, preCommitTask, postCommitTask);
076            break;
077        default:
078            illegalStateTransition("commit");
079        }
080    }
081
082    private void storeCommit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit)
083            throws XAException, IOException {
084        try {
085            transactionStore.commit(getTransactionId(), wasPrepared, preCommitTask, postCommitTask);
086            waitPostCommitDone(postCommitTask);
087        } catch (XAException xae) {
088            throw xae;
089        } catch (Throwable t) {
090            LOG.warn("Store COMMIT FAILED: ", t);
091            rollback();
092            XAException xae = new XAException("STORE COMMIT FAILED: Transaction rolled back.");
093            xae.errorCode = XAException.XA_RBOTHER;
094            xae.initCause(t);
095            throw xae;
096        }
097    }
098
099    private void illegalStateTransition(String callName) throws XAException {
100        XAException xae = new XAException("Cannot call " + callName + " now.");
101        xae.errorCode = XAException.XAER_PROTO;
102        throw xae;
103    }
104
105    private void checkForPreparedState(boolean onePhase) throws XAException {
106        if (!onePhase) {
107            XAException xae = new XAException("Cannot do 2 phase commit if the transaction has not been prepared.");
108            xae.errorCode = XAException.XAER_PROTO;
109            throw xae;
110        }
111    }
112
113    private void doPrePrepare() throws XAException, IOException {
114        try {
115            prePrepare();
116        } catch (XAException e) {
117            throw e;
118        } catch (Throwable e) {
119            LOG.warn("PRE-PREPARE FAILED: ", e);
120            rollback();
121            XAException xae = new XAException("PRE-PREPARE FAILED: Transaction rolled back.");
122            xae.errorCode = XAException.XA_RBOTHER;
123            xae.initCause(e);
124            throw xae;
125        }
126    }
127
128    @Override
129    public void rollback() throws XAException, IOException {
130
131        if (LOG.isDebugEnabled()) {
132            LOG.debug("XA Transaction rollback: " + xid);
133        }
134
135        switch (getState()) {
136        case START_STATE:
137            // 1 phase rollback no work done.
138            setStateFinished();
139            break;
140        case IN_USE_STATE:
141            // 1 phase rollback work done.
142            setStateFinished();
143            transactionStore.rollback(getTransactionId());
144            doPostRollback();
145            break;
146        case PREPARED_STATE:
147            // 2 phase rollback work done.
148            setStateFinished();
149            transactionStore.rollback(getTransactionId());
150            doPostRollback();
151            break;
152        case FINISHED_STATE:
153            // failure to commit
154            transactionStore.rollback(getTransactionId());
155            doPostRollback();
156            break;
157        default:
158            throw new XAException("Invalid state");
159        }
160
161    }
162
163    private void doPostRollback() throws XAException {
164        try {
165            fireAfterRollback();
166        } catch (Throwable e) {
167            // I guess this could happen. Post commit task failed
168            // to execute properly.
169            LOG.warn("POST ROLLBACK FAILED: ", e);
170            XAException xae = new XAException("POST ROLLBACK FAILED");
171            xae.errorCode = XAException.XAER_RMERR;
172            xae.initCause(e);
173            throw xae;
174        }
175    }
176
177    @Override
178    public int prepare() throws XAException, IOException {
179        if (LOG.isDebugEnabled()) {
180            LOG.debug("XA Transaction prepare: " + xid);
181        }
182
183        switch (getState()) {
184        case START_STATE:
185            // No work done.. no commit/rollback needed.
186            setStateFinished();
187            return XAResource.XA_RDONLY;
188        case IN_USE_STATE:
189            // We would record prepare here.
190            doPrePrepare();
191            setState(Transaction.PREPARED_STATE);
192            transactionStore.prepare(getTransactionId());
193            return XAResource.XA_OK;
194        default:
195            illegalStateTransition("prepare");
196            return XAResource.XA_RDONLY;
197        }
198    }
199
200    private void setStateFinished() {
201        setState(Transaction.FINISHED_STATE);
202        broker.removeTransaction(xid);
203    }
204
205    public ConnectionId getConnectionId() {
206        return connectionId;
207    }
208
209    @Override
210    public TransactionId getTransactionId() {
211        return xid;
212    }
213    
214    @Override
215    public Logger getLog() {
216        return LOG;
217    }
218}