001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadaptor;
018
019import java.io.IOException;
020import java.util.Iterator;
021import java.util.Map;
022import java.util.Map.Entry;
023import java.util.concurrent.ConcurrentHashMap;
024import javax.transaction.xa.XAException;
025import org.apache.activemq.broker.BrokerService;
026import org.apache.activemq.broker.BrokerServiceAware;
027import org.apache.activemq.broker.ConnectionContext;
028import org.apache.activemq.command.Message;
029import org.apache.activemq.command.MessageAck;
030import org.apache.activemq.command.MessageId;
031import org.apache.activemq.command.TransactionId;
032import org.apache.activemq.command.XATransactionId;
033import org.apache.activemq.kaha.RuntimeStoreException;
034import org.apache.activemq.store.MessageStore;
035import org.apache.activemq.store.ProxyMessageStore;
036import org.apache.activemq.store.ProxyTopicMessageStore;
037import org.apache.activemq.store.TopicMessageStore;
038import org.apache.activemq.store.TransactionRecoveryListener;
039import org.apache.activemq.store.TransactionStore;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Provides a TransactionStore implementation that can create transaction aware
045 * MessageStore objects from non transaction aware MessageStore objects.
046 * 
047 * 
048 */
049public class KahaTransactionStore implements TransactionStore, BrokerServiceAware {     
050    private static final Logger LOG = LoggerFactory.getLogger(KahaTransactionStore.class);
051        
052    private final Map transactions = new ConcurrentHashMap();
053    private final Map prepared;
054    private final KahaPersistenceAdapter adaptor;
055    
056    private BrokerService brokerService;
057
058    KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
059        this.adaptor = adaptor;
060        this.prepared = preparedMap;
061    }
062
063    public MessageStore proxy(MessageStore messageStore) {
064        return new ProxyMessageStore(messageStore) {
065            @Override
066            public void addMessage(ConnectionContext context, final Message send) throws IOException {
067                KahaTransactionStore.this.addMessage(getDelegate(), send);
068            }
069
070            @Override
071            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
072                KahaTransactionStore.this.removeMessage(getDelegate(), ack);
073            }
074        };
075    }
076
077    public TopicMessageStore proxy(TopicMessageStore messageStore) {
078        return new ProxyTopicMessageStore(messageStore) {
079            @Override
080            public void addMessage(ConnectionContext context, final Message send) throws IOException {
081                KahaTransactionStore.this.addMessage(getDelegate(), send);
082            }
083
084            @Override
085            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
086                KahaTransactionStore.this.removeMessage(getDelegate(), ack);
087            }
088
089            @Override
090            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
091                            MessageId messageId, MessageAck ack) throws IOException {
092                KahaTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId, subscriptionName, messageId, ack);
093            }
094        };
095    }
096
097    /**
098     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
099     */
100    public void prepare(TransactionId txid) {
101        KahaTransaction tx = getTx(txid);
102        if (tx != null) {
103            tx.prepare();
104            prepared.put(txid, tx);
105        }
106    }
107
108    public void commit(TransactionId txid, boolean wasPrepared, Runnable before,Runnable after) throws IOException {
109        if(before != null) {
110            before.run();
111        }
112        KahaTransaction tx = getTx(txid);
113        if (tx != null) {
114            tx.commit(this);
115            removeTx(txid);
116        }
117        if (after != null) {
118            after.run();
119        }
120    }
121
122    /**
123     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
124     */
125    public void rollback(TransactionId txid) {
126        KahaTransaction tx = getTx(txid);
127        if (tx != null) {
128            tx.rollback();
129            removeTx(txid);
130        }
131    }
132
133    public void start() throws Exception {
134    }
135
136    public void stop() throws Exception {
137    }
138
139    public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
140        for (Iterator i = prepared.entrySet().iterator(); i.hasNext();) {
141            Map.Entry entry = (Entry)i.next();
142            XATransactionId xid = (XATransactionId)entry.getKey();
143            KahaTransaction kt = (KahaTransaction)entry.getValue();
144            listener.recover(xid, kt.getMessages(), kt.getAcks());
145        }
146    }
147
148    /**
149     * @param message
150     * @throws IOException
151     */
152    void addMessage(final MessageStore destination, final Message message) throws IOException {
153        try {
154                if (message.isInTransaction()) {
155                        KahaTransaction tx = getOrCreateTx(message.getTransactionId());
156                        tx.add((KahaMessageStore)destination, message);
157                } else {
158                        destination.addMessage(null, message);
159                }
160        } catch (RuntimeStoreException rse) {
161            if (rse.getCause() instanceof IOException) {
162                brokerService.handleIOException((IOException)rse.getCause());
163            }
164            throw rse;
165        }
166    }
167
168    /**
169     * @param ack
170     * @throws IOException
171     */
172    final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
173        try {
174                if (ack.isInTransaction()) {
175                        KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
176                        tx.add((KahaMessageStore)destination, ack);
177                } else {
178                        destination.removeMessage(null, ack);
179                }
180        } catch (RuntimeStoreException rse) {
181            if (rse.getCause() instanceof IOException) {
182                brokerService.handleIOException((IOException)rse.getCause());
183            }
184            throw rse;
185        }
186    }
187
188    final void acknowledge(final TopicMessageStore destination, String clientId,
189                           String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
190        try {
191            if (ack.isInTransaction()) {
192                KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
193                tx.add((KahaMessageStore)destination, clientId, subscriptionName, messageId, ack);
194            } else {
195                destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
196            }
197        } catch (RuntimeStoreException rse) {
198            if (rse.getCause() instanceof IOException) {
199                brokerService.handleIOException((IOException)rse.getCause());
200            }
201            throw rse;
202        }
203    }
204
205    protected synchronized KahaTransaction getTx(TransactionId key) {
206        KahaTransaction result = (KahaTransaction)transactions.get(key);
207        if (result == null) {
208            result = (KahaTransaction)prepared.get(key);
209        }
210        return result;
211    }
212
213    protected synchronized KahaTransaction getOrCreateTx(TransactionId key) {
214        KahaTransaction result = (KahaTransaction)transactions.get(key);
215        if (result == null) {
216            result = new KahaTransaction();
217            transactions.put(key, result);
218        }
219        return result;
220    }
221
222    protected synchronized void removeTx(TransactionId key) {
223        transactions.remove(key);
224        prepared.remove(key);
225    }
226
227    public void delete() {
228        transactions.clear();
229        prepared.clear();
230    }
231
232    protected MessageStore getStoreById(Object id) {
233        return adaptor.retrieveMessageStore(id);
234    }
235
236        public void setBrokerService(BrokerService brokerService) {
237                this.brokerService = brokerService;
238        }
239}