001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.Map.Entry; 031import java.util.concurrent.ExecutorService; 032import java.util.concurrent.Future; 033import java.util.concurrent.FutureTask; 034import java.util.concurrent.LinkedBlockingQueue; 035import java.util.concurrent.Semaphore; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicBoolean; 040import java.util.concurrent.atomic.AtomicInteger; 041import org.apache.activemq.broker.ConnectionContext; 042import org.apache.activemq.command.ActiveMQDestination; 043import org.apache.activemq.command.ActiveMQQueue; 044import org.apache.activemq.command.ActiveMQTempQueue; 045import org.apache.activemq.command.ActiveMQTempTopic; 046import org.apache.activemq.command.ActiveMQTopic; 047import org.apache.activemq.command.LocalTransactionId; 048import org.apache.activemq.command.Message; 049import org.apache.activemq.command.MessageAck; 050import org.apache.activemq.command.MessageId; 051import org.apache.activemq.command.ProducerId; 052import org.apache.activemq.command.SubscriptionInfo; 053import org.apache.activemq.command.TransactionId; 054import org.apache.activemq.command.XATransactionId; 055import org.apache.activemq.filter.BooleanExpression; 056import org.apache.activemq.filter.MessageEvaluationContext; 057import org.apache.activemq.openwire.OpenWireFormat; 058import org.apache.activemq.protobuf.Buffer; 059import org.apache.activemq.selector.SelectorParser; 060import org.apache.activemq.store.AbstractMessageStore; 061import org.apache.activemq.store.MessageRecoveryListener; 062import org.apache.activemq.store.MessageStore; 063import org.apache.activemq.store.PersistenceAdapter; 064import org.apache.activemq.store.TopicMessageStore; 065import org.apache.activemq.store.TransactionStore; 066import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 067import org.apache.activemq.store.kahadb.data.KahaDestination; 068import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; 069import org.apache.activemq.store.kahadb.data.KahaLocation; 070import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 071import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 072import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 073import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 074import org.apache.activemq.store.kahadb.data.KahaXATransactionId; 075import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 076import org.apache.activemq.usage.MemoryUsage; 077import org.apache.activemq.usage.SystemUsage; 078import org.apache.activemq.util.IOExceptionSupport; 079import org.apache.activemq.util.ServiceStopper; 080import org.apache.activemq.wireformat.WireFormat; 081import org.slf4j.Logger; 082import org.slf4j.LoggerFactory; 083import org.apache.kahadb.journal.Location; 084import org.apache.kahadb.page.Transaction; 085 086public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { 087 static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); 088 private static final int MAX_ASYNC_JOBS = 10000; 089 090 public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC"; 091 public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty( 092 PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10); 093 public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS"; 094 private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty( 095 PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);; 096 097 protected ExecutorService queueExecutor; 098 protected ExecutorService topicExecutor; 099 protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 100 protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 101 final WireFormat wireFormat = new OpenWireFormat(); 102 private SystemUsage usageManager; 103 private LinkedBlockingQueue<Runnable> asyncQueueJobQueue; 104 private LinkedBlockingQueue<Runnable> asyncTopicJobQueue; 105 Semaphore globalQueueSemaphore; 106 Semaphore globalTopicSemaphore; 107 private boolean concurrentStoreAndDispatchQueues = true; 108 // when true, message order may be compromised when cache is exhausted if store is out 109 // or order w.r.t cache 110 private boolean concurrentStoreAndDispatchTopics = false; 111 private boolean concurrentStoreAndDispatchTransactions = false; 112 private int maxAsyncJobs = MAX_ASYNC_JOBS; 113 private final KahaDBTransactionStore transactionStore; 114 115 public KahaDBStore() { 116 this.transactionStore = new KahaDBTransactionStore(this); 117 } 118 119 public void setBrokerName(String brokerName) { 120 } 121 122 public void setUsageManager(SystemUsage usageManager) { 123 this.usageManager = usageManager; 124 } 125 126 public SystemUsage getUsageManager() { 127 return this.usageManager; 128 } 129 130 /** 131 * @return the concurrentStoreAndDispatch 132 */ 133 public boolean isConcurrentStoreAndDispatchQueues() { 134 return this.concurrentStoreAndDispatchQueues; 135 } 136 137 /** 138 * @param concurrentStoreAndDispatch 139 * the concurrentStoreAndDispatch to set 140 */ 141 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 142 this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch; 143 } 144 145 /** 146 * @return the concurrentStoreAndDispatch 147 */ 148 public boolean isConcurrentStoreAndDispatchTopics() { 149 return this.concurrentStoreAndDispatchTopics; 150 } 151 152 /** 153 * @param concurrentStoreAndDispatch 154 * the concurrentStoreAndDispatch to set 155 */ 156 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 157 this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; 158 } 159 160 public boolean isConcurrentStoreAndDispatchTransactions() { 161 return this.concurrentStoreAndDispatchTransactions; 162 } 163 164 /** 165 * @return the maxAsyncJobs 166 */ 167 public int getMaxAsyncJobs() { 168 return this.maxAsyncJobs; 169 } 170 /** 171 * @param maxAsyncJobs 172 * the maxAsyncJobs to set 173 */ 174 public void setMaxAsyncJobs(int maxAsyncJobs) { 175 this.maxAsyncJobs = maxAsyncJobs; 176 } 177 178 @Override 179 public void doStart() throws Exception { 180 super.doStart(); 181 this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs()); 182 this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs()); 183 this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 184 this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 185 this.queueExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 186 asyncQueueJobQueue, new ThreadFactory() { 187 public Thread newThread(Runnable runnable) { 188 Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); 189 thread.setDaemon(true); 190 return thread; 191 } 192 }); 193 this.topicExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 194 asyncTopicJobQueue, new ThreadFactory() { 195 public Thread newThread(Runnable runnable) { 196 Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); 197 thread.setDaemon(true); 198 return thread; 199 } 200 }); 201 } 202 203 @Override 204 public void doStop(ServiceStopper stopper) throws Exception { 205 // drain down async jobs 206 LOG.info("Stopping async queue tasks"); 207 if (this.globalQueueSemaphore != null) { 208 this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 209 } 210 synchronized (this.asyncQueueMaps) { 211 for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) { 212 synchronized (m) { 213 for (StoreTask task : m.values()) { 214 task.cancel(); 215 } 216 } 217 } 218 this.asyncQueueMaps.clear(); 219 } 220 LOG.info("Stopping async topic tasks"); 221 if (this.globalTopicSemaphore != null) { 222 this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 223 } 224 synchronized (this.asyncTopicMaps) { 225 for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) { 226 synchronized (m) { 227 for (StoreTask task : m.values()) { 228 task.cancel(); 229 } 230 } 231 } 232 this.asyncTopicMaps.clear(); 233 } 234 if (this.globalQueueSemaphore != null) { 235 this.globalQueueSemaphore.drainPermits(); 236 } 237 if (this.globalTopicSemaphore != null) { 238 this.globalTopicSemaphore.drainPermits(); 239 } 240 if (this.queueExecutor != null) { 241 this.queueExecutor.shutdownNow(); 242 } 243 if (this.topicExecutor != null) { 244 this.topicExecutor.shutdownNow(); 245 } 246 LOG.info("Stopped KahaDB"); 247 super.doStop(stopper); 248 } 249 250 protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { 251 StoreQueueTask task = null; 252 synchronized (store.asyncTaskMap) { 253 task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 254 } 255 return task; 256 } 257 258 protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { 259 synchronized (store.asyncTaskMap) { 260 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 261 } 262 this.queueExecutor.execute(task); 263 } 264 265 protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) { 266 StoreTopicTask task = null; 267 synchronized (store.asyncTaskMap) { 268 task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 269 } 270 return task; 271 } 272 273 protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException { 274 synchronized (store.asyncTaskMap) { 275 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 276 } 277 this.topicExecutor.execute(task); 278 } 279 280 public TransactionStore createTransactionStore() throws IOException { 281 return this.transactionStore; 282 } 283 284 public boolean getForceRecoverIndex() { 285 return this.forceRecoverIndex; 286 } 287 288 public void setForceRecoverIndex(boolean forceRecoverIndex) { 289 this.forceRecoverIndex = forceRecoverIndex; 290 } 291 292 public class KahaDBMessageStore extends AbstractMessageStore { 293 protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>(); 294 protected KahaDestination dest; 295 private final int maxAsyncJobs; 296 private final Semaphore localDestinationSemaphore; 297 298 double doneTasks, canceledTasks = 0; 299 300 public KahaDBMessageStore(ActiveMQDestination destination) { 301 super(destination); 302 this.dest = convert(destination); 303 this.maxAsyncJobs = getMaxAsyncJobs(); 304 this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs); 305 } 306 307 @Override 308 public ActiveMQDestination getDestination() { 309 return destination; 310 } 311 312 @Override 313 public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) 314 throws IOException { 315 if (isConcurrentStoreAndDispatchQueues()) { 316 StoreQueueTask result = new StoreQueueTask(this, context, message); 317 result.aquireLocks(); 318 addQueueTask(this, result); 319 return result.getFuture(); 320 } else { 321 return super.asyncAddQueueMessage(context, message); 322 } 323 } 324 325 @Override 326 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 327 if (isConcurrentStoreAndDispatchQueues()) { 328 AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination()); 329 StoreQueueTask task = null; 330 synchronized (asyncTaskMap) { 331 task = (StoreQueueTask) asyncTaskMap.get(key); 332 } 333 if (task != null) { 334 if (!task.cancel()) { 335 try { 336 337 task.future.get(); 338 } catch (InterruptedException e) { 339 throw new InterruptedIOException(e.toString()); 340 } catch (Exception ignored) { 341 LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored); 342 } 343 removeMessage(context, ack); 344 } else { 345 synchronized (asyncTaskMap) { 346 asyncTaskMap.remove(key); 347 } 348 } 349 } else { 350 removeMessage(context, ack); 351 } 352 } else { 353 removeMessage(context, ack); 354 } 355 } 356 357 public void addMessage(ConnectionContext context, Message message) throws IOException { 358 KahaAddMessageCommand command = new KahaAddMessageCommand(); 359 command.setDestination(dest); 360 command.setMessageId(message.getMessageId().toString()); 361 command.setTransactionInfo(createTransactionInfo(message.getTransactionId())); 362 command.setPriority(message.getPriority()); 363 command.setPrioritySupported(isPrioritizedMessages()); 364 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 365 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 366 store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null); 367 368 } 369 370 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 371 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 372 command.setDestination(dest); 373 command.setMessageId(ack.getLastMessageId().toString()); 374 command.setTransactionInfo(createTransactionInfo(ack.getTransactionId())); 375 376 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 377 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 378 store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null); 379 } 380 381 public void removeAllMessages(ConnectionContext context) throws IOException { 382 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 383 command.setDestination(dest); 384 store(command, true, null, null); 385 } 386 387 public Message getMessage(MessageId identity) throws IOException { 388 final String key = identity.toString(); 389 390 // Hopefully one day the page file supports concurrent read 391 // operations... but for now we must 392 // externally synchronize... 393 Location location; 394 indexLock.readLock().lock(); 395 try { 396 location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() { 397 public Location execute(Transaction tx) throws IOException { 398 StoredDestination sd = getStoredDestination(dest, tx); 399 Long sequence = sd.messageIdIndex.get(tx, key); 400 if (sequence == null) { 401 return null; 402 } 403 return sd.orderIndex.get(tx, sequence).location; 404 } 405 }); 406 }finally { 407 indexLock.readLock().unlock(); 408 } 409 if (location == null) { 410 return null; 411 } 412 413 return loadMessage(location); 414 } 415 416 public int getMessageCount() throws IOException { 417 try { 418 lockAsyncJobQueue(); 419 indexLock.readLock().lock(); 420 try { 421 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 422 public Integer execute(Transaction tx) throws IOException { 423 // Iterate through all index entries to get a count 424 // of 425 // messages in the destination. 426 StoredDestination sd = getStoredDestination(dest, tx); 427 int rc = 0; 428 for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator 429 .hasNext();) { 430 iterator.next(); 431 rc++; 432 } 433 return rc; 434 } 435 }); 436 }finally { 437 indexLock.readLock().unlock(); 438 } 439 } finally { 440 unlockAsyncJobQueue(); 441 } 442 } 443 444 @Override 445 public boolean isEmpty() throws IOException { 446 indexLock.readLock().lock(); 447 try { 448 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() { 449 public Boolean execute(Transaction tx) throws IOException { 450 // Iterate through all index entries to get a count of 451 // messages in the destination. 452 StoredDestination sd = getStoredDestination(dest, tx); 453 return sd.locationIndex.isEmpty(tx); 454 } 455 }); 456 }finally { 457 indexLock.readLock().unlock(); 458 } 459 } 460 461 public void recover(final MessageRecoveryListener listener) throws Exception { 462 indexLock.readLock().lock(); 463 try { 464 pageFile.tx().execute(new Transaction.Closure<Exception>() { 465 public void execute(Transaction tx) throws Exception { 466 StoredDestination sd = getStoredDestination(dest, tx); 467 sd.orderIndex.resetCursorPosition(); 468 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator 469 .hasNext();) { 470 Entry<Long, MessageKeys> entry = iterator.next(); 471 Message msg = loadMessage(entry.getValue().location); 472 listener.recoverMessage(msg); 473 } 474 } 475 }); 476 }finally { 477 indexLock.readLock().unlock(); 478 } 479 } 480 481 482 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 483 indexLock.readLock().lock(); 484 try { 485 pageFile.tx().execute(new Transaction.Closure<Exception>() { 486 public void execute(Transaction tx) throws Exception { 487 StoredDestination sd = getStoredDestination(dest, tx); 488 Entry<Long, MessageKeys> entry = null; 489 int counter = 0; 490 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); 491 listener.hasSpace() && iterator.hasNext(); ) { 492 entry = iterator.next(); 493 Message msg = loadMessage(entry.getValue().location); 494 listener.recoverMessage(msg); 495 counter++; 496 if (counter >= maxReturned) { 497 break; 498 } 499 } 500 sd.orderIndex.stoppedIterating(); 501 } 502 }); 503 }finally { 504 indexLock.readLock().unlock(); 505 } 506 } 507 508 public void resetBatching() { 509 try { 510 pageFile.tx().execute(new Transaction.Closure<Exception>() { 511 public void execute(Transaction tx) throws Exception { 512 StoredDestination sd = getExistingStoredDestination(dest, tx); 513 if (sd != null) { 514 sd.orderIndex.resetCursorPosition();} 515 } 516 }); 517 } catch (Exception e) { 518 LOG.error("Failed to reset batching",e); 519 } 520 } 521 522 @Override 523 public void setBatch(MessageId identity) throws IOException { 524 try { 525 final String key = identity.toString(); 526 lockAsyncJobQueue(); 527 528 // Hopefully one day the page file supports concurrent read 529 // operations... but for now we must 530 // externally synchronize... 531 532 indexLock.writeLock().lock(); 533 try { 534 pageFile.tx().execute(new Transaction.Closure<IOException>() { 535 public void execute(Transaction tx) throws IOException { 536 StoredDestination sd = getStoredDestination(dest, tx); 537 Long location = sd.messageIdIndex.get(tx, key); 538 if (location != null) { 539 sd.orderIndex.setBatch(tx, location); 540 } 541 } 542 }); 543 }finally { 544 indexLock.writeLock().unlock(); 545 } 546 547 } finally { 548 unlockAsyncJobQueue(); 549 } 550 551 } 552 553 @Override 554 public void setMemoryUsage(MemoryUsage memoeyUSage) { 555 } 556 @Override 557 public void start() throws Exception { 558 super.start(); 559 } 560 @Override 561 public void stop() throws Exception { 562 super.stop(); 563 } 564 565 protected void lockAsyncJobQueue() { 566 try { 567 this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 568 } catch (Exception e) { 569 LOG.error("Failed to lock async jobs for " + this.destination, e); 570 } 571 } 572 573 protected void unlockAsyncJobQueue() { 574 this.localDestinationSemaphore.release(this.maxAsyncJobs); 575 } 576 577 protected void acquireLocalAsyncLock() { 578 try { 579 this.localDestinationSemaphore.acquire(); 580 } catch (InterruptedException e) { 581 LOG.error("Failed to aquire async lock for " + this.destination, e); 582 } 583 } 584 585 protected void releaseLocalAsyncLock() { 586 this.localDestinationSemaphore.release(); 587 } 588 589 } 590 591 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 592 private final AtomicInteger subscriptionCount = new AtomicInteger(); 593 public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { 594 super(destination); 595 this.subscriptionCount.set(getAllSubscriptions().length); 596 asyncTopicMaps.add(asyncTaskMap); 597 } 598 599 @Override 600 public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) 601 throws IOException { 602 if (isConcurrentStoreAndDispatchTopics()) { 603 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); 604 result.aquireLocks(); 605 addTopicTask(this, result); 606 return result.getFuture(); 607 } else { 608 return super.asyncAddTopicMessage(context, message); 609 } 610 } 611 612 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 613 MessageId messageId, MessageAck ack) 614 throws IOException { 615 String subscriptionKey = subscriptionKey(clientId, subscriptionName); 616 if (isConcurrentStoreAndDispatchTopics()) { 617 AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); 618 StoreTopicTask task = null; 619 synchronized (asyncTaskMap) { 620 task = (StoreTopicTask) asyncTaskMap.get(key); 621 } 622 if (task != null) { 623 if (task.addSubscriptionKey(subscriptionKey)) { 624 removeTopicTask(this, messageId); 625 if (task.cancel()) { 626 synchronized (asyncTaskMap) { 627 asyncTaskMap.remove(key); 628 } 629 } 630 } 631 } else { 632 doAcknowledge(context, subscriptionKey, messageId, ack); 633 } 634 } else { 635 doAcknowledge(context, subscriptionKey, messageId, ack); 636 } 637 } 638 639 protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack) 640 throws IOException { 641 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 642 command.setDestination(dest); 643 command.setSubscriptionKey(subscriptionKey); 644 command.setMessageId(messageId.toString()); 645 command.setTransactionInfo(createTransactionInfo(ack.getTransactionId())); 646 if (ack != null && ack.isUnmatchedAck()) { 647 command.setAck(UNMATCHED); 648 } 649 store(command, false, null, null); 650 } 651 652 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 653 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo 654 .getSubscriptionName()); 655 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 656 command.setDestination(dest); 657 command.setSubscriptionKey(subscriptionKey); 658 command.setRetroactive(retroactive); 659 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 660 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 661 store(command, isEnableJournalDiskSyncs() && true, null, null); 662 this.subscriptionCount.incrementAndGet(); 663 } 664 665 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 666 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 667 command.setDestination(dest); 668 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); 669 store(command, isEnableJournalDiskSyncs() && true, null, null); 670 this.subscriptionCount.decrementAndGet(); 671 } 672 673 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 674 675 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 676 indexLock.readLock().lock(); 677 try { 678 pageFile.tx().execute(new Transaction.Closure<IOException>() { 679 public void execute(Transaction tx) throws IOException { 680 StoredDestination sd = getStoredDestination(dest, tx); 681 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator 682 .hasNext();) { 683 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 684 SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry 685 .getValue().getSubscriptionInfo().newInput())); 686 subscriptions.add(info); 687 688 } 689 } 690 }); 691 }finally { 692 indexLock.readLock().unlock(); 693 } 694 695 SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; 696 subscriptions.toArray(rc); 697 return rc; 698 } 699 700 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 701 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 702 indexLock.readLock().lock(); 703 try { 704 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() { 705 public SubscriptionInfo execute(Transaction tx) throws IOException { 706 StoredDestination sd = getStoredDestination(dest, tx); 707 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 708 if (command == null) { 709 return null; 710 } 711 return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command 712 .getSubscriptionInfo().newInput())); 713 } 714 }); 715 }finally { 716 indexLock.readLock().unlock(); 717 } 718 } 719 720 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 721 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 722 indexLock.writeLock().lock(); 723 try { 724 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 725 public Integer execute(Transaction tx) throws IOException { 726 StoredDestination sd = getStoredDestination(dest, tx); 727 LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 728 if (cursorPos == null) { 729 // The subscription might not exist. 730 return 0; 731 } 732 733 int counter = 0; 734 for (Iterator<Entry<Long, HashSet<String>>> iterator = 735 sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext();) { 736 Entry<Long, HashSet<String>> entry = iterator.next(); 737 if (entry.getValue().contains(subscriptionKey)) { 738 counter++; 739 } 740 } 741 return counter; 742 } 743 }); 744 }finally { 745 indexLock.writeLock().unlock(); 746 } 747 } 748 749 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) 750 throws Exception { 751 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 752 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 753 indexLock.writeLock().lock(); 754 try { 755 pageFile.tx().execute(new Transaction.Closure<Exception>() { 756 public void execute(Transaction tx) throws Exception { 757 StoredDestination sd = getStoredDestination(dest, tx); 758 LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 759 sd.orderIndex.setBatch(tx, cursorPos); 760 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator 761 .hasNext();) { 762 Entry<Long, MessageKeys> entry = iterator.next(); 763 listener.recoverMessage(loadMessage(entry.getValue().location)); 764 } 765 sd.orderIndex.resetCursorPosition(); 766 } 767 }); 768 }finally { 769 indexLock.writeLock().unlock(); 770 } 771 } 772 773 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, 774 final MessageRecoveryListener listener) throws Exception { 775 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 776 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 777 indexLock.writeLock().lock(); 778 try { 779 pageFile.tx().execute(new Transaction.Closure<Exception>() { 780 public void execute(Transaction tx) throws Exception { 781 StoredDestination sd = getStoredDestination(dest, tx); 782 sd.orderIndex.resetCursorPosition(); 783 MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); 784 if (moc == null) { 785 LastAck pos = sd.subscriptionAcks.get(tx, subscriptionKey); 786 if (pos == null) { 787 // sub deleted 788 return; 789 } 790 sd.orderIndex.setBatch(tx, pos); 791 moc = sd.orderIndex.cursor; 792 } else { 793 sd.orderIndex.cursor.sync(moc); 794 } 795 796 Entry<Long, MessageKeys> entry = null; 797 int counter = 0; 798 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator 799 .hasNext();) { 800 entry = iterator.next(); 801 if (listener.recoverMessage(loadMessage(entry.getValue().location))) { 802 counter++; 803 } 804 if (counter >= maxReturned || listener.hasSpace() == false) { 805 break; 806 } 807 } 808 sd.orderIndex.stoppedIterating(); 809 if (entry != null) { 810 MessageOrderCursor copy = sd.orderIndex.cursor.copy(); 811 sd.subscriptionCursors.put(subscriptionKey, copy); 812 } 813 } 814 }); 815 }finally { 816 indexLock.writeLock().unlock(); 817 } 818 } 819 820 public void resetBatching(String clientId, String subscriptionName) { 821 try { 822 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 823 indexLock.writeLock().lock(); 824 try { 825 pageFile.tx().execute(new Transaction.Closure<IOException>() { 826 public void execute(Transaction tx) throws IOException { 827 StoredDestination sd = getStoredDestination(dest, tx); 828 sd.subscriptionCursors.remove(subscriptionKey); 829 } 830 }); 831 }finally { 832 indexLock.writeLock().unlock(); 833 } 834 } catch (IOException e) { 835 throw new RuntimeException(e); 836 } 837 } 838 } 839 840 String subscriptionKey(String clientId, String subscriptionName) { 841 return clientId + ":" + subscriptionName; 842 } 843 844 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 845 return this.transactionStore.proxy(new KahaDBMessageStore(destination)); 846 } 847 848 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 849 return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination)); 850 } 851 852 /** 853 * Cleanup method to remove any state associated with the given destination. 854 * This method does not stop the message store (it might not be cached). 855 * 856 * @param destination 857 * Destination to forget 858 */ 859 public void removeQueueMessageStore(ActiveMQQueue destination) { 860 } 861 862 /** 863 * Cleanup method to remove any state associated with the given destination 864 * This method does not stop the message store (it might not be cached). 865 * 866 * @param destination 867 * Destination to forget 868 */ 869 public void removeTopicMessageStore(ActiveMQTopic destination) { 870 } 871 872 public void deleteAllMessages() throws IOException { 873 deleteAllMessages = true; 874 } 875 876 public Set<ActiveMQDestination> getDestinations() { 877 try { 878 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 879 indexLock.readLock().lock(); 880 try { 881 pageFile.tx().execute(new Transaction.Closure<IOException>() { 882 public void execute(Transaction tx) throws IOException { 883 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator 884 .hasNext();) { 885 Entry<String, StoredDestination> entry = iterator.next(); 886 if (!isEmptyTopic(entry, tx)) { 887 rc.add(convert(entry.getKey())); 888 } 889 } 890 } 891 892 private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx) 893 throws IOException { 894 boolean isEmptyTopic = false; 895 ActiveMQDestination dest = convert(entry.getKey()); 896 if (dest.isTopic()) { 897 StoredDestination loadedStore = getStoredDestination(convert(dest), tx); 898 if (loadedStore.subscriptionAcks.isEmpty(tx)) { 899 isEmptyTopic = true; 900 } 901 } 902 return isEmptyTopic; 903 } 904 }); 905 }finally { 906 indexLock.readLock().unlock(); 907 } 908 return rc; 909 } catch (IOException e) { 910 throw new RuntimeException(e); 911 } 912 } 913 914 public long getLastMessageBrokerSequenceId() throws IOException { 915 return 0; 916 } 917 918 public long getLastProducerSequenceId(ProducerId id) { 919 indexLock.readLock().lock(); 920 try { 921 return metadata.producerSequenceIdTracker.getLastSeqId(id); 922 } finally { 923 indexLock.readLock().unlock(); 924 } 925 } 926 927 public long size() { 928 return storeSize.get(); 929 } 930 931 public void beginTransaction(ConnectionContext context) throws IOException { 932 throw new IOException("Not yet implemented."); 933 } 934 public void commitTransaction(ConnectionContext context) throws IOException { 935 throw new IOException("Not yet implemented."); 936 } 937 public void rollbackTransaction(ConnectionContext context) throws IOException { 938 throw new IOException("Not yet implemented."); 939 } 940 941 public void checkpoint(boolean sync) throws IOException { 942 super.checkpointCleanup(false); 943 } 944 945 // ///////////////////////////////////////////////////////////////// 946 // Internal helper methods. 947 // ///////////////////////////////////////////////////////////////// 948 949 /** 950 * @param location 951 * @return 952 * @throws IOException 953 */ 954 Message loadMessage(Location location) throws IOException { 955 KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location); 956 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); 957 return msg; 958 } 959 960 // ///////////////////////////////////////////////////////////////// 961 // Internal conversion methods. 962 // ///////////////////////////////////////////////////////////////// 963 964 KahaLocation convert(Location location) { 965 KahaLocation rc = new KahaLocation(); 966 rc.setLogId(location.getDataFileId()); 967 rc.setOffset(location.getOffset()); 968 return rc; 969 } 970 971 KahaDestination convert(ActiveMQDestination dest) { 972 KahaDestination rc = new KahaDestination(); 973 rc.setName(dest.getPhysicalName()); 974 switch (dest.getDestinationType()) { 975 case ActiveMQDestination.QUEUE_TYPE: 976 rc.setType(DestinationType.QUEUE); 977 return rc; 978 case ActiveMQDestination.TOPIC_TYPE: 979 rc.setType(DestinationType.TOPIC); 980 return rc; 981 case ActiveMQDestination.TEMP_QUEUE_TYPE: 982 rc.setType(DestinationType.TEMP_QUEUE); 983 return rc; 984 case ActiveMQDestination.TEMP_TOPIC_TYPE: 985 rc.setType(DestinationType.TEMP_TOPIC); 986 return rc; 987 default: 988 return null; 989 } 990 } 991 992 ActiveMQDestination convert(String dest) { 993 int p = dest.indexOf(":"); 994 if (p < 0) { 995 throw new IllegalArgumentException("Not in the valid destination format"); 996 } 997 int type = Integer.parseInt(dest.substring(0, p)); 998 String name = dest.substring(p + 1); 999 1000 switch (KahaDestination.DestinationType.valueOf(type)) { 1001 case QUEUE: 1002 return new ActiveMQQueue(name); 1003 case TOPIC: 1004 return new ActiveMQTopic(name); 1005 case TEMP_QUEUE: 1006 return new ActiveMQTempQueue(name); 1007 case TEMP_TOPIC: 1008 return new ActiveMQTempTopic(name); 1009 default: 1010 throw new IllegalArgumentException("Not in the valid destination format"); 1011 } 1012 } 1013 1014 static class AsyncJobKey { 1015 MessageId id; 1016 ActiveMQDestination destination; 1017 1018 AsyncJobKey(MessageId id, ActiveMQDestination destination) { 1019 this.id = id; 1020 this.destination = destination; 1021 } 1022 1023 @Override 1024 public boolean equals(Object obj) { 1025 if (obj == this) { 1026 return true; 1027 } 1028 return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id) 1029 && destination.equals(((AsyncJobKey) obj).destination); 1030 } 1031 1032 @Override 1033 public int hashCode() { 1034 return id.hashCode() + destination.hashCode(); 1035 } 1036 1037 @Override 1038 public String toString() { 1039 return destination.getPhysicalName() + "-" + id; 1040 } 1041 } 1042 1043 interface StoreTask { 1044 public boolean cancel(); 1045 } 1046 1047 class StoreQueueTask implements Runnable, StoreTask { 1048 protected final Message message; 1049 protected final ConnectionContext context; 1050 protected final KahaDBMessageStore store; 1051 protected final InnerFutureTask future; 1052 protected final AtomicBoolean done = new AtomicBoolean(); 1053 protected final AtomicBoolean locked = new AtomicBoolean(); 1054 1055 public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) { 1056 this.store = store; 1057 this.context = context; 1058 this.message = message; 1059 this.future = new InnerFutureTask(this); 1060 } 1061 1062 public Future<Object> getFuture() { 1063 return this.future; 1064 } 1065 1066 public boolean cancel() { 1067 releaseLocks(); 1068 if (this.done.compareAndSet(false, true)) { 1069 return this.future.cancel(false); 1070 } 1071 return false; 1072 } 1073 1074 void aquireLocks() { 1075 if (this.locked.compareAndSet(false, true)) { 1076 try { 1077 globalQueueSemaphore.acquire(); 1078 store.acquireLocalAsyncLock(); 1079 message.incrementReferenceCount(); 1080 } catch (InterruptedException e) { 1081 LOG.warn("Failed to aquire lock", e); 1082 } 1083 } 1084 1085 } 1086 1087 void releaseLocks() { 1088 if (this.locked.compareAndSet(true, false)) { 1089 store.releaseLocalAsyncLock(); 1090 globalQueueSemaphore.release(); 1091 message.decrementReferenceCount(); 1092 } 1093 } 1094 1095 public void run() { 1096 this.store.doneTasks++; 1097 try { 1098 if (this.done.compareAndSet(false, true)) { 1099 this.store.addMessage(context, message); 1100 removeQueueTask(this.store, this.message.getMessageId()); 1101 this.future.complete(); 1102 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1103 System.err.println(this.store.dest.getName() + " cancelled: " 1104 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1105 this.store.canceledTasks = this.store.doneTasks = 0; 1106 } 1107 } catch (Exception e) { 1108 this.future.setException(e); 1109 } finally { 1110 releaseLocks(); 1111 } 1112 } 1113 1114 protected Message getMessage() { 1115 return this.message; 1116 } 1117 1118 private class InnerFutureTask extends FutureTask<Object> { 1119 1120 public InnerFutureTask(Runnable runnable) { 1121 super(runnable, null); 1122 1123 } 1124 1125 public void setException(final Exception e) { 1126 super.setException(e); 1127 } 1128 1129 public void complete() { 1130 super.set(null); 1131 } 1132 } 1133 } 1134 1135 class StoreTopicTask extends StoreQueueTask { 1136 private final int subscriptionCount; 1137 private final List<String> subscriptionKeys = new ArrayList<String>(1); 1138 private final KahaDBTopicMessageStore topicStore; 1139 public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, 1140 int subscriptionCount) { 1141 super(store, context, message); 1142 this.topicStore = store; 1143 this.subscriptionCount = subscriptionCount; 1144 1145 } 1146 1147 @Override 1148 void aquireLocks() { 1149 if (this.locked.compareAndSet(false, true)) { 1150 try { 1151 globalTopicSemaphore.acquire(); 1152 store.acquireLocalAsyncLock(); 1153 message.incrementReferenceCount(); 1154 } catch (InterruptedException e) { 1155 LOG.warn("Failed to aquire lock", e); 1156 } 1157 } 1158 1159 } 1160 1161 @Override 1162 void releaseLocks() { 1163 if (this.locked.compareAndSet(true, false)) { 1164 message.decrementReferenceCount(); 1165 store.releaseLocalAsyncLock(); 1166 globalTopicSemaphore.release(); 1167 } 1168 } 1169 1170 /** 1171 * add a key 1172 * 1173 * @param key 1174 * @return true if all acknowledgements received 1175 */ 1176 public boolean addSubscriptionKey(String key) { 1177 synchronized (this.subscriptionKeys) { 1178 this.subscriptionKeys.add(key); 1179 } 1180 return this.subscriptionKeys.size() >= this.subscriptionCount; 1181 } 1182 1183 @Override 1184 public void run() { 1185 this.store.doneTasks++; 1186 try { 1187 if (this.done.compareAndSet(false, true)) { 1188 this.topicStore.addMessage(context, message); 1189 // apply any acks we have 1190 synchronized (this.subscriptionKeys) { 1191 for (String key : this.subscriptionKeys) { 1192 this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null); 1193 1194 } 1195 } 1196 removeTopicTask(this.topicStore, this.message.getMessageId()); 1197 this.future.complete(); 1198 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1199 System.err.println(this.store.dest.getName() + " cancelled: " 1200 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1201 this.store.canceledTasks = this.store.doneTasks = 0; 1202 } 1203 } catch (Exception e) { 1204 this.future.setException(e); 1205 } finally { 1206 releaseLocks(); 1207 } 1208 } 1209 } 1210}