001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019
020import java.util.*;
021import java.util.concurrent.ConcurrentHashMap;
022
023import javax.jms.JMSException;
024import javax.transaction.xa.XAException;
025
026import org.apache.activemq.ActiveMQMessageAudit;
027import org.apache.activemq.command.ConnectionInfo;
028import org.apache.activemq.command.LocalTransactionId;
029import org.apache.activemq.command.Message;
030import org.apache.activemq.command.MessageAck;
031import org.apache.activemq.command.ProducerInfo;
032import org.apache.activemq.command.TransactionId;
033import org.apache.activemq.command.XATransactionId;
034import org.apache.activemq.state.ProducerState;
035import org.apache.activemq.store.TransactionRecoveryListener;
036import org.apache.activemq.store.TransactionStore;
037import org.apache.activemq.transaction.LocalTransaction;
038import org.apache.activemq.transaction.Synchronization;
039import org.apache.activemq.transaction.Transaction;
040import org.apache.activemq.transaction.XATransaction;
041import org.apache.activemq.util.IOExceptionSupport;
042import org.apache.activemq.util.WrappedException;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * This broker filter handles the transaction related operations in the Broker
048 * interface.
049 * 
050 * 
051 */
052public class TransactionBroker extends BrokerFilter {
053
054    private static final Logger LOG = LoggerFactory.getLogger(TransactionBroker.class);
055
056    // The prepared XA transactions.
057    private TransactionStore transactionStore;
058    private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>();
059    private ActiveMQMessageAudit audit;
060
061    public TransactionBroker(Broker next, TransactionStore transactionStore) {
062        super(next);
063        this.transactionStore = transactionStore;
064    }
065
066    // ////////////////////////////////////////////////////////////////////////////
067    //
068    // Life cycle Methods
069    //
070    // ////////////////////////////////////////////////////////////////////////////
071
072    /**
073     * Recovers any prepared transactions.
074     */
075    public void start() throws Exception {
076        transactionStore.start();
077        try {
078            final ConnectionContext context = new ConnectionContext();
079            context.setBroker(this);
080            context.setInRecoveryMode(true);
081            context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
082            context.setProducerFlowControl(false);
083            final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
084            producerExchange.setMutable(true);
085            producerExchange.setConnectionContext(context);
086            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
087            final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
088            consumerExchange.setConnectionContext(context);
089            transactionStore.recover(new TransactionRecoveryListener() {
090                public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
091                    try {
092                        beginTransaction(context, xid);
093                        for (int i = 0; i < addedMessages.length; i++) {
094                            send(producerExchange, addedMessages[i]);
095                        }
096                        for (int i = 0; i < aks.length; i++) {
097                            acknowledge(consumerExchange, aks[i]);
098                        }
099                        prepareTransaction(context, xid);
100                    } catch (Throwable e) {
101                        throw new WrappedException(e);
102                    }
103                }
104            });
105        } catch (WrappedException e) {
106            Throwable cause = e.getCause();
107            throw IOExceptionSupport.create("Recovery Failed: " + cause.getMessage(), cause);
108        }
109        next.start();
110    }
111
112    public void stop() throws Exception {
113        transactionStore.stop();
114        next.stop();
115    }
116
117    // ////////////////////////////////////////////////////////////////////////////
118    //
119    // BrokerFilter overrides
120    //
121    // ////////////////////////////////////////////////////////////////////////////
122    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
123        List<TransactionId> txs = new ArrayList<TransactionId>();
124        synchronized (xaTransactions) {
125            for (Iterator<XATransaction> iter = xaTransactions.values().iterator(); iter.hasNext();) {
126                Transaction tx = iter.next();
127                if (tx.isPrepared()) {
128                    if (LOG.isDebugEnabled()) {
129                        LOG.debug("prepared transaction: " + tx.getTransactionId());
130                    }
131                    txs.add(tx.getTransactionId());
132                }
133            }
134        }
135        XATransactionId rc[] = new XATransactionId[txs.size()];
136        txs.toArray(rc);
137        if (LOG.isDebugEnabled()) {
138            LOG.debug("prepared transacton list size: " + rc.length);
139        }
140        return rc;
141    }
142
143    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
144        // the transaction may have already been started.
145        if (xid.isXATransaction()) {
146            XATransaction transaction = null;
147            synchronized (xaTransactions) {
148                transaction = xaTransactions.get(xid);
149                if (transaction != null) {
150                    return;
151                }
152                transaction = new XATransaction(transactionStore, (XATransactionId)xid, this, context.getConnectionId());
153                xaTransactions.put(xid, transaction);
154            }
155        } else {
156            Map<TransactionId, Transaction> transactionMap = context.getTransactions();
157            Transaction transaction = transactionMap.get(xid);
158            if (transaction != null) {
159                throw new JMSException("Transaction '" + xid + "' has already been started.");
160            }
161            transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context);
162            transactionMap.put(xid, transaction);
163        }
164    }
165
166    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
167        Transaction transaction = getTransaction(context, xid, false);
168        return transaction.prepare();
169    }
170
171    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
172        Transaction transaction = getTransaction(context, xid, true);
173        transaction.commit(onePhase);
174    }
175
176    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
177        Transaction transaction = getTransaction(context, xid, true);
178        transaction.rollback();
179    }
180
181    public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
182        Transaction transaction = getTransaction(context, xid, true);
183        transaction.rollback();
184    }
185
186    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
187        // This method may be invoked recursively.
188        // Track original tx so that it can be restored.
189        final ConnectionContext context = consumerExchange.getConnectionContext();
190        Transaction originalTx = context.getTransaction();
191        Transaction transaction = null;
192        if (ack.isInTransaction()) {
193            transaction = getTransaction(context, ack.getTransactionId(), false);
194        }
195        context.setTransaction(transaction);
196        try {
197            next.acknowledge(consumerExchange, ack);
198        } finally {
199            context.setTransaction(originalTx);
200        }
201    }
202
203    public void send(ProducerBrokerExchange producerExchange, final Message message) throws Exception {
204        // This method may be invoked recursively.
205        // Track original tx so that it can be restored.
206        final ConnectionContext context = producerExchange.getConnectionContext();
207        Transaction originalTx = context.getTransaction();
208        Transaction transaction = null;
209        Synchronization sync = null;
210        if (message.getTransactionId() != null) {
211            transaction = getTransaction(context, message.getTransactionId(), false);
212            if (transaction != null) {
213                sync = new Synchronization() {
214
215                    public void afterRollback() {
216                        if (audit != null) {
217                            audit.rollback(message);
218                        }
219                    }
220                };
221                transaction.addSynchronization(sync);
222            }
223        }
224        if (audit == null || !audit.isDuplicate(message)) {
225            context.setTransaction(transaction);
226            try {
227                next.send(producerExchange, message);
228            } finally {
229                context.setTransaction(originalTx);
230            }
231        } else {
232            if (sync != null && transaction != null) {
233                transaction.removeSynchronization(sync);
234            }
235            if (LOG.isDebugEnabled()) {
236                LOG.debug("IGNORING duplicate message " + message);
237            }
238        }
239    }
240
241    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
242        for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) {
243            try {
244                Transaction transaction = iter.next();
245                transaction.rollback();
246            } catch (Exception e) {
247                LOG.warn("ERROR Rolling back disconnected client's transactions: ", e);
248            }
249            iter.remove();
250        }
251
252        synchronized (xaTransactions) {
253            // first find all txs that belongs to the connection
254            ArrayList<XATransaction> txs = new ArrayList<XATransaction>();
255            for (XATransaction tx : xaTransactions.values()) {
256                if (tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
257                    txs.add(tx);
258                }
259            }
260
261            // then remove them
262            // two steps needed to avoid ConcurrentModificationException, from removeTransaction()
263            for (XATransaction tx : txs) {
264                try {
265                    tx.rollback();
266                } catch (Exception e) {
267                    LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e);
268                }
269            }
270
271        }
272        next.removeConnection(context, info, error);
273    }
274
275    // ////////////////////////////////////////////////////////////////////////////
276    //
277    // Implementation help methods.
278    //
279    // ////////////////////////////////////////////////////////////////////////////
280    public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException {
281        Map transactionMap = null;
282        synchronized (xaTransactions) {
283            transactionMap = xid.isXATransaction() ? xaTransactions : context.getTransactions();
284        }
285        Transaction transaction = (Transaction)transactionMap.get(xid);
286        if (transaction != null) {
287            return transaction;
288        }
289        if (xid.isXATransaction()) {
290            XAException e = new XAException("Transaction '" + xid + "' has not been started.");
291            e.errorCode = XAException.XAER_NOTA;
292            throw e;
293        } else {
294            throw new JMSException("Transaction '" + xid + "' has not been started.");
295        }
296    }
297
298    public void removeTransaction(XATransactionId xid) {
299        synchronized (xaTransactions) {
300            xaTransactions.remove(xid);
301        }
302    }
303
304    public synchronized void brokerServiceStarted() {
305        super.brokerServiceStarted();
306        if (getBrokerService().isSupportFailOver() && audit == null) {
307            audit = new ActiveMQMessageAudit();
308        }
309    }
310
311}