001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.journal; 018 019import java.io.File; 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.Set; 025import java.util.concurrent.Callable; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.CountDownLatch; 028import java.util.concurrent.FutureTask; 029import java.util.concurrent.LinkedBlockingQueue; 030import java.util.concurrent.ThreadFactory; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicBoolean; 034import org.apache.activeio.journal.InvalidRecordLocationException; 035import org.apache.activeio.journal.Journal; 036import org.apache.activeio.journal.JournalEventListener; 037import org.apache.activeio.journal.RecordLocation; 038import org.apache.activeio.packet.ByteArrayPacket; 039import org.apache.activeio.packet.Packet; 040import org.apache.activemq.broker.BrokerService; 041import org.apache.activemq.broker.BrokerServiceAware; 042import org.apache.activemq.broker.ConnectionContext; 043import org.apache.activemq.command.ActiveMQDestination; 044import org.apache.activemq.command.ActiveMQQueue; 045import org.apache.activemq.command.ActiveMQTopic; 046import org.apache.activemq.command.DataStructure; 047import org.apache.activemq.command.JournalQueueAck; 048import org.apache.activemq.command.JournalTopicAck; 049import org.apache.activemq.command.JournalTrace; 050import org.apache.activemq.command.JournalTransaction; 051import org.apache.activemq.command.Message; 052import org.apache.activemq.command.MessageAck; 053import org.apache.activemq.command.ProducerId; 054import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 055import org.apache.activemq.openwire.OpenWireFormat; 056import org.apache.activemq.store.MessageStore; 057import org.apache.activemq.store.PersistenceAdapter; 058import org.apache.activemq.store.TopicMessageStore; 059import org.apache.activemq.store.TransactionStore; 060import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 061import org.apache.activemq.store.journal.JournalTransactionStore.Tx; 062import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation; 063import org.apache.activemq.thread.Scheduler; 064import org.apache.activemq.thread.Task; 065import org.apache.activemq.thread.TaskRunner; 066import org.apache.activemq.thread.TaskRunnerFactory; 067import org.apache.activemq.usage.SystemUsage; 068import org.apache.activemq.usage.Usage; 069import org.apache.activemq.usage.UsageListener; 070import org.apache.activemq.util.ByteSequence; 071import org.apache.activemq.util.IOExceptionSupport; 072import org.apache.activemq.wireformat.WireFormat; 073import org.slf4j.Logger; 074import org.slf4j.LoggerFactory; 075 076/** 077 * An implementation of {@link PersistenceAdapter} designed for use with a 078 * {@link Journal} and then check pointing asynchronously on a timeout with some 079 * other long term persistent storage. 080 * 081 * @org.apache.xbean.XBean 082 * 083 */ 084public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware { 085 086 private BrokerService brokerService; 087 088 protected Scheduler scheduler; 089 private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class); 090 091 private Journal journal; 092 private PersistenceAdapter longTermPersistence; 093 094 private final WireFormat wireFormat = new OpenWireFormat(); 095 096 private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>(); 097 private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>(); 098 099 private SystemUsage usageManager; 100 private final long checkpointInterval = 1000 * 60 * 5; 101 private long lastCheckpointRequest = System.currentTimeMillis(); 102 private long lastCleanup = System.currentTimeMillis(); 103 private int maxCheckpointWorkers = 10; 104 private int maxCheckpointMessageAddSize = 1024 * 1024; 105 106 private final JournalTransactionStore transactionStore = new JournalTransactionStore(this); 107 private ThreadPoolExecutor checkpointExecutor; 108 109 private TaskRunner checkpointTask; 110 private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); 111 private boolean fullCheckPoint; 112 113 private final AtomicBoolean started = new AtomicBoolean(false); 114 115 private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask(); 116 117 private TaskRunnerFactory taskRunnerFactory; 118 119 public JournalPersistenceAdapter() { 120 } 121 122 public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException { 123 setJournal(journal); 124 setTaskRunnerFactory(taskRunnerFactory); 125 setPersistenceAdapter(longTermPersistence); 126 } 127 128 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 129 this.taskRunnerFactory = taskRunnerFactory; 130 } 131 132 public void setJournal(Journal journal) { 133 this.journal = journal; 134 journal.setJournalEventListener(this); 135 } 136 137 public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) { 138 this.longTermPersistence = longTermPersistence; 139 } 140 141 final Runnable createPeriodicCheckpointTask() { 142 return new Runnable() { 143 public void run() { 144 long lastTime = 0; 145 synchronized (this) { 146 lastTime = lastCheckpointRequest; 147 } 148 if (System.currentTimeMillis() > lastTime + checkpointInterval) { 149 checkpoint(false, true); 150 } 151 } 152 }; 153 } 154 155 /** 156 * @param usageManager The UsageManager that is controlling the 157 * destination's memory usage. 158 */ 159 public void setUsageManager(SystemUsage usageManager) { 160 this.usageManager = usageManager; 161 longTermPersistence.setUsageManager(usageManager); 162 } 163 164 public Set<ActiveMQDestination> getDestinations() { 165 Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations()); 166 destinations.addAll(queues.keySet()); 167 destinations.addAll(topics.keySet()); 168 return destinations; 169 } 170 171 private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException { 172 if (destination.isQueue()) { 173 return createQueueMessageStore((ActiveMQQueue)destination); 174 } else { 175 return createTopicMessageStore((ActiveMQTopic)destination); 176 } 177 } 178 179 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 180 JournalMessageStore store = queues.get(destination); 181 if (store == null) { 182 MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination); 183 store = new JournalMessageStore(this, checkpointStore, destination); 184 queues.put(destination, store); 185 } 186 return store; 187 } 188 189 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException { 190 JournalTopicMessageStore store = topics.get(destinationName); 191 if (store == null) { 192 TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName); 193 store = new JournalTopicMessageStore(this, checkpointStore, destinationName); 194 topics.put(destinationName, store); 195 } 196 return store; 197 } 198 199 /** 200 * Cleanup method to remove any state associated with the given destination 201 * 202 * @param destination Destination to forget 203 */ 204 public void removeQueueMessageStore(ActiveMQQueue destination) { 205 queues.remove(destination); 206 } 207 208 /** 209 * Cleanup method to remove any state associated with the given destination 210 * 211 * @param destination Destination to forget 212 */ 213 public void removeTopicMessageStore(ActiveMQTopic destination) { 214 topics.remove(destination); 215 } 216 217 public TransactionStore createTransactionStore() throws IOException { 218 return transactionStore; 219 } 220 221 public long getLastMessageBrokerSequenceId() throws IOException { 222 return longTermPersistence.getLastMessageBrokerSequenceId(); 223 } 224 225 public void beginTransaction(ConnectionContext context) throws IOException { 226 longTermPersistence.beginTransaction(context); 227 } 228 229 public void commitTransaction(ConnectionContext context) throws IOException { 230 longTermPersistence.commitTransaction(context); 231 } 232 233 public void rollbackTransaction(ConnectionContext context) throws IOException { 234 longTermPersistence.rollbackTransaction(context); 235 } 236 237 public synchronized void start() throws Exception { 238 if (!started.compareAndSet(false, true)) { 239 return; 240 } 241 242 checkpointTask = taskRunnerFactory.createTaskRunner(new Task() { 243 public boolean iterate() { 244 return doCheckpoint(); 245 } 246 }, "ActiveMQ Journal Checkpoint Worker"); 247 248 checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 249 public Thread newThread(Runnable runable) { 250 Thread t = new Thread(runable, "Journal checkpoint worker"); 251 t.setPriority(7); 252 return t; 253 } 254 }); 255 // checkpointExecutor.allowCoreThreadTimeOut(true); 256 257 this.usageManager.getMemoryUsage().addUsageListener(this); 258 259 if (longTermPersistence instanceof JDBCPersistenceAdapter) { 260 // Disabled periodic clean up as it deadlocks with the checkpoint 261 // operations. 262 ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0); 263 } 264 265 longTermPersistence.start(); 266 createTransactionStore(); 267 recover(); 268 269 // Do a checkpoint periodically. 270 this.scheduler = new Scheduler("Journal Scheduler"); 271 this.scheduler.start(); 272 this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10); 273 274 } 275 276 public void stop() throws Exception { 277 278 this.usageManager.getMemoryUsage().removeUsageListener(this); 279 if (!started.compareAndSet(true, false)) { 280 return; 281 } 282 283 this.scheduler.cancel(periodicCheckpointTask); 284 this.scheduler.stop(); 285 286 // Take one final checkpoint and stop checkpoint processing. 287 checkpoint(true, true); 288 checkpointTask.shutdown(); 289 checkpointExecutor.shutdown(); 290 291 queues.clear(); 292 topics.clear(); 293 294 IOException firstException = null; 295 try { 296 journal.close(); 297 } catch (Exception e) { 298 firstException = IOExceptionSupport.create("Failed to close journals: " + e, e); 299 } 300 longTermPersistence.stop(); 301 302 if (firstException != null) { 303 throw firstException; 304 } 305 } 306 307 // Properties 308 // ------------------------------------------------------------------------- 309 public PersistenceAdapter getLongTermPersistence() { 310 return longTermPersistence; 311 } 312 313 /** 314 * @return Returns the wireFormat. 315 */ 316 public WireFormat getWireFormat() { 317 return wireFormat; 318 } 319 320 // Implementation methods 321 // ------------------------------------------------------------------------- 322 323 /** 324 * The Journal give us a call back so that we can move old data out of the 325 * journal. Taking a checkpoint does this for us. 326 * 327 * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation) 328 */ 329 public void overflowNotification(RecordLocation safeLocation) { 330 checkpoint(false, true); 331 } 332 333 /** 334 * When we checkpoint we move all the journalled data to long term storage. 335 * 336 */ 337 public void checkpoint(boolean sync, boolean fullCheckpoint) { 338 try { 339 if (journal == null) { 340 throw new IllegalStateException("Journal is closed."); 341 } 342 343 long now = System.currentTimeMillis(); 344 CountDownLatch latch = null; 345 synchronized (this) { 346 latch = nextCheckpointCountDownLatch; 347 lastCheckpointRequest = now; 348 if (fullCheckpoint) { 349 this.fullCheckPoint = true; 350 } 351 } 352 353 checkpointTask.wakeup(); 354 355 if (sync) { 356 LOG.debug("Waking for checkpoint to complete."); 357 latch.await(); 358 } 359 } catch (InterruptedException e) { 360 Thread.currentThread().interrupt(); 361 LOG.warn("Request to start checkpoint failed: " + e, e); 362 } 363 } 364 365 public void checkpoint(boolean sync) { 366 checkpoint(sync, sync); 367 } 368 369 /** 370 * This does the actual checkpoint. 371 * 372 * @return 373 */ 374 public boolean doCheckpoint() { 375 CountDownLatch latch = null; 376 boolean fullCheckpoint; 377 synchronized (this) { 378 latch = nextCheckpointCountDownLatch; 379 nextCheckpointCountDownLatch = new CountDownLatch(1); 380 fullCheckpoint = this.fullCheckPoint; 381 this.fullCheckPoint = false; 382 } 383 try { 384 385 LOG.debug("Checkpoint started."); 386 RecordLocation newMark = null; 387 388 ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size()); 389 390 // 391 // We do many partial checkpoints (fullCheckpoint==false) to move 392 // topic messages 393 // to long term store as soon as possible. 394 // 395 // We want to avoid doing that for queue messages since removes the 396 // come in the same 397 // checkpoint cycle will nullify the previous message add. 398 // Therefore, we only 399 // checkpoint queues on the fullCheckpoint cycles. 400 // 401 if (fullCheckpoint) { 402 Iterator<JournalMessageStore> iterator = queues.values().iterator(); 403 while (iterator.hasNext()) { 404 try { 405 final JournalMessageStore ms = iterator.next(); 406 FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() { 407 public RecordLocation call() throws Exception { 408 return ms.checkpoint(); 409 } 410 }); 411 futureTasks.add(task); 412 checkpointExecutor.execute(task); 413 } catch (Exception e) { 414 LOG.error("Failed to checkpoint a message store: " + e, e); 415 } 416 } 417 } 418 419 Iterator<JournalTopicMessageStore> iterator = topics.values().iterator(); 420 while (iterator.hasNext()) { 421 try { 422 final JournalTopicMessageStore ms = iterator.next(); 423 FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() { 424 public RecordLocation call() throws Exception { 425 return ms.checkpoint(); 426 } 427 }); 428 futureTasks.add(task); 429 checkpointExecutor.execute(task); 430 } catch (Exception e) { 431 LOG.error("Failed to checkpoint a message store: " + e, e); 432 } 433 } 434 435 try { 436 for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) { 437 FutureTask<RecordLocation> ft = iter.next(); 438 RecordLocation mark = ft.get(); 439 // We only set a newMark on full checkpoints. 440 if (fullCheckpoint) { 441 if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { 442 newMark = mark; 443 } 444 } 445 } 446 } catch (Throwable e) { 447 LOG.error("Failed to checkpoint a message store: " + e, e); 448 } 449 450 if (fullCheckpoint) { 451 try { 452 if (newMark != null) { 453 LOG.debug("Marking journal at: " + newMark); 454 journal.setMark(newMark, true); 455 } 456 } catch (Exception e) { 457 LOG.error("Failed to mark the Journal: " + e, e); 458 } 459 460 if (longTermPersistence instanceof JDBCPersistenceAdapter) { 461 // We may be check pointing more often than the 462 // checkpointInterval if under high use 463 // But we don't want to clean up the db that often. 464 long now = System.currentTimeMillis(); 465 if (now > lastCleanup + checkpointInterval) { 466 lastCleanup = now; 467 ((JDBCPersistenceAdapter)longTermPersistence).cleanup(); 468 } 469 } 470 } 471 472 LOG.debug("Checkpoint done."); 473 } finally { 474 latch.countDown(); 475 } 476 synchronized (this) { 477 return this.fullCheckPoint; 478 } 479 480 } 481 482 /** 483 * @param location 484 * @return 485 * @throws IOException 486 */ 487 public DataStructure readCommand(RecordLocation location) throws IOException { 488 try { 489 Packet packet = journal.read(location); 490 return (DataStructure)wireFormat.unmarshal(toByteSequence(packet)); 491 } catch (InvalidRecordLocationException e) { 492 throw createReadException(location, e); 493 } catch (IOException e) { 494 throw createReadException(location, e); 495 } 496 } 497 498 /** 499 * Move all the messages that were in the journal into long term storage. We 500 * just replay and do a checkpoint. 501 * 502 * @throws IOException 503 * @throws IOException 504 * @throws InvalidRecordLocationException 505 * @throws IllegalStateException 506 */ 507 private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException { 508 509 RecordLocation pos = null; 510 int transactionCounter = 0; 511 512 LOG.info("Journal Recovery Started from: " + journal); 513 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); 514 515 // While we have records in the journal. 516 while ((pos = journal.getNextRecordLocation(pos)) != null) { 517 Packet data = journal.read(pos); 518 DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data)); 519 520 if (c instanceof Message) { 521 Message message = (Message)c; 522 JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination()); 523 if (message.isInTransaction()) { 524 transactionStore.addMessage(store, message, pos); 525 } else { 526 store.replayAddMessage(context, message); 527 transactionCounter++; 528 } 529 } else { 530 switch (c.getDataStructureType()) { 531 case JournalQueueAck.DATA_STRUCTURE_TYPE: { 532 JournalQueueAck command = (JournalQueueAck)c; 533 JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination()); 534 if (command.getMessageAck().isInTransaction()) { 535 transactionStore.removeMessage(store, command.getMessageAck(), pos); 536 } else { 537 store.replayRemoveMessage(context, command.getMessageAck()); 538 transactionCounter++; 539 } 540 } 541 break; 542 case JournalTopicAck.DATA_STRUCTURE_TYPE: { 543 JournalTopicAck command = (JournalTopicAck)c; 544 JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination()); 545 if (command.getTransactionId() != null) { 546 transactionStore.acknowledge(store, command, pos); 547 } else { 548 store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()); 549 transactionCounter++; 550 } 551 } 552 break; 553 case JournalTransaction.DATA_STRUCTURE_TYPE: { 554 JournalTransaction command = (JournalTransaction)c; 555 try { 556 // Try to replay the packet. 557 switch (command.getType()) { 558 case JournalTransaction.XA_PREPARE: 559 transactionStore.replayPrepare(command.getTransactionId()); 560 break; 561 case JournalTransaction.XA_COMMIT: 562 case JournalTransaction.LOCAL_COMMIT: 563 Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); 564 if (tx == null) { 565 break; // We may be trying to replay a commit 566 } 567 // that 568 // was already committed. 569 570 // Replay the committed operations. 571 tx.getOperations(); 572 for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { 573 TxOperation op = (TxOperation)iter.next(); 574 if (op.operationType == TxOperation.ADD_OPERATION_TYPE) { 575 op.store.replayAddMessage(context, (Message)op.data); 576 } 577 if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { 578 op.store.replayRemoveMessage(context, (MessageAck)op.data); 579 } 580 if (op.operationType == TxOperation.ACK_OPERATION_TYPE) { 581 JournalTopicAck ack = (JournalTopicAck)op.data; 582 ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId()); 583 } 584 } 585 transactionCounter++; 586 break; 587 case JournalTransaction.LOCAL_ROLLBACK: 588 case JournalTransaction.XA_ROLLBACK: 589 transactionStore.replayRollback(command.getTransactionId()); 590 break; 591 default: 592 throw new IOException("Invalid journal command type: " + command.getType()); 593 } 594 } catch (IOException e) { 595 LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e); 596 } 597 } 598 break; 599 case JournalTrace.DATA_STRUCTURE_TYPE: 600 JournalTrace trace = (JournalTrace)c; 601 LOG.debug("TRACE Entry: " + trace.getMessage()); 602 break; 603 default: 604 LOG.error("Unknown type of record in transaction log which will be discarded: " + c); 605 } 606 } 607 } 608 609 RecordLocation location = writeTraceMessage("RECOVERED", true); 610 journal.setMark(location, true); 611 612 LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered."); 613 } 614 615 private IOException createReadException(RecordLocation location, Exception e) { 616 return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e); 617 } 618 619 protected IOException createWriteException(DataStructure packet, Exception e) { 620 return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e); 621 } 622 623 protected IOException createWriteException(String command, Exception e) { 624 return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e); 625 } 626 627 protected IOException createRecoveryFailedException(Exception e) { 628 return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e); 629 } 630 631 /** 632 * @param command 633 * @param sync 634 * @return 635 * @throws IOException 636 */ 637 public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException { 638 if (started.get()) { 639 try { 640 return journal.write(toPacket(wireFormat.marshal(command)), sync); 641 } catch (IOException ioe) { 642 LOG.error("Cannot write to the journal", ioe); 643 brokerService.handleIOException(ioe); 644 throw ioe; 645 } 646 } 647 throw new IOException("closed"); 648 } 649 650 private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException { 651 JournalTrace trace = new JournalTrace(); 652 trace.setMessage(message); 653 return writeCommand(trace, sync); 654 } 655 656 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 657 newPercentUsage = (newPercentUsage / 10) * 10; 658 oldPercentUsage = (oldPercentUsage / 10) * 10; 659 if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) { 660 boolean sync = newPercentUsage >= 90; 661 checkpoint(sync, true); 662 } 663 } 664 665 public JournalTransactionStore getTransactionStore() { 666 return transactionStore; 667 } 668 669 public void deleteAllMessages() throws IOException { 670 try { 671 JournalTrace trace = new JournalTrace(); 672 trace.setMessage("DELETED"); 673 RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false); 674 journal.setMark(location, true); 675 LOG.info("Journal deleted: "); 676 } catch (IOException e) { 677 throw e; 678 } catch (Throwable e) { 679 throw IOExceptionSupport.create(e); 680 } 681 longTermPersistence.deleteAllMessages(); 682 } 683 684 public SystemUsage getUsageManager() { 685 return usageManager; 686 } 687 688 public int getMaxCheckpointMessageAddSize() { 689 return maxCheckpointMessageAddSize; 690 } 691 692 public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) { 693 this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize; 694 } 695 696 public int getMaxCheckpointWorkers() { 697 return maxCheckpointWorkers; 698 } 699 700 public void setMaxCheckpointWorkers(int maxCheckpointWorkers) { 701 this.maxCheckpointWorkers = maxCheckpointWorkers; 702 } 703 704 public boolean isUseExternalMessageReferences() { 705 return false; 706 } 707 708 public void setUseExternalMessageReferences(boolean enable) { 709 if (enable) { 710 throw new IllegalArgumentException("The journal does not support message references."); 711 } 712 } 713 714 public Packet toPacket(ByteSequence sequence) { 715 return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length)); 716 } 717 718 public ByteSequence toByteSequence(Packet packet) { 719 org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence(); 720 return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength()); 721 } 722 723 public void setBrokerName(String brokerName) { 724 longTermPersistence.setBrokerName(brokerName); 725 } 726 727 @Override 728 public String toString() { 729 return "JournalPersistenceAdapator(" + longTermPersistence + ")"; 730 } 731 732 public void setDirectory(File dir) { 733 } 734 735 public long size(){ 736 return 0; 737 } 738 739 public void setBrokerService(BrokerService brokerService) { 740 this.brokerService = brokerService; 741 PersistenceAdapter pa = getLongTermPersistence(); 742 if( pa instanceof BrokerServiceAware ) { 743 ((BrokerServiceAware)pa).setBrokerService(brokerService); 744 } 745 } 746 747 public long getLastProducerSequenceId(ProducerId id) { 748 return -1; 749 } 750 751}