001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.memory;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Iterator;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.Future;
024import javax.transaction.xa.XAException;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.command.Message;
027import org.apache.activemq.command.MessageAck;
028import org.apache.activemq.command.MessageId;
029import org.apache.activemq.command.TransactionId;
030import org.apache.activemq.command.XATransactionId;
031import org.apache.activemq.store.AbstractMessageStore;
032import org.apache.activemq.store.MessageStore;
033import org.apache.activemq.store.PersistenceAdapter;
034import org.apache.activemq.store.ProxyMessageStore;
035import org.apache.activemq.store.ProxyTopicMessageStore;
036import org.apache.activemq.store.TopicMessageStore;
037import org.apache.activemq.store.TransactionRecoveryListener;
038import org.apache.activemq.store.TransactionStore;
039
040/**
041 * Provides a TransactionStore implementation that can create transaction aware
042 * MessageStore objects from non transaction aware MessageStore objects.
043 * 
044 * 
045 */
046public class MemoryTransactionStore implements TransactionStore {
047
048    ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
049    ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
050    final PersistenceAdapter persistenceAdapter;
051
052    private boolean doingRecover;
053
054    public class Tx {
055        private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
056
057        private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
058
059        public void add(AddMessageCommand msg) {
060            messages.add(msg);
061        }
062
063        public void add(RemoveMessageCommand ack) {
064            acks.add(ack);
065        }
066
067        public Message[] getMessages() {
068            Message rc[] = new Message[messages.size()];
069            int count = 0;
070            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
071                AddMessageCommand cmd = iter.next();
072                rc[count++] = cmd.getMessage();
073            }
074            return rc;
075        }
076
077        public MessageAck[] getAcks() {
078            MessageAck rc[] = new MessageAck[acks.size()];
079            int count = 0;
080            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
081                RemoveMessageCommand cmd = iter.next();
082                rc[count++] = cmd.getMessageAck();
083            }
084            return rc;
085        }
086
087        /**
088         * @throws IOException
089         */
090        public void commit() throws IOException {
091            ConnectionContext ctx = new ConnectionContext();
092            persistenceAdapter.beginTransaction(ctx);
093            try {
094                
095                // Do all the message adds.
096                for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
097                    AddMessageCommand cmd = iter.next();
098                    cmd.run(ctx);
099                }
100                // And removes..
101                for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
102                    RemoveMessageCommand cmd = iter.next();
103                    cmd.run(ctx);
104                }
105                
106            } catch ( IOException e ) {
107                persistenceAdapter.rollbackTransaction(ctx);
108                throw e;
109            }
110            persistenceAdapter.commitTransaction(ctx);
111        }
112    }
113    
114    public interface AddMessageCommand {
115        Message getMessage();
116
117        void run(ConnectionContext context) throws IOException;
118    }
119
120    public interface RemoveMessageCommand {
121        MessageAck getMessageAck();
122
123        void run(ConnectionContext context) throws IOException;
124    }
125    
126    public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
127        this.persistenceAdapter=persistenceAdapter;
128    }
129
130    public MessageStore proxy(MessageStore messageStore) {
131        return new ProxyMessageStore(messageStore) {
132            @Override
133            public void addMessage(ConnectionContext context, final Message send) throws IOException {
134                MemoryTransactionStore.this.addMessage(getDelegate(), send);
135            }
136
137            @Override
138            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
139                MemoryTransactionStore.this.addMessage(getDelegate(), message);
140                return AbstractMessageStore.FUTURE;
141             }
142             
143            @Override
144            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
145                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
146            }
147             
148            @Override
149            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
150                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);       
151            }
152        };
153    }
154
155    public TopicMessageStore proxy(TopicMessageStore messageStore) {
156        return new ProxyTopicMessageStore(messageStore) {
157            @Override
158            public void addMessage(ConnectionContext context, final Message send) throws IOException {
159                MemoryTransactionStore.this.addMessage(getDelegate(), send);
160            }
161
162            @Override
163            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
164                MemoryTransactionStore.this.addMessage(getDelegate(), message);
165                return AbstractMessageStore.FUTURE;
166             }
167
168            @Override
169            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
170                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
171            }
172            
173            @Override
174            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
175                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);       
176            }
177
178            @Override
179            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
180                            MessageId messageId, MessageAck ack) throws IOException {
181                MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId,
182                        subscriptionName, messageId, ack);
183            }
184        };
185    }
186
187    /**
188     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
189     */
190    public void prepare(TransactionId txid) {
191        Tx tx = inflightTransactions.remove(txid);
192        if (tx == null) {
193            return;
194        }
195        preparedTransactions.put(txid, tx);
196    }
197
198    public Tx getTx(Object txid) {
199        Tx tx = inflightTransactions.get(txid);
200        if (tx == null) {
201            tx = new Tx();
202            inflightTransactions.put(txid, tx);
203        }
204        return tx;
205    }
206
207    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
208        if (preCommit != null) {
209            preCommit.run();
210        }
211        Tx tx;
212        if (wasPrepared) {
213            tx = preparedTransactions.remove(txid);
214        } else {
215            tx = inflightTransactions.remove(txid);
216        }
217
218        if (tx == null) {
219            if (postCommit != null) {
220                postCommit.run();
221            }
222            return;
223        }
224        // ensure message order w.r.t to cursor and store for setBatch()
225        synchronized (this) {
226            tx.commit();
227            if (postCommit != null) {
228                postCommit.run();
229            }
230        }
231    }
232
233    /**
234     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
235     */
236    public void rollback(TransactionId txid) {
237        preparedTransactions.remove(txid);
238        inflightTransactions.remove(txid);
239    }
240
241    public void start() throws Exception {
242    }
243
244    public void stop() throws Exception {
245    }
246
247    public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
248        // All the inflight transactions get rolled back..
249        inflightTransactions.clear();
250        this.doingRecover = true;
251        try {
252            for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
253                Object txid = iter.next();
254                Tx tx = preparedTransactions.get(txid);
255                listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
256            }
257        } finally {
258            this.doingRecover = false;
259        }
260    }
261
262    /**
263     * @param message
264     * @throws IOException
265     */
266    void addMessage(final MessageStore destination, final Message message) throws IOException {
267
268        if (doingRecover) {
269            return;
270        }
271
272        if (message.getTransactionId() != null) {
273            Tx tx = getTx(message.getTransactionId());
274            tx.add(new AddMessageCommand() {
275                public Message getMessage() {
276                    return message;
277                }
278
279                public void run(ConnectionContext ctx) throws IOException {
280                    destination.addMessage(ctx, message);
281                }
282
283            });
284        } else {
285            destination.addMessage(null, message);
286        }
287    }
288    
289    /**
290     * @param ack
291     * @throws IOException
292     */
293    final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
294        if (doingRecover) {
295            return;
296        }
297
298        if (ack.isInTransaction()) {
299            Tx tx = getTx(ack.getTransactionId());
300            tx.add(new RemoveMessageCommand() {
301                public MessageAck getMessageAck() {
302                    return ack;
303                }
304
305                public void run(ConnectionContext ctx) throws IOException {
306                    destination.removeMessage(ctx, ack);
307                }
308            });
309        } else {
310            destination.removeMessage(null, ack);
311        }
312    }
313
314    final void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
315                           final MessageId messageId, final MessageAck ack) throws IOException {
316        if (doingRecover) {
317            return;
318        }
319
320        if (ack.isInTransaction()) {
321            Tx tx = getTx(ack.getTransactionId());
322            tx.add(new RemoveMessageCommand() {
323                public MessageAck getMessageAck() {
324                    return ack;
325                }
326
327                public void run(ConnectionContext ctx) throws IOException {
328                    destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
329                }
330            });
331        } else {
332            destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
333        }
334    }
335
336
337    public void delete() {
338        inflightTransactions.clear();
339        preparedTransactions.clear();
340        doingRecover = false;
341    }
342
343}