001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.memory; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Iterator; 022import java.util.concurrent.ConcurrentHashMap; 023import java.util.concurrent.Future; 024import javax.transaction.xa.XAException; 025import org.apache.activemq.broker.ConnectionContext; 026import org.apache.activemq.command.Message; 027import org.apache.activemq.command.MessageAck; 028import org.apache.activemq.command.MessageId; 029import org.apache.activemq.command.TransactionId; 030import org.apache.activemq.command.XATransactionId; 031import org.apache.activemq.store.AbstractMessageStore; 032import org.apache.activemq.store.MessageStore; 033import org.apache.activemq.store.PersistenceAdapter; 034import org.apache.activemq.store.ProxyMessageStore; 035import org.apache.activemq.store.ProxyTopicMessageStore; 036import org.apache.activemq.store.TopicMessageStore; 037import org.apache.activemq.store.TransactionRecoveryListener; 038import org.apache.activemq.store.TransactionStore; 039 040/** 041 * Provides a TransactionStore implementation that can create transaction aware 042 * MessageStore objects from non transaction aware MessageStore objects. 043 * 044 * 045 */ 046public class MemoryTransactionStore implements TransactionStore { 047 048 ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>(); 049 ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>(); 050 final PersistenceAdapter persistenceAdapter; 051 052 private boolean doingRecover; 053 054 public class Tx { 055 private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>(); 056 057 private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>(); 058 059 public void add(AddMessageCommand msg) { 060 messages.add(msg); 061 } 062 063 public void add(RemoveMessageCommand ack) { 064 acks.add(ack); 065 } 066 067 public Message[] getMessages() { 068 Message rc[] = new Message[messages.size()]; 069 int count = 0; 070 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 071 AddMessageCommand cmd = iter.next(); 072 rc[count++] = cmd.getMessage(); 073 } 074 return rc; 075 } 076 077 public MessageAck[] getAcks() { 078 MessageAck rc[] = new MessageAck[acks.size()]; 079 int count = 0; 080 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 081 RemoveMessageCommand cmd = iter.next(); 082 rc[count++] = cmd.getMessageAck(); 083 } 084 return rc; 085 } 086 087 /** 088 * @throws IOException 089 */ 090 public void commit() throws IOException { 091 ConnectionContext ctx = new ConnectionContext(); 092 persistenceAdapter.beginTransaction(ctx); 093 try { 094 095 // Do all the message adds. 096 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 097 AddMessageCommand cmd = iter.next(); 098 cmd.run(ctx); 099 } 100 // And removes.. 101 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 102 RemoveMessageCommand cmd = iter.next(); 103 cmd.run(ctx); 104 } 105 106 } catch ( IOException e ) { 107 persistenceAdapter.rollbackTransaction(ctx); 108 throw e; 109 } 110 persistenceAdapter.commitTransaction(ctx); 111 } 112 } 113 114 public interface AddMessageCommand { 115 Message getMessage(); 116 117 void run(ConnectionContext context) throws IOException; 118 } 119 120 public interface RemoveMessageCommand { 121 MessageAck getMessageAck(); 122 123 void run(ConnectionContext context) throws IOException; 124 } 125 126 public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) { 127 this.persistenceAdapter=persistenceAdapter; 128 } 129 130 public MessageStore proxy(MessageStore messageStore) { 131 return new ProxyMessageStore(messageStore) { 132 @Override 133 public void addMessage(ConnectionContext context, final Message send) throws IOException { 134 MemoryTransactionStore.this.addMessage(getDelegate(), send); 135 } 136 137 @Override 138 public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 139 MemoryTransactionStore.this.addMessage(getDelegate(), message); 140 return AbstractMessageStore.FUTURE; 141 } 142 143 @Override 144 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 145 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 146 } 147 148 @Override 149 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 150 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 151 } 152 }; 153 } 154 155 public TopicMessageStore proxy(TopicMessageStore messageStore) { 156 return new ProxyTopicMessageStore(messageStore) { 157 @Override 158 public void addMessage(ConnectionContext context, final Message send) throws IOException { 159 MemoryTransactionStore.this.addMessage(getDelegate(), send); 160 } 161 162 @Override 163 public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 164 MemoryTransactionStore.this.addMessage(getDelegate(), message); 165 return AbstractMessageStore.FUTURE; 166 } 167 168 @Override 169 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 170 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 171 } 172 173 @Override 174 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 175 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 176 } 177 178 @Override 179 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 180 MessageId messageId, MessageAck ack) throws IOException { 181 MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId, 182 subscriptionName, messageId, ack); 183 } 184 }; 185 } 186 187 /** 188 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 189 */ 190 public void prepare(TransactionId txid) { 191 Tx tx = inflightTransactions.remove(txid); 192 if (tx == null) { 193 return; 194 } 195 preparedTransactions.put(txid, tx); 196 } 197 198 public Tx getTx(Object txid) { 199 Tx tx = inflightTransactions.get(txid); 200 if (tx == null) { 201 tx = new Tx(); 202 inflightTransactions.put(txid, tx); 203 } 204 return tx; 205 } 206 207 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException { 208 if (preCommit != null) { 209 preCommit.run(); 210 } 211 Tx tx; 212 if (wasPrepared) { 213 tx = preparedTransactions.remove(txid); 214 } else { 215 tx = inflightTransactions.remove(txid); 216 } 217 218 if (tx == null) { 219 if (postCommit != null) { 220 postCommit.run(); 221 } 222 return; 223 } 224 // ensure message order w.r.t to cursor and store for setBatch() 225 synchronized (this) { 226 tx.commit(); 227 if (postCommit != null) { 228 postCommit.run(); 229 } 230 } 231 } 232 233 /** 234 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 235 */ 236 public void rollback(TransactionId txid) { 237 preparedTransactions.remove(txid); 238 inflightTransactions.remove(txid); 239 } 240 241 public void start() throws Exception { 242 } 243 244 public void stop() throws Exception { 245 } 246 247 public synchronized void recover(TransactionRecoveryListener listener) throws IOException { 248 // All the inflight transactions get rolled back.. 249 inflightTransactions.clear(); 250 this.doingRecover = true; 251 try { 252 for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) { 253 Object txid = iter.next(); 254 Tx tx = preparedTransactions.get(txid); 255 listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks()); 256 } 257 } finally { 258 this.doingRecover = false; 259 } 260 } 261 262 /** 263 * @param message 264 * @throws IOException 265 */ 266 void addMessage(final MessageStore destination, final Message message) throws IOException { 267 268 if (doingRecover) { 269 return; 270 } 271 272 if (message.getTransactionId() != null) { 273 Tx tx = getTx(message.getTransactionId()); 274 tx.add(new AddMessageCommand() { 275 public Message getMessage() { 276 return message; 277 } 278 279 public void run(ConnectionContext ctx) throws IOException { 280 destination.addMessage(ctx, message); 281 } 282 283 }); 284 } else { 285 destination.addMessage(null, message); 286 } 287 } 288 289 /** 290 * @param ack 291 * @throws IOException 292 */ 293 final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException { 294 if (doingRecover) { 295 return; 296 } 297 298 if (ack.isInTransaction()) { 299 Tx tx = getTx(ack.getTransactionId()); 300 tx.add(new RemoveMessageCommand() { 301 public MessageAck getMessageAck() { 302 return ack; 303 } 304 305 public void run(ConnectionContext ctx) throws IOException { 306 destination.removeMessage(ctx, ack); 307 } 308 }); 309 } else { 310 destination.removeMessage(null, ack); 311 } 312 } 313 314 final void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName, 315 final MessageId messageId, final MessageAck ack) throws IOException { 316 if (doingRecover) { 317 return; 318 } 319 320 if (ack.isInTransaction()) { 321 Tx tx = getTx(ack.getTransactionId()); 322 tx.add(new RemoveMessageCommand() { 323 public MessageAck getMessageAck() { 324 return ack; 325 } 326 327 public void run(ConnectionContext ctx) throws IOException { 328 destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack); 329 } 330 }); 331 } else { 332 destination.acknowledge(null, clientId, subscriptionName, messageId, ack); 333 } 334 } 335 336 337 public void delete() { 338 inflightTransactions.clear(); 339 preparedTransactions.clear(); 340 doingRecover = false; 341 } 342 343}