001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.Set;
022import org.apache.activeio.journal.Journal;
023import org.apache.activemq.broker.BrokerService;
024import org.apache.activemq.broker.BrokerServiceAware;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.activemq.command.ActiveMQQueue;
028import org.apache.activemq.command.ActiveMQTopic;
029import org.apache.activemq.command.ProducerId;
030import org.apache.activemq.store.MessageStore;
031import org.apache.activemq.store.PersistenceAdapter;
032import org.apache.activemq.store.TopicMessageStore;
033import org.apache.activemq.store.TransactionStore;
034import org.apache.activemq.usage.SystemUsage;
035
036/**
037 * An implementation of {@link PersistenceAdapter} designed for use with a
038 * {@link Journal} and then check pointing asynchronously on a timeout with some
039 * other long term persistent storage.
040 * 
041 * @org.apache.xbean.XBean element="kahaDB"
042 * 
043 */
044public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
045    private final KahaDBStore letter = new KahaDBStore();
046
047    /**
048     * @param context
049     * @throws IOException
050     * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
051     */
052    public void beginTransaction(ConnectionContext context) throws IOException {
053        this.letter.beginTransaction(context);
054    }
055
056    /**
057     * @param sync
058     * @throws IOException
059     * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
060     */
061    public void checkpoint(boolean sync) throws IOException {
062        this.letter.checkpoint(sync);
063    }
064
065    /**
066     * @param context
067     * @throws IOException
068     * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
069     */
070    public void commitTransaction(ConnectionContext context) throws IOException {
071        this.letter.commitTransaction(context);
072    }
073
074    /**
075     * @param destination
076     * @return MessageStore
077     * @throws IOException
078     * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
079     */
080    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
081        return this.letter.createQueueMessageStore(destination);
082    }
083
084    /**
085     * @param destination
086     * @return TopicMessageStore
087     * @throws IOException
088     * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
089     */
090    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
091        return this.letter.createTopicMessageStore(destination);
092    }
093
094    /**
095     * @return TrandactionStore
096     * @throws IOException
097     * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
098     */
099    public TransactionStore createTransactionStore() throws IOException {
100        return this.letter.createTransactionStore();
101    }
102
103    /**
104     * @throws IOException
105     * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
106     */
107    public void deleteAllMessages() throws IOException {
108        this.letter.deleteAllMessages();
109    }
110
111    /**
112     * @return destinations
113     * @see org.apache.activemq.store.PersistenceAdapter#getDestinations()
114     */
115    public Set<ActiveMQDestination> getDestinations() {
116        return this.letter.getDestinations();
117    }
118
119    /**
120     * @return lastMessageBrokerSequenceId
121     * @throws IOException
122     * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
123     */
124    public long getLastMessageBrokerSequenceId() throws IOException {
125        return this.letter.getLastMessageBrokerSequenceId();
126    }
127
128    public long getLastProducerSequenceId(ProducerId id) throws IOException {
129        return this.letter.getLastProducerSequenceId(id);
130    }
131
132    /**
133     * @param destination
134     * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
135     */
136    public void removeQueueMessageStore(ActiveMQQueue destination) {
137        this.letter.removeQueueMessageStore(destination);
138    }
139
140    /**
141     * @param destination
142     * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
143     */
144    public void removeTopicMessageStore(ActiveMQTopic destination) {
145        this.letter.removeTopicMessageStore(destination);
146    }
147
148    /**
149     * @param context
150     * @throws IOException
151     * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
152     */
153    public void rollbackTransaction(ConnectionContext context) throws IOException {
154        this.letter.rollbackTransaction(context);
155    }
156
157    /**
158     * @param brokerName
159     * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
160     */
161    public void setBrokerName(String brokerName) {
162        this.letter.setBrokerName(brokerName);
163    }
164
165    /**
166     * @param usageManager
167     * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
168     */
169    public void setUsageManager(SystemUsage usageManager) {
170        this.letter.setUsageManager(usageManager);
171    }
172
173    /**
174     * @return the size of the store
175     * @see org.apache.activemq.store.PersistenceAdapter#size()
176     */
177    public long size() {
178        return this.letter.size();
179    }
180
181    /**
182     * @throws Exception
183     * @see org.apache.activemq.Service#start()
184     */
185    public void start() throws Exception {
186        this.letter.start();
187    }
188
189    /**
190     * @throws Exception
191     * @see org.apache.activemq.Service#stop()
192     */
193    public void stop() throws Exception {
194        this.letter.stop();
195    }
196
197    /**
198     * Get the journalMaxFileLength
199     * 
200     * @return the journalMaxFileLength
201     */
202    public int getJournalMaxFileLength() {
203        return this.letter.getJournalMaxFileLength();
204    }
205
206    /**
207     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
208     * be used
209     * 
210     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
211     */
212    public void setJournalMaxFileLength(int journalMaxFileLength) {
213        this.letter.setJournalMaxFileLength(journalMaxFileLength);
214    }
215
216    /**
217     * Set the max number of producers (LRU cache) to track for duplicate sends
218     */
219    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
220        this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
221    }
222    
223    public int getMaxFailoverProducersToTrack() {
224        return this.letter.getMaxFailoverProducersToTrack();
225    }
226
227    /**
228     * set the audit window depth for duplicate suppression (should exceed the max transaction
229     * batch)
230     */
231    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
232        this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
233    }
234    
235    public int getFailoverProducersAuditDepth() {
236        return this.getFailoverProducersAuditDepth();
237    }
238    
239    /**
240     * Get the checkpointInterval
241     * 
242     * @return the checkpointInterval
243     */
244    public long getCheckpointInterval() {
245        return this.letter.getCheckpointInterval();
246    }
247
248    /**
249     * Set the checkpointInterval
250     * 
251     * @param checkpointInterval
252     *            the checkpointInterval to set
253     */
254    public void setCheckpointInterval(long checkpointInterval) {
255        this.letter.setCheckpointInterval(checkpointInterval);
256    }
257
258    /**
259     * Get the cleanupInterval
260     * 
261     * @return the cleanupInterval
262     */
263    public long getCleanupInterval() {
264        return this.letter.getCleanupInterval();
265    }
266
267    /**
268     * Set the cleanupInterval
269     * 
270     * @param cleanupInterval
271     *            the cleanupInterval to set
272     */
273    public void setCleanupInterval(long cleanupInterval) {
274        this.letter.setCleanupInterval(cleanupInterval);
275    }
276
277    /**
278     * Get the indexWriteBatchSize
279     * 
280     * @return the indexWriteBatchSize
281     */
282    public int getIndexWriteBatchSize() {
283        return this.letter.getIndexWriteBatchSize();
284    }
285
286    /**
287     * Set the indexWriteBatchSize
288     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
289     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
290     * @param indexWriteBatchSize
291     *            the indexWriteBatchSize to set
292     */
293    public void setIndexWriteBatchSize(int indexWriteBatchSize) {
294        this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
295    }
296
297    /**
298     * Get the journalMaxWriteBatchSize
299     * 
300     * @return the journalMaxWriteBatchSize
301     */
302    public int getJournalMaxWriteBatchSize() {
303        return this.letter.getJournalMaxWriteBatchSize();
304    }
305
306    /**
307     * Set the journalMaxWriteBatchSize
308     *  * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
309     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
310     * @param journalMaxWriteBatchSize
311     *            the journalMaxWriteBatchSize to set
312     */
313    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
314        this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
315    }
316
317    /**
318     * Get the enableIndexWriteAsync
319     * 
320     * @return the enableIndexWriteAsync
321     */
322    public boolean isEnableIndexWriteAsync() {
323        return this.letter.isEnableIndexWriteAsync();
324    }
325
326    /**
327     * Set the enableIndexWriteAsync
328     * 
329     * @param enableIndexWriteAsync
330     *            the enableIndexWriteAsync to set
331     */
332    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
333        this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
334    }
335
336    /**
337     * Get the directory
338     * 
339     * @return the directory
340     */
341    public File getDirectory() {
342        return this.letter.getDirectory();
343    }
344
345    /**
346     * @param dir
347     * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
348     */
349    public void setDirectory(File dir) {
350        this.letter.setDirectory(dir);
351    }
352
353    /**
354     * Get the enableJournalDiskSyncs
355     * 
356     * @return the enableJournalDiskSyncs
357     */
358    public boolean isEnableJournalDiskSyncs() {
359        return this.letter.isEnableJournalDiskSyncs();
360    }
361
362    /**
363     * Set the enableJournalDiskSyncs
364     * 
365     * @param enableJournalDiskSyncs
366     *            the enableJournalDiskSyncs to set
367     */
368    public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
369        this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
370    }
371
372    /**
373     * Get the indexCacheSize
374     * 
375     * @return the indexCacheSize
376     */
377    public int getIndexCacheSize() {
378        return this.letter.getIndexCacheSize();
379    }
380
381    /**
382     * Set the indexCacheSize
383     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
384     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
385     * @param indexCacheSize
386     *            the indexCacheSize to set
387     */
388    public void setIndexCacheSize(int indexCacheSize) {
389        this.letter.setIndexCacheSize(indexCacheSize);
390    }
391
392    /**
393     * Get the ignoreMissingJournalfiles
394     * 
395     * @return the ignoreMissingJournalfiles
396     */
397    public boolean isIgnoreMissingJournalfiles() {
398        return this.letter.isIgnoreMissingJournalfiles();
399    }
400
401    /**
402     * Set the ignoreMissingJournalfiles
403     * 
404     * @param ignoreMissingJournalfiles
405     *            the ignoreMissingJournalfiles to set
406     */
407    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
408        this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
409    }
410
411    public boolean isChecksumJournalFiles() {
412        return letter.isChecksumJournalFiles();
413    }
414
415    public boolean isCheckForCorruptJournalFiles() {
416        return letter.isCheckForCorruptJournalFiles();
417    }
418
419    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
420        letter.setChecksumJournalFiles(checksumJournalFiles);
421    }
422
423    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
424        letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
425    }
426
427    public void setBrokerService(BrokerService brokerService) {
428        letter.setBrokerService(brokerService);
429    }
430
431    public boolean isArchiveDataLogs() {
432        return letter.isArchiveDataLogs();
433    }
434
435    public void setArchiveDataLogs(boolean archiveDataLogs) {
436        letter.setArchiveDataLogs(archiveDataLogs);
437    }
438
439    public File getDirectoryArchive() {
440        return letter.getDirectoryArchive();
441    }
442
443    public void setDirectoryArchive(File directoryArchive) {
444        letter.setDirectoryArchive(directoryArchive);
445    }
446
447    public boolean isConcurrentStoreAndDispatchQueues() {
448        return letter.isConcurrentStoreAndDispatchQueues();
449    }
450
451    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
452        letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
453    }
454
455    public boolean isConcurrentStoreAndDispatchTopics() {
456        return letter.isConcurrentStoreAndDispatchTopics();
457    }
458
459    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
460        letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
461    }
462
463    public int getMaxAsyncJobs() {
464        return letter.getMaxAsyncJobs();
465    }
466    /**
467     * @param maxAsyncJobs
468     *            the maxAsyncJobs to set
469     */
470    public void setMaxAsyncJobs(int maxAsyncJobs) {
471        letter.setMaxAsyncJobs(maxAsyncJobs);
472    }
473    
474    /**
475     * @return the databaseLockedWaitDelay
476     */
477    public int getDatabaseLockedWaitDelay() {
478        return letter.getDatabaseLockedWaitDelay();
479    }
480
481    /**
482     * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
483     */
484    public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
485       letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
486    }
487
488    public boolean getForceRecoverIndex() {
489        return letter.getForceRecoverIndex();
490    }
491
492    public void setForceRecoverIndex(boolean forceRecoverIndex) {
493        letter.setForceRecoverIndex(forceRecoverIndex);
494    }
495
496    //  for testing
497    public KahaDBStore getStore() {
498        return letter;
499    }
500
501    @Override
502    public String toString() {
503        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
504        return "KahaDBPersistenceAdapter[" + path + "]";
505    }
506
507}