001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.CancellationException;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.Future;
029import javax.transaction.xa.XAException;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.command.Message;
032import org.apache.activemq.command.MessageAck;
033import org.apache.activemq.command.MessageId;
034import org.apache.activemq.command.TransactionId;
035import org.apache.activemq.command.XATransactionId;
036import org.apache.activemq.openwire.OpenWireFormat;
037import org.apache.activemq.protobuf.Buffer;
038import org.apache.activemq.store.AbstractMessageStore;
039import org.apache.activemq.store.MessageStore;
040import org.apache.activemq.store.ProxyMessageStore;
041import org.apache.activemq.store.ProxyTopicMessageStore;
042import org.apache.activemq.store.TopicMessageStore;
043import org.apache.activemq.store.TransactionRecoveryListener;
044import org.apache.activemq.store.TransactionStore;
045import org.apache.activemq.store.kahadb.MessageDatabase.AddOpperation;
046import org.apache.activemq.store.kahadb.MessageDatabase.Operation;
047import org.apache.activemq.store.kahadb.MessageDatabase.RemoveOpperation;
048import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
049import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
050import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
051import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
052import org.apache.activemq.wireformat.WireFormat;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * Provides a TransactionStore implementation that can create transaction aware
058 * MessageStore objects from non transaction aware MessageStore objects.
059 * 
060 * 
061 */
062public class KahaDBTransactionStore implements TransactionStore {
063    static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
064    ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
065    private final WireFormat wireFormat = new OpenWireFormat();
066    private final KahaDBStore theStore;
067
068    public KahaDBTransactionStore(KahaDBStore theStore) {
069        this.theStore = theStore;
070    }
071
072    public class Tx {
073        private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
074
075        private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
076
077        public void add(AddMessageCommand msg) {
078            messages.add(msg);
079        }
080
081        public void add(RemoveMessageCommand ack) {
082            acks.add(ack);
083        }
084
085        public Message[] getMessages() {
086            Message rc[] = new Message[messages.size()];
087            int count = 0;
088            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
089                AddMessageCommand cmd = iter.next();
090                rc[count++] = cmd.getMessage();
091            }
092            return rc;
093        }
094
095        public MessageAck[] getAcks() {
096            MessageAck rc[] = new MessageAck[acks.size()];
097            int count = 0;
098            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
099                RemoveMessageCommand cmd = iter.next();
100                rc[count++] = cmd.getMessageAck();
101            }
102            return rc;
103        }
104
105        /**
106         * @return true if something to commit
107         * @throws IOException
108         */
109        public List<Future<Object>> commit() throws IOException {
110            List<Future<Object>> results = new ArrayList<Future<Object>>();
111            // Do all the message adds.
112            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
113                AddMessageCommand cmd = iter.next();
114                results.add(cmd.run());
115
116            }
117            // And removes..
118            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
119                RemoveMessageCommand cmd = iter.next();
120                cmd.run();
121                results.add(cmd.run());
122            }
123            
124            return results;
125        }
126    }
127
128    public abstract class AddMessageCommand {
129        private final ConnectionContext ctx;
130        AddMessageCommand(ConnectionContext ctx) {
131            this.ctx = ctx;
132        }
133        abstract Message getMessage();
134        Future<Object> run() throws IOException {
135            return run(this.ctx);
136        }
137        abstract Future<Object> run(ConnectionContext ctx) throws IOException;
138    }
139
140    public abstract class RemoveMessageCommand {
141
142        private final ConnectionContext ctx;
143        RemoveMessageCommand(ConnectionContext ctx) {
144            this.ctx = ctx;
145        }
146        abstract MessageAck getMessageAck();
147        Future<Object> run() throws IOException {
148            return run(this.ctx);
149        }
150        abstract Future<Object> run(ConnectionContext context) throws IOException;
151    }
152
153    public MessageStore proxy(MessageStore messageStore) {
154        return new ProxyMessageStore(messageStore) {
155            @Override
156            public void addMessage(ConnectionContext context, final Message send) throws IOException {
157                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
158            }
159
160            @Override
161            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
162                return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
163            }
164
165            @Override
166            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
167                KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
168            }
169
170            @Override
171            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
172                KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
173            }
174        };
175    }
176
177    public TopicMessageStore proxy(TopicMessageStore messageStore) {
178        return new ProxyTopicMessageStore(messageStore) {
179            @Override
180            public void addMessage(ConnectionContext context, final Message send) throws IOException {
181                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
182            }
183
184            @Override
185            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
186                return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
187            }
188
189            @Override
190            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
191                KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
192            }
193
194            @Override
195            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
196                KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
197            }
198
199            @Override
200            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
201                            MessageId messageId, MessageAck ack) throws IOException {
202                KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId,
203                        subscriptionName, messageId, ack);
204            }
205
206        };
207    }
208
209    /**
210     * @throws IOException
211     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
212     */
213    public void prepare(TransactionId txid) throws IOException {
214        inflightTransactions.remove(txid);
215        KahaTransactionInfo info = getTransactionInfo(txid);
216        theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
217    }
218
219    public Tx getTx(Object txid) {
220        Tx tx = inflightTransactions.get(txid);
221        if (tx == null) {
222            tx = new Tx();
223            inflightTransactions.put(txid, tx);
224        }
225        return tx;
226    }
227
228    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
229            throws IOException {
230        if (txid != null) {
231            if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) {
232                if (preCommit != null) {
233                    preCommit.run();
234                }
235                Tx tx = inflightTransactions.remove(txid);
236                if (tx != null) {
237                    List<Future<Object>> results = tx.commit();
238                    boolean doneSomething = false;
239                    for (Future<Object> result : results) {
240                        try {
241                            result.get();
242                        } catch (InterruptedException e) {
243                            theStore.brokerService.handleIOException(new IOException(e.getMessage()));
244                        } catch (ExecutionException e) {
245                            theStore.brokerService.handleIOException(new IOException(e.getMessage()));
246                        }catch(CancellationException e) {
247                        }
248                        if (!result.isCancelled()) {
249                            doneSomething = true;
250                        }
251                    }
252                    if (postCommit != null) {
253                        postCommit.run();
254                    }
255                    if (doneSomething) {
256                        KahaTransactionInfo info = getTransactionInfo(txid);
257                        theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, null, null);
258                    }
259                }else {
260                    //The Tx will be null for failed over clients - lets run their post commits
261                    if (postCommit != null) {
262                        postCommit.run();
263                    }
264                }
265
266            } else {
267                KahaTransactionInfo info = getTransactionInfo(txid);
268                // ensure message order w.r.t to cursor and store for setBatch()
269                synchronized (this) {
270                    theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
271                }
272            }
273        }else {
274           LOG.error("Null transaction passed on commit");
275        }
276    }
277
278    /**
279     * @throws IOException
280     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
281     */
282    public void rollback(TransactionId txid) throws IOException {
283        if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
284            KahaTransactionInfo info = getTransactionInfo(txid);
285            theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
286        } else {
287            inflightTransactions.remove(txid);
288        }
289    }
290
291    public void start() throws Exception {
292    }
293
294    public void stop() throws Exception {
295    }
296
297    public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
298        // All the inflight transactions get rolled back..
299        // inflightTransactions.clear();
300        for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
301            XATransactionId xid = (XATransactionId) entry.getKey();
302            ArrayList<Message> messageList = new ArrayList<Message>();
303            ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
304
305            for (Operation op : entry.getValue()) {
306                if (op.getClass() == AddOpperation.class) {
307                    AddOpperation addOp = (AddOpperation) op;
308                    Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand().getMessage()
309                            .newInput()));
310                    messageList.add(msg);
311                } else {
312                    RemoveOpperation rmOp = (RemoveOpperation) op;
313                    Buffer ackb = rmOp.getCommand().getAck();
314                    MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(ackb.newInput()));
315                    ackList.add(ack);
316                }
317            }
318
319            Message[] addedMessages = new Message[messageList.size()];
320            MessageAck[] acks = new MessageAck[ackList.size()];
321            messageList.toArray(addedMessages);
322            ackList.toArray(acks);
323            listener.recover(xid, addedMessages, acks);
324        }
325    }
326
327    /**
328     * @param message
329     * @throws IOException
330     */
331    void addMessage(ConnectionContext context, final MessageStore destination, final Message message)
332            throws IOException {
333
334        if (message.getTransactionId() != null) {
335            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
336                destination.addMessage(context, message);
337            } else {
338                Tx tx = getTx(message.getTransactionId());
339                tx.add(new AddMessageCommand(context) {
340                    @Override
341                    public Message getMessage() {
342                        return message;
343                    }
344                    @Override
345                    public Future<Object> run(ConnectionContext ctx) throws IOException {
346                        destination.addMessage(ctx, message);
347                        return AbstractMessageStore.FUTURE;
348                    }
349
350                });
351            }
352        } else {
353            destination.addMessage(context, message);
354        }
355    }
356
357    Future<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
358            throws IOException {
359
360        if (message.getTransactionId() != null) {
361            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
362                destination.addMessage(context, message);
363                return AbstractMessageStore.FUTURE;
364            } else {
365                Tx tx = getTx(message.getTransactionId());
366                tx.add(new AddMessageCommand(context) {
367                    @Override
368                    public Message getMessage() {
369                        return message;
370                    }
371                    @Override
372                    public Future<Object> run(ConnectionContext ctx) throws IOException {
373                        return destination.asyncAddQueueMessage(ctx, message);
374                    }
375
376                });
377                return AbstractMessageStore.FUTURE;
378            }
379        } else {
380            return destination.asyncAddQueueMessage(context, message);
381        }
382    }
383
384    Future<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
385            throws IOException {
386
387        if (message.getTransactionId() != null) {
388            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
389                destination.addMessage(context, message);
390                return AbstractMessageStore.FUTURE;
391            } else {
392                Tx tx = getTx(message.getTransactionId());
393                tx.add(new AddMessageCommand(context) {
394                    @Override
395                    public Message getMessage() {
396                        return message;
397                    }
398                    @Override
399                    public Future run(ConnectionContext ctx) throws IOException {
400                        return destination.asyncAddTopicMessage(ctx, message);
401                    }
402
403                });
404                return AbstractMessageStore.FUTURE;
405            }
406        } else {
407            return destination.asyncAddTopicMessage(context, message);
408        }
409    }
410
411    /**
412     * @param ack
413     * @throws IOException
414     */
415    final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
416            throws IOException {
417
418        if (ack.isInTransaction()) {
419            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
420                destination.removeMessage(context, ack);
421            } else {
422                Tx tx = getTx(ack.getTransactionId());
423                tx.add(new RemoveMessageCommand(context) {
424                    @Override
425                    public MessageAck getMessageAck() {
426                        return ack;
427                    }
428
429                    @Override
430                    public Future<Object> run(ConnectionContext ctx) throws IOException {
431                        destination.removeMessage(ctx, ack);
432                        return AbstractMessageStore.FUTURE;
433                    }
434                });
435            }
436        } else {
437            destination.removeMessage(context, ack);
438        }
439    }
440
441    final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
442            throws IOException {
443
444        if (ack.isInTransaction()) {
445            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
446                destination.removeAsyncMessage(context, ack);
447            } else {
448                Tx tx = getTx(ack.getTransactionId());
449                tx.add(new RemoveMessageCommand(context) {
450                    @Override
451                    public MessageAck getMessageAck() {
452                        return ack;
453                    }
454
455                    @Override
456                    public Future<Object> run(ConnectionContext ctx) throws IOException {
457                        destination.removeMessage(ctx, ack);
458                        return AbstractMessageStore.FUTURE;
459                    }
460                });
461            }
462        } else {
463            destination.removeAsyncMessage(context, ack);
464        }
465    }
466
467    final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName,
468                           final MessageId messageId, final MessageAck ack) throws IOException {
469
470        if (ack.isInTransaction()) {
471            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
472                destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
473            } else {
474                Tx tx = getTx(ack.getTransactionId());
475                tx.add(new RemoveMessageCommand(context) {
476                    public MessageAck getMessageAck() {
477                        return ack;
478                    }
479
480                    public Future<Object> run(ConnectionContext ctx) throws IOException {
481                        destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
482                        return AbstractMessageStore.FUTURE;
483                    }
484                });
485            }
486        } else {
487            destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
488        }
489    }
490
491
492    private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
493        return theStore.createTransactionInfo(txid);
494    }
495
496}