001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadaptor; 018 019import java.io.IOException; 020import java.util.Iterator; 021import java.util.Map; 022import java.util.Map.Entry; 023import java.util.concurrent.ConcurrentHashMap; 024import javax.transaction.xa.XAException; 025import org.apache.activemq.broker.BrokerService; 026import org.apache.activemq.broker.BrokerServiceAware; 027import org.apache.activemq.broker.ConnectionContext; 028import org.apache.activemq.command.Message; 029import org.apache.activemq.command.MessageAck; 030import org.apache.activemq.command.MessageId; 031import org.apache.activemq.command.TransactionId; 032import org.apache.activemq.command.XATransactionId; 033import org.apache.activemq.kaha.RuntimeStoreException; 034import org.apache.activemq.store.MessageStore; 035import org.apache.activemq.store.ProxyMessageStore; 036import org.apache.activemq.store.ProxyTopicMessageStore; 037import org.apache.activemq.store.TopicMessageStore; 038import org.apache.activemq.store.TransactionRecoveryListener; 039import org.apache.activemq.store.TransactionStore; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Provides a TransactionStore implementation that can create transaction aware 045 * MessageStore objects from non transaction aware MessageStore objects. 046 * 047 * 048 */ 049public class KahaTransactionStore implements TransactionStore, BrokerServiceAware { 050 private static final Logger LOG = LoggerFactory.getLogger(KahaTransactionStore.class); 051 052 private final Map transactions = new ConcurrentHashMap(); 053 private final Map prepared; 054 private final KahaPersistenceAdapter adaptor; 055 056 private BrokerService brokerService; 057 058 KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) { 059 this.adaptor = adaptor; 060 this.prepared = preparedMap; 061 } 062 063 public MessageStore proxy(MessageStore messageStore) { 064 return new ProxyMessageStore(messageStore) { 065 @Override 066 public void addMessage(ConnectionContext context, final Message send) throws IOException { 067 KahaTransactionStore.this.addMessage(getDelegate(), send); 068 } 069 070 @Override 071 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 072 KahaTransactionStore.this.removeMessage(getDelegate(), ack); 073 } 074 }; 075 } 076 077 public TopicMessageStore proxy(TopicMessageStore messageStore) { 078 return new ProxyTopicMessageStore(messageStore) { 079 @Override 080 public void addMessage(ConnectionContext context, final Message send) throws IOException { 081 KahaTransactionStore.this.addMessage(getDelegate(), send); 082 } 083 084 @Override 085 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 086 KahaTransactionStore.this.removeMessage(getDelegate(), ack); 087 } 088 089 @Override 090 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 091 MessageId messageId, MessageAck ack) throws IOException { 092 KahaTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId, subscriptionName, messageId, ack); 093 } 094 }; 095 } 096 097 /** 098 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 099 */ 100 public void prepare(TransactionId txid) { 101 KahaTransaction tx = getTx(txid); 102 if (tx != null) { 103 tx.prepare(); 104 prepared.put(txid, tx); 105 } 106 } 107 108 public void commit(TransactionId txid, boolean wasPrepared, Runnable before,Runnable after) throws IOException { 109 if(before != null) { 110 before.run(); 111 } 112 KahaTransaction tx = getTx(txid); 113 if (tx != null) { 114 tx.commit(this); 115 removeTx(txid); 116 } 117 if (after != null) { 118 after.run(); 119 } 120 } 121 122 /** 123 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 124 */ 125 public void rollback(TransactionId txid) { 126 KahaTransaction tx = getTx(txid); 127 if (tx != null) { 128 tx.rollback(); 129 removeTx(txid); 130 } 131 } 132 133 public void start() throws Exception { 134 } 135 136 public void stop() throws Exception { 137 } 138 139 public synchronized void recover(TransactionRecoveryListener listener) throws IOException { 140 for (Iterator i = prepared.entrySet().iterator(); i.hasNext();) { 141 Map.Entry entry = (Entry)i.next(); 142 XATransactionId xid = (XATransactionId)entry.getKey(); 143 KahaTransaction kt = (KahaTransaction)entry.getValue(); 144 listener.recover(xid, kt.getMessages(), kt.getAcks()); 145 } 146 } 147 148 /** 149 * @param message 150 * @throws IOException 151 */ 152 void addMessage(final MessageStore destination, final Message message) throws IOException { 153 try { 154 if (message.isInTransaction()) { 155 KahaTransaction tx = getOrCreateTx(message.getTransactionId()); 156 tx.add((KahaMessageStore)destination, message); 157 } else { 158 destination.addMessage(null, message); 159 } 160 } catch (RuntimeStoreException rse) { 161 if (rse.getCause() instanceof IOException) { 162 brokerService.handleIOException((IOException)rse.getCause()); 163 } 164 throw rse; 165 } 166 } 167 168 /** 169 * @param ack 170 * @throws IOException 171 */ 172 final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException { 173 try { 174 if (ack.isInTransaction()) { 175 KahaTransaction tx = getOrCreateTx(ack.getTransactionId()); 176 tx.add((KahaMessageStore)destination, ack); 177 } else { 178 destination.removeMessage(null, ack); 179 } 180 } catch (RuntimeStoreException rse) { 181 if (rse.getCause() instanceof IOException) { 182 brokerService.handleIOException((IOException)rse.getCause()); 183 } 184 throw rse; 185 } 186 } 187 188 final void acknowledge(final TopicMessageStore destination, String clientId, 189 String subscriptionName, MessageId messageId, MessageAck ack) throws IOException { 190 try { 191 if (ack.isInTransaction()) { 192 KahaTransaction tx = getOrCreateTx(ack.getTransactionId()); 193 tx.add((KahaMessageStore)destination, clientId, subscriptionName, messageId, ack); 194 } else { 195 destination.acknowledge(null, clientId, subscriptionName, messageId, ack); 196 } 197 } catch (RuntimeStoreException rse) { 198 if (rse.getCause() instanceof IOException) { 199 brokerService.handleIOException((IOException)rse.getCause()); 200 } 201 throw rse; 202 } 203 } 204 205 protected synchronized KahaTransaction getTx(TransactionId key) { 206 KahaTransaction result = (KahaTransaction)transactions.get(key); 207 if (result == null) { 208 result = (KahaTransaction)prepared.get(key); 209 } 210 return result; 211 } 212 213 protected synchronized KahaTransaction getOrCreateTx(TransactionId key) { 214 KahaTransaction result = (KahaTransaction)transactions.get(key); 215 if (result == null) { 216 result = new KahaTransaction(); 217 transactions.put(key, result); 218 } 219 return result; 220 } 221 222 protected synchronized void removeTx(TransactionId key) { 223 transactions.remove(key); 224 prepared.remove(key); 225 } 226 227 public void delete() { 228 transactions.clear(); 229 prepared.clear(); 230 } 231 232 protected MessageStore getStoreById(Object id) { 233 return adaptor.retrieveMessageStore(id); 234 } 235 236 public void setBrokerService(BrokerService brokerService) { 237 this.brokerService = brokerService; 238 } 239}