001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.EOFException;
024import java.io.File;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.ObjectInputStream;
028import java.io.ObjectOutputStream;
029import java.io.OutputStream;
030import java.util.*;
031import java.util.Map.Entry;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicLong;
034import java.util.concurrent.locks.ReentrantReadWriteLock;
035
036import org.apache.activemq.ActiveMQMessageAuditNoSync;
037import org.apache.activemq.broker.BrokerService;
038import org.apache.activemq.broker.BrokerServiceAware;
039import org.apache.activemq.command.ConnectionId;
040import org.apache.activemq.command.LocalTransactionId;
041import org.apache.activemq.command.MessageId;
042import org.apache.activemq.command.SubscriptionInfo;
043import org.apache.activemq.command.TransactionId;
044import org.apache.activemq.command.XATransactionId;
045import org.apache.activemq.protobuf.Buffer;
046import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
047import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
048import org.apache.activemq.store.kahadb.data.KahaDestination;
049import org.apache.activemq.store.kahadb.data.KahaEntryType;
050import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
051import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
052import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
053import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
054import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
055import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
056import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
057import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
058import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
059import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
060import org.apache.activemq.util.Callback;
061import org.apache.activemq.util.IOHelper;
062import org.apache.activemq.util.ServiceStopper;
063import org.apache.activemq.util.ServiceSupport;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066import org.apache.kahadb.index.BTreeIndex;
067import org.apache.kahadb.index.BTreeVisitor;
068import org.apache.kahadb.journal.DataFile;
069import org.apache.kahadb.journal.Journal;
070import org.apache.kahadb.journal.Location;
071import org.apache.kahadb.page.Page;
072import org.apache.kahadb.page.PageFile;
073import org.apache.kahadb.page.Transaction;
074import org.apache.kahadb.util.ByteSequence;
075import org.apache.kahadb.util.DataByteArrayInputStream;
076import org.apache.kahadb.util.DataByteArrayOutputStream;
077import org.apache.kahadb.util.LockFile;
078import org.apache.kahadb.util.LongMarshaller;
079import org.apache.kahadb.util.Marshaller;
080import org.apache.kahadb.util.Sequence;
081import org.apache.kahadb.util.SequenceSet;
082import org.apache.kahadb.util.StringMarshaller;
083import org.apache.kahadb.util.VariableMarshaller;
084
085public class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
086        
087        protected BrokerService brokerService;
088
089    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
090    public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "0"));
091
092    protected static final Buffer UNMATCHED;
093    static {
094        UNMATCHED = new Buffer(new byte[]{});
095    }
096    private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
097    private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
098
099    static final int CLOSED_STATE = 1;
100    static final int OPEN_STATE = 2;
101    static final long NOT_ACKED = -1;
102    static final long UNMATCHED_SEQ = -2;
103
104    static final int VERSION = 3;
105
106
107    protected class Metadata {
108        protected Page<Metadata> page;
109        protected int state;
110        protected BTreeIndex<String, StoredDestination> destinations;
111        protected Location lastUpdate;
112        protected Location firstInProgressTransactionLocation;
113        protected Location producerSequenceIdTrackerLocation = null;
114        protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
115        protected int version = VERSION;
116        public void read(DataInput is) throws IOException {
117            state = is.readInt();
118            destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
119            if (is.readBoolean()) {
120                lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
121            } else {
122                lastUpdate = null;
123            }
124            if (is.readBoolean()) {
125                firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
126            } else {
127                firstInProgressTransactionLocation = null;
128            }
129            try {
130                if (is.readBoolean()) {
131                    producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
132                } else {
133                    producerSequenceIdTrackerLocation = null;
134                }
135            } catch (EOFException expectedOnUpgrade) {
136            }
137            try {
138               version = is.readInt();
139            }catch (EOFException expectedOnUpgrade) {
140                version=1;
141            }
142            LOG.info("KahaDB is version " + version);
143        }
144
145        public void write(DataOutput os) throws IOException {
146            os.writeInt(state);
147            os.writeLong(destinations.getPageId());
148
149            if (lastUpdate != null) {
150                os.writeBoolean(true);
151                LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
152            } else {
153                os.writeBoolean(false);
154            }
155
156            if (firstInProgressTransactionLocation != null) {
157                os.writeBoolean(true);
158                LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
159            } else {
160                os.writeBoolean(false);
161            }
162            
163            if (producerSequenceIdTrackerLocation != null) {
164                os.writeBoolean(true);
165                LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
166            } else {
167                os.writeBoolean(false);
168            }
169            os.writeInt(VERSION);
170        }
171    }
172
173    class MetadataMarshaller extends VariableMarshaller<Metadata> {
174        public Metadata readPayload(DataInput dataIn) throws IOException {
175            Metadata rc = new Metadata();
176            rc.read(dataIn);
177            return rc;
178        }
179
180        public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
181            object.write(dataOut);
182        }
183    }
184
185    protected PageFile pageFile;
186        protected Journal journal;
187        protected Metadata metadata = new Metadata();
188
189    protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
190
191    protected boolean failIfDatabaseIsLocked;
192
193    protected boolean deleteAllMessages;
194    protected File directory = new File("KahaDB");
195    protected Thread checkpointThread;
196    protected boolean enableJournalDiskSyncs=true;
197    protected boolean archiveDataLogs;
198    protected File directoryArchive;
199    protected AtomicLong storeSize = new AtomicLong(0);
200    long checkpointInterval = 5*1000;
201    long cleanupInterval = 30*1000;
202    int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
203    int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
204    boolean enableIndexWriteAsync = false;
205    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
206    
207    
208    protected AtomicBoolean opened = new AtomicBoolean();
209    private LockFile lockFile;
210    private boolean ignoreMissingJournalfiles = false;
211    private int indexCacheSize = 10000;
212    private boolean checkForCorruptJournalFiles = false;
213    private boolean checksumJournalFiles = false;
214    private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
215    protected boolean forceRecoverIndex = false;
216    private final Object checkpointThreadLock = new Object();
217
218    public MessageDatabase() {
219    }
220
221    @Override
222    public void doStart() throws Exception {
223        load();
224    }
225
226    @Override
227    public void doStop(ServiceStopper stopper) throws Exception {
228        unload();
229    }
230
231        private void loadPageFile() throws IOException {
232            this.indexLock.writeLock().lock();
233            try {
234                    final PageFile pageFile = getPageFile();
235            pageFile.load();
236            pageFile.tx().execute(new Transaction.Closure<IOException>() {
237                public void execute(Transaction tx) throws IOException {
238                    if (pageFile.getPageCount() == 0) {
239                        // First time this is created.. Initialize the metadata
240                        Page<Metadata> page = tx.allocate();
241                        assert page.getPageId() == 0;
242                        page.set(metadata);
243                        metadata.page = page;
244                        metadata.state = CLOSED_STATE;
245                        metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
246
247                        tx.store(metadata.page, metadataMarshaller, true);
248                    } else {
249                        Page<Metadata> page = tx.load(0, metadataMarshaller);
250                        metadata = page.get();
251                        metadata.page = page;
252                    }
253                    metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
254                    metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
255                    metadata.destinations.load(tx);
256                }
257            });
258            // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
259            // Perhaps we should just keep an index of file
260            storedDestinations.clear();
261            pageFile.tx().execute(new Transaction.Closure<IOException>() {
262                public void execute(Transaction tx) throws IOException {
263                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
264                        Entry<String, StoredDestination> entry = iterator.next();
265                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
266                        storedDestinations.put(entry.getKey(), sd);
267                    }
268                }
269            });
270            pageFile.flush();            
271        }finally {
272            this.indexLock.writeLock().unlock();
273        }
274        }
275        
276        private void startCheckpoint() {
277        synchronized (checkpointThreadLock) {
278            boolean start = false;
279            if (checkpointThread == null) {
280                start = true;
281            } else if (!checkpointThread.isAlive()) {
282                start = true;
283                LOG.info("KahaDB: Recovering checkpoint thread after death");
284            }
285            if (start) {
286                checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
287                    @Override
288                    public void run() {
289                        try {
290                            long lastCleanup = System.currentTimeMillis();
291                            long lastCheckpoint = System.currentTimeMillis();
292                            // Sleep for a short time so we can periodically check
293                            // to see if we need to exit this thread.
294                            long sleepTime = Math.min(checkpointInterval, 500);
295                            while (opened.get()) {
296                                Thread.sleep(sleepTime);
297                                long now = System.currentTimeMillis();
298                                if( now - lastCleanup >= cleanupInterval ) {
299                                    checkpointCleanup(true);
300                                    lastCleanup = now;
301                                    lastCheckpoint = now;
302                                } else if( now - lastCheckpoint >= checkpointInterval ) {
303                                    checkpointCleanup(false);
304                                    lastCheckpoint = now;
305                                }
306                            }
307                        } catch (InterruptedException e) {
308                            // Looks like someone really wants us to exit this thread...
309                        } catch (IOException ioe) {
310                            LOG.error("Checkpoint failed", ioe);
311                            brokerService.handleIOException(ioe);
312                        }
313                    }
314                };
315
316                checkpointThread.setDaemon(true);
317                checkpointThread.start();
318            }
319        }
320        }
321
322        public void open() throws IOException {
323                if( opened.compareAndSet(false, true) ) {
324            getJournal().start();
325                loadPageFile();        
326                startCheckpoint();
327            recover();
328                }
329        }
330
331    private void lock() throws IOException {
332        if( lockFile == null ) {
333            File lockFileName = new File(directory, "lock");
334            lockFile = new LockFile(lockFileName, true);
335            if (failIfDatabaseIsLocked) {
336                lockFile.lock();
337            } else {
338                while (true) {
339                    try {
340                        lockFile.lock();
341                        break;
342                    } catch (IOException e) {
343                        LOG.info("Database "+lockFileName+" is locked... waiting " + (getDatabaseLockedWaitDelay() / 1000) + " seconds for the database to be unlocked. Reason: " + e);
344                        try {
345                            Thread.sleep(getDatabaseLockedWaitDelay());
346                        } catch (InterruptedException e1) {
347                        }
348                    }
349                }
350            }
351        }
352    }
353
354    // for testing
355    public LockFile getLockFile() {
356        return lockFile;
357    }
358
359    public void load() throws IOException {
360        
361        this.indexLock.writeLock().lock();
362        try {
363            lock();
364            if (deleteAllMessages) {
365                getJournal().start();
366                getJournal().delete();
367                getJournal().close();
368                journal = null;
369                getPageFile().delete();
370                LOG.info("Persistence store purged.");
371                deleteAllMessages = false;
372            }
373
374                open();
375                store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
376        }finally {
377            this.indexLock.writeLock().unlock();
378        }
379
380    }
381
382    
383        public void close() throws IOException, InterruptedException {
384                if( opened.compareAndSet(true, false)) {
385                    this.indexLock.writeLock().lock();
386                try {
387                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
388                        public void execute(Transaction tx) throws IOException {
389                            checkpointUpdate(tx, true);
390                        }
391                    });
392                    pageFile.unload();
393                    metadata = new Metadata();
394                }finally {
395                    this.indexLock.writeLock().unlock();
396                }
397                journal.close();
398            synchronized (checkpointThreadLock) {
399                    checkpointThread.join();
400            }
401                lockFile.unlock();
402                lockFile=null;
403                }
404        }
405        
406    public void unload() throws IOException, InterruptedException {
407        this.indexLock.writeLock().lock();
408        try {
409            if( pageFile != null && pageFile.isLoaded() ) {
410                metadata.state = CLOSED_STATE;
411                metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
412    
413                pageFile.tx().execute(new Transaction.Closure<IOException>() {
414                    public void execute(Transaction tx) throws IOException {
415                        tx.store(metadata.page, metadataMarshaller, true);
416                    }
417                });
418            }
419        }finally {
420            this.indexLock.writeLock().unlock();
421        }
422        close();
423    }
424
425    // public for testing
426    public Location getFirstInProgressTxLocation() {
427        Location l = null;
428        synchronized (inflightTransactions) {
429            if (!inflightTransactions.isEmpty()) {
430                l = inflightTransactions.values().iterator().next().get(0).getLocation();
431            }
432            if (!preparedTransactions.isEmpty()) {
433                Location t = preparedTransactions.values().iterator().next().get(0).getLocation();
434                if (l==null || t.compareTo(l) <= 0) {
435                    l = t;
436                }
437            }
438        }
439        return l;
440    }
441
442    /**
443     * Move all the messages that were in the journal into long term storage. We
444     * just replay and do a checkpoint.
445     * 
446     * @throws IOException
447     * @throws IOException
448     * @throws IllegalStateException
449     */
450    private void recover() throws IllegalStateException, IOException {
451        this.indexLock.writeLock().lock();
452        try {
453            
454                long start = System.currentTimeMillis();        
455                Location producerAuditPosition = recoverProducerAudit();
456                Location lastIndoubtPosition = getRecoveryPosition();
457                
458                Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
459                    
460                if (recoveryPosition != null) {  
461                    int redoCounter = 0;
462                    LOG.info("Recovering from the journal ...");
463                    while (recoveryPosition != null) {
464                        JournalCommand<?> message = load(recoveryPosition);
465                        metadata.lastUpdate = recoveryPosition;
466                        process(message, recoveryPosition, lastIndoubtPosition);
467                        redoCounter++;
468                        recoveryPosition = journal.getNextLocation(recoveryPosition);
469                    }
470                    long end = System.currentTimeMillis();
471                    LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
472                }
473                
474                // We may have to undo some index updates.
475            pageFile.tx().execute(new Transaction.Closure<IOException>() {
476                public void execute(Transaction tx) throws IOException {
477                    recoverIndex(tx);
478                }
479            });
480
481            // rollback any recovered inflight local transactions
482            Set<TransactionId> toRollback = new HashSet<TransactionId>();
483            synchronized (inflightTransactions) {
484                for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
485                    TransactionId id = it.next();
486                    if (id.isLocalTransaction()) {
487                        toRollback.add(id);
488                    }
489                }
490                for (TransactionId tx: toRollback) {
491                    LOG.debug("rolling back recovered indoubt local transaction " + tx);
492                    store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(tx)), false, null, null);
493                }
494            }
495        }finally {
496            this.indexLock.writeLock().unlock();
497        }
498    }
499    
500        private Location minimum(Location producerAuditPosition,
501            Location lastIndoubtPosition) {
502            Location min = null;
503            if (producerAuditPosition != null) {
504                min = producerAuditPosition;
505                if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
506                    min = lastIndoubtPosition;
507                }
508            } else {
509                min = lastIndoubtPosition;
510            }
511            return min;
512    }
513        
514        private Location recoverProducerAudit() throws IOException {
515            if (metadata.producerSequenceIdTrackerLocation != null) {
516                KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
517                try {
518                    ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
519                    metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
520                } catch (ClassNotFoundException cfe) {
521                    IOException ioe = new IOException("Failed to read producerAudit: " + cfe);
522                    ioe.initCause(cfe);
523                    throw ioe;
524                }
525                return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
526            } else {
527                // got no audit stored so got to recreate via replay from start of the journal
528                return journal.getNextLocation(null);
529            }
530    }
531
532    protected void recoverIndex(Transaction tx) throws IOException {
533        long start = System.currentTimeMillis();
534        // It is possible index updates got applied before the journal updates.. 
535        // in that case we need to removed references to messages that are not in the journal
536        final Location lastAppendLocation = journal.getLastAppendLocation();
537        long undoCounter=0;
538        
539        // Go through all the destinations to see if they have messages past the lastAppendLocation
540        for (StoredDestination sd : storedDestinations.values()) {
541                
542            final ArrayList<Long> matches = new ArrayList<Long>();
543            // Find all the Locations that are >= than the last Append Location.
544            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
545                                @Override
546                                protected void matched(Location key, Long value) {
547                                        matches.add(value);
548                                }
549            });
550            
551            
552            for (Long sequenceId : matches) {
553                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
554                sd.locationIndex.remove(tx, keys.location);
555                sd.messageIdIndex.remove(tx, keys.messageId);
556                metadata.producerSequenceIdTracker.rollback(new MessageId(keys.messageId));
557                undoCounter++;
558                // TODO: do we need to modify the ack positions for the pub sub case?
559                        }
560        }
561
562        long end = System.currentTimeMillis();
563        if( undoCounter > 0 ) {
564                // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
565                // should do sync writes to the journal.
566                LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
567        }
568
569        undoCounter = 0;
570        start = System.currentTimeMillis();
571
572        // Lets be extra paranoid here and verify that all the datafiles being referenced
573        // by the indexes still exists.
574
575        final SequenceSet ss = new SequenceSet();
576        for (StoredDestination sd : storedDestinations.values()) {
577            // Use a visitor to cut down the number of pages that we load
578            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
579                int last=-1;
580
581                public boolean isInterestedInKeysBetween(Location first, Location second) {
582                    if( first==null ) {
583                        return !ss.contains(0, second.getDataFileId());
584                    } else if( second==null ) {
585                        return true;
586                    } else {
587                        return !ss.contains(first.getDataFileId(), second.getDataFileId());
588                    }
589                }
590
591                public void visit(List<Location> keys, List<Long> values) {
592                    for (Location l : keys) {
593                        int fileId = l.getDataFileId();
594                        if( last != fileId ) {
595                            ss.add(fileId);
596                            last = fileId;
597                        }
598                    }
599                }
600
601            });
602        }
603        HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
604        while( !ss.isEmpty() ) {
605            missingJournalFiles.add( (int)ss.removeFirst() );
606        }
607        missingJournalFiles.removeAll( journal.getFileMap().keySet() );
608
609        if( !missingJournalFiles.isEmpty() ) {
610            LOG.info("Some journal files are missing: "+missingJournalFiles);
611        }
612
613        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
614        for (Integer missing : missingJournalFiles) {
615            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing,0), new Location(missing+1,0)));
616        }
617
618        if ( checkForCorruptJournalFiles ) {
619            Collection<DataFile> dataFiles = journal.getFileMap().values();
620            for (DataFile dataFile : dataFiles) {
621                int id = dataFile.getDataFileId();
622                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id,dataFile.getLength()), new Location(id+1,0)));
623                Sequence seq = dataFile.getCorruptedBlocks().getHead();
624                while( seq!=null ) {
625                    missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast()+1)));
626                    seq = seq.getNext();
627                }
628            }
629        }
630
631        if( !missingPredicates.isEmpty() ) {
632            for (StoredDestination sd : storedDestinations.values()) {
633
634                final ArrayList<Long> matches = new ArrayList<Long>();
635                sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
636                    @Override
637                    protected void matched(Location key, Long value) {
638                        matches.add(value);
639                    }
640                });
641
642                // If somes message references are affected by the missing data files...
643                if( !matches.isEmpty() ) {
644
645                    // We either 'gracefully' recover dropping the missing messages or
646                    // we error out.
647                    if( ignoreMissingJournalfiles ) {
648                        // Update the index to remove the references to the missing data
649                        for (Long sequenceId : matches) {
650                            MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
651                            sd.locationIndex.remove(tx, keys.location);
652                            sd.messageIdIndex.remove(tx, keys.messageId);
653                            undoCounter++;
654                            // TODO: do we need to modify the ack positions for the pub sub case?
655                        }
656
657                    } else {
658                        throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
659                    }
660                }
661            }
662        }
663        
664        end = System.currentTimeMillis();
665        if( undoCounter > 0 ) {
666                // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
667                // should do sync writes to the journal.
668                LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
669        }
670        }
671
672        private Location nextRecoveryPosition;
673        private Location lastRecoveryPosition;
674
675        public void incrementalRecover() throws IOException {
676            this.indexLock.writeLock().lock();
677        try {
678                if( nextRecoveryPosition == null ) {
679                        if( lastRecoveryPosition==null ) {
680                                nextRecoveryPosition = getRecoveryPosition();
681                        } else {
682                        nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
683                        }               
684                }
685                while (nextRecoveryPosition != null) {
686                        lastRecoveryPosition = nextRecoveryPosition;
687                    metadata.lastUpdate = lastRecoveryPosition;
688                    JournalCommand<?> message = load(lastRecoveryPosition);
689                    process(message, lastRecoveryPosition);            
690                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
691                }
692        }finally {
693            this.indexLock.writeLock().unlock();
694        }
695        }
696        
697    public Location getLastUpdatePosition() throws IOException {
698        return metadata.lastUpdate;
699    }
700    
701    private Location getRecoveryPosition() throws IOException {
702
703        if (!this.forceRecoverIndex) {
704
705            // If we need to recover the transactions..
706            if (metadata.firstInProgressTransactionLocation != null) {
707                return metadata.firstInProgressTransactionLocation;
708            }
709        
710            // Perhaps there were no transactions...
711            if( metadata.lastUpdate!=null) {
712                // Start replay at the record after the last one recorded in the index file.
713                return journal.getNextLocation(metadata.lastUpdate);
714            }
715        }
716        // This loads the first position.
717        return journal.getNextLocation(null);
718        }
719
720    protected void checkpointCleanup(final boolean cleanup) throws IOException {
721        long start;
722        this.indexLock.writeLock().lock();
723        try {
724            start = System.currentTimeMillis();
725                if( !opened.get() ) {
726                        return;
727                }
728            pageFile.tx().execute(new Transaction.Closure<IOException>() {
729                public void execute(Transaction tx) throws IOException {
730                    checkpointUpdate(tx, cleanup);
731                }
732            });
733        }finally {
734            this.indexLock.writeLock().unlock();
735        }
736        long end = System.currentTimeMillis();
737        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
738                LOG.info("Slow KahaDB access: cleanup took "+(end-start));
739        }
740    }
741
742    
743        public void checkpoint(Callback closure) throws Exception {
744            this.indexLock.writeLock().lock();
745        try {
746            pageFile.tx().execute(new Transaction.Closure<IOException>() {
747                public void execute(Transaction tx) throws IOException {
748                    checkpointUpdate(tx, false);
749                }
750            });
751            closure.execute();
752        }finally {
753            this.indexLock.writeLock().unlock();
754        }
755        }
756
757    // /////////////////////////////////////////////////////////////////
758    // Methods call by the broker to update and query the store.
759    // /////////////////////////////////////////////////////////////////
760    public Location store(JournalCommand<?> data) throws IOException {
761        return store(data, false, null,null);
762    }
763
764    /**
765     * All updated are are funneled through this method. The updates are converted
766     * to a JournalMessage which is logged to the journal and then the data from
767     * the JournalMessage is used to update the index just like it would be done
768     * during a recovery process.
769     */
770    public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
771        if (before != null) {
772            before.run();
773        }
774        try {
775            int size = data.serializedSizeFramed();
776            DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
777            os.writeByte(data.type().getNumber());
778            data.writeFramed(os);
779    
780            long start = System.currentTimeMillis();
781            Location location = journal.write(os.toByteSequence(), sync);
782            long start2 = System.currentTimeMillis();
783            process(data, location);
784                long end = System.currentTimeMillis();
785                if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
786                        LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
787                }
788    
789                this.indexLock.writeLock().lock();
790            try {
791                metadata.lastUpdate = location;
792            }finally {
793                this.indexLock.writeLock().unlock();
794            }
795            if (!checkpointThread.isAlive()) {
796                startCheckpoint();
797            }
798            if (after != null) {
799                after.run();
800            }
801            return location;
802        } catch (IOException ioe) {
803            LOG.error("KahaDB failed to store to Journal", ioe);
804            brokerService.handleIOException(ioe);
805            throw ioe;
806        }
807    }
808
809    /**
810     * Loads a previously stored JournalMessage
811     * 
812     * @param location
813     * @return
814     * @throws IOException
815     */
816    public JournalCommand<?> load(Location location) throws IOException {
817        ByteSequence data = journal.read(location);
818        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
819        byte readByte = is.readByte();
820        KahaEntryType type = KahaEntryType.valueOf(readByte);
821        if( type == null ) {
822            throw new IOException("Could not load journal record. Invalid location: "+location);
823        }
824        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
825        message.mergeFramed(is);
826        return message;
827    }
828    
829    /**
830     * do minimal recovery till we reach the last inDoubtLocation
831     * @param data
832     * @param location
833     * @param inDoubtlocation
834     * @throws IOException
835     */
836    void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
837        if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
838            process(data, location);
839        } else {
840            // just recover producer audit
841            data.visit(new Visitor() {
842                public void visit(KahaAddMessageCommand command) throws IOException {
843                    metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
844                }
845            });
846        }
847    }
848
849    // /////////////////////////////////////////////////////////////////
850    // Journaled record processing methods. Once the record is journaled,
851    // these methods handle applying the index updates. These may be called
852    // from the recovery method too so they need to be idempotent
853    // /////////////////////////////////////////////////////////////////
854
855    void process(JournalCommand<?> data, final Location location) throws IOException {
856        data.visit(new Visitor() {
857            @Override
858            public void visit(KahaAddMessageCommand command) throws IOException {
859                process(command, location);
860            }
861
862            @Override
863            public void visit(KahaRemoveMessageCommand command) throws IOException {
864                process(command, location);
865            }
866
867            @Override
868            public void visit(KahaPrepareCommand command) throws IOException {
869                process(command, location);
870            }
871
872            @Override
873            public void visit(KahaCommitCommand command) throws IOException {
874                process(command, location);
875            }
876
877            @Override
878            public void visit(KahaRollbackCommand command) throws IOException {
879                process(command, location);
880            }
881
882            @Override
883            public void visit(KahaRemoveDestinationCommand command) throws IOException {
884                process(command, location);
885            }
886
887            @Override
888            public void visit(KahaSubscriptionCommand command) throws IOException {
889                process(command, location);
890            }
891        });
892    }
893
894    protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
895        if (command.hasTransactionInfo()) {
896            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
897            inflightTx.add(new AddOpperation(command, location));
898        } else {
899            this.indexLock.writeLock().lock();
900            try {
901                pageFile.tx().execute(new Transaction.Closure<IOException>() {
902                    public void execute(Transaction tx) throws IOException {
903                        upadateIndex(tx, command, location);
904                    }
905                });
906            }finally {
907                this.indexLock.writeLock().unlock();
908            }
909        }
910    }
911
912    protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
913        if (command.hasTransactionInfo()) {
914           List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
915           inflightTx.add(new RemoveOpperation(command, location));
916        } else {
917            this.indexLock.writeLock().lock();
918            try {
919                pageFile.tx().execute(new Transaction.Closure<IOException>() {
920                    public void execute(Transaction tx) throws IOException {
921                        updateIndex(tx, command, location);
922                    }
923                });
924            }finally {
925                this.indexLock.writeLock().unlock();
926            }
927        }
928
929    }
930
931    protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
932        this.indexLock.writeLock().lock();
933        try {
934            pageFile.tx().execute(new Transaction.Closure<IOException>() {
935                public void execute(Transaction tx) throws IOException {
936                    updateIndex(tx, command, location);
937                }
938            });
939        }finally {
940            this.indexLock.writeLock().unlock();
941        }
942    }
943
944    protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
945        this.indexLock.writeLock().lock();
946        try {
947            pageFile.tx().execute(new Transaction.Closure<IOException>() {
948                public void execute(Transaction tx) throws IOException {
949                    updateIndex(tx, command, location);
950                }
951            });
952        }finally {
953            this.indexLock.writeLock().unlock();
954        }
955    }
956
957    protected void process(KahaCommitCommand command, Location location) throws IOException {
958        TransactionId key = key(command.getTransactionInfo());
959        List<Operation> inflightTx;
960        synchronized (inflightTransactions) {
961            inflightTx = inflightTransactions.remove(key);
962            if (inflightTx == null) {
963                inflightTx = preparedTransactions.remove(key);
964            }
965        }
966        if (inflightTx == null) {
967            return;
968        }
969
970        final List<Operation> messagingTx = inflightTx;
971        this.indexLock.writeLock().lock();
972        try {
973            pageFile.tx().execute(new Transaction.Closure<IOException>() {
974                public void execute(Transaction tx) throws IOException {
975                    for (Operation op : messagingTx) {
976                        op.execute(tx);
977                    }
978                }
979            });
980        }finally {
981            this.indexLock.writeLock().unlock();
982        }
983    }
984
985    protected void process(KahaPrepareCommand command, Location location) {
986        TransactionId key = key(command.getTransactionInfo());
987        synchronized (inflightTransactions) {
988            List<Operation> tx = inflightTransactions.remove(key);
989            if (tx != null) {
990                preparedTransactions.put(key, tx);
991            }
992        }
993    }
994
995    protected void process(KahaRollbackCommand command, Location location) {
996        TransactionId key = key(command.getTransactionInfo());
997        synchronized (inflightTransactions) {
998            List<Operation> tx = inflightTransactions.remove(key);
999            if (tx == null) {
1000                preparedTransactions.remove(key);
1001            }
1002        }
1003    }
1004
1005    // /////////////////////////////////////////////////////////////////
1006    // These methods do the actual index updates.
1007    // /////////////////////////////////////////////////////////////////
1008
1009    protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1010        private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1011
1012    void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1013        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1014
1015        // Skip adding the message to the index if this is a topic and there are
1016        // no subscriptions.
1017        if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1018            return;
1019        }
1020
1021        // Add the message.
1022        int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1023        long id = sd.orderIndex.getNextMessageId(priority);
1024        Long previous = sd.locationIndex.put(tx, location, id);
1025        if (previous == null) {
1026            previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1027            if (previous == null) {
1028                sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1029                if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1030                    addAckLocationForNewMessage(tx, sd, id);
1031                }
1032            } else {
1033                // If the message ID as indexed, then the broker asked us to
1034                // store a DUP
1035                // message. Bad BOY! Don't do it, and log a warning.
1036                LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId());
1037                // TODO: consider just rolling back the tx.
1038                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1039                sd.locationIndex.remove(tx, location);
1040            }
1041        } else {
1042            // restore the previous value.. Looks like this was a redo of a
1043            // previously
1044            // added message. We don't want to assign it a new id as the other
1045            // indexes would
1046            // be wrong..
1047            //
1048            // TODO: consider just rolling back the tx.
1049            sd.locationIndex.put(tx, location, previous);
1050        }
1051        // record this id in any event, initial send or recovery
1052        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1053    }
1054
1055    void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1056        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1057        if (!command.hasSubscriptionKey()) {
1058            
1059            // In the queue case we just remove the message from the index..
1060            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1061            if (sequenceId != null) {
1062                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1063                if (keys != null) {
1064                    sd.locationIndex.remove(tx, keys.location);
1065                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1066                }                
1067            }
1068        } else {
1069            // In the topic case we need remove the message once it's been acked
1070            // by all the subs
1071            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1072
1073            // Make sure it's a valid message id...
1074            if (sequence != null) {
1075                String subscriptionKey = command.getSubscriptionKey();
1076                if (command.getAck() != UNMATCHED) {
1077                    sd.orderIndex.get(tx, sequence);
1078                    byte priority = sd.orderIndex.lastGetPriority();
1079                    sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1080                }
1081                // The following method handles deleting un-referenced messages.
1082                removeAckLocation(tx, sd, subscriptionKey, sequence);
1083            }
1084
1085        }
1086    }
1087
1088    Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
1089    private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1090        Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1091        if (referenceFileIds == null) {
1092            referenceFileIds = new HashSet<Integer>();
1093            referenceFileIds.add(messageLocation.getDataFileId());
1094            ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1095        } else {
1096            Integer id = Integer.valueOf(messageLocation.getDataFileId());
1097            if (!referenceFileIds.contains(id)) {
1098                referenceFileIds.add(id);
1099            }
1100        }
1101    }
1102
1103    void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1104        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1105        sd.orderIndex.remove(tx);
1106        
1107        sd.locationIndex.clear(tx);
1108        sd.locationIndex.unload(tx);
1109        tx.free(sd.locationIndex.getPageId());
1110
1111        sd.messageIdIndex.clear(tx);
1112        sd.messageIdIndex.unload(tx);
1113        tx.free(sd.messageIdIndex.getPageId());
1114
1115        if (sd.subscriptions != null) {
1116            sd.subscriptions.clear(tx);
1117            sd.subscriptions.unload(tx);
1118            tx.free(sd.subscriptions.getPageId());
1119
1120            sd.subscriptionAcks.clear(tx);
1121            sd.subscriptionAcks.unload(tx);
1122            tx.free(sd.subscriptionAcks.getPageId());
1123
1124            sd.ackPositions.clear(tx);
1125            sd.ackPositions.unload(tx);
1126            tx.free(sd.ackPositions.getPageId());
1127        }
1128
1129        String key = key(command.getDestination());
1130        storedDestinations.remove(key);
1131        metadata.destinations.remove(tx, key);
1132    }
1133
1134    void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1135        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1136        final String subscriptionKey = command.getSubscriptionKey();
1137
1138        // If set then we are creating it.. otherwise we are destroying the sub
1139        if (command.hasSubscriptionInfo()) {
1140            sd.subscriptions.put(tx, subscriptionKey, command);
1141            long ackLocation=NOT_ACKED;
1142            if (!command.getRetroactive()) {
1143                ackLocation = sd.orderIndex.nextMessageId-1;
1144            } else {
1145                addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey);
1146            }
1147            sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1148        } else {
1149            // delete the sub...
1150            sd.subscriptions.remove(tx, subscriptionKey);
1151            sd.subscriptionAcks.remove(tx, subscriptionKey);
1152            removeAckLocationsForSub(tx, sd, subscriptionKey);
1153        }
1154    }
1155    
1156    /**
1157     * @param tx
1158     * @throws IOException
1159     */
1160    void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1161        LOG.debug("Checkpoint started.");
1162
1163        // reflect last update exclusive of current checkpoint
1164        Location firstTxLocation = metadata.lastUpdate;
1165
1166        metadata.state = OPEN_STATE;
1167        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1168        metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
1169        tx.store(metadata.page, metadataMarshaller, true);
1170        pageFile.flush();
1171
1172        if( cleanup ) {
1173
1174            final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1175            final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
1176
1177            LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet);
1178
1179                // Don't GC files under replication
1180                if( journalFilesBeingReplicated!=null ) {
1181                        gcCandidateSet.removeAll(journalFilesBeingReplicated);
1182                }
1183
1184            // Don't GC files after the first in progress tx
1185            if( metadata.firstInProgressTransactionLocation!=null ) {
1186                if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) {
1187                   firstTxLocation = metadata.firstInProgressTransactionLocation;
1188                };
1189            }
1190            
1191            if( firstTxLocation!=null ) {
1192                while( !gcCandidateSet.isEmpty() ) {
1193                        Integer last = gcCandidateSet.last();
1194                        if( last >= firstTxLocation.getDataFileId() ) {
1195                                gcCandidateSet.remove(last);
1196                        } else {
1197                                break;
1198                        }
1199                }
1200                LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet);
1201            }
1202
1203            // Go through all the destinations to see if any of them can remove GC candidates.
1204            for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1205                if( gcCandidateSet.isEmpty() ) {
1206                        break;
1207                }
1208
1209                // Use a visitor to cut down the number of pages that we load
1210                entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1211                    int last=-1;
1212                    public boolean isInterestedInKeysBetween(Location first, Location second) {
1213                        if( first==null ) {
1214                                SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1215                                if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1216                                        subset.remove(second.getDataFileId());
1217                                }
1218                                                        return !subset.isEmpty();
1219                        } else if( second==null ) {
1220                                SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1221                                if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1222                                        subset.remove(first.getDataFileId());
1223                                }
1224                                                        return !subset.isEmpty();
1225                        } else {
1226                                SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1227                                if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1228                                        subset.remove(first.getDataFileId());
1229                                }
1230                                if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1231                                        subset.remove(second.getDataFileId());
1232                                }
1233                                                        return !subset.isEmpty();
1234                        }
1235                    }
1236
1237                    public void visit(List<Location> keys, List<Long> values) {
1238                        for (Location l : keys) {
1239                            int fileId = l.getDataFileId();
1240                                                        if( last != fileId ) {
1241                                        gcCandidateSet.remove(fileId);
1242                                last = fileId;
1243                            }
1244                        }
1245                    }
1246                });
1247                LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1248            }
1249
1250            // check we are not deleting file with ack for in-use journal files
1251            LOG.trace("gc candidates: " + gcCandidateSet);
1252            final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet);
1253            Iterator<Integer> candidates = gcCandidateSet.iterator();
1254            while (candidates.hasNext()) {
1255                Integer candidate = candidates.next();
1256                Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate);
1257                if (referencedFileIds != null) {
1258                    for (Integer referencedFileId : referencedFileIds) {
1259                        if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) {
1260                            // active file that is not targeted for deletion is referenced so don't delete
1261                            candidates.remove();
1262                            break;
1263                        }
1264                    }
1265                    if (gcCandidateSet.contains(candidate)) {
1266                        ackMessageFileMap.remove(candidate);
1267                    } else {
1268                        LOG.trace("not removing data file: " + candidate
1269                                + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1270                    }
1271                }
1272            }
1273
1274            if( !gcCandidateSet.isEmpty() ) {
1275                    LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
1276                    journal.removeDataFiles(gcCandidateSet);
1277            }
1278        }
1279        
1280        LOG.debug("Checkpoint done.");
1281    }
1282    
1283    private Location checkpointProducerAudit() throws IOException {
1284        ByteArrayOutputStream baos = new ByteArrayOutputStream();
1285        ObjectOutputStream oout = new ObjectOutputStream(baos);
1286        oout.writeObject(metadata.producerSequenceIdTracker);
1287        oout.flush();
1288        oout.close();
1289        return store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), true, null, null);
1290    }
1291
1292    public HashSet<Integer> getJournalFilesBeingReplicated() {
1293                return journalFilesBeingReplicated;
1294        }
1295
1296    // /////////////////////////////////////////////////////////////////
1297    // StoredDestination related implementation methods.
1298    // /////////////////////////////////////////////////////////////////
1299
1300
1301        private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
1302
1303    class StoredSubscription {
1304        SubscriptionInfo subscriptionInfo;
1305        String lastAckId;
1306        Location lastAckLocation;
1307        Location cursor;
1308    }
1309    
1310    static class MessageKeys {
1311        final String messageId;
1312        final Location location;
1313        
1314        public MessageKeys(String messageId, Location location) {
1315            this.messageId=messageId;
1316            this.location=location;
1317        }
1318        
1319        @Override
1320        public String toString() {
1321            return "["+messageId+","+location+"]";
1322        }
1323    }
1324    
1325    static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
1326        static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
1327        
1328        public MessageKeys readPayload(DataInput dataIn) throws IOException {
1329            return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
1330        }
1331
1332        public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
1333            dataOut.writeUTF(object.messageId);
1334            LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
1335        }
1336    }
1337
1338    class LastAck {
1339        long lastAckedSequence;
1340        byte priority;
1341
1342        public LastAck(LastAck source) {
1343            this.lastAckedSequence = source.lastAckedSequence;
1344            this.priority = source.priority;
1345        }
1346
1347        public LastAck() {
1348            this.priority = MessageOrderIndex.HI;
1349        }
1350
1351        public LastAck(long ackLocation) {
1352            this.lastAckedSequence = ackLocation;
1353            this.priority = MessageOrderIndex.LO;
1354        }
1355
1356        public LastAck(long ackLocation, byte priority) {
1357            this.lastAckedSequence = ackLocation;
1358            this.priority = priority;
1359        }
1360
1361        public String toString() {
1362            return "[" + lastAckedSequence + ":" + priority + "]";
1363        }
1364    }
1365
1366    protected class LastAckMarshaller implements Marshaller<LastAck> {
1367        
1368        public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
1369            dataOut.writeLong(object.lastAckedSequence);
1370            dataOut.writeByte(object.priority);
1371        }
1372
1373        public LastAck readPayload(DataInput dataIn) throws IOException {
1374            LastAck lastAcked = new LastAck();
1375            lastAcked.lastAckedSequence = dataIn.readLong();
1376            if (metadata.version >= 3) {
1377                lastAcked.priority = dataIn.readByte();
1378            }
1379            return lastAcked;
1380        }
1381
1382        public int getFixedSize() {
1383            return 9;
1384        }
1385
1386        public LastAck deepCopy(LastAck source) {
1387            return new LastAck(source);
1388        }
1389
1390        public boolean isDeepCopySupported() {
1391            return true;
1392        }
1393    }
1394
1395    class StoredDestination {
1396        
1397        MessageOrderIndex orderIndex = new MessageOrderIndex();
1398        BTreeIndex<Location, Long> locationIndex;
1399        BTreeIndex<String, Long> messageIdIndex;
1400
1401        // These bits are only set for Topics
1402        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
1403        BTreeIndex<String, LastAck> subscriptionAcks;
1404        HashMap<String, MessageOrderCursor> subscriptionCursors;
1405        BTreeIndex<Long, HashSet<String>> ackPositions;
1406    }
1407
1408    protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
1409
1410        public StoredDestination readPayload(DataInput dataIn) throws IOException {
1411            final StoredDestination value = new StoredDestination();
1412            value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1413            value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
1414            value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
1415
1416            if (dataIn.readBoolean()) {
1417                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
1418                value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
1419                if (metadata.version >= 3) {
1420                    value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
1421                } else {
1422                    // upgrade
1423                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1424                        public void execute(Transaction tx) throws IOException {
1425                            value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, tx.allocate());
1426                            value.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
1427                            value.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
1428                            value.ackPositions.load(tx);
1429                        }
1430                    });
1431                }
1432            }
1433            if (metadata.version >= 2) {
1434                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1435                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1436            } else {
1437                    // upgrade
1438                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1439                        public void execute(Transaction tx) throws IOException {
1440                            value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1441                            value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1442                            value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1443                            value.orderIndex.lowPriorityIndex.load(tx);
1444
1445                            value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1446                            value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1447                            value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1448                            value.orderIndex.highPriorityIndex.load(tx);
1449                        }
1450                    });
1451            }
1452
1453            return value;
1454        }
1455
1456        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
1457            dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
1458            dataOut.writeLong(value.locationIndex.getPageId());
1459            dataOut.writeLong(value.messageIdIndex.getPageId());
1460            if (value.subscriptions != null) {
1461                dataOut.writeBoolean(true);
1462                dataOut.writeLong(value.subscriptions.getPageId());
1463                dataOut.writeLong(value.subscriptionAcks.getPageId());
1464                dataOut.writeLong(value.ackPositions.getPageId());
1465            } else {
1466                dataOut.writeBoolean(false);
1467            }
1468            dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
1469            dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
1470        }
1471    }
1472
1473    static class LocationMarshaller implements Marshaller<Location> {
1474        final static LocationMarshaller INSTANCE = new LocationMarshaller();
1475
1476        public Location readPayload(DataInput dataIn) throws IOException {
1477            Location rc = new Location();
1478            rc.setDataFileId(dataIn.readInt());
1479            rc.setOffset(dataIn.readInt());
1480            return rc;
1481        }
1482
1483        public void writePayload(Location object, DataOutput dataOut) throws IOException {
1484            dataOut.writeInt(object.getDataFileId());
1485            dataOut.writeInt(object.getOffset());
1486        }
1487
1488        public int getFixedSize() {
1489            return 8;
1490        }
1491
1492        public Location deepCopy(Location source) {
1493            return new Location(source);
1494        }
1495
1496        public boolean isDeepCopySupported() {
1497            return true;
1498        }
1499    }
1500
1501    static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
1502        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
1503
1504        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
1505            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
1506            rc.mergeFramed((InputStream)dataIn);
1507            return rc;
1508        }
1509
1510        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
1511            object.writeFramed((OutputStream)dataOut);
1512        }
1513    }
1514
1515    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1516        String key = key(destination);
1517        StoredDestination rc = storedDestinations.get(key);
1518        if (rc == null) {
1519            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
1520            rc = loadStoredDestination(tx, key, topic);
1521            // Cache it. We may want to remove/unload destinations from the
1522            // cache that are not used for a while
1523            // to reduce memory usage.
1524            storedDestinations.put(key, rc);
1525        }
1526        return rc;
1527    }
1528
1529
1530    protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1531        String key = key(destination);
1532        StoredDestination rc = storedDestinations.get(key);
1533        if (rc == null && metadata.destinations.containsKey(tx, key)) {
1534            rc = getStoredDestination(destination, tx);
1535        }
1536        return rc;
1537    }
1538
1539    /**
1540     * @param tx
1541     * @param key
1542     * @param topic
1543     * @return
1544     * @throws IOException
1545     */
1546    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
1547        // Try to load the existing indexes..
1548        StoredDestination rc = metadata.destinations.get(tx, key);
1549        if (rc == null) {
1550            // Brand new destination.. allocate indexes for it.
1551            rc = new StoredDestination();
1552            rc.orderIndex.allocate(tx);
1553            rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
1554            rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
1555
1556            if (topic) {
1557                rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
1558                rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
1559                rc.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, tx.allocate());
1560            }
1561            metadata.destinations.put(tx, key, rc);
1562        }
1563
1564        // Configure the marshalers and load.
1565        rc.orderIndex.load(tx);
1566
1567        // Figure out the next key using the last entry in the destination.
1568        rc.orderIndex.configureLast(tx);
1569
1570        rc.locationIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
1571        rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1572        rc.locationIndex.load(tx);
1573
1574        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
1575        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1576        rc.messageIdIndex.load(tx);
1577        
1578        // If it was a topic...
1579        if (topic) {
1580
1581            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
1582            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
1583            rc.subscriptions.load(tx);
1584
1585            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
1586            rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
1587            rc.subscriptionAcks.load(tx);
1588
1589            rc.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
1590            rc.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
1591            rc.ackPositions.load(tx);
1592
1593            rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
1594
1595            if (metadata.version < 3) {
1596
1597                // on upgrade need to fill ackLocation with available messages past last ack
1598                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
1599                    Entry<String, LastAck> entry = iterator.next();
1600                    for (Iterator<Entry<Long, MessageKeys>> orderIterator =
1601                            rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
1602                        Long sequence = orderIterator.next().getKey();
1603                        addAckLocation(tx, rc, sequence, entry.getKey());
1604                    }
1605                    // modify so it is upgraded                   
1606                    rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
1607                }
1608            }
1609            
1610            if (rc.orderIndex.nextMessageId == 0) {
1611                // check for existing durable sub all acked out - pull next seq from acks as messages are gone
1612                if (!rc.subscriptionAcks.isEmpty(tx)) {
1613                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
1614                        Entry<String, LastAck> entry = iterator.next();
1615                        rc.orderIndex.nextMessageId =
1616                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
1617                    }
1618                }
1619            } else {
1620                // update based on ackPositions for unmatched, last entry is always the next
1621                if (!rc.ackPositions.isEmpty(tx)) {
1622                    Entry<Long,HashSet<String>> last = rc.ackPositions.getLast(tx);
1623                    rc.orderIndex.nextMessageId =
1624                        Math.max(rc.orderIndex.nextMessageId, last.getKey());
1625                }
1626            }
1627
1628        }
1629
1630        if (metadata.version < 3) {
1631            // store again after upgrade
1632            metadata.destinations.put(tx, key, rc);
1633        }        
1634        return rc;
1635    }
1636
1637    private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
1638        HashSet<String> hs = sd.ackPositions.get(tx, messageSequence);
1639        if (hs == null) {
1640            hs = new HashSet<String>();
1641        }
1642        hs.add(subscriptionKey);
1643        // every ack location addition needs to be a btree modification to get it stored
1644        sd.ackPositions.put(tx, messageSequence, hs);
1645    }
1646
1647    // new sub is interested in potentially all existing messages
1648    private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
1649        for (Iterator<Entry<Long, HashSet<String>>> iterator = sd.ackPositions.iterator(tx, messageSequence); iterator.hasNext(); ) {
1650            Entry<Long, HashSet<String>> entry = iterator.next();
1651            entry.getValue().add(subscriptionKey);
1652            sd.ackPositions.put(tx, entry.getKey(), entry.getValue());
1653        }
1654    }
1655
1656    final HashSet nextMessageIdMarker = new HashSet<String>();
1657    // on a new message add, all existing subs are interested in this message
1658    private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
1659        HashSet hs = new HashSet<String>();
1660        for (Iterator<Entry<String, LastAck>> iterator = sd.subscriptionAcks.iterator(tx); iterator.hasNext();) {
1661            Entry<String, LastAck> entry = iterator.next();
1662            hs.add(entry.getKey());
1663        }
1664        sd.ackPositions.put(tx, messageSequence, hs);
1665        // add empty next to keep track of nextMessage
1666        sd.ackPositions.put(tx, messageSequence+1, nextMessageIdMarker);
1667    }
1668
1669    private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
1670        if (!sd.ackPositions.isEmpty(tx)) {
1671            Long end = sd.ackPositions.getLast(tx).getKey();
1672            for (Long sequence = sd.ackPositions.getFirst(tx).getKey(); sequence <= end; sequence++) {
1673                removeAckLocation(tx, sd, subscriptionKey, sequence);
1674            }
1675        }
1676    }
1677
1678    /**
1679     * @param tx
1680     * @param sd
1681     * @param subscriptionKey
1682     * @param sequenceId
1683     * @throws IOException
1684     */
1685    private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException {
1686        // Remove the sub from the previous location set..
1687        if (sequenceId != null) {
1688            HashSet<String> hs = sd.ackPositions.get(tx, sequenceId);
1689            if (hs != null) {
1690                hs.remove(subscriptionKey);
1691                if (hs.isEmpty()) {
1692                    HashSet<String> firstSet = sd.ackPositions.getFirst(tx).getValue();
1693                    sd.ackPositions.remove(tx, sequenceId);
1694
1695                    // Find all the entries that need to get deleted.
1696                    ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
1697                    sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
1698
1699                    // Do the actual deletes.
1700                    for (Entry<Long, MessageKeys> entry : deletes) {
1701                        sd.locationIndex.remove(tx, entry.getValue().location);
1702                        sd.messageIdIndex.remove(tx, entry.getValue().messageId);
1703                        sd.orderIndex.remove(tx, entry.getKey());
1704                    }
1705                } else {
1706                    // update
1707                    sd.ackPositions.put(tx, sequenceId, hs);
1708                }
1709            }
1710        }
1711    }
1712
1713    private String key(KahaDestination destination) {
1714        return destination.getType().getNumber() + ":" + destination.getName();
1715    }
1716
1717    // /////////////////////////////////////////////////////////////////
1718    // Transaction related implementation methods.
1719    // /////////////////////////////////////////////////////////////////
1720    protected final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
1721    protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
1722 
1723    private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
1724        TransactionId key = key(info);
1725        List<Operation> tx;
1726        synchronized (inflightTransactions) {
1727            tx = inflightTransactions.get(key);
1728            if (tx == null) {
1729                tx = Collections.synchronizedList(new ArrayList<Operation>());
1730                inflightTransactions.put(key, tx);
1731            }
1732        }
1733        return tx;
1734    }
1735
1736    private TransactionId key(KahaTransactionInfo transactionInfo) {
1737        if (transactionInfo.hasLocalTransacitonId()) {
1738            KahaLocalTransactionId tx = transactionInfo.getLocalTransacitonId();
1739            LocalTransactionId rc = new LocalTransactionId();
1740            rc.setConnectionId(new ConnectionId(tx.getConnectionId()));
1741            rc.setValue(tx.getTransacitonId());
1742            return rc;
1743        } else {
1744            KahaXATransactionId tx = transactionInfo.getXaTransacitonId();
1745            XATransactionId rc = new XATransactionId();
1746            rc.setBranchQualifier(tx.getBranchQualifier().toByteArray());
1747            rc.setGlobalTransactionId(tx.getGlobalTransactionId().toByteArray());
1748            rc.setFormatId(tx.getFormatId());
1749            return rc;
1750        }
1751    }
1752
1753    abstract class Operation {
1754        final Location location;
1755
1756        public Operation(Location location) {
1757            this.location = location;
1758        }
1759
1760        public Location getLocation() {
1761            return location;
1762        }
1763
1764        abstract public void execute(Transaction tx) throws IOException;
1765    }
1766
1767    class AddOpperation extends Operation {
1768        final KahaAddMessageCommand command;
1769
1770        public AddOpperation(KahaAddMessageCommand command, Location location) {
1771            super(location);
1772            this.command = command;
1773        }
1774
1775        @Override
1776        public void execute(Transaction tx) throws IOException {
1777            upadateIndex(tx, command, location);
1778        }
1779
1780        public KahaAddMessageCommand getCommand() {
1781            return command;
1782        }
1783    }
1784
1785    class RemoveOpperation extends Operation {
1786        final KahaRemoveMessageCommand command;
1787
1788        public RemoveOpperation(KahaRemoveMessageCommand command, Location location) {
1789            super(location);
1790            this.command = command;
1791        }
1792
1793        @Override
1794        public void execute(Transaction tx) throws IOException {
1795            updateIndex(tx, command, location);
1796        }
1797
1798        public KahaRemoveMessageCommand getCommand() {
1799            return command;
1800        }
1801    }
1802
1803    // /////////////////////////////////////////////////////////////////
1804    // Initialization related implementation methods.
1805    // /////////////////////////////////////////////////////////////////
1806
1807    private PageFile createPageFile() {
1808        PageFile index = new PageFile(directory, "db");
1809        index.setEnableWriteThread(isEnableIndexWriteAsync());
1810        index.setWriteBatchSize(getIndexWriteBatchSize());
1811        index.setPageCacheSize(indexCacheSize);
1812        return index;
1813    }
1814
1815    private Journal createJournal() throws IOException {
1816        Journal manager = new Journal();
1817        manager.setDirectory(directory);
1818        manager.setMaxFileLength(getJournalMaxFileLength());
1819        manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
1820        manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
1821        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
1822        manager.setArchiveDataLogs(isArchiveDataLogs());
1823        manager.setSizeAccumulator(storeSize);
1824        if (getDirectoryArchive() != null) {
1825            IOHelper.mkdirs(getDirectoryArchive());
1826            manager.setDirectoryArchive(getDirectoryArchive());
1827        }
1828        return manager;
1829    }
1830
1831    public int getJournalMaxWriteBatchSize() {
1832        return journalMaxWriteBatchSize;
1833    }
1834    
1835    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
1836        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
1837    }
1838
1839    public File getDirectory() {
1840        return directory;
1841    }
1842
1843    public void setDirectory(File directory) {
1844        this.directory = directory;
1845    }
1846
1847    public boolean isDeleteAllMessages() {
1848        return deleteAllMessages;
1849    }
1850
1851    public void setDeleteAllMessages(boolean deleteAllMessages) {
1852        this.deleteAllMessages = deleteAllMessages;
1853    }
1854    
1855    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
1856        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
1857    }
1858
1859    public int getIndexWriteBatchSize() {
1860        return setIndexWriteBatchSize;
1861    }
1862    
1863    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
1864        this.enableIndexWriteAsync = enableIndexWriteAsync;
1865    }
1866    
1867    boolean isEnableIndexWriteAsync() {
1868        return enableIndexWriteAsync;
1869    }
1870    
1871    public boolean isEnableJournalDiskSyncs() {
1872        return enableJournalDiskSyncs;
1873    }
1874
1875    public void setEnableJournalDiskSyncs(boolean syncWrites) {
1876        this.enableJournalDiskSyncs = syncWrites;
1877    }
1878
1879    public long getCheckpointInterval() {
1880        return checkpointInterval;
1881    }
1882
1883    public void setCheckpointInterval(long checkpointInterval) {
1884        this.checkpointInterval = checkpointInterval;
1885    }
1886
1887    public long getCleanupInterval() {
1888        return cleanupInterval;
1889    }
1890
1891    public void setCleanupInterval(long cleanupInterval) {
1892        this.cleanupInterval = cleanupInterval;
1893    }
1894
1895    public void setJournalMaxFileLength(int journalMaxFileLength) {
1896        this.journalMaxFileLength = journalMaxFileLength;
1897    }
1898    
1899    public int getJournalMaxFileLength() {
1900        return journalMaxFileLength;
1901    }
1902    
1903    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
1904        this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
1905    }
1906    
1907    public int getMaxFailoverProducersToTrack() {
1908        return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
1909    }
1910    
1911    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
1912        this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
1913    }
1914    
1915    public int getFailoverProducersAuditDepth() {
1916        return this.metadata.producerSequenceIdTracker.getAuditDepth();
1917    }
1918    
1919    public PageFile getPageFile() {
1920        if (pageFile == null) {
1921            pageFile = createPageFile();
1922        }
1923                return pageFile;
1924        }
1925
1926        public Journal getJournal() throws IOException {
1927        if (journal == null) {
1928            journal = createJournal();
1929        }
1930                return journal;
1931        }
1932
1933    public boolean isFailIfDatabaseIsLocked() {
1934        return failIfDatabaseIsLocked;
1935    }
1936
1937    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
1938        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
1939    }
1940
1941    public boolean isIgnoreMissingJournalfiles() {
1942        return ignoreMissingJournalfiles;
1943    }
1944    
1945    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
1946        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
1947    }
1948
1949    public int getIndexCacheSize() {
1950        return indexCacheSize;
1951    }
1952
1953    public void setIndexCacheSize(int indexCacheSize) {
1954        this.indexCacheSize = indexCacheSize;
1955    }
1956
1957    public boolean isCheckForCorruptJournalFiles() {
1958        return checkForCorruptJournalFiles;
1959    }
1960
1961    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
1962        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
1963    }
1964
1965    public boolean isChecksumJournalFiles() {
1966        return checksumJournalFiles;
1967    }
1968
1969    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
1970        this.checksumJournalFiles = checksumJournalFiles;
1971    }
1972
1973        public void setBrokerService(BrokerService brokerService) {
1974                this.brokerService = brokerService;
1975        }
1976
1977    /**
1978     * @return the archiveDataLogs
1979     */
1980    public boolean isArchiveDataLogs() {
1981        return this.archiveDataLogs;
1982    }
1983
1984    /**
1985     * @param archiveDataLogs the archiveDataLogs to set
1986     */
1987    public void setArchiveDataLogs(boolean archiveDataLogs) {
1988        this.archiveDataLogs = archiveDataLogs;
1989    }
1990
1991    /**
1992     * @return the directoryArchive
1993     */
1994    public File getDirectoryArchive() {
1995        return this.directoryArchive;
1996    }
1997
1998    /**
1999     * @param directoryArchive the directoryArchive to set
2000     */
2001    public void setDirectoryArchive(File directoryArchive) {
2002        this.directoryArchive = directoryArchive;
2003    }
2004
2005    /**
2006     * @return the databaseLockedWaitDelay
2007     */
2008    public int getDatabaseLockedWaitDelay() {
2009        return this.databaseLockedWaitDelay;
2010    }
2011
2012    /**
2013     * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
2014     */
2015    public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
2016        this.databaseLockedWaitDelay = databaseLockedWaitDelay;
2017    }
2018
2019    // /////////////////////////////////////////////////////////////////
2020    // Internal conversion methods.
2021    // /////////////////////////////////////////////////////////////////
2022
2023    KahaTransactionInfo createTransactionInfo(TransactionId txid) {
2024        if (txid == null) {
2025            return null;
2026        }
2027        KahaTransactionInfo rc = new KahaTransactionInfo();
2028
2029        if (txid.isLocalTransaction()) {
2030            LocalTransactionId t = (LocalTransactionId) txid;
2031            KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
2032            kahaTxId.setConnectionId(t.getConnectionId().getValue());
2033            kahaTxId.setTransacitonId(t.getValue());
2034            rc.setLocalTransacitonId(kahaTxId);
2035        } else {
2036            XATransactionId t = (XATransactionId) txid;
2037            KahaXATransactionId kahaTxId = new KahaXATransactionId();
2038            kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
2039            kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
2040            kahaTxId.setFormatId(t.getFormatId());
2041            rc.setXaTransacitonId(kahaTxId);
2042        }
2043        return rc;
2044    }
2045
2046    class MessageOrderCursor{
2047        long defaultCursorPosition;
2048        long lowPriorityCursorPosition;
2049        long highPriorityCursorPosition;
2050        MessageOrderCursor(){
2051        }
2052        
2053        MessageOrderCursor(long position){
2054            this.defaultCursorPosition=position;
2055            this.lowPriorityCursorPosition=position;
2056            this.highPriorityCursorPosition=position;
2057        }
2058        
2059        MessageOrderCursor(MessageOrderCursor other){
2060            this.defaultCursorPosition=other.defaultCursorPosition;
2061            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2062            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2063        }
2064        
2065        MessageOrderCursor copy() {
2066            return new MessageOrderCursor(this);
2067        }
2068        
2069        void reset() {
2070            this.defaultCursorPosition=0;
2071            this.highPriorityCursorPosition=0;
2072            this.lowPriorityCursorPosition=0;
2073        }
2074        
2075        void increment() {
2076            if (defaultCursorPosition!=0) {
2077                defaultCursorPosition++;
2078            }
2079            if (highPriorityCursorPosition!=0) {
2080                highPriorityCursorPosition++;
2081            }
2082            if (lowPriorityCursorPosition!=0) {
2083                lowPriorityCursorPosition++;
2084            }
2085        }
2086
2087        public String toString() {
2088           return "MessageOrderCursor:[def:" + defaultCursorPosition
2089                   + ", low:" + lowPriorityCursorPosition
2090                   + ", high:" +  highPriorityCursorPosition + "]";
2091        }
2092
2093        public void sync(MessageOrderCursor other) {
2094            this.defaultCursorPosition=other.defaultCursorPosition;
2095            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2096            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2097        }
2098    }
2099    
2100    class MessageOrderIndex {
2101        static final byte HI = 9;
2102        static final byte LO = 0;
2103        static final byte DEF = 4;
2104
2105        long nextMessageId;
2106        BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
2107        BTreeIndex<Long, MessageKeys> lowPriorityIndex;
2108        BTreeIndex<Long, MessageKeys> highPriorityIndex;
2109        MessageOrderCursor cursor = new MessageOrderCursor();
2110        Long lastDefaultKey;
2111        Long lastHighKey;
2112        Long lastLowKey;
2113        byte lastGetPriority;
2114
2115        MessageKeys remove(Transaction tx, Long key) throws IOException {
2116            MessageKeys result = defaultPriorityIndex.remove(tx, key);
2117            if (result == null && highPriorityIndex!=null) {
2118                result = highPriorityIndex.remove(tx, key);
2119                if (result ==null && lowPriorityIndex!=null) {
2120                    result = lowPriorityIndex.remove(tx, key);
2121                }
2122            }
2123            return result;
2124        }
2125        
2126        void load(Transaction tx) throws IOException {
2127            defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2128            defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2129            defaultPriorityIndex.load(tx);
2130            lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2131            lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2132            lowPriorityIndex.load(tx);
2133            highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2134            highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2135            highPriorityIndex.load(tx);
2136        }
2137        
2138        void allocate(Transaction tx) throws IOException {
2139            defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2140            if (metadata.version >= 2) {
2141                lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2142                highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2143            }
2144        }
2145        
2146        void configureLast(Transaction tx) throws IOException {
2147            // Figure out the next key using the last entry in the destination.
2148            if (highPriorityIndex != null) {
2149                Entry<Long, MessageKeys> lastEntry = highPriorityIndex.getLast(tx);
2150                if (lastEntry != null) {
2151                    nextMessageId = lastEntry.getKey() + 1;
2152                } else {
2153                    lastEntry = defaultPriorityIndex.getLast(tx);
2154                    if (lastEntry != null) {
2155                        nextMessageId = lastEntry.getKey() + 1;
2156                    } else {
2157                        lastEntry = lowPriorityIndex.getLast(tx);
2158                        if (lastEntry != null) {
2159                            nextMessageId = lastEntry.getKey() + 1;
2160                        }
2161                    }
2162                }
2163            } else {
2164                Entry<Long, MessageKeys> lastEntry = defaultPriorityIndex.getLast(tx);
2165                if (lastEntry != null) {
2166                    nextMessageId = lastEntry.getKey() + 1;
2167                }
2168            }
2169        }
2170        
2171               
2172        void remove(Transaction tx) throws IOException {
2173            defaultPriorityIndex.clear(tx);
2174            defaultPriorityIndex.unload(tx);
2175            tx.free(defaultPriorityIndex.getPageId());
2176            if (lowPriorityIndex != null) {
2177                lowPriorityIndex.clear(tx);
2178                lowPriorityIndex.unload(tx);
2179
2180                tx.free(lowPriorityIndex.getPageId());
2181            }
2182            if (highPriorityIndex != null) {
2183                highPriorityIndex.clear(tx);
2184                highPriorityIndex.unload(tx);
2185                tx.free(highPriorityIndex.getPageId());
2186            }
2187        }
2188        
2189        void resetCursorPosition() {
2190            this.cursor.reset();
2191            lastDefaultKey = null;
2192            lastHighKey = null;
2193            lastLowKey = null;
2194        }
2195        
2196        void setBatch(Transaction tx, Long sequence) throws IOException {
2197            if (sequence != null) {
2198                Long nextPosition = new Long(sequence.longValue() + 1);
2199                if (defaultPriorityIndex.containsKey(tx, sequence)) {
2200                    lastDefaultKey = sequence;
2201                    cursor.defaultCursorPosition = nextPosition.longValue();
2202                } else if (highPriorityIndex != null) {
2203                    if (highPriorityIndex.containsKey(tx, sequence)) {
2204                        lastHighKey = sequence;
2205                        cursor.highPriorityCursorPosition = nextPosition.longValue();
2206                    } else if (lowPriorityIndex.containsKey(tx, sequence)) {
2207                        lastLowKey = sequence;
2208                        cursor.lowPriorityCursorPosition = nextPosition.longValue();
2209                    }
2210                } else {
2211                    lastDefaultKey = sequence;
2212                    cursor.defaultCursorPosition = nextPosition.longValue();
2213                }
2214            }
2215        }
2216
2217        void setBatch(Transaction tx, LastAck last) throws IOException {
2218            setBatch(tx, last.lastAckedSequence);
2219            if (cursor.defaultCursorPosition == 0
2220                    && cursor.highPriorityCursorPosition == 0
2221                    && cursor.lowPriorityCursorPosition == 0) {
2222                long next = last.lastAckedSequence + 1;
2223                switch (last.priority) {
2224                    case DEF:
2225                        cursor.defaultCursorPosition = next;
2226                        cursor.highPriorityCursorPosition = next;
2227                        break;
2228                    case HI:
2229                        cursor.highPriorityCursorPosition = next;
2230                        break;
2231                    case LO:
2232                        cursor.lowPriorityCursorPosition = next;
2233                        cursor.defaultCursorPosition = next;
2234                        cursor.highPriorityCursorPosition = next;
2235                        break;
2236                }
2237            }
2238        }
2239        
2240        void stoppedIterating() {
2241            if (lastDefaultKey!=null) {
2242                cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
2243            }
2244            if (lastHighKey!=null) {
2245                cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
2246            }
2247            if (lastLowKey!=null) {
2248                cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
2249            }
2250            lastDefaultKey = null;
2251            lastHighKey = null;
2252            lastLowKey = null;
2253        }
2254        
2255        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
2256                throws IOException {
2257            if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
2258                getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
2259            } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
2260                getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
2261            } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
2262                getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
2263            }
2264        }
2265        
2266        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
2267                BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
2268
2269            Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId);
2270            deletes.add(iterator.next());
2271        }
2272        
2273        long getNextMessageId(int priority) {
2274            return nextMessageId++;
2275        }
2276        
2277        MessageKeys get(Transaction tx, Long key) throws IOException {
2278            MessageKeys result = defaultPriorityIndex.get(tx, key);
2279            if (result == null) {
2280                result = highPriorityIndex.get(tx, key);
2281                if (result == null) {
2282                    result = lowPriorityIndex.get(tx, key);
2283                    lastGetPriority = LO;
2284                } else {
2285                    lastGetPriority = HI;
2286                }
2287            } else {
2288                lastGetPriority = DEF;
2289            }
2290            return result;
2291        }
2292        
2293        MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
2294            if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
2295                return defaultPriorityIndex.put(tx, key, value);
2296            } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
2297                return highPriorityIndex.put(tx, key, value);
2298            } else {
2299                return lowPriorityIndex.put(tx, key, value);
2300            }
2301        }
2302        
2303        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
2304            return new MessageOrderIterator(tx,cursor);
2305        }
2306        
2307        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
2308            return new MessageOrderIterator(tx,m);
2309        }
2310
2311        public byte lastGetPriority() {
2312            return lastGetPriority;
2313        }
2314
2315        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
2316            Iterator<Entry<Long, MessageKeys>>currentIterator;
2317            final Iterator<Entry<Long, MessageKeys>>highIterator;
2318            final Iterator<Entry<Long, MessageKeys>>defaultIterator;
2319            final Iterator<Entry<Long, MessageKeys>>lowIterator;
2320            
2321            
2322
2323            MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {
2324                this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition);
2325                if (highPriorityIndex != null) {
2326                    this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition);
2327                } else {
2328                    this.highIterator = null;
2329                }
2330                if (lowPriorityIndex != null) {
2331                    this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition);
2332                } else {
2333                    this.lowIterator = null;
2334                }
2335            }
2336            
2337            public boolean hasNext() {
2338                if (currentIterator == null) {
2339                    if (highIterator != null) {
2340                        if (highIterator.hasNext()) {
2341                            currentIterator = highIterator;
2342                            return currentIterator.hasNext();
2343                        }
2344                        if (defaultIterator.hasNext()) {
2345                            currentIterator = defaultIterator;
2346                            return currentIterator.hasNext();
2347                        }
2348                        if (lowIterator.hasNext()) {
2349                            currentIterator = lowIterator;
2350                            return currentIterator.hasNext();
2351                        }
2352                        return false;
2353                    } else {
2354                        currentIterator = defaultIterator;
2355                        return currentIterator.hasNext();
2356                    }
2357                }
2358                if (highIterator != null) {
2359                    if (currentIterator.hasNext()) {
2360                        return true;
2361                    }
2362                    if (currentIterator == highIterator) {
2363                        if (defaultIterator.hasNext()) {
2364                            currentIterator = defaultIterator;
2365                            return currentIterator.hasNext();
2366                        }
2367                        if (lowIterator.hasNext()) {
2368                            currentIterator = lowIterator;
2369                            return currentIterator.hasNext();
2370                        }
2371                        return false;
2372                    }
2373                    if (currentIterator == defaultIterator) {
2374                        if (lowIterator.hasNext()) {
2375                            currentIterator = lowIterator;
2376                            return currentIterator.hasNext();
2377                        }
2378                        return false;
2379                    }
2380                }
2381                return currentIterator.hasNext();
2382            }
2383
2384            public Entry<Long, MessageKeys> next() {
2385                Entry<Long, MessageKeys> result = currentIterator.next();
2386                if (result != null) {
2387                    Long key = result.getKey();
2388                    if (highIterator != null) {
2389                        if (currentIterator == defaultIterator) {
2390                            lastDefaultKey = key;
2391                        } else if (currentIterator == highIterator) {
2392                            lastHighKey = key;
2393                        } else {
2394                            lastLowKey = key;
2395                        }
2396                    } else {
2397                        lastDefaultKey = key;
2398                    }
2399                }
2400                return result;
2401            }
2402
2403            public void remove() {
2404                throw new UnsupportedOperationException();
2405            }
2406           
2407        }
2408    }
2409    
2410    private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
2411        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
2412
2413        public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
2414            ByteArrayOutputStream baos = new ByteArrayOutputStream();
2415            ObjectOutputStream oout = new ObjectOutputStream(baos);
2416            oout.writeObject(object);
2417            oout.flush();
2418            oout.close();
2419            byte[] data = baos.toByteArray();
2420            dataOut.writeInt(data.length);
2421            dataOut.write(data);
2422        }
2423
2424        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
2425            int dataLen = dataIn.readInt();
2426            byte[] data = new byte[dataLen];
2427            dataIn.readFully(data);
2428            ByteArrayInputStream bais = new ByteArrayInputStream(data);
2429            ObjectInputStream oin = new ObjectInputStream(bais);
2430            try {
2431                return (HashSet<String>) oin.readObject();
2432            } catch (ClassNotFoundException cfe) {
2433                    IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
2434                    ioe.initCause(cfe);
2435                    throw ioe;
2436                }
2437        }
2438    }
2439}