001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.Map.Entry;
031import java.util.concurrent.ExecutorService;
032import java.util.concurrent.Future;
033import java.util.concurrent.FutureTask;
034import java.util.concurrent.LinkedBlockingQueue;
035import java.util.concurrent.Semaphore;
036import java.util.concurrent.ThreadFactory;
037import java.util.concurrent.ThreadPoolExecutor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicBoolean;
040import java.util.concurrent.atomic.AtomicInteger;
041import org.apache.activemq.broker.ConnectionContext;
042import org.apache.activemq.command.ActiveMQDestination;
043import org.apache.activemq.command.ActiveMQQueue;
044import org.apache.activemq.command.ActiveMQTempQueue;
045import org.apache.activemq.command.ActiveMQTempTopic;
046import org.apache.activemq.command.ActiveMQTopic;
047import org.apache.activemq.command.LocalTransactionId;
048import org.apache.activemq.command.Message;
049import org.apache.activemq.command.MessageAck;
050import org.apache.activemq.command.MessageId;
051import org.apache.activemq.command.ProducerId;
052import org.apache.activemq.command.SubscriptionInfo;
053import org.apache.activemq.command.TransactionId;
054import org.apache.activemq.command.XATransactionId;
055import org.apache.activemq.filter.BooleanExpression;
056import org.apache.activemq.filter.MessageEvaluationContext;
057import org.apache.activemq.openwire.OpenWireFormat;
058import org.apache.activemq.protobuf.Buffer;
059import org.apache.activemq.selector.SelectorParser;
060import org.apache.activemq.store.AbstractMessageStore;
061import org.apache.activemq.store.MessageRecoveryListener;
062import org.apache.activemq.store.MessageStore;
063import org.apache.activemq.store.PersistenceAdapter;
064import org.apache.activemq.store.TopicMessageStore;
065import org.apache.activemq.store.TransactionStore;
066import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
067import org.apache.activemq.store.kahadb.data.KahaDestination;
068import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
069import org.apache.activemq.store.kahadb.data.KahaLocation;
070import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
071import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
072import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
073import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
074import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
075import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
076import org.apache.activemq.usage.MemoryUsage;
077import org.apache.activemq.usage.SystemUsage;
078import org.apache.activemq.util.IOExceptionSupport;
079import org.apache.activemq.util.ServiceStopper;
080import org.apache.activemq.wireformat.WireFormat;
081import org.slf4j.Logger;
082import org.slf4j.LoggerFactory;
083import org.apache.kahadb.journal.Location;
084import org.apache.kahadb.page.Transaction;
085
086public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
087    static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
088    private static final int MAX_ASYNC_JOBS = 10000;
089
090    public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
091    public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
092            PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
093    public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
094    private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
095            PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
096
097    protected ExecutorService queueExecutor;
098    protected ExecutorService topicExecutor;
099    protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
100    protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
101    final WireFormat wireFormat = new OpenWireFormat();
102    private SystemUsage usageManager;
103    private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
104    private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
105    Semaphore globalQueueSemaphore;
106    Semaphore globalTopicSemaphore;
107    private boolean concurrentStoreAndDispatchQueues = true;
108    // when true, message order may be compromised when cache is exhausted if store is out
109    // or order w.r.t cache
110    private boolean concurrentStoreAndDispatchTopics = false;
111    private boolean concurrentStoreAndDispatchTransactions = false;
112    private int maxAsyncJobs = MAX_ASYNC_JOBS;
113    private final KahaDBTransactionStore transactionStore;
114
115    public KahaDBStore() {
116        this.transactionStore = new KahaDBTransactionStore(this);
117    }
118
119    public void setBrokerName(String brokerName) {
120    }
121
122    public void setUsageManager(SystemUsage usageManager) {
123        this.usageManager = usageManager;
124    }
125
126    public SystemUsage getUsageManager() {
127        return this.usageManager;
128    }
129
130    /**
131     * @return the concurrentStoreAndDispatch
132     */
133    public boolean isConcurrentStoreAndDispatchQueues() {
134        return this.concurrentStoreAndDispatchQueues;
135    }
136
137    /**
138     * @param concurrentStoreAndDispatch
139     *            the concurrentStoreAndDispatch to set
140     */
141    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
142        this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
143    }
144
145    /**
146     * @return the concurrentStoreAndDispatch
147     */
148    public boolean isConcurrentStoreAndDispatchTopics() {
149        return this.concurrentStoreAndDispatchTopics;
150    }
151
152    /**
153     * @param concurrentStoreAndDispatch
154     *            the concurrentStoreAndDispatch to set
155     */
156    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
157        this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
158    }
159
160    public boolean isConcurrentStoreAndDispatchTransactions() {
161        return this.concurrentStoreAndDispatchTransactions;
162    }
163    
164    /**
165     * @return the maxAsyncJobs
166     */
167    public int getMaxAsyncJobs() {
168        return this.maxAsyncJobs;
169    }
170    /**
171     * @param maxAsyncJobs
172     *            the maxAsyncJobs to set
173     */
174    public void setMaxAsyncJobs(int maxAsyncJobs) {
175        this.maxAsyncJobs = maxAsyncJobs;
176    }
177
178    @Override
179    public void doStart() throws Exception {
180        super.doStart();
181        this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
182        this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
183        this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
184        this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
185        this.queueExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
186                asyncQueueJobQueue, new ThreadFactory() {
187                    public Thread newThread(Runnable runnable) {
188                        Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
189                        thread.setDaemon(true);
190                        return thread;
191                    }
192                });
193        this.topicExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
194                asyncTopicJobQueue, new ThreadFactory() {
195                    public Thread newThread(Runnable runnable) {
196                        Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
197                        thread.setDaemon(true);
198                        return thread;
199                    }
200                });
201    }
202
203    @Override
204    public void doStop(ServiceStopper stopper) throws Exception {
205        // drain down async jobs
206        LOG.info("Stopping async queue tasks");
207        if (this.globalQueueSemaphore != null) {
208            this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
209        }
210        synchronized (this.asyncQueueMaps) {
211            for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
212                synchronized (m) {
213                    for (StoreTask task : m.values()) {
214                        task.cancel();
215                    }
216                }
217            }
218            this.asyncQueueMaps.clear();
219        }
220        LOG.info("Stopping async topic tasks");
221        if (this.globalTopicSemaphore != null) {
222            this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
223        }
224        synchronized (this.asyncTopicMaps) {
225            for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
226                synchronized (m) {
227                    for (StoreTask task : m.values()) {
228                        task.cancel();
229                    }
230                }
231            }
232            this.asyncTopicMaps.clear();
233        }
234        if (this.globalQueueSemaphore != null) {
235            this.globalQueueSemaphore.drainPermits();
236        }
237        if (this.globalTopicSemaphore != null) {
238            this.globalTopicSemaphore.drainPermits();
239        }
240        if (this.queueExecutor != null) {
241            this.queueExecutor.shutdownNow();
242        }
243        if (this.topicExecutor != null) {
244            this.topicExecutor.shutdownNow();
245        }
246        LOG.info("Stopped KahaDB");
247        super.doStop(stopper);
248    }
249
250    protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
251        StoreQueueTask task = null;
252        synchronized (store.asyncTaskMap) {
253            task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
254        }
255        return task;
256    }
257
258    protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
259        synchronized (store.asyncTaskMap) {
260            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
261        }
262        this.queueExecutor.execute(task);
263    }
264
265    protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
266        StoreTopicTask task = null;
267        synchronized (store.asyncTaskMap) {
268            task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
269        }
270        return task;
271    }
272
273    protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
274        synchronized (store.asyncTaskMap) {
275            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
276        }
277        this.topicExecutor.execute(task);
278    }
279
280    public TransactionStore createTransactionStore() throws IOException {
281        return this.transactionStore;
282    }
283
284    public boolean getForceRecoverIndex() {
285        return this.forceRecoverIndex;
286    }
287
288    public void setForceRecoverIndex(boolean forceRecoverIndex) {
289        this.forceRecoverIndex = forceRecoverIndex;
290    }
291
292    public class KahaDBMessageStore extends AbstractMessageStore {
293        protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
294        protected KahaDestination dest;
295        private final int maxAsyncJobs;
296        private final Semaphore localDestinationSemaphore;
297
298        double doneTasks, canceledTasks = 0;
299
300        public KahaDBMessageStore(ActiveMQDestination destination) {
301            super(destination);
302            this.dest = convert(destination);
303            this.maxAsyncJobs = getMaxAsyncJobs();
304            this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
305        }
306
307        @Override
308        public ActiveMQDestination getDestination() {
309            return destination;
310        }
311
312        @Override
313        public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
314                throws IOException {
315            if (isConcurrentStoreAndDispatchQueues()) {
316                StoreQueueTask result = new StoreQueueTask(this, context, message);
317                result.aquireLocks();
318                addQueueTask(this, result);
319                return result.getFuture();
320            } else {
321                return super.asyncAddQueueMessage(context, message);
322            }
323        }
324
325        @Override
326        public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
327            if (isConcurrentStoreAndDispatchQueues()) {
328                AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
329                StoreQueueTask task = null;
330                synchronized (asyncTaskMap) {
331                    task = (StoreQueueTask) asyncTaskMap.get(key);
332                }
333                if (task != null) {
334                    if (!task.cancel()) {
335                        try {
336
337                            task.future.get();
338                        } catch (InterruptedException e) {
339                            throw new InterruptedIOException(e.toString());
340                        } catch (Exception ignored) {
341                            LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
342                        }
343                        removeMessage(context, ack);
344                    } else {
345                        synchronized (asyncTaskMap) {
346                            asyncTaskMap.remove(key);
347                        }
348                    }
349                } else {
350                    removeMessage(context, ack);
351                }
352            } else {
353                removeMessage(context, ack);
354            }
355        }
356
357        public void addMessage(ConnectionContext context, Message message) throws IOException {
358            KahaAddMessageCommand command = new KahaAddMessageCommand();
359            command.setDestination(dest);
360            command.setMessageId(message.getMessageId().toString());
361            command.setTransactionInfo(createTransactionInfo(message.getTransactionId()));
362            command.setPriority(message.getPriority());
363            command.setPrioritySupported(isPrioritizedMessages());
364            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
365            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
366            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
367            
368        }
369
370        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
371            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
372            command.setDestination(dest);
373            command.setMessageId(ack.getLastMessageId().toString());
374            command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()));
375
376            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
377            command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
378            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
379        }
380
381        public void removeAllMessages(ConnectionContext context) throws IOException {
382            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
383            command.setDestination(dest);
384            store(command, true, null, null);
385        }
386
387        public Message getMessage(MessageId identity) throws IOException {
388            final String key = identity.toString();
389
390            // Hopefully one day the page file supports concurrent read
391            // operations... but for now we must
392            // externally synchronize...
393            Location location;
394            indexLock.readLock().lock();
395            try {
396                location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
397                    public Location execute(Transaction tx) throws IOException {
398                        StoredDestination sd = getStoredDestination(dest, tx);
399                        Long sequence = sd.messageIdIndex.get(tx, key);
400                        if (sequence == null) {
401                            return null;
402                        }
403                        return sd.orderIndex.get(tx, sequence).location;
404                    }
405                });
406            }finally {
407                indexLock.readLock().unlock();
408            }
409            if (location == null) {
410                return null;
411            }
412
413            return loadMessage(location);
414        }
415
416        public int getMessageCount() throws IOException {
417            try {
418                lockAsyncJobQueue();
419                indexLock.readLock().lock();
420                try {
421                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
422                        public Integer execute(Transaction tx) throws IOException {
423                            // Iterate through all index entries to get a count
424                            // of
425                            // messages in the destination.
426                            StoredDestination sd = getStoredDestination(dest, tx);
427                            int rc = 0;
428                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
429                                    .hasNext();) {
430                                iterator.next();
431                                rc++;
432                            }
433                            return rc;
434                        }
435                    });
436                }finally {
437                    indexLock.readLock().unlock();
438                }
439            } finally {
440                unlockAsyncJobQueue();
441            }
442        }
443
444        @Override
445        public boolean isEmpty() throws IOException {
446            indexLock.readLock().lock();
447            try {
448                return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
449                    public Boolean execute(Transaction tx) throws IOException {
450                        // Iterate through all index entries to get a count of
451                        // messages in the destination.
452                        StoredDestination sd = getStoredDestination(dest, tx);
453                        return sd.locationIndex.isEmpty(tx);
454                    }
455                });
456            }finally {
457                indexLock.readLock().unlock();
458            }
459        }
460
461        public void recover(final MessageRecoveryListener listener) throws Exception {
462            indexLock.readLock().lock();
463            try {
464                pageFile.tx().execute(new Transaction.Closure<Exception>() {
465                    public void execute(Transaction tx) throws Exception {
466                        StoredDestination sd = getStoredDestination(dest, tx);
467                        sd.orderIndex.resetCursorPosition();
468                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
469                                .hasNext();) {
470                            Entry<Long, MessageKeys> entry = iterator.next();
471                            Message msg = loadMessage(entry.getValue().location);
472                            listener.recoverMessage(msg);
473                        }
474                    }
475                });
476            }finally {
477                indexLock.readLock().unlock();
478            }
479        }
480
481        
482        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
483            indexLock.readLock().lock();
484            try {
485                pageFile.tx().execute(new Transaction.Closure<Exception>() {
486                    public void execute(Transaction tx) throws Exception {
487                        StoredDestination sd = getStoredDestination(dest, tx);
488                        Entry<Long, MessageKeys> entry = null;
489                        int counter = 0;
490                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
491                             listener.hasSpace() && iterator.hasNext(); ) {
492                            entry = iterator.next();
493                            Message msg = loadMessage(entry.getValue().location);
494                            listener.recoverMessage(msg);
495                            counter++;
496                            if (counter >= maxReturned) {
497                                break;
498                            }
499                        }
500                        sd.orderIndex.stoppedIterating();
501                    }
502                });
503            }finally {
504                indexLock.readLock().unlock();
505            }
506        }
507
508        public void resetBatching() {
509            try {
510                pageFile.tx().execute(new Transaction.Closure<Exception>() {
511                    public void execute(Transaction tx) throws Exception {
512                        StoredDestination sd = getExistingStoredDestination(dest, tx);
513                        if (sd != null) {
514                            sd.orderIndex.resetCursorPosition();}
515                        }
516                    });
517            } catch (Exception e) {
518                LOG.error("Failed to reset batching",e);
519            }
520        }
521
522        @Override
523        public void setBatch(MessageId identity) throws IOException {
524            try {
525                final String key = identity.toString();
526                lockAsyncJobQueue();
527
528                // Hopefully one day the page file supports concurrent read
529                // operations... but for now we must
530                // externally synchronize...
531               
532                indexLock.writeLock().lock();
533                try {
534                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
535                        public void execute(Transaction tx) throws IOException {
536                            StoredDestination sd = getStoredDestination(dest, tx);
537                            Long location = sd.messageIdIndex.get(tx, key);
538                            if (location != null) {
539                                sd.orderIndex.setBatch(tx, location);
540                            }
541                        }
542                    });
543                }finally {
544                    indexLock.writeLock().unlock();
545                }
546                
547            } finally {
548                unlockAsyncJobQueue();
549            }
550
551        }
552
553        @Override
554        public void setMemoryUsage(MemoryUsage memoeyUSage) {
555        }
556        @Override
557        public void start() throws Exception {
558            super.start();
559        }
560        @Override
561        public void stop() throws Exception {
562            super.stop();
563        }
564
565        protected void lockAsyncJobQueue() {
566            try {
567                this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
568            } catch (Exception e) {
569                LOG.error("Failed to lock async jobs for " + this.destination, e);
570            }
571        }
572
573        protected void unlockAsyncJobQueue() {
574            this.localDestinationSemaphore.release(this.maxAsyncJobs);
575        }
576
577        protected void acquireLocalAsyncLock() {
578            try {
579                this.localDestinationSemaphore.acquire();
580            } catch (InterruptedException e) {
581                LOG.error("Failed to aquire async lock for " + this.destination, e);
582            }
583        }
584
585        protected void releaseLocalAsyncLock() {
586            this.localDestinationSemaphore.release();
587        }
588
589    }
590
591    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
592        private final AtomicInteger subscriptionCount = new AtomicInteger();
593        public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
594            super(destination);
595            this.subscriptionCount.set(getAllSubscriptions().length);
596            asyncTopicMaps.add(asyncTaskMap);
597        }
598
599        @Override
600        public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
601                throws IOException {
602            if (isConcurrentStoreAndDispatchTopics()) {
603                StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
604                result.aquireLocks();
605                addTopicTask(this, result);
606                return result.getFuture();
607            } else {
608                return super.asyncAddTopicMessage(context, message);
609            }
610        }
611
612        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
613                                MessageId messageId, MessageAck ack)
614                throws IOException {
615            String subscriptionKey = subscriptionKey(clientId, subscriptionName);
616            if (isConcurrentStoreAndDispatchTopics()) {
617                AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
618                StoreTopicTask task = null;
619                synchronized (asyncTaskMap) {
620                    task = (StoreTopicTask) asyncTaskMap.get(key);
621                }
622                if (task != null) {
623                    if (task.addSubscriptionKey(subscriptionKey)) {
624                        removeTopicTask(this, messageId);
625                        if (task.cancel()) {
626                            synchronized (asyncTaskMap) {
627                                asyncTaskMap.remove(key);
628                            }
629                        }
630                    }
631                } else {
632                    doAcknowledge(context, subscriptionKey, messageId, ack);
633                }
634            } else {
635                doAcknowledge(context, subscriptionKey, messageId, ack);
636            }
637        }
638
639        protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
640                throws IOException {
641            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
642            command.setDestination(dest);
643            command.setSubscriptionKey(subscriptionKey);
644            command.setMessageId(messageId.toString());
645            command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()));
646            if (ack != null && ack.isUnmatchedAck()) {
647                command.setAck(UNMATCHED);
648            }
649            store(command, false, null, null);
650        }
651
652        public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
653            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
654                    .getSubscriptionName());
655            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
656            command.setDestination(dest);
657            command.setSubscriptionKey(subscriptionKey);
658            command.setRetroactive(retroactive);
659            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
660            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
661            store(command, isEnableJournalDiskSyncs() && true, null, null);
662            this.subscriptionCount.incrementAndGet();
663        }
664
665        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
666            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
667            command.setDestination(dest);
668            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
669            store(command, isEnableJournalDiskSyncs() && true, null, null);
670            this.subscriptionCount.decrementAndGet();
671        }
672
673        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
674
675            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
676            indexLock.readLock().lock();
677            try {
678                pageFile.tx().execute(new Transaction.Closure<IOException>() {
679                    public void execute(Transaction tx) throws IOException {
680                        StoredDestination sd = getStoredDestination(dest, tx);
681                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
682                                .hasNext();) {
683                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
684                            SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
685                                    .getValue().getSubscriptionInfo().newInput()));
686                            subscriptions.add(info);
687
688                        }
689                    }
690                });
691            }finally {
692                indexLock.readLock().unlock();
693            }
694
695            SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
696            subscriptions.toArray(rc);
697            return rc;
698        }
699
700        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
701            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
702            indexLock.readLock().lock();
703            try {
704                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
705                    public SubscriptionInfo execute(Transaction tx) throws IOException {
706                        StoredDestination sd = getStoredDestination(dest, tx);
707                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
708                        if (command == null) {
709                            return null;
710                        }
711                        return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
712                                .getSubscriptionInfo().newInput()));
713                    }
714                });
715            }finally {
716                indexLock.readLock().unlock();
717            }
718        }
719
720        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
721            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
722            indexLock.writeLock().lock();
723            try {
724                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
725                    public Integer execute(Transaction tx) throws IOException {
726                        StoredDestination sd = getStoredDestination(dest, tx);
727                        LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
728                        if (cursorPos == null) {
729                            // The subscription might not exist.
730                            return 0;
731                        }
732
733                        int counter = 0;
734                        for (Iterator<Entry<Long, HashSet<String>>> iterator =
735                                sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext();) {
736                            Entry<Long, HashSet<String>> entry = iterator.next();
737                            if (entry.getValue().contains(subscriptionKey)) {
738                                counter++;
739                            }
740                        }
741                        return counter;
742                    }
743                });
744            }finally {
745                indexLock.writeLock().unlock();
746            }
747        }
748
749        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
750                throws Exception {
751            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
752            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
753            indexLock.writeLock().lock();
754            try {
755                pageFile.tx().execute(new Transaction.Closure<Exception>() {
756                    public void execute(Transaction tx) throws Exception {
757                        StoredDestination sd = getStoredDestination(dest, tx);
758                        LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
759                        sd.orderIndex.setBatch(tx, cursorPos);
760                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
761                                .hasNext();) {
762                            Entry<Long, MessageKeys> entry = iterator.next();
763                            listener.recoverMessage(loadMessage(entry.getValue().location));
764                        }
765                        sd.orderIndex.resetCursorPosition();
766                    }
767                });
768            }finally {
769                indexLock.writeLock().unlock();
770            }
771        }
772
773        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
774                final MessageRecoveryListener listener) throws Exception {
775            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
776            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
777            indexLock.writeLock().lock();
778            try {
779                pageFile.tx().execute(new Transaction.Closure<Exception>() {
780                    public void execute(Transaction tx) throws Exception {
781                        StoredDestination sd = getStoredDestination(dest, tx);
782                        sd.orderIndex.resetCursorPosition();
783                        MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
784                        if (moc == null) {
785                            LastAck pos = sd.subscriptionAcks.get(tx, subscriptionKey);
786                            if (pos == null) {
787                                // sub deleted
788                                return;
789                            }
790                            sd.orderIndex.setBatch(tx, pos);
791                            moc = sd.orderIndex.cursor;
792                        } else {
793                            sd.orderIndex.cursor.sync(moc);
794                        }
795
796                        Entry<Long, MessageKeys> entry = null;
797                        int counter = 0;
798                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
799                                .hasNext();) {
800                            entry = iterator.next();
801                            if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
802                                counter++;
803                            }
804                            if (counter >= maxReturned || listener.hasSpace() == false) {
805                                break;
806                            }
807                        }
808                        sd.orderIndex.stoppedIterating();
809                        if (entry != null) {
810                            MessageOrderCursor copy = sd.orderIndex.cursor.copy();
811                            sd.subscriptionCursors.put(subscriptionKey, copy);
812                        }
813                    }
814                });
815            }finally {
816                indexLock.writeLock().unlock();
817            }
818        }
819
820        public void resetBatching(String clientId, String subscriptionName) {
821            try {
822                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
823                indexLock.writeLock().lock();
824                try {
825                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
826                        public void execute(Transaction tx) throws IOException {
827                            StoredDestination sd = getStoredDestination(dest, tx);
828                            sd.subscriptionCursors.remove(subscriptionKey);
829                        }
830                    });
831                }finally {
832                    indexLock.writeLock().unlock();
833                }
834            } catch (IOException e) {
835                throw new RuntimeException(e);
836            }
837        }
838    }
839
840    String subscriptionKey(String clientId, String subscriptionName) {
841        return clientId + ":" + subscriptionName;
842    }
843
844    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
845        return this.transactionStore.proxy(new KahaDBMessageStore(destination));
846    }
847
848    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
849        return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
850    }
851
852    /**
853     * Cleanup method to remove any state associated with the given destination.
854     * This method does not stop the message store (it might not be cached).
855     * 
856     * @param destination
857     *            Destination to forget
858     */
859    public void removeQueueMessageStore(ActiveMQQueue destination) {
860    }
861
862    /**
863     * Cleanup method to remove any state associated with the given destination
864     * This method does not stop the message store (it might not be cached).
865     * 
866     * @param destination
867     *            Destination to forget
868     */
869    public void removeTopicMessageStore(ActiveMQTopic destination) {
870    }
871
872    public void deleteAllMessages() throws IOException {
873        deleteAllMessages = true;
874    }
875
876    public Set<ActiveMQDestination> getDestinations() {
877        try {
878            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
879            indexLock.readLock().lock();
880            try {
881                pageFile.tx().execute(new Transaction.Closure<IOException>() {
882                    public void execute(Transaction tx) throws IOException {
883                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
884                                .hasNext();) {
885                            Entry<String, StoredDestination> entry = iterator.next();
886                            if (!isEmptyTopic(entry, tx)) {
887                                rc.add(convert(entry.getKey()));
888                            }
889                        }
890                    }
891
892                    private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx)
893                            throws IOException {
894                        boolean isEmptyTopic = false;
895                        ActiveMQDestination dest = convert(entry.getKey());
896                        if (dest.isTopic()) {
897                            StoredDestination loadedStore = getStoredDestination(convert(dest), tx);
898                            if (loadedStore.subscriptionAcks.isEmpty(tx)) {
899                                isEmptyTopic = true;
900                            }
901                        }
902                        return isEmptyTopic;
903                    }
904                });
905            }finally {
906                indexLock.readLock().unlock();
907            }
908            return rc;
909        } catch (IOException e) {
910            throw new RuntimeException(e);
911        }
912    }
913
914    public long getLastMessageBrokerSequenceId() throws IOException {
915        return 0;
916    }
917    
918    public long getLastProducerSequenceId(ProducerId id) {
919        indexLock.readLock().lock();
920        try {
921            return metadata.producerSequenceIdTracker.getLastSeqId(id);
922        } finally {
923            indexLock.readLock().unlock();
924        }
925    }
926
927    public long size() {
928        return storeSize.get();
929    }
930
931    public void beginTransaction(ConnectionContext context) throws IOException {
932        throw new IOException("Not yet implemented.");
933    }
934    public void commitTransaction(ConnectionContext context) throws IOException {
935        throw new IOException("Not yet implemented.");
936    }
937    public void rollbackTransaction(ConnectionContext context) throws IOException {
938        throw new IOException("Not yet implemented.");
939    }
940
941    public void checkpoint(boolean sync) throws IOException {
942        super.checkpointCleanup(false);
943    }
944
945    // /////////////////////////////////////////////////////////////////
946    // Internal helper methods.
947    // /////////////////////////////////////////////////////////////////
948
949    /**
950     * @param location
951     * @return
952     * @throws IOException
953     */
954    Message loadMessage(Location location) throws IOException {
955        KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
956        Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
957        return msg;
958    }
959
960    // /////////////////////////////////////////////////////////////////
961    // Internal conversion methods.
962    // /////////////////////////////////////////////////////////////////
963
964    KahaLocation convert(Location location) {
965        KahaLocation rc = new KahaLocation();
966        rc.setLogId(location.getDataFileId());
967        rc.setOffset(location.getOffset());
968        return rc;
969    }
970
971    KahaDestination convert(ActiveMQDestination dest) {
972        KahaDestination rc = new KahaDestination();
973        rc.setName(dest.getPhysicalName());
974        switch (dest.getDestinationType()) {
975        case ActiveMQDestination.QUEUE_TYPE:
976            rc.setType(DestinationType.QUEUE);
977            return rc;
978        case ActiveMQDestination.TOPIC_TYPE:
979            rc.setType(DestinationType.TOPIC);
980            return rc;
981        case ActiveMQDestination.TEMP_QUEUE_TYPE:
982            rc.setType(DestinationType.TEMP_QUEUE);
983            return rc;
984        case ActiveMQDestination.TEMP_TOPIC_TYPE:
985            rc.setType(DestinationType.TEMP_TOPIC);
986            return rc;
987        default:
988            return null;
989        }
990    }
991
992    ActiveMQDestination convert(String dest) {
993        int p = dest.indexOf(":");
994        if (p < 0) {
995            throw new IllegalArgumentException("Not in the valid destination format");
996        }
997        int type = Integer.parseInt(dest.substring(0, p));
998        String name = dest.substring(p + 1);
999
1000        switch (KahaDestination.DestinationType.valueOf(type)) {
1001        case QUEUE:
1002            return new ActiveMQQueue(name);
1003        case TOPIC:
1004            return new ActiveMQTopic(name);
1005        case TEMP_QUEUE:
1006            return new ActiveMQTempQueue(name);
1007        case TEMP_TOPIC:
1008            return new ActiveMQTempTopic(name);
1009        default:
1010            throw new IllegalArgumentException("Not in the valid destination format");
1011        }
1012    }
1013
1014    static class AsyncJobKey {
1015        MessageId id;
1016        ActiveMQDestination destination;
1017
1018        AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1019            this.id = id;
1020            this.destination = destination;
1021        }
1022
1023        @Override
1024        public boolean equals(Object obj) {
1025            if (obj == this) {
1026                return true;
1027            }
1028            return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1029                    && destination.equals(((AsyncJobKey) obj).destination);
1030        }
1031
1032        @Override
1033        public int hashCode() {
1034            return id.hashCode() + destination.hashCode();
1035        }
1036
1037        @Override
1038        public String toString() {
1039            return destination.getPhysicalName() + "-" + id;
1040        }
1041    }
1042
1043    interface StoreTask {
1044        public boolean cancel();
1045    }
1046
1047    class StoreQueueTask implements Runnable, StoreTask {
1048        protected final Message message;
1049        protected final ConnectionContext context;
1050        protected final KahaDBMessageStore store;
1051        protected final InnerFutureTask future;
1052        protected final AtomicBoolean done = new AtomicBoolean();
1053        protected final AtomicBoolean locked = new AtomicBoolean();
1054
1055        public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1056            this.store = store;
1057            this.context = context;
1058            this.message = message;
1059            this.future = new InnerFutureTask(this);
1060        }
1061
1062        public Future<Object> getFuture() {
1063            return this.future;
1064        }
1065
1066        public boolean cancel() {
1067            releaseLocks();
1068            if (this.done.compareAndSet(false, true)) {
1069                return this.future.cancel(false);
1070            }
1071            return false;
1072        }
1073
1074        void aquireLocks() {
1075            if (this.locked.compareAndSet(false, true)) {
1076                try {
1077                    globalQueueSemaphore.acquire();
1078                    store.acquireLocalAsyncLock();
1079                    message.incrementReferenceCount();
1080                } catch (InterruptedException e) {
1081                    LOG.warn("Failed to aquire lock", e);
1082                }
1083            }
1084
1085        }
1086
1087        void releaseLocks() {
1088            if (this.locked.compareAndSet(true, false)) {
1089                store.releaseLocalAsyncLock();
1090                globalQueueSemaphore.release();
1091                message.decrementReferenceCount();
1092            }
1093        }
1094
1095        public void run() {
1096            this.store.doneTasks++;
1097            try {
1098                if (this.done.compareAndSet(false, true)) {
1099                    this.store.addMessage(context, message);
1100                    removeQueueTask(this.store, this.message.getMessageId());
1101                    this.future.complete();
1102                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1103                    System.err.println(this.store.dest.getName() + " cancelled: "
1104                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1105                    this.store.canceledTasks = this.store.doneTasks = 0;
1106                }
1107            } catch (Exception e) {
1108                this.future.setException(e);
1109            } finally {
1110                releaseLocks();
1111            }
1112        }
1113
1114        protected Message getMessage() {
1115            return this.message;
1116        }
1117
1118        private class InnerFutureTask extends FutureTask<Object> {
1119
1120            public InnerFutureTask(Runnable runnable) {
1121                super(runnable, null);
1122
1123            }
1124
1125            public void setException(final Exception e) {
1126                super.setException(e);
1127            }
1128
1129            public void complete() {
1130                super.set(null);
1131            }
1132        }
1133    }
1134
1135    class StoreTopicTask extends StoreQueueTask {
1136        private final int subscriptionCount;
1137        private final List<String> subscriptionKeys = new ArrayList<String>(1);
1138        private final KahaDBTopicMessageStore topicStore;
1139        public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1140                int subscriptionCount) {
1141            super(store, context, message);
1142            this.topicStore = store;
1143            this.subscriptionCount = subscriptionCount;
1144
1145        }
1146
1147        @Override
1148        void aquireLocks() {
1149            if (this.locked.compareAndSet(false, true)) {
1150                try {
1151                    globalTopicSemaphore.acquire();
1152                    store.acquireLocalAsyncLock();
1153                    message.incrementReferenceCount();
1154                } catch (InterruptedException e) {
1155                    LOG.warn("Failed to aquire lock", e);
1156                }
1157            }
1158
1159        }
1160
1161        @Override
1162        void releaseLocks() {
1163            if (this.locked.compareAndSet(true, false)) {
1164                message.decrementReferenceCount();
1165                store.releaseLocalAsyncLock();
1166                globalTopicSemaphore.release();
1167            }
1168        }
1169
1170        /**
1171         * add a key
1172         * 
1173         * @param key
1174         * @return true if all acknowledgements received
1175         */
1176        public boolean addSubscriptionKey(String key) {
1177            synchronized (this.subscriptionKeys) {
1178                this.subscriptionKeys.add(key);
1179            }
1180            return this.subscriptionKeys.size() >= this.subscriptionCount;
1181        }
1182
1183        @Override
1184        public void run() {
1185            this.store.doneTasks++;
1186            try {
1187                if (this.done.compareAndSet(false, true)) {
1188                    this.topicStore.addMessage(context, message);
1189                    // apply any acks we have
1190                    synchronized (this.subscriptionKeys) {
1191                        for (String key : this.subscriptionKeys) {
1192                            this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1193
1194                        }
1195                    }
1196                    removeTopicTask(this.topicStore, this.message.getMessageId());
1197                    this.future.complete();
1198                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1199                    System.err.println(this.store.dest.getName() + " cancelled: "
1200                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1201                    this.store.canceledTasks = this.store.doneTasks = 0;
1202                }
1203            } catch (Exception e) {
1204                this.future.setException(e);
1205            } finally {
1206                releaseLocks();
1207            }
1208        }
1209    }
1210}