001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.amq;
018
019import java.io.File;
020import java.io.IOException;
021import java.io.RandomAccessFile;
022import java.nio.channels.FileLock;
023import java.util.Date;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.CountDownLatch;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicLong;
034import org.apache.activeio.journal.Journal;
035import org.apache.activemq.broker.BrokerService;
036import org.apache.activemq.broker.BrokerServiceAware;
037import org.apache.activemq.broker.ConnectionContext;
038import org.apache.activemq.command.ActiveMQDestination;
039import org.apache.activemq.command.ActiveMQQueue;
040import org.apache.activemq.command.ActiveMQTopic;
041import org.apache.activemq.command.DataStructure;
042import org.apache.activemq.command.JournalQueueAck;
043import org.apache.activemq.command.JournalTopicAck;
044import org.apache.activemq.command.JournalTrace;
045import org.apache.activemq.command.JournalTransaction;
046import org.apache.activemq.command.Message;
047import org.apache.activemq.command.ProducerId;
048import org.apache.activemq.command.SubscriptionInfo;
049import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
050import org.apache.activemq.kaha.impl.async.AsyncDataManager;
051import org.apache.activemq.kaha.impl.async.Location;
052import org.apache.activemq.kaha.impl.index.hash.HashIndex;
053import org.apache.activemq.openwire.OpenWireFormat;
054import org.apache.activemq.store.MessageStore;
055import org.apache.activemq.store.PersistenceAdapter;
056import org.apache.activemq.store.ReferenceStore;
057import org.apache.activemq.store.ReferenceStoreAdapter;
058import org.apache.activemq.store.TopicMessageStore;
059import org.apache.activemq.store.TopicReferenceStore;
060import org.apache.activemq.store.TransactionStore;
061import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
062import org.apache.activemq.thread.Scheduler;
063import org.apache.activemq.thread.Task;
064import org.apache.activemq.thread.TaskRunner;
065import org.apache.activemq.thread.TaskRunnerFactory;
066import org.apache.activemq.usage.SystemUsage;
067import org.apache.activemq.usage.Usage;
068import org.apache.activemq.usage.UsageListener;
069import org.apache.activemq.util.ByteSequence;
070import org.apache.activemq.util.IOExceptionSupport;
071import org.apache.activemq.util.IOHelper;
072import org.apache.activemq.wireformat.WireFormat;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076
077/**
078 * An implementation of {@link PersistenceAdapter} designed for use with a
079 * {@link Journal} and then check pointing asynchronously on a timeout with some
080 * other long term persistent storage.
081 * 
082 * @org.apache.xbean.XBean element="amqPersistenceAdapter"
083 * 
084 */
085public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
086
087    private static final Logger LOG = LoggerFactory.getLogger(AMQPersistenceAdapter.class);
088    private Scheduler scheduler;
089    private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
090    private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>();
091    private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
092    private static final boolean BROKEN_FILE_LOCK;
093    private static final boolean DISABLE_LOCKING;
094    private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
095    private AsyncDataManager asyncDataManager;
096    private ReferenceStoreAdapter referenceStoreAdapter;
097    private TaskRunnerFactory taskRunnerFactory;
098    private WireFormat wireFormat = new OpenWireFormat();
099    private SystemUsage usageManager;
100    private long checkpointInterval = 1000 * 20;
101    private int maxCheckpointMessageAddSize = 1024 * 4;
102    private final AMQTransactionStore transactionStore = new AMQTransactionStore(this);
103    private TaskRunner checkpointTask;
104    private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
105    private final AtomicBoolean started = new AtomicBoolean(false);
106    private Runnable periodicCheckpointTask;
107    private Runnable periodicCleanupTask;
108    private boolean deleteAllMessages;
109    private boolean syncOnWrite;
110    private boolean syncOnTransaction=true;
111    private String brokerName = "";
112    private File directory;
113    private File directoryArchive;
114    private BrokerService brokerService;
115    private final AtomicLong storeSize = new AtomicLong();
116    private boolean persistentIndex=true;
117    private boolean useNio = true;
118    private boolean archiveDataLogs=false;
119    private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL;
120    private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
121    private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
122    private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
123    private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
124    private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
125    private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
126    private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH;
127    private final Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> ();
128    private RandomAccessFile lockFile;
129    private FileLock lock;
130    private boolean disableLocking = DISABLE_LOCKING;
131        private boolean failIfJournalIsLocked;
132    private boolean lockLogged;
133    private boolean lockAquired;
134    private boolean recoverReferenceStore=true;
135    private boolean forceRecoverReferenceStore=false;
136    private boolean useDedicatedTaskRunner=false;
137    private int journalThreadPriority = Thread.MAX_PRIORITY;
138
139    public String getBrokerName() {
140        return this.brokerName;
141    }
142
143    public void setBrokerName(String brokerName) {
144        this.brokerName = brokerName;
145        if (this.referenceStoreAdapter != null) {
146            this.referenceStoreAdapter.setBrokerName(brokerName);
147        }
148    }
149
150    public BrokerService getBrokerService() {
151        return brokerService;
152    }
153
154    public void setBrokerService(BrokerService brokerService) {
155        this.brokerService = brokerService;
156    }
157
158    public synchronized void start() throws Exception {
159        if (!started.compareAndSet(false, true)) {
160            return;
161        }
162        if (this.directory == null) {
163            if (brokerService != null) {
164                this.directory = brokerService.getBrokerDataDirectory();
165               
166            } else {
167                this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName));
168                this.directory = new File(directory, "amqstore");
169                directory.getAbsolutePath();
170            }
171        }
172        if (this.directoryArchive == null) {
173            this.directoryArchive = new File(this.directory,"archive");
174        }
175        if (this.brokerService != null) {
176            this.taskRunnerFactory = this.brokerService.getTaskRunnerFactory();
177            this.scheduler = this.brokerService.getScheduler();
178        } else {
179            this.taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", getJournalThreadPriority(),
180                true, 1000, isUseDedicatedTaskRunner());
181            this.scheduler = new Scheduler("AMQPersistenceAdapter Scheduler");
182        }
183
184        IOHelper.mkdirs(this.directory);
185        lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
186        lock();
187        LOG.info("AMQStore starting using directory: " + directory); 
188        if (archiveDataLogs) {
189            IOHelper.mkdirs(this.directoryArchive);
190        }
191
192        if (this.usageManager != null) {
193            this.usageManager.getMemoryUsage().addUsageListener(this);
194        }
195        if (asyncDataManager == null) {
196            asyncDataManager = createAsyncDataManager();
197        }
198        if (referenceStoreAdapter == null) {
199            referenceStoreAdapter = createReferenceStoreAdapter();
200        }
201        referenceStoreAdapter.setDirectory(new File(directory, "kr-store"));
202        referenceStoreAdapter.setBrokerName(getBrokerName());
203        referenceStoreAdapter.setUsageManager(usageManager);
204        referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength());
205        
206        if (failIfJournalIsLocked) {
207            asyncDataManager.lock();
208        } else {
209            while (true) {
210                try {
211                    asyncDataManager.lock();
212                    break;
213                } catch (IOException e) {
214                    LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.", e);
215                    try {
216                        Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
217                    } catch (InterruptedException e1) {
218                    }
219                }
220            }
221        }
222        
223        asyncDataManager.start();
224        if (deleteAllMessages) {
225            asyncDataManager.delete();
226            try {
227                JournalTrace trace = new JournalTrace();
228                trace.setMessage("DELETED " + new Date());
229                Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
230                asyncDataManager.setMark(location, true);
231                LOG.info("Journal deleted: ");
232                deleteAllMessages = false;
233            } catch (IOException e) {
234                throw e;
235            } catch (Throwable e) {
236                throw IOExceptionSupport.create(e);
237            }
238            referenceStoreAdapter.deleteAllMessages();
239        }
240        referenceStoreAdapter.start();
241        Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
242        LOG.info("Active data files: " + files);
243        checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
244
245            public boolean iterate() {
246                doCheckpoint();
247                return false;
248            }
249        }, "ActiveMQ Journal Checkpoint Worker");
250        createTransactionStore();
251
252        //
253        // The following was attempting to reduce startup times by avoiding the
254        // log
255        // file scanning that recovery performs. The problem with it is that XA
256        // transactions
257        // only live in transaction log and are not stored in the reference
258        // store, but they still
259        // need to be recovered when the broker starts up.
260
261        if (isForceRecoverReferenceStore()
262                || (isRecoverReferenceStore() && !referenceStoreAdapter
263                        .isStoreValid())) {
264            LOG.warn("The ReferenceStore is not valid - recovering ...");
265            recover();
266            LOG.info("Finished recovering the ReferenceStore");
267        } else {
268            Location location = writeTraceMessage("RECOVERED " + new Date(),
269                    true);
270            asyncDataManager.setMark(location, true);
271            // recover transactions
272            getTransactionStore().setPreparedTransactions(
273                    referenceStoreAdapter.retrievePreparedState());
274        }
275
276        // Do a checkpoint periodically.
277        periodicCheckpointTask = new Runnable() {
278
279            public void run() {
280                checkpoint(false);
281            }
282        };
283        scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
284        periodicCleanupTask = new Runnable() {
285
286            public void run() {
287                cleanup();
288            }
289        };
290        scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval());
291        
292        if (lockAquired && lockLogged) {
293            LOG.info("Aquired lock for AMQ Store" + getDirectory());
294            if (brokerService != null) {
295                brokerService.getBroker().nowMasterBroker();
296            }
297        }
298
299    }
300
301    public void stop() throws Exception {
302
303        if (!started.compareAndSet(true, false)) {
304            return;
305        }
306        unlock();
307        if (lockFile != null) {
308            lockFile.close();
309            lockFile = null;
310        }
311        this.usageManager.getMemoryUsage().removeUsageListener(this);
312        synchronized (this) {
313            scheduler.cancel(periodicCheckpointTask);
314            scheduler.cancel(periodicCleanupTask);
315        }
316        Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
317        while (queueIterator.hasNext()) {
318            AMQMessageStore ms = queueIterator.next();
319            ms.stop();
320        }
321        Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
322        while (topicIterator.hasNext()) {
323            final AMQTopicMessageStore ms = topicIterator.next();
324            ms.stop();
325        }
326        // Take one final checkpoint and stop checkpoint processing.
327        checkpoint(true);
328        synchronized (this) {
329            checkpointTask.shutdown();
330        }
331        referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions());
332        queues.clear();
333        topics.clear();
334        IOException firstException = null;
335        referenceStoreAdapter.stop();
336        referenceStoreAdapter = null;
337
338        if (this.brokerService == null) {
339            this.taskRunnerFactory.shutdown();
340            this.scheduler.stop();
341        }
342        try {
343            LOG.debug("Journal close");
344            asyncDataManager.close();
345        } catch (Exception e) {
346            firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
347        }
348        if (firstException != null) {
349            throw firstException;
350        }
351    }
352
353    /**
354     * When we checkpoint we move all the journalled data to long term storage.
355     * 
356     * @param sync
357     */
358    public void checkpoint(boolean sync) {
359        try {
360            if (asyncDataManager == null) {
361                throw new IllegalStateException("Journal is closed.");
362            }
363            CountDownLatch latch = null;
364            synchronized (this) {
365                latch = nextCheckpointCountDownLatch;
366                checkpointTask.wakeup();
367            }
368            if (sync) {
369                if (LOG.isDebugEnabled()) {
370                    LOG.debug("Waitng for checkpoint to complete.");
371                }
372                latch.await();
373            }
374            referenceStoreAdapter.checkpoint(sync);
375        } catch (InterruptedException e) {
376            Thread.currentThread().interrupt();
377            LOG.warn("Request to start checkpoint failed: " + e, e);
378        } catch (IOException e) {
379            LOG.error("checkpoint failed: " + e, e);
380        }
381    }
382
383    /**
384     * This does the actual checkpoint.
385     * 
386     * @return true if successful
387     */
388    public boolean doCheckpoint() {
389        CountDownLatch latch = null;
390        synchronized (this) {
391            latch = nextCheckpointCountDownLatch;
392            nextCheckpointCountDownLatch = new CountDownLatch(1);
393        }
394        try {
395            if (LOG.isDebugEnabled()) {
396                LOG.debug("Checkpoint started.");
397            }
398
399            Location currentMark = asyncDataManager.getMark();
400            Location newMark = currentMark;
401            Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
402            while (queueIterator.hasNext()) {
403                final AMQMessageStore ms = queueIterator.next();
404                Location mark = ms.getMark();
405                if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
406                    newMark = mark;
407                }
408            }
409            Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
410            while (topicIterator.hasNext()) {
411                final AMQTopicMessageStore ms = topicIterator.next();
412                Location mark = ms.getMark();
413                if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
414                    newMark = mark;
415                }
416            }
417            try {
418                if (newMark != currentMark) {
419                    if (LOG.isDebugEnabled()) {
420                        LOG.debug("Marking journal at: " + newMark);
421                    }
422                    asyncDataManager.setMark(newMark, false);
423                    writeTraceMessage("CHECKPOINT " + new Date(), true);
424                }
425            } catch (Exception e) {
426                LOG.error("Failed to mark the Journal: " + e, e);
427            }
428            if (LOG.isDebugEnabled()) {
429                LOG.debug("Checkpoint done.");
430            }
431        } finally {
432            latch.countDown();
433        }
434        return true;
435    }
436
437    /**
438     * Cleans up the data files
439     * @throws IOException
440     */
441    public void cleanup() {
442        try {
443            Set<Integer>inProgress = new HashSet<Integer>();
444            if (LOG.isDebugEnabled()) {
445                LOG.debug("dataFilesInProgress.values: (" + dataFilesInProgress.values().size() + ") " + dataFilesInProgress.values());
446            }      
447            for (Map<Integer, AtomicInteger> set: dataFilesInProgress.values()) {
448                inProgress.addAll(set.keySet());
449            }
450            Integer lastDataFile = asyncDataManager.getCurrentDataFileId();   
451            inProgress.add(lastDataFile);
452            lastDataFile = asyncDataManager.getMark().getDataFileId();
453            inProgress.addAll(referenceStoreAdapter.getReferenceFileIdsInUse());
454            Location lastActiveTx = transactionStore.checkpoint();
455            if (lastActiveTx != null) {
456                lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId());
457            }
458            LOG.debug("lastDataFile: " + lastDataFile);
459            asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1);
460        } catch (IOException e) {
461            LOG.error("Could not cleanup data files: " + e, e);
462        }
463    }
464
465    public Set<ActiveMQDestination> getDestinations() {
466        Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
467        destinations.addAll(queues.keySet());
468        destinations.addAll(topics.keySet());
469        return destinations;
470    }
471
472    MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
473        if (destination.isQueue()) {
474            return createQueueMessageStore((ActiveMQQueue)destination);
475        } else {
476            return createTopicMessageStore((ActiveMQTopic)destination);
477        }
478    }
479
480    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
481        AMQMessageStore store = queues.get(destination);
482        if (store == null) {
483            ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
484            store = new AMQMessageStore(this, checkpointStore, destination);
485            try {
486                store.start();
487            } catch (Exception e) {
488                throw IOExceptionSupport.create(e);
489            }
490            queues.put(destination, store);
491        }
492        return store;
493    }
494
495    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
496        AMQTopicMessageStore store = topics.get(destinationName);
497        if (store == null) {
498            TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
499            store = new AMQTopicMessageStore(this,checkpointStore, destinationName);
500            try {
501                store.start();
502            } catch (Exception e) {
503                throw IOExceptionSupport.create(e);
504            }
505            topics.put(destinationName, store);
506        }
507        return store;
508    }
509
510    /**
511     * Cleanup method to remove any state associated with the given destination
512     *
513     * @param destination
514     */
515    public void removeQueueMessageStore(ActiveMQQueue destination) {
516        AMQMessageStore store= queues.remove(destination);
517        referenceStoreAdapter.removeQueueMessageStore(destination);
518    }
519
520    /**
521     * Cleanup method to remove any state associated with the given destination
522     *
523     * @param destination
524     */
525    public void removeTopicMessageStore(ActiveMQTopic destination) {
526        topics.remove(destination);
527    }
528
529    public TransactionStore createTransactionStore() throws IOException {
530        return transactionStore;
531    }
532
533    public long getLastMessageBrokerSequenceId() throws IOException {
534        return referenceStoreAdapter.getLastMessageBrokerSequenceId();
535    }
536
537    public void beginTransaction(ConnectionContext context) throws IOException {
538        referenceStoreAdapter.beginTransaction(context);
539    }
540
541    public void commitTransaction(ConnectionContext context) throws IOException {
542        referenceStoreAdapter.commitTransaction(context);
543    }
544
545    public void rollbackTransaction(ConnectionContext context) throws IOException {
546        referenceStoreAdapter.rollbackTransaction(context);
547    }
548    
549    public boolean isPersistentIndex() {
550                return persistentIndex;
551        }
552
553        public void setPersistentIndex(boolean persistentIndex) {
554                this.persistentIndex = persistentIndex;
555        }
556
557    /**
558     * @param location
559     * @return
560     * @throws IOException
561     */
562    public DataStructure readCommand(Location location) throws IOException {
563        try {
564            ByteSequence packet = asyncDataManager.read(location);
565            return (DataStructure)wireFormat.unmarshal(packet);
566        } catch (IOException e) {
567            throw createReadException(location, e);
568        }
569    }
570
571    /**
572     * Move all the messages that were in the journal into long term storage. We
573     * just replay and do a checkpoint.
574     * 
575     * @throws IOException
576     * @throws IOException
577     * @throws IllegalStateException
578     */
579    private void recover() throws IllegalStateException, IOException {
580        referenceStoreAdapter.clearMessages();
581        Location pos = null;
582        int redoCounter = 0;
583        LOG.info("Journal Recovery Started from: " + asyncDataManager);
584        long start = System.currentTimeMillis();
585        ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
586        // While we have records in the journal.
587        while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
588            ByteSequence data = asyncDataManager.read(pos);
589            DataStructure c = (DataStructure)wireFormat.unmarshal(data);
590            if (c instanceof Message) {
591                Message message = (Message)c;
592                AMQMessageStore store = (AMQMessageStore)createMessageStore(message.getDestination());
593                if (message.isInTransaction()) {
594                    transactionStore.addMessage(store, message, pos);
595                } else {
596                    if (store.replayAddMessage(context, message, pos)) {
597                        redoCounter++;
598                    }
599                }
600            } else {
601                switch (c.getDataStructureType()) {
602                case SubscriptionInfo.DATA_STRUCTURE_TYPE: {
603                    referenceStoreAdapter.recoverSubscription((SubscriptionInfo)c);
604                }
605                    break;
606                case JournalQueueAck.DATA_STRUCTURE_TYPE: {
607                    JournalQueueAck command = (JournalQueueAck)c;
608                    AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination());
609                    if (command.getMessageAck().isInTransaction()) {
610                        transactionStore.removeMessage(store, command.getMessageAck(), pos);
611                    } else {
612                        if (store.replayRemoveMessage(context, command.getMessageAck())) {
613                            redoCounter++;
614                        }
615                    }
616                }
617                    break;
618                case JournalTopicAck.DATA_STRUCTURE_TYPE: {
619                    JournalTopicAck command = (JournalTopicAck)c;
620                    AMQTopicMessageStore store = (AMQTopicMessageStore)createMessageStore(command.getDestination());
621                    if (command.getTransactionId() != null) {
622                        transactionStore.acknowledge(store, command, pos);
623                    } else {
624                        if (store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId())) {
625                            redoCounter++;
626                        }
627                    }
628                }
629                    break;
630                case JournalTransaction.DATA_STRUCTURE_TYPE: {
631                    JournalTransaction command = (JournalTransaction)c;
632                    try {
633                        // Try to replay the packet.
634                        switch (command.getType()) {
635                        case JournalTransaction.XA_PREPARE:
636                            transactionStore.replayPrepare(command.getTransactionId());
637                            break;
638                        case JournalTransaction.XA_COMMIT:
639                        case JournalTransaction.LOCAL_COMMIT:
640                            AMQTx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
641                            if (tx == null) {
642                                break; // We may be trying to replay a commit
643                            }
644                            // that
645                            // was already committed.
646                            // Replay the committed operations.
647                            tx.getOperations();
648                            for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
649                                AMQTxOperation op = (AMQTxOperation)iter.next();
650                                if (op.replay(this, context)) {
651                                    redoCounter++;
652                                }
653                            }
654                            break;
655                        case JournalTransaction.LOCAL_ROLLBACK:
656                        case JournalTransaction.XA_ROLLBACK:
657                            transactionStore.replayRollback(command.getTransactionId());
658                            break;
659                        default:
660                            throw new IOException("Invalid journal command type: " + command.getType());
661                        }
662                    } catch (IOException e) {
663                        LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
664                    }
665                }
666                    break;
667                case JournalTrace.DATA_STRUCTURE_TYPE:
668                    JournalTrace trace = (JournalTrace)c;
669                    LOG.debug("TRACE Entry: " + trace.getMessage());
670                    break;
671                default:
672                    LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
673                }
674            }
675        }
676        Location location = writeTraceMessage("RECOVERED " + new Date(), true);
677        asyncDataManager.setMark(location, true);
678        long end = System.currentTimeMillis();
679        LOG.info("Recovered " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
680    }
681
682    private IOException createReadException(Location location, Exception e) {
683        return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
684    }
685
686    protected IOException createWriteException(DataStructure packet, Exception e) {
687        return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
688    }
689
690    protected IOException createWriteException(String command, Exception e) {
691        return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
692    }
693
694    protected IOException createRecoveryFailedException(Exception e) {
695        return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
696    }
697
698    /**
699     * @param command
700     * @param syncHint
701     * @return
702     * @throws IOException
703     */
704    public Location writeCommand(DataStructure command, boolean syncHint) throws IOException {
705        return writeCommand(command, syncHint,false);
706    }
707    
708    public Location writeCommand(DataStructure command, boolean syncHint,boolean forceSync) throws IOException {
709        try {
710                return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite)));
711        } catch (IOException ioe) {
712                LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe);
713                brokerService.handleIOException(ioe);
714                throw ioe;
715        }
716    }
717
718    private Location writeTraceMessage(String message, boolean sync) throws IOException {
719        JournalTrace trace = new JournalTrace();
720        trace.setMessage(message);
721        return writeCommand(trace, sync);
722    }
723
724    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
725        newPercentUsage = (newPercentUsage / 10) * 10;
726        oldPercentUsage = (oldPercentUsage / 10) * 10;
727        if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
728            checkpoint(false);
729        }
730    }
731
732    public AMQTransactionStore getTransactionStore() {
733        return transactionStore;
734    }
735
736    public synchronized void deleteAllMessages() throws IOException {
737        deleteAllMessages = true;
738    }
739
740    @Override
741    public String toString() {
742        return "AMQPersistenceAdapter(" + directory + ")";
743    }
744
745    // /////////////////////////////////////////////////////////////////
746    // Subclass overridables
747    // /////////////////////////////////////////////////////////////////
748    protected AsyncDataManager createAsyncDataManager() {
749        AsyncDataManager manager = new AsyncDataManager(storeSize);
750        manager.setDirectory(new File(directory, "journal"));
751        manager.setDirectoryArchive(getDirectoryArchive());
752        manager.setArchiveDataLogs(isArchiveDataLogs());
753        manager.setMaxFileLength(maxFileLength);
754        manager.setUseNio(useNio);    
755        return manager;
756    }
757
758    protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
759        KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize);
760        adaptor.setPersistentIndex(isPersistentIndex());
761        adaptor.setIndexBinSize(getIndexBinSize());
762        adaptor.setIndexKeySize(getIndexKeySize());
763        adaptor.setIndexPageSize(getIndexPageSize());
764        adaptor.setIndexMaxBinSize(getIndexMaxBinSize());
765        adaptor.setIndexLoadFactor(getIndexLoadFactor());
766        return adaptor;
767    }
768
769    // /////////////////////////////////////////////////////////////////
770    // Property Accessors
771    // /////////////////////////////////////////////////////////////////
772    public AsyncDataManager getAsyncDataManager() {
773        return asyncDataManager;
774    }
775
776    public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
777        this.asyncDataManager = asyncDataManager;
778    }
779
780    public ReferenceStoreAdapter getReferenceStoreAdapter() {
781        return referenceStoreAdapter;
782    }
783
784    public TaskRunnerFactory getTaskRunnerFactory() {
785        return taskRunnerFactory;
786    }
787
788    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
789        this.taskRunnerFactory = taskRunnerFactory;
790    }
791
792    /**
793     * @return Returns the wireFormat.
794     */
795    public WireFormat getWireFormat() {
796        return wireFormat;
797    }
798
799    public void setWireFormat(WireFormat wireFormat) {
800        this.wireFormat = wireFormat;
801    }
802
803    public SystemUsage getUsageManager() {
804        return usageManager;
805    }
806
807    public void setUsageManager(SystemUsage usageManager) {
808        this.usageManager = usageManager;
809    }
810
811    public int getMaxCheckpointMessageAddSize() {
812        return maxCheckpointMessageAddSize;
813    }
814
815    /** 
816     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
817     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
818     */
819    public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
820        this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
821    }
822
823   
824    public synchronized File getDirectory() {
825        return directory;
826    }
827
828    public synchronized void setDirectory(File directory) {
829        this.directory = directory;
830    }
831
832    public boolean isSyncOnWrite() {
833        return this.syncOnWrite;
834    }
835
836    public void setSyncOnWrite(boolean syncOnWrite) {
837        this.syncOnWrite = syncOnWrite;
838    }
839    
840    public boolean isSyncOnTransaction() {
841        return syncOnTransaction;
842    }
843
844    public void setSyncOnTransaction(boolean syncOnTransaction) {
845        this.syncOnTransaction = syncOnTransaction;
846    }
847
848    /**
849     * @param referenceStoreAdapter the referenceStoreAdapter to set
850     */
851    public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
852        this.referenceStoreAdapter = referenceStoreAdapter;
853    }
854    
855    public long size(){
856        return storeSize.get();
857    }
858
859        public boolean isUseNio() {
860                return useNio;
861        }
862
863        public void setUseNio(boolean useNio) {
864                this.useNio = useNio;
865        }
866
867        public int getMaxFileLength() {
868                return maxFileLength;
869        }
870
871         /**
872      * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
873      * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
874      */
875        public void setMaxFileLength(int maxFileLength) {
876                this.maxFileLength = maxFileLength;
877        }
878        
879        public long getCleanupInterval() {
880        return cleanupInterval;
881    }
882
883    public void setCleanupInterval(long cleanupInterval) {
884        this.cleanupInterval = cleanupInterval;
885    }
886
887    public long getCheckpointInterval() {
888        return checkpointInterval;
889    }
890
891    public void setCheckpointInterval(long checkpointInterval) {
892        this.checkpointInterval = checkpointInterval;
893    }
894    
895    public int getIndexBinSize() {
896        return indexBinSize;
897    }
898
899    public void setIndexBinSize(int indexBinSize) {
900        this.indexBinSize = indexBinSize;
901    }
902
903    public int getIndexKeySize() {
904        return indexKeySize;
905    }
906
907    public void setIndexKeySize(int indexKeySize) {
908        this.indexKeySize = indexKeySize;
909    }
910
911    public int getIndexPageSize() {
912        return indexPageSize;
913    }
914    
915    public int getIndexMaxBinSize() {
916        return indexMaxBinSize;
917    }
918
919    public void setIndexMaxBinSize(int maxBinSize) {
920        this.indexMaxBinSize = maxBinSize;
921    }
922
923    /**
924     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
925     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
926     */
927    public void setIndexPageSize(int indexPageSize) {
928        this.indexPageSize = indexPageSize;
929    }
930    
931    public void setIndexLoadFactor(int factor){
932        this.indexLoadFactor=factor;    
933    }
934    
935    public int getIndexLoadFactor(){
936        return this.indexLoadFactor;
937    }
938    
939    public int getMaxReferenceFileLength() {
940        return maxReferenceFileLength;
941    }
942
943    /**
944     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
945     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
946     */
947    public void setMaxReferenceFileLength(int maxReferenceFileLength) {
948        this.maxReferenceFileLength = maxReferenceFileLength;
949    }
950    
951    public File getDirectoryArchive() {
952        return directoryArchive;
953    }
954
955    public void setDirectoryArchive(File directoryArchive) {
956        this.directoryArchive = directoryArchive;
957    }
958
959    public boolean isArchiveDataLogs() {
960        return archiveDataLogs;
961    }
962
963    public void setArchiveDataLogs(boolean archiveDataLogs) {
964        this.archiveDataLogs = archiveDataLogs;
965    }  
966    
967    public boolean isDisableLocking() {
968        return disableLocking;
969    }
970
971    public void setDisableLocking(boolean disableLocking) {
972        this.disableLocking = disableLocking;
973    }
974    
975    /**
976     * @return the recoverReferenceStore
977     */
978    public boolean isRecoverReferenceStore() {
979        return recoverReferenceStore;
980    }
981
982    /**
983     * @param recoverReferenceStore the recoverReferenceStore to set
984     */
985    public void setRecoverReferenceStore(boolean recoverReferenceStore) {
986        this.recoverReferenceStore = recoverReferenceStore;
987    }
988
989    /**
990     * @return the forceRecoverReferenceStore
991     */
992    public boolean isForceRecoverReferenceStore() {
993        return forceRecoverReferenceStore;
994    }
995
996    /**
997     * @param forceRecoverReferenceStore the forceRecoverReferenceStore to set
998     */
999    public void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore) {
1000        this.forceRecoverReferenceStore = forceRecoverReferenceStore;
1001    }
1002    
1003    public boolean isUseDedicatedTaskRunner() {
1004        return useDedicatedTaskRunner;
1005    }
1006    
1007    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1008        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1009    }
1010    
1011    /**
1012     * @return the journalThreadPriority
1013     */
1014    public int getJournalThreadPriority() {
1015        return this.journalThreadPriority;
1016    }
1017
1018    /**
1019     * @param journalThreadPriority the journalThreadPriority to set
1020     */
1021    public void setJournalThreadPriority(int journalThreadPriority) {
1022        this.journalThreadPriority = journalThreadPriority;
1023    }
1024
1025        
1026        protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
1027            Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
1028            if (map == null) {
1029                map = new ConcurrentHashMap<Integer, AtomicInteger>();
1030                dataFilesInProgress.put(store, map);
1031            }
1032            AtomicInteger count = map.get(dataFileId);
1033            if (count == null) {
1034                count = new AtomicInteger(0);
1035                map.put(dataFileId, count);
1036            }
1037            count.incrementAndGet();
1038        }
1039        
1040        protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
1041        Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
1042        if (map != null) {
1043            AtomicInteger count = map.get(dataFileId);
1044            if (count != null) {
1045                int newCount = count.decrementAndGet(); 
1046                if (newCount <=0) {
1047                    map.remove(dataFileId);
1048                }
1049            }
1050            if (map.isEmpty()) {
1051                dataFilesInProgress.remove(store);
1052            }
1053        }
1054    }
1055        
1056        
1057        protected void lock() throws Exception {
1058        lockLogged = false;
1059        lockAquired = false;
1060        do {
1061            if (doLock()) {
1062                lockAquired = true;
1063            } else {
1064                if (!lockLogged) {
1065                    LOG.warn("Waiting to Lock the Store " + getDirectory());
1066                    lockLogged = true;
1067                }
1068                Thread.sleep(1000);
1069            }
1070
1071        } while (!lockAquired && !disableLocking);
1072    }
1073        
1074        private synchronized void unlock() throws IOException {
1075        if (!disableLocking && (null != lock)) {
1076            //clear property doesn't work on some platforms
1077            System.getProperties().remove(getPropertyKey());
1078            System.clearProperty(getPropertyKey());
1079            assert(System.getProperty(getPropertyKey())==null);
1080            if (lock.isValid()) {
1081                lock.release();
1082                lock.channel().close();
1083                
1084            }
1085            lock = null;
1086        }
1087    }
1088
1089        
1090        protected boolean doLock() throws IOException {
1091            boolean result = true;
1092            if (!disableLocking && directory != null && lock == null) {
1093            String key = getPropertyKey();
1094            String property = System.getProperty(key);
1095            if (null == property) {
1096                if (!BROKEN_FILE_LOCK) {
1097                    lock = lockFile.getChannel().tryLock(0, lockFile.getChannel().size(), false);
1098                    if (lock == null) {
1099                        result = false;
1100                    } else {
1101                        System.setProperty(key, new Date().toString());
1102                    }
1103                }
1104            } else { // already locked
1105                result = false;
1106            }
1107        }
1108            return result;
1109        }
1110        
1111        private String getPropertyKey() throws IOException {
1112        return getClass().getName() + ".lock." + directory.getCanonicalPath();
1113    }
1114        
1115        static {
1116            BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
1117                    + ".FileLockBroken",
1118                    "false"));
1119            DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
1120                   + ".DisableLocking",
1121                   "false"));
1122        }
1123
1124        
1125    public long getLastProducerSequenceId(ProducerId id) {
1126        // reference store send has adequate duplicate suppression
1127        return -1;
1128    }
1129}