001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transaction; 018 019import java.io.IOException; 020import javax.transaction.xa.XAException; 021import javax.transaction.xa.XAResource; 022import org.apache.activemq.broker.TransactionBroker; 023import org.apache.activemq.command.ConnectionId; 024import org.apache.activemq.command.TransactionId; 025import org.apache.activemq.command.XATransactionId; 026import org.apache.activemq.store.TransactionStore; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * 032 */ 033public class XATransaction extends Transaction { 034 035 private static final Logger LOG = LoggerFactory.getLogger(XATransaction.class); 036 037 private final TransactionStore transactionStore; 038 private final XATransactionId xid; 039 private final TransactionBroker broker; 040 private final ConnectionId connectionId; 041 042 public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker broker, ConnectionId connectionId) { 043 this.transactionStore = transactionStore; 044 this.xid = xid; 045 this.broker = broker; 046 this.connectionId = connectionId; 047 if (LOG.isDebugEnabled()) { 048 LOG.debug("XA Transaction new/begin : " + xid); 049 } 050 } 051 052 @Override 053 public void commit(boolean onePhase) throws XAException, IOException { 054 if (LOG.isDebugEnabled()) { 055 LOG.debug("XA Transaction commit: " + xid); 056 } 057 058 switch (getState()) { 059 case START_STATE: 060 // 1 phase commit, no work done. 061 checkForPreparedState(onePhase); 062 setStateFinished(); 063 break; 064 case IN_USE_STATE: 065 // 1 phase commit, work done. 066 checkForPreparedState(onePhase); 067 doPrePrepare(); 068 setStateFinished(); 069 storeCommit(getTransactionId(), false, preCommitTask, postCommitTask); 070 break; 071 case PREPARED_STATE: 072 // 2 phase commit, work done. 073 // We would record commit here. 074 setStateFinished(); 075 storeCommit(getTransactionId(), true, preCommitTask, postCommitTask); 076 break; 077 default: 078 illegalStateTransition("commit"); 079 } 080 } 081 082 private void storeCommit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) 083 throws XAException, IOException { 084 try { 085 transactionStore.commit(getTransactionId(), wasPrepared, preCommitTask, postCommitTask); 086 waitPostCommitDone(postCommitTask); 087 } catch (XAException xae) { 088 throw xae; 089 } catch (Throwable t) { 090 LOG.warn("Store COMMIT FAILED: ", t); 091 rollback(); 092 XAException xae = new XAException("STORE COMMIT FAILED: Transaction rolled back."); 093 xae.errorCode = XAException.XA_RBOTHER; 094 xae.initCause(t); 095 throw xae; 096 } 097 } 098 099 private void illegalStateTransition(String callName) throws XAException { 100 XAException xae = new XAException("Cannot call " + callName + " now."); 101 xae.errorCode = XAException.XAER_PROTO; 102 throw xae; 103 } 104 105 private void checkForPreparedState(boolean onePhase) throws XAException { 106 if (!onePhase) { 107 XAException xae = new XAException("Cannot do 2 phase commit if the transaction has not been prepared."); 108 xae.errorCode = XAException.XAER_PROTO; 109 throw xae; 110 } 111 } 112 113 private void doPrePrepare() throws XAException, IOException { 114 try { 115 prePrepare(); 116 } catch (XAException e) { 117 throw e; 118 } catch (Throwable e) { 119 LOG.warn("PRE-PREPARE FAILED: ", e); 120 rollback(); 121 XAException xae = new XAException("PRE-PREPARE FAILED: Transaction rolled back."); 122 xae.errorCode = XAException.XA_RBOTHER; 123 xae.initCause(e); 124 throw xae; 125 } 126 } 127 128 @Override 129 public void rollback() throws XAException, IOException { 130 131 if (LOG.isDebugEnabled()) { 132 LOG.debug("XA Transaction rollback: " + xid); 133 } 134 135 switch (getState()) { 136 case START_STATE: 137 // 1 phase rollback no work done. 138 setStateFinished(); 139 break; 140 case IN_USE_STATE: 141 // 1 phase rollback work done. 142 setStateFinished(); 143 transactionStore.rollback(getTransactionId()); 144 doPostRollback(); 145 break; 146 case PREPARED_STATE: 147 // 2 phase rollback work done. 148 setStateFinished(); 149 transactionStore.rollback(getTransactionId()); 150 doPostRollback(); 151 break; 152 case FINISHED_STATE: 153 // failure to commit 154 transactionStore.rollback(getTransactionId()); 155 doPostRollback(); 156 break; 157 default: 158 throw new XAException("Invalid state"); 159 } 160 161 } 162 163 private void doPostRollback() throws XAException { 164 try { 165 fireAfterRollback(); 166 } catch (Throwable e) { 167 // I guess this could happen. Post commit task failed 168 // to execute properly. 169 LOG.warn("POST ROLLBACK FAILED: ", e); 170 XAException xae = new XAException("POST ROLLBACK FAILED"); 171 xae.errorCode = XAException.XAER_RMERR; 172 xae.initCause(e); 173 throw xae; 174 } 175 } 176 177 @Override 178 public int prepare() throws XAException, IOException { 179 if (LOG.isDebugEnabled()) { 180 LOG.debug("XA Transaction prepare: " + xid); 181 } 182 183 switch (getState()) { 184 case START_STATE: 185 // No work done.. no commit/rollback needed. 186 setStateFinished(); 187 return XAResource.XA_RDONLY; 188 case IN_USE_STATE: 189 // We would record prepare here. 190 doPrePrepare(); 191 setState(Transaction.PREPARED_STATE); 192 transactionStore.prepare(getTransactionId()); 193 return XAResource.XA_OK; 194 default: 195 illegalStateTransition("prepare"); 196 return XAResource.XA_RDONLY; 197 } 198 } 199 200 private void setStateFinished() { 201 setState(Transaction.FINISHED_STATE); 202 broker.removeTransaction(xid); 203 } 204 205 public ConnectionId getConnectionId() { 206 return connectionId; 207 } 208 209 @Override 210 public TransactionId getTransactionId() { 211 return xid; 212 } 213 214 @Override 215 public Logger getLog() { 216 return LOG; 217 } 218}