001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.journal;
018
019import java.io.File;
020import java.io.IOException;
021
022import org.apache.activeio.journal.Journal;
023import org.apache.activeio.journal.active.JournalImpl;
024import org.apache.activeio.journal.active.JournalLockedException;
025import org.apache.activemq.store.PersistenceAdapter;
026import org.apache.activemq.store.PersistenceAdapterFactory;
027import org.apache.activemq.store.jdbc.DataSourceSupport;
028import org.apache.activemq.store.jdbc.JDBCAdapter;
029import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
030import org.apache.activemq.store.jdbc.Statements;
031import org.apache.activemq.thread.TaskRunnerFactory;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * Factory class that can create PersistenceAdapter objects.
037 * 
038 * @org.apache.xbean.XBean
039 * 
040 */
041public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory {
042
043    private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
044
045    private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapterFactory.class);
046
047    private int journalLogFileSize = 1024 * 1024 * 20;
048    private int journalLogFiles = 2;
049    private TaskRunnerFactory taskRunnerFactory;
050    private Journal journal;
051    private boolean useJournal = true;
052    private boolean useQuickJournal;
053    private File journalArchiveDirectory;
054    private boolean failIfJournalIsLocked;
055    private int journalThreadPriority = Thread.MAX_PRIORITY;
056    private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
057    private boolean useDedicatedTaskRunner;
058
059    public PersistenceAdapter createPersistenceAdapter() throws IOException {
060        jdbcPersistenceAdapter.setDataSource(getDataSource());
061
062        if (!useJournal) {
063            return jdbcPersistenceAdapter;
064        }
065        return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
066
067    }
068
069    public int getJournalLogFiles() {
070        return journalLogFiles;
071    }
072
073    /**
074     * Sets the number of journal log files to use
075     */
076    public void setJournalLogFiles(int journalLogFiles) {
077        this.journalLogFiles = journalLogFiles;
078    }
079
080    public int getJournalLogFileSize() {
081        return journalLogFileSize;
082    }
083
084    /**
085     * Sets the size of the journal log files
086     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
087     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
088     */
089    public void setJournalLogFileSize(int journalLogFileSize) {
090        this.journalLogFileSize = journalLogFileSize;
091    }
092
093    public JDBCPersistenceAdapter getJdbcAdapter() {
094        return jdbcPersistenceAdapter;
095    }
096
097    public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) {
098        this.jdbcPersistenceAdapter = jdbcAdapter;
099    }
100
101    public boolean isUseJournal() {
102        return useJournal;
103    }
104
105    /**
106     * Enables or disables the use of the journal. The default is to use the
107     * journal
108     * 
109     * @param useJournal
110     */
111    public void setUseJournal(boolean useJournal) {
112        this.useJournal = useJournal;
113    }
114
115    public boolean isUseDedicatedTaskRunner() {
116        return useDedicatedTaskRunner;
117    }
118    
119    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
120        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
121    }
122    
123    public TaskRunnerFactory getTaskRunnerFactory() {
124        if (taskRunnerFactory == null) {
125            taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority,
126                                                      true, 1000, isUseDedicatedTaskRunner());
127        }
128        return taskRunnerFactory;
129    }
130
131    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
132        this.taskRunnerFactory = taskRunnerFactory;
133    }
134
135    public Journal getJournal() throws IOException {
136        if (journal == null) {
137            createJournal();
138        }
139        return journal;
140    }
141
142    public void setJournal(Journal journal) {
143        this.journal = journal;
144    }
145
146    public File getJournalArchiveDirectory() {
147        if (journalArchiveDirectory == null && useQuickJournal) {
148            journalArchiveDirectory = new File(getDataDirectoryFile(), "journal");
149        }
150        return journalArchiveDirectory;
151    }
152
153    public void setJournalArchiveDirectory(File journalArchiveDirectory) {
154        this.journalArchiveDirectory = journalArchiveDirectory;
155    }
156
157    public boolean isUseQuickJournal() {
158        return useQuickJournal;
159    }
160
161    /**
162     * Enables or disables the use of quick journal, which keeps messages in the
163     * journal and just stores a reference to the messages in JDBC. Defaults to
164     * false so that messages actually reside long term in the JDBC database.
165     */
166    public void setUseQuickJournal(boolean useQuickJournal) {
167        this.useQuickJournal = useQuickJournal;
168    }
169
170    public JDBCAdapter getAdapter() throws IOException {
171        return jdbcPersistenceAdapter.getAdapter();
172    }
173
174    public void setAdapter(JDBCAdapter adapter) {
175        jdbcPersistenceAdapter.setAdapter(adapter);
176    }
177
178    public Statements getStatements() {
179        return jdbcPersistenceAdapter.getStatements();
180    }
181
182    public void setStatements(Statements statements) {
183        jdbcPersistenceAdapter.setStatements(statements);
184    }
185
186    public boolean isUseDatabaseLock() {
187        return jdbcPersistenceAdapter.isUseDatabaseLock();
188    }
189
190    /**
191     * Sets whether or not an exclusive database lock should be used to enable
192     * JDBC Master/Slave. Enabled by default.
193     */
194    public void setUseDatabaseLock(boolean useDatabaseLock) {
195        jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock);
196    }
197
198    public boolean isCreateTablesOnStartup() {
199        return jdbcPersistenceAdapter.isCreateTablesOnStartup();
200    }
201
202    /**
203     * Sets whether or not tables are created on startup
204     */
205    public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
206        jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup);
207    }
208
209    public int getJournalThreadPriority() {
210        return journalThreadPriority;
211    }
212
213    /**
214     * Sets the thread priority of the journal thread
215     */
216    public void setJournalThreadPriority(int journalThreadPriority) {
217        this.journalThreadPriority = journalThreadPriority;
218    }
219
220    /**
221     * @throws IOException
222     */
223    protected void createJournal() throws IOException {
224        File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile();
225        if (failIfJournalIsLocked) {
226            journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
227                                      getJournalArchiveDirectory());
228        } else {
229            while (true) {
230                try {
231                    journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
232                                              getJournalArchiveDirectory());
233                    break;
234                } catch (JournalLockedException e) {
235                    LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000)
236                             + " seconds for the journal to be unlocked.");
237                    try {
238                        Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
239                    } catch (InterruptedException e1) {
240                    }
241                }
242            }
243        }
244    }
245
246}