001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadaptor; 018 019import java.io.File; 020import java.io.IOException; 021import java.util.HashSet; 022import java.util.Iterator; 023import java.util.Set; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.atomic.AtomicLong; 026 027import org.apache.activemq.broker.BrokerService; 028import org.apache.activemq.broker.BrokerServiceAware; 029import org.apache.activemq.broker.ConnectionContext; 030import org.apache.activemq.command.ActiveMQDestination; 031import org.apache.activemq.command.ActiveMQQueue; 032import org.apache.activemq.command.ActiveMQTopic; 033import org.apache.activemq.command.Message; 034import org.apache.activemq.command.MessageId; 035import org.apache.activemq.command.ProducerId; 036import org.apache.activemq.kaha.CommandMarshaller; 037import org.apache.activemq.kaha.ContainerId; 038import org.apache.activemq.kaha.ListContainer; 039import org.apache.activemq.kaha.MapContainer; 040import org.apache.activemq.kaha.Marshaller; 041import org.apache.activemq.kaha.MessageIdMarshaller; 042import org.apache.activemq.kaha.MessageMarshaller; 043import org.apache.activemq.kaha.Store; 044import org.apache.activemq.kaha.StoreFactory; 045import org.apache.activemq.kaha.impl.StoreLockedExcpetion; 046import org.apache.activemq.openwire.OpenWireFormat; 047import org.apache.activemq.store.MessageStore; 048import org.apache.activemq.store.PersistenceAdapter; 049import org.apache.activemq.store.TopicMessageStore; 050import org.apache.activemq.store.TransactionStore; 051import org.apache.activemq.usage.SystemUsage; 052import org.apache.activemq.util.IOHelper; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * @org.apache.xbean.XBean 058 * 059 */ 060public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware { 061 062 private static final int STORE_LOCKED_WAIT_DELAY = 10 * 1000; 063 private static final Logger LOG = LoggerFactory.getLogger(KahaPersistenceAdapter.class); 064 private static final String PREPARED_TRANSACTIONS_NAME = "PreparedTransactions"; 065 066 protected OpenWireFormat wireFormat = new OpenWireFormat(); 067 protected KahaTransactionStore transactionStore; 068 protected ConcurrentHashMap<ActiveMQTopic, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, TopicMessageStore>(); 069 protected ConcurrentHashMap<ActiveMQQueue, MessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, MessageStore>(); 070 protected ConcurrentHashMap<ActiveMQDestination, MessageStore> messageStores = new ConcurrentHashMap<ActiveMQDestination, MessageStore>(); 071 072 private long maxDataFileLength = 32 * 1024 * 1024; 073 private File directory; 074 private String brokerName; 075 private Store theStore; 076 private boolean initialized; 077 private final AtomicLong storeSize; 078 private boolean persistentIndex = true; 079 private BrokerService brokerService; 080 081 082 public KahaPersistenceAdapter(AtomicLong size) { 083 this.storeSize=size; 084 } 085 086 public KahaPersistenceAdapter() { 087 this(new AtomicLong()); 088 } 089 090 public Set<ActiveMQDestination> getDestinations() { 091 Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 092 try { 093 Store store = getStore(); 094 for (Iterator i = store.getMapContainerIds().iterator(); i.hasNext();) { 095 ContainerId id = (ContainerId)i.next(); 096 Object obj = id.getKey(); 097 if (obj instanceof ActiveMQDestination) { 098 rc.add((ActiveMQDestination)obj); 099 } 100 } 101 } catch (IOException e) { 102 LOG.error("Failed to get destinations ", e); 103 } 104 return rc; 105 } 106 107 public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 108 MessageStore rc = queues.get(destination); 109 if (rc == null) { 110 rc = new KahaMessageStore(getMapContainer(destination, "queue-data"), destination); 111 messageStores.put(destination, rc); 112 if (transactionStore != null) { 113 rc = transactionStore.proxy(rc); 114 } 115 queues.put(destination, rc); 116 } 117 return rc; 118 } 119 120 public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) 121 throws IOException { 122 TopicMessageStore rc = topics.get(destination); 123 if (rc == null) { 124 Store store = getStore(); 125 MapContainer messageContainer = getMapContainer(destination, "topic-data"); 126 MapContainer subsContainer = getSubsMapContainer(destination.toString() + "-Subscriptions", 127 "topic-subs"); 128 ListContainer<TopicSubAck> ackContainer = store.getListContainer(destination.toString(), 129 "topic-acks"); 130 ackContainer.setMarshaller(new TopicSubAckMarshaller()); 131 rc = new KahaTopicMessageStore(store, messageContainer, ackContainer, subsContainer, destination); 132 messageStores.put(destination, rc); 133 if (transactionStore != null) { 134 rc = transactionStore.proxy(rc); 135 } 136 topics.put(destination, rc); 137 } 138 return rc; 139 } 140 141 /** 142 * Cleanup method to remove any state associated with the given destination 143 * 144 * @param destination Destination to forget 145 */ 146 public void removeQueueMessageStore(ActiveMQQueue destination) { 147 queues.remove(destination); 148 try{ 149 if(theStore!=null){ 150 theStore.deleteMapContainer(destination,"queue-data"); 151 } 152 }catch(IOException e ){ 153 LOG.error("Failed to remove store map container for queue:"+destination, e); 154 } 155 } 156 157 /** 158 * Cleanup method to remove any state associated with the given destination 159 * 160 * @param destination Destination to forget 161 */ 162 public void removeTopicMessageStore(ActiveMQTopic destination) { 163 topics.remove(destination); 164 } 165 166 protected MessageStore retrieveMessageStore(Object id) { 167 MessageStore result = messageStores.get(id); 168 return result; 169 } 170 171 public TransactionStore createTransactionStore() throws IOException { 172 if (transactionStore == null) { 173 while (true) { 174 try { 175 Store store = getStore(); 176 MapContainer container = store 177 .getMapContainer(PREPARED_TRANSACTIONS_NAME, "transactions"); 178 container.setKeyMarshaller(new CommandMarshaller(wireFormat)); 179 container.setValueMarshaller(new TransactionMarshaller(wireFormat)); 180 container.load(); 181 transactionStore = new KahaTransactionStore(this, container); 182 transactionStore.setBrokerService(brokerService); 183 break; 184 } catch (StoreLockedExcpetion e) { 185 LOG.info("Store is locked... waiting " + (STORE_LOCKED_WAIT_DELAY / 1000) 186 + " seconds for the Store to be unlocked."); 187 try { 188 Thread.sleep(STORE_LOCKED_WAIT_DELAY); 189 } catch (InterruptedException e1) { 190 } 191 } 192 } 193 } 194 return transactionStore; 195 } 196 197 public void beginTransaction(ConnectionContext context) { 198 } 199 200 public void commitTransaction(ConnectionContext context) throws IOException { 201 if (theStore != null) { 202 theStore.force(); 203 } 204 } 205 206 public void rollbackTransaction(ConnectionContext context) { 207 } 208 209 public void start() throws Exception { 210 initialize(); 211 } 212 213 public void stop() throws Exception { 214 if (theStore != null) { 215 theStore.close(); 216 } 217 } 218 219 public long getLastMessageBrokerSequenceId() throws IOException { 220 return 0; 221 } 222 223 public void deleteAllMessages() throws IOException { 224 if (theStore != null) { 225 if (theStore.isInitialized()) { 226 theStore.clear(); 227 } else { 228 theStore.delete(); 229 } 230 } else { 231 StoreFactory.delete(getStoreDirectory()); 232 } 233 } 234 235 protected MapContainer<MessageId, Message> getMapContainer(Object id, String containerName) 236 throws IOException { 237 Store store = getStore(); 238 MapContainer<MessageId, Message> container = store.getMapContainer(id, containerName); 239 container.setKeyMarshaller(new MessageIdMarshaller()); 240 container.setValueMarshaller(new MessageMarshaller(wireFormat)); 241 container.load(); 242 return container; 243 } 244 245 protected MapContainer getSubsMapContainer(Object id, String containerName) 246 throws IOException { 247 Store store = getStore(); 248 MapContainer container = store.getMapContainer(id, containerName); 249 container.setKeyMarshaller(Store.STRING_MARSHALLER); 250 container.setValueMarshaller(createMessageMarshaller()); 251 container.load(); 252 return container; 253 } 254 255 protected Marshaller<Object> createMessageMarshaller() { 256 return new CommandMarshaller(wireFormat); 257 } 258 259 protected ListContainer<TopicSubAck> getListContainer(Object id, String containerName) throws IOException { 260 Store store = getStore(); 261 ListContainer<TopicSubAck> container = store.getListContainer(id, containerName); 262 container.setMarshaller(createMessageMarshaller()); 263 container.load(); 264 return container; 265 } 266 267 /** 268 * @param usageManager The UsageManager that is controlling the broker's 269 * memory usage. 270 */ 271 public void setUsageManager(SystemUsage usageManager) { 272 } 273 274 /** 275 * @return the maxDataFileLength 276 */ 277 public long getMaxDataFileLength() { 278 return maxDataFileLength; 279 } 280 281 public boolean isPersistentIndex() { 282 return persistentIndex; 283 } 284 285 public void setPersistentIndex(boolean persistentIndex) { 286 this.persistentIndex = persistentIndex; 287 } 288 289 /** 290 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 291 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 292 */ 293 public void setMaxDataFileLength(long maxDataFileLength) { 294 this.maxDataFileLength = maxDataFileLength; 295 } 296 297 protected final synchronized Store getStore() throws IOException { 298 if (theStore == null) { 299 theStore = createStore(); 300 } 301 return theStore; 302 } 303 304 protected final Store createStore() throws IOException { 305 Store result = StoreFactory.open(getStoreDirectory(), "rw",storeSize); 306 result.setMaxDataFileLength(maxDataFileLength); 307 result.setPersistentIndex(isPersistentIndex()); 308 result.setDefaultContainerName("container-roots"); 309 return result; 310 } 311 312 private String getStoreName() { 313 initialize(); 314 return directory.getAbsolutePath(); 315 } 316 317 private File getStoreDirectory() { 318 initialize(); 319 return directory; 320 } 321 322 public String toString() { 323 return "KahaPersistenceAdapter(" + getStoreName() + ")"; 324 } 325 326 public void setBrokerName(String brokerName) { 327 this.brokerName = brokerName; 328 } 329 330 public String getBrokerName() { 331 return brokerName; 332 } 333 334 public File getDirectory() { 335 return this.directory; 336 } 337 338 public void setDirectory(File directory) { 339 this.directory = directory; 340 } 341 342 public void checkpoint(boolean sync) throws IOException { 343 if (sync) { 344 getStore().force(); 345 } 346 } 347 348 public long size(){ 349 return storeSize.get(); 350 } 351 352 private void initialize() { 353 if (!initialized) { 354 initialized = true; 355 if (this.directory == null) { 356 File file = new File(IOHelper.getDefaultDataDirectory()); 357 file = new File(file, IOHelper.toFileSystemSafeName(brokerName) + "-kahastore"); 358 setDirectory(file); 359 } 360 try { 361 IOHelper.mkdirs(this.directory); 362 } catch (IOException e) { 363 throw new RuntimeException(e); 364 } 365 wireFormat.setCacheEnabled(false); 366 wireFormat.setTightEncodingEnabled(true); 367 } 368 } 369 370 public void setBrokerService(BrokerService brokerService) { 371 this.brokerService = brokerService; 372 } 373 374 public long getLastProducerSequenceId(ProducerId id) { 375 // reference store send has adequate duplicate suppression 376 return -1; 377 } 378 379 380}