001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019 020import java.util.*; 021import java.util.concurrent.ConcurrentHashMap; 022 023import javax.jms.JMSException; 024import javax.transaction.xa.XAException; 025 026import org.apache.activemq.ActiveMQMessageAudit; 027import org.apache.activemq.command.ConnectionInfo; 028import org.apache.activemq.command.LocalTransactionId; 029import org.apache.activemq.command.Message; 030import org.apache.activemq.command.MessageAck; 031import org.apache.activemq.command.ProducerInfo; 032import org.apache.activemq.command.TransactionId; 033import org.apache.activemq.command.XATransactionId; 034import org.apache.activemq.state.ProducerState; 035import org.apache.activemq.store.TransactionRecoveryListener; 036import org.apache.activemq.store.TransactionStore; 037import org.apache.activemq.transaction.LocalTransaction; 038import org.apache.activemq.transaction.Synchronization; 039import org.apache.activemq.transaction.Transaction; 040import org.apache.activemq.transaction.XATransaction; 041import org.apache.activemq.util.IOExceptionSupport; 042import org.apache.activemq.util.WrappedException; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * This broker filter handles the transaction related operations in the Broker 048 * interface. 049 * 050 * 051 */ 052public class TransactionBroker extends BrokerFilter { 053 054 private static final Logger LOG = LoggerFactory.getLogger(TransactionBroker.class); 055 056 // The prepared XA transactions. 057 private TransactionStore transactionStore; 058 private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>(); 059 private ActiveMQMessageAudit audit; 060 061 public TransactionBroker(Broker next, TransactionStore transactionStore) { 062 super(next); 063 this.transactionStore = transactionStore; 064 } 065 066 // //////////////////////////////////////////////////////////////////////////// 067 // 068 // Life cycle Methods 069 // 070 // //////////////////////////////////////////////////////////////////////////// 071 072 /** 073 * Recovers any prepared transactions. 074 */ 075 public void start() throws Exception { 076 transactionStore.start(); 077 try { 078 final ConnectionContext context = new ConnectionContext(); 079 context.setBroker(this); 080 context.setInRecoveryMode(true); 081 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>()); 082 context.setProducerFlowControl(false); 083 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 084 producerExchange.setMutable(true); 085 producerExchange.setConnectionContext(context); 086 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 087 final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange(); 088 consumerExchange.setConnectionContext(context); 089 transactionStore.recover(new TransactionRecoveryListener() { 090 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) { 091 try { 092 beginTransaction(context, xid); 093 for (int i = 0; i < addedMessages.length; i++) { 094 send(producerExchange, addedMessages[i]); 095 } 096 for (int i = 0; i < aks.length; i++) { 097 acknowledge(consumerExchange, aks[i]); 098 } 099 prepareTransaction(context, xid); 100 } catch (Throwable e) { 101 throw new WrappedException(e); 102 } 103 } 104 }); 105 } catch (WrappedException e) { 106 Throwable cause = e.getCause(); 107 throw IOExceptionSupport.create("Recovery Failed: " + cause.getMessage(), cause); 108 } 109 next.start(); 110 } 111 112 public void stop() throws Exception { 113 transactionStore.stop(); 114 next.stop(); 115 } 116 117 // //////////////////////////////////////////////////////////////////////////// 118 // 119 // BrokerFilter overrides 120 // 121 // //////////////////////////////////////////////////////////////////////////// 122 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 123 List<TransactionId> txs = new ArrayList<TransactionId>(); 124 synchronized (xaTransactions) { 125 for (Iterator<XATransaction> iter = xaTransactions.values().iterator(); iter.hasNext();) { 126 Transaction tx = iter.next(); 127 if (tx.isPrepared()) { 128 if (LOG.isDebugEnabled()) { 129 LOG.debug("prepared transaction: " + tx.getTransactionId()); 130 } 131 txs.add(tx.getTransactionId()); 132 } 133 } 134 } 135 XATransactionId rc[] = new XATransactionId[txs.size()]; 136 txs.toArray(rc); 137 if (LOG.isDebugEnabled()) { 138 LOG.debug("prepared transacton list size: " + rc.length); 139 } 140 return rc; 141 } 142 143 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 144 // the transaction may have already been started. 145 if (xid.isXATransaction()) { 146 XATransaction transaction = null; 147 synchronized (xaTransactions) { 148 transaction = xaTransactions.get(xid); 149 if (transaction != null) { 150 return; 151 } 152 transaction = new XATransaction(transactionStore, (XATransactionId)xid, this, context.getConnectionId()); 153 xaTransactions.put(xid, transaction); 154 } 155 } else { 156 Map<TransactionId, Transaction> transactionMap = context.getTransactions(); 157 Transaction transaction = transactionMap.get(xid); 158 if (transaction != null) { 159 throw new JMSException("Transaction '" + xid + "' has already been started."); 160 } 161 transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context); 162 transactionMap.put(xid, transaction); 163 } 164 } 165 166 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 167 Transaction transaction = getTransaction(context, xid, false); 168 return transaction.prepare(); 169 } 170 171 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 172 Transaction transaction = getTransaction(context, xid, true); 173 transaction.commit(onePhase); 174 } 175 176 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 177 Transaction transaction = getTransaction(context, xid, true); 178 transaction.rollback(); 179 } 180 181 public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception { 182 Transaction transaction = getTransaction(context, xid, true); 183 transaction.rollback(); 184 } 185 186 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 187 // This method may be invoked recursively. 188 // Track original tx so that it can be restored. 189 final ConnectionContext context = consumerExchange.getConnectionContext(); 190 Transaction originalTx = context.getTransaction(); 191 Transaction transaction = null; 192 if (ack.isInTransaction()) { 193 transaction = getTransaction(context, ack.getTransactionId(), false); 194 } 195 context.setTransaction(transaction); 196 try { 197 next.acknowledge(consumerExchange, ack); 198 } finally { 199 context.setTransaction(originalTx); 200 } 201 } 202 203 public void send(ProducerBrokerExchange producerExchange, final Message message) throws Exception { 204 // This method may be invoked recursively. 205 // Track original tx so that it can be restored. 206 final ConnectionContext context = producerExchange.getConnectionContext(); 207 Transaction originalTx = context.getTransaction(); 208 Transaction transaction = null; 209 Synchronization sync = null; 210 if (message.getTransactionId() != null) { 211 transaction = getTransaction(context, message.getTransactionId(), false); 212 if (transaction != null) { 213 sync = new Synchronization() { 214 215 public void afterRollback() { 216 if (audit != null) { 217 audit.rollback(message); 218 } 219 } 220 }; 221 transaction.addSynchronization(sync); 222 } 223 } 224 if (audit == null || !audit.isDuplicate(message)) { 225 context.setTransaction(transaction); 226 try { 227 next.send(producerExchange, message); 228 } finally { 229 context.setTransaction(originalTx); 230 } 231 } else { 232 if (sync != null && transaction != null) { 233 transaction.removeSynchronization(sync); 234 } 235 if (LOG.isDebugEnabled()) { 236 LOG.debug("IGNORING duplicate message " + message); 237 } 238 } 239 } 240 241 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 242 for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) { 243 try { 244 Transaction transaction = iter.next(); 245 transaction.rollback(); 246 } catch (Exception e) { 247 LOG.warn("ERROR Rolling back disconnected client's transactions: ", e); 248 } 249 iter.remove(); 250 } 251 252 synchronized (xaTransactions) { 253 // first find all txs that belongs to the connection 254 ArrayList<XATransaction> txs = new ArrayList<XATransaction>(); 255 for (XATransaction tx : xaTransactions.values()) { 256 if (tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) { 257 txs.add(tx); 258 } 259 } 260 261 // then remove them 262 // two steps needed to avoid ConcurrentModificationException, from removeTransaction() 263 for (XATransaction tx : txs) { 264 try { 265 tx.rollback(); 266 } catch (Exception e) { 267 LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e); 268 } 269 } 270 271 } 272 next.removeConnection(context, info, error); 273 } 274 275 // //////////////////////////////////////////////////////////////////////////// 276 // 277 // Implementation help methods. 278 // 279 // //////////////////////////////////////////////////////////////////////////// 280 public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException { 281 Map transactionMap = null; 282 synchronized (xaTransactions) { 283 transactionMap = xid.isXATransaction() ? xaTransactions : context.getTransactions(); 284 } 285 Transaction transaction = (Transaction)transactionMap.get(xid); 286 if (transaction != null) { 287 return transaction; 288 } 289 if (xid.isXATransaction()) { 290 XAException e = new XAException("Transaction '" + xid + "' has not been started."); 291 e.errorCode = XAException.XAER_NOTA; 292 throw e; 293 } else { 294 throw new JMSException("Transaction '" + xid + "' has not been started."); 295 } 296 } 297 298 public void removeTransaction(XATransactionId xid) { 299 synchronized (xaTransactions) { 300 xaTransactions.remove(xid); 301 } 302 } 303 304 public synchronized void brokerServiceStarted() { 305 super.brokerServiceStarted(); 306 if (getBrokerService().isSupportFailOver() && audit == null) { 307 audit = new ActiveMQMessageAudit(); 308 } 309 } 310 311}