001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.journal;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.Set;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.CountDownLatch;
028import java.util.concurrent.FutureTask;
029import java.util.concurrent.LinkedBlockingQueue;
030import java.util.concurrent.ThreadFactory;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicBoolean;
034import org.apache.activeio.journal.InvalidRecordLocationException;
035import org.apache.activeio.journal.Journal;
036import org.apache.activeio.journal.JournalEventListener;
037import org.apache.activeio.journal.RecordLocation;
038import org.apache.activeio.packet.ByteArrayPacket;
039import org.apache.activeio.packet.Packet;
040import org.apache.activemq.broker.BrokerService;
041import org.apache.activemq.broker.BrokerServiceAware;
042import org.apache.activemq.broker.ConnectionContext;
043import org.apache.activemq.command.ActiveMQDestination;
044import org.apache.activemq.command.ActiveMQQueue;
045import org.apache.activemq.command.ActiveMQTopic;
046import org.apache.activemq.command.DataStructure;
047import org.apache.activemq.command.JournalQueueAck;
048import org.apache.activemq.command.JournalTopicAck;
049import org.apache.activemq.command.JournalTrace;
050import org.apache.activemq.command.JournalTransaction;
051import org.apache.activemq.command.Message;
052import org.apache.activemq.command.MessageAck;
053import org.apache.activemq.command.ProducerId;
054import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
055import org.apache.activemq.openwire.OpenWireFormat;
056import org.apache.activemq.store.MessageStore;
057import org.apache.activemq.store.PersistenceAdapter;
058import org.apache.activemq.store.TopicMessageStore;
059import org.apache.activemq.store.TransactionStore;
060import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
061import org.apache.activemq.store.journal.JournalTransactionStore.Tx;
062import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation;
063import org.apache.activemq.thread.Scheduler;
064import org.apache.activemq.thread.Task;
065import org.apache.activemq.thread.TaskRunner;
066import org.apache.activemq.thread.TaskRunnerFactory;
067import org.apache.activemq.usage.SystemUsage;
068import org.apache.activemq.usage.Usage;
069import org.apache.activemq.usage.UsageListener;
070import org.apache.activemq.util.ByteSequence;
071import org.apache.activemq.util.IOExceptionSupport;
072import org.apache.activemq.wireformat.WireFormat;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076/**
077 * An implementation of {@link PersistenceAdapter} designed for use with a
078 * {@link Journal} and then check pointing asynchronously on a timeout with some
079 * other long term persistent storage.
080 * 
081 * @org.apache.xbean.XBean
082 * 
083 */
084public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
085
086    private BrokerService brokerService;
087        
088    protected Scheduler scheduler;
089    private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class);
090
091    private Journal journal;
092    private PersistenceAdapter longTermPersistence;
093
094    private final WireFormat wireFormat = new OpenWireFormat();
095
096    private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
097    private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
098
099    private SystemUsage usageManager;
100    private final long checkpointInterval = 1000 * 60 * 5;
101    private long lastCheckpointRequest = System.currentTimeMillis();
102    private long lastCleanup = System.currentTimeMillis();
103    private int maxCheckpointWorkers = 10;
104    private int maxCheckpointMessageAddSize = 1024 * 1024;
105
106    private final JournalTransactionStore transactionStore = new JournalTransactionStore(this);
107    private ThreadPoolExecutor checkpointExecutor;
108
109    private TaskRunner checkpointTask;
110    private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
111    private boolean fullCheckPoint;
112
113    private final AtomicBoolean started = new AtomicBoolean(false);
114
115    private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
116
117    private TaskRunnerFactory taskRunnerFactory;
118
119    public JournalPersistenceAdapter() {        
120    }
121    
122    public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
123        setJournal(journal);
124        setTaskRunnerFactory(taskRunnerFactory);
125        setPersistenceAdapter(longTermPersistence);
126    }
127
128    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
129        this.taskRunnerFactory = taskRunnerFactory;
130    }
131
132    public void setJournal(Journal journal) {
133        this.journal = journal;
134        journal.setJournalEventListener(this);
135    }
136    
137    public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
138        this.longTermPersistence = longTermPersistence;
139    }
140    
141    final Runnable createPeriodicCheckpointTask() {
142        return new Runnable() {
143            public void run() {
144                long lastTime = 0;
145                synchronized (this) {
146                    lastTime = lastCheckpointRequest;
147                }
148                if (System.currentTimeMillis() > lastTime + checkpointInterval) {
149                    checkpoint(false, true);
150                }
151            }
152        };
153    }
154
155    /**
156     * @param usageManager The UsageManager that is controlling the
157     *                destination's memory usage.
158     */
159    public void setUsageManager(SystemUsage usageManager) {
160        this.usageManager = usageManager;
161        longTermPersistence.setUsageManager(usageManager);
162    }
163
164    public Set<ActiveMQDestination> getDestinations() {
165        Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
166        destinations.addAll(queues.keySet());
167        destinations.addAll(topics.keySet());
168        return destinations;
169    }
170
171    private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
172        if (destination.isQueue()) {
173            return createQueueMessageStore((ActiveMQQueue)destination);
174        } else {
175            return createTopicMessageStore((ActiveMQTopic)destination);
176        }
177    }
178
179    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
180        JournalMessageStore store = queues.get(destination);
181        if (store == null) {
182            MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination);
183            store = new JournalMessageStore(this, checkpointStore, destination);
184            queues.put(destination, store);
185        }
186        return store;
187    }
188
189    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
190        JournalTopicMessageStore store = topics.get(destinationName);
191        if (store == null) {
192            TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
193            store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
194            topics.put(destinationName, store);
195        }
196        return store;
197    }
198
199    /**
200     * Cleanup method to remove any state associated with the given destination
201     *
202     * @param destination Destination to forget
203     */
204    public void removeQueueMessageStore(ActiveMQQueue destination) {
205        queues.remove(destination);
206    }
207
208    /**
209     * Cleanup method to remove any state associated with the given destination
210     *
211     * @param destination Destination to forget
212     */
213    public void removeTopicMessageStore(ActiveMQTopic destination) {
214        topics.remove(destination);
215    }
216
217    public TransactionStore createTransactionStore() throws IOException {
218        return transactionStore;
219    }
220
221    public long getLastMessageBrokerSequenceId() throws IOException {
222        return longTermPersistence.getLastMessageBrokerSequenceId();
223    }
224
225    public void beginTransaction(ConnectionContext context) throws IOException {
226        longTermPersistence.beginTransaction(context);
227    }
228
229    public void commitTransaction(ConnectionContext context) throws IOException {
230        longTermPersistence.commitTransaction(context);
231    }
232
233    public void rollbackTransaction(ConnectionContext context) throws IOException {
234        longTermPersistence.rollbackTransaction(context);
235    }
236
237    public synchronized void start() throws Exception {
238        if (!started.compareAndSet(false, true)) {
239            return;
240        }
241
242        checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
243            public boolean iterate() {
244                return doCheckpoint();
245            }
246        }, "ActiveMQ Journal Checkpoint Worker");
247
248        checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
249            public Thread newThread(Runnable runable) {
250                Thread t = new Thread(runable, "Journal checkpoint worker");
251                t.setPriority(7);
252                return t;
253            }
254        });
255        // checkpointExecutor.allowCoreThreadTimeOut(true);
256
257        this.usageManager.getMemoryUsage().addUsageListener(this);
258
259        if (longTermPersistence instanceof JDBCPersistenceAdapter) {
260            // Disabled periodic clean up as it deadlocks with the checkpoint
261            // operations.
262            ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
263        }
264
265        longTermPersistence.start();
266        createTransactionStore();
267        recover();
268
269        // Do a checkpoint periodically.
270        this.scheduler = new Scheduler("Journal Scheduler");
271        this.scheduler.start();
272        this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
273
274    }
275
276    public void stop() throws Exception {
277
278        this.usageManager.getMemoryUsage().removeUsageListener(this);
279        if (!started.compareAndSet(true, false)) {
280            return;
281        }
282
283        this.scheduler.cancel(periodicCheckpointTask);
284        this.scheduler.stop();
285
286        // Take one final checkpoint and stop checkpoint processing.
287        checkpoint(true, true);
288        checkpointTask.shutdown();
289        checkpointExecutor.shutdown();
290
291        queues.clear();
292        topics.clear();
293
294        IOException firstException = null;
295        try {
296            journal.close();
297        } catch (Exception e) {
298            firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
299        }
300        longTermPersistence.stop();
301
302        if (firstException != null) {
303            throw firstException;
304        }
305    }
306
307    // Properties
308    // -------------------------------------------------------------------------
309    public PersistenceAdapter getLongTermPersistence() {
310        return longTermPersistence;
311    }
312
313    /**
314     * @return Returns the wireFormat.
315     */
316    public WireFormat getWireFormat() {
317        return wireFormat;
318    }
319
320    // Implementation methods
321    // -------------------------------------------------------------------------
322
323    /**
324     * The Journal give us a call back so that we can move old data out of the
325     * journal. Taking a checkpoint does this for us.
326     * 
327     * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
328     */
329    public void overflowNotification(RecordLocation safeLocation) {
330        checkpoint(false, true);
331    }
332
333    /**
334     * When we checkpoint we move all the journalled data to long term storage.
335     * 
336     */
337    public void checkpoint(boolean sync, boolean fullCheckpoint) {
338        try {
339            if (journal == null) {
340                throw new IllegalStateException("Journal is closed.");
341            }
342
343            long now = System.currentTimeMillis();
344            CountDownLatch latch = null;
345            synchronized (this) {
346                latch = nextCheckpointCountDownLatch;
347                lastCheckpointRequest = now;
348                if (fullCheckpoint) {
349                    this.fullCheckPoint = true;
350                }
351            }
352
353            checkpointTask.wakeup();
354
355            if (sync) {
356                LOG.debug("Waking for checkpoint to complete.");
357                latch.await();
358            }
359        } catch (InterruptedException e) {
360            Thread.currentThread().interrupt();
361            LOG.warn("Request to start checkpoint failed: " + e, e);
362        }
363    }
364
365    public void checkpoint(boolean sync) {
366        checkpoint(sync, sync);
367    }
368
369    /**
370     * This does the actual checkpoint.
371     * 
372     * @return
373     */
374    public boolean doCheckpoint() {
375        CountDownLatch latch = null;
376        boolean fullCheckpoint;
377        synchronized (this) {
378            latch = nextCheckpointCountDownLatch;
379            nextCheckpointCountDownLatch = new CountDownLatch(1);
380            fullCheckpoint = this.fullCheckPoint;
381            this.fullCheckPoint = false;
382        }
383        try {
384
385            LOG.debug("Checkpoint started.");
386            RecordLocation newMark = null;
387
388            ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size());
389
390            //
391            // We do many partial checkpoints (fullCheckpoint==false) to move
392            // topic messages
393            // to long term store as soon as possible.
394            // 
395            // We want to avoid doing that for queue messages since removes the
396            // come in the same
397            // checkpoint cycle will nullify the previous message add.
398            // Therefore, we only
399            // checkpoint queues on the fullCheckpoint cycles.
400            //
401            if (fullCheckpoint) {
402                Iterator<JournalMessageStore> iterator = queues.values().iterator();
403                while (iterator.hasNext()) {
404                    try {
405                        final JournalMessageStore ms = iterator.next();
406                        FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
407                            public RecordLocation call() throws Exception {
408                                return ms.checkpoint();
409                            }
410                        });
411                        futureTasks.add(task);
412                        checkpointExecutor.execute(task);
413                    } catch (Exception e) {
414                        LOG.error("Failed to checkpoint a message store: " + e, e);
415                    }
416                }
417            }
418
419            Iterator<JournalTopicMessageStore> iterator = topics.values().iterator();
420            while (iterator.hasNext()) {
421                try {
422                    final JournalTopicMessageStore ms = iterator.next();
423                    FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
424                        public RecordLocation call() throws Exception {
425                            return ms.checkpoint();
426                        }
427                    });
428                    futureTasks.add(task);
429                    checkpointExecutor.execute(task);
430                } catch (Exception e) {
431                    LOG.error("Failed to checkpoint a message store: " + e, e);
432                }
433            }
434
435            try {
436                for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) {
437                    FutureTask<RecordLocation> ft = iter.next();
438                    RecordLocation mark = ft.get();
439                    // We only set a newMark on full checkpoints.
440                    if (fullCheckpoint) {
441                        if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
442                            newMark = mark;
443                        }
444                    }
445                }
446            } catch (Throwable e) {
447                LOG.error("Failed to checkpoint a message store: " + e, e);
448            }
449
450            if (fullCheckpoint) {
451                try {
452                    if (newMark != null) {
453                        LOG.debug("Marking journal at: " + newMark);
454                        journal.setMark(newMark, true);
455                    }
456                } catch (Exception e) {
457                    LOG.error("Failed to mark the Journal: " + e, e);
458                }
459
460                if (longTermPersistence instanceof JDBCPersistenceAdapter) {
461                    // We may be check pointing more often than the
462                    // checkpointInterval if under high use
463                    // But we don't want to clean up the db that often.
464                    long now = System.currentTimeMillis();
465                    if (now > lastCleanup + checkpointInterval) {
466                        lastCleanup = now;
467                        ((JDBCPersistenceAdapter)longTermPersistence).cleanup();
468                    }
469                }
470            }
471
472            LOG.debug("Checkpoint done.");
473        } finally {
474            latch.countDown();
475        }
476        synchronized (this) {
477            return this.fullCheckPoint;
478        }
479
480    }
481
482    /**
483     * @param location
484     * @return
485     * @throws IOException
486     */
487    public DataStructure readCommand(RecordLocation location) throws IOException {
488        try {
489            Packet packet = journal.read(location);
490            return (DataStructure)wireFormat.unmarshal(toByteSequence(packet));
491        } catch (InvalidRecordLocationException e) {
492            throw createReadException(location, e);
493        } catch (IOException e) {
494            throw createReadException(location, e);
495        }
496    }
497
498    /**
499     * Move all the messages that were in the journal into long term storage. We
500     * just replay and do a checkpoint.
501     * 
502     * @throws IOException
503     * @throws IOException
504     * @throws InvalidRecordLocationException
505     * @throws IllegalStateException
506     */
507    private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
508
509        RecordLocation pos = null;
510        int transactionCounter = 0;
511
512        LOG.info("Journal Recovery Started from: " + journal);
513        ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
514
515        // While we have records in the journal.
516        while ((pos = journal.getNextRecordLocation(pos)) != null) {
517            Packet data = journal.read(pos);
518            DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data));
519
520            if (c instanceof Message) {
521                Message message = (Message)c;
522                JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination());
523                if (message.isInTransaction()) {
524                    transactionStore.addMessage(store, message, pos);
525                } else {
526                    store.replayAddMessage(context, message);
527                    transactionCounter++;
528                }
529            } else {
530                switch (c.getDataStructureType()) {
531                case JournalQueueAck.DATA_STRUCTURE_TYPE: {
532                    JournalQueueAck command = (JournalQueueAck)c;
533                    JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination());
534                    if (command.getMessageAck().isInTransaction()) {
535                        transactionStore.removeMessage(store, command.getMessageAck(), pos);
536                    } else {
537                        store.replayRemoveMessage(context, command.getMessageAck());
538                        transactionCounter++;
539                    }
540                }
541                    break;
542                case JournalTopicAck.DATA_STRUCTURE_TYPE: {
543                    JournalTopicAck command = (JournalTopicAck)c;
544                    JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination());
545                    if (command.getTransactionId() != null) {
546                        transactionStore.acknowledge(store, command, pos);
547                    } else {
548                        store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
549                        transactionCounter++;
550                    }
551                }
552                    break;
553                case JournalTransaction.DATA_STRUCTURE_TYPE: {
554                    JournalTransaction command = (JournalTransaction)c;
555                    try {
556                        // Try to replay the packet.
557                        switch (command.getType()) {
558                        case JournalTransaction.XA_PREPARE:
559                            transactionStore.replayPrepare(command.getTransactionId());
560                            break;
561                        case JournalTransaction.XA_COMMIT:
562                        case JournalTransaction.LOCAL_COMMIT:
563                            Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
564                            if (tx == null) {
565                                break; // We may be trying to replay a commit
566                            }
567                            // that
568                            // was already committed.
569
570                            // Replay the committed operations.
571                            tx.getOperations();
572                            for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
573                                TxOperation op = (TxOperation)iter.next();
574                                if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
575                                    op.store.replayAddMessage(context, (Message)op.data);
576                                }
577                                if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
578                                    op.store.replayRemoveMessage(context, (MessageAck)op.data);
579                                }
580                                if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
581                                    JournalTopicAck ack = (JournalTopicAck)op.data;
582                                    ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId());
583                                }
584                            }
585                            transactionCounter++;
586                            break;
587                        case JournalTransaction.LOCAL_ROLLBACK:
588                        case JournalTransaction.XA_ROLLBACK:
589                            transactionStore.replayRollback(command.getTransactionId());
590                            break;
591                        default:
592                            throw new IOException("Invalid journal command type: " + command.getType());
593                        }
594                    } catch (IOException e) {
595                        LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
596                    }
597                }
598                    break;
599                case JournalTrace.DATA_STRUCTURE_TYPE:
600                    JournalTrace trace = (JournalTrace)c;
601                    LOG.debug("TRACE Entry: " + trace.getMessage());
602                    break;
603                default:
604                    LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
605                }
606            }
607        }
608
609        RecordLocation location = writeTraceMessage("RECOVERED", true);
610        journal.setMark(location, true);
611
612        LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
613    }
614
615    private IOException createReadException(RecordLocation location, Exception e) {
616        return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
617    }
618
619    protected IOException createWriteException(DataStructure packet, Exception e) {
620        return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
621    }
622
623    protected IOException createWriteException(String command, Exception e) {
624        return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
625    }
626
627    protected IOException createRecoveryFailedException(Exception e) {
628        return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
629    }
630
631    /**
632     * @param command
633     * @param sync
634     * @return
635     * @throws IOException
636     */
637    public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
638        if (started.get()) {
639            try {
640                    return journal.write(toPacket(wireFormat.marshal(command)), sync);
641            } catch (IOException ioe) {
642                    LOG.error("Cannot write to the journal", ioe);
643                    brokerService.handleIOException(ioe);
644                    throw ioe;
645            }
646        }
647        throw new IOException("closed");
648    }
649
650    private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
651        JournalTrace trace = new JournalTrace();
652        trace.setMessage(message);
653        return writeCommand(trace, sync);
654    }
655
656    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
657        newPercentUsage = (newPercentUsage / 10) * 10;
658        oldPercentUsage = (oldPercentUsage / 10) * 10;
659        if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
660            boolean sync = newPercentUsage >= 90;
661            checkpoint(sync, true);
662        }
663    }
664
665    public JournalTransactionStore getTransactionStore() {
666        return transactionStore;
667    }
668
669    public void deleteAllMessages() throws IOException {
670        try {
671            JournalTrace trace = new JournalTrace();
672            trace.setMessage("DELETED");
673            RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
674            journal.setMark(location, true);
675            LOG.info("Journal deleted: ");
676        } catch (IOException e) {
677            throw e;
678        } catch (Throwable e) {
679            throw IOExceptionSupport.create(e);
680        }
681        longTermPersistence.deleteAllMessages();
682    }
683
684    public SystemUsage getUsageManager() {
685        return usageManager;
686    }
687
688    public int getMaxCheckpointMessageAddSize() {
689        return maxCheckpointMessageAddSize;
690    }
691
692    public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
693        this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
694    }
695
696    public int getMaxCheckpointWorkers() {
697        return maxCheckpointWorkers;
698    }
699
700    public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
701        this.maxCheckpointWorkers = maxCheckpointWorkers;
702    }
703
704    public boolean isUseExternalMessageReferences() {
705        return false;
706    }
707
708    public void setUseExternalMessageReferences(boolean enable) {
709        if (enable) {
710            throw new IllegalArgumentException("The journal does not support message references.");
711        }
712    }
713
714    public Packet toPacket(ByteSequence sequence) {
715        return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
716    }
717
718    public ByteSequence toByteSequence(Packet packet) {
719        org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
720        return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
721    }
722
723    public void setBrokerName(String brokerName) {
724        longTermPersistence.setBrokerName(brokerName);
725    }
726
727    @Override
728    public String toString() {
729        return "JournalPersistenceAdapator(" + longTermPersistence + ")";
730    }
731
732    public void setDirectory(File dir) {
733    }
734    
735    public long size(){
736        return 0;
737    }
738
739    public void setBrokerService(BrokerService brokerService) {
740        this.brokerService = brokerService;
741        PersistenceAdapter pa = getLongTermPersistence();
742        if( pa instanceof BrokerServiceAware ) {
743            ((BrokerServiceAware)pa).setBrokerService(brokerService);
744        }
745    }
746
747    public long getLastProducerSequenceId(ProducerId id) {
748        return -1;
749    }
750
751}