001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.scheduler;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.File;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.Map.Entry;
031import org.apache.activemq.util.IOHelper;
032import org.apache.activemq.util.ServiceStopper;
033import org.apache.activemq.util.ServiceSupport;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.kahadb.index.BTreeIndex;
037import org.apache.kahadb.journal.Journal;
038import org.apache.kahadb.journal.Location;
039import org.apache.kahadb.page.Page;
040import org.apache.kahadb.page.PageFile;
041import org.apache.kahadb.page.Transaction;
042import org.apache.kahadb.util.ByteSequence;
043import org.apache.kahadb.util.IntegerMarshaller;
044import org.apache.kahadb.util.LockFile;
045import org.apache.kahadb.util.StringMarshaller;
046import org.apache.kahadb.util.VariableMarshaller;
047
048public class JobSchedulerStore extends ServiceSupport {
049    static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
050    private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
051
052    public static final int CLOSED_STATE = 1;
053    public static final int OPEN_STATE = 2;
054
055    private File directory;
056    PageFile pageFile;
057    private Journal journal;
058    private LockFile lockFile;
059    private boolean failIfDatabaseIsLocked;
060    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
061    private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
062    private boolean enableIndexWriteAsync = false;
063    // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
064    MetaData metaData = new MetaData(this);
065    final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
066    Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
067
068    protected class MetaData {
069        protected MetaData(JobSchedulerStore store) {
070            this.store = store;
071        }
072        private final JobSchedulerStore store;
073        Page<MetaData> page;
074        BTreeIndex<Integer, Integer> journalRC;
075        BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
076
077        void createIndexes(Transaction tx) throws IOException {
078            this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
079            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
080        }
081
082        void load(Transaction tx) throws IOException {
083            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
084            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
085            this.storedSchedulers.load(tx);
086            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
087            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
088            this.journalRC.load(tx);
089        }
090
091        void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
092            for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
093                Entry<String, JobSchedulerImpl> entry = i.next();
094                entry.getValue().load(tx);
095                schedulers.put(entry.getKey(), entry.getValue());
096            }
097        }
098
099        public void read(DataInput is) throws IOException {
100            this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
101            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
102            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
103            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
104            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
105            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
106        }
107
108        public void write(DataOutput os) throws IOException {
109            os.writeLong(this.storedSchedulers.getPageId());
110            os.writeLong(this.journalRC.getPageId());
111
112        }
113    }
114
115    class MetaDataMarshaller extends VariableMarshaller<MetaData> {
116        private final JobSchedulerStore store;
117
118        MetaDataMarshaller(JobSchedulerStore store) {
119            this.store = store;
120        }
121        public MetaData readPayload(DataInput dataIn) throws IOException {
122            MetaData rc = new MetaData(this.store);
123            rc.read(dataIn);
124            return rc;
125        }
126
127        public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
128            object.write(dataOut);
129        }
130    }
131
132    class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
133        public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
134            List<JobLocation> result = new ArrayList<JobLocation>();
135            int size = dataIn.readInt();
136            for (int i = 0; i < size; i++) {
137                JobLocation jobLocation = new JobLocation();
138                jobLocation.readExternal(dataIn);
139                result.add(jobLocation);
140            }
141            return result;
142        }
143
144        public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
145            dataOut.writeInt(value.size());
146            for (JobLocation jobLocation : value) {
147                jobLocation.writeExternal(dataOut);
148            }
149        }
150    }
151
152    class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
153        private final JobSchedulerStore store;
154        JobSchedulerMarshaller(JobSchedulerStore store) {
155            this.store = store;
156        }
157        public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
158            JobSchedulerImpl result = new JobSchedulerImpl(this.store);
159            result.read(dataIn);
160            return result;
161        }
162
163        public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
164            js.write(dataOut);
165        }
166    }
167
168    public File getDirectory() {
169        return directory;
170    }
171
172    public void setDirectory(File directory) {
173        this.directory = directory;
174    }
175    
176    public long size() {
177        if ( !isStarted() ) {
178            return 0;
179        }
180        try {
181            return journal.getDiskSize() + pageFile.getDiskSize();
182        } catch (IOException e) {
183            throw new RuntimeException(e);
184        }
185    }
186
187    public JobScheduler getJobScheduler(final String name) throws Exception {
188        JobSchedulerImpl result = this.schedulers.get(name);
189        if (result == null) {
190            final JobSchedulerImpl js = new JobSchedulerImpl(this);
191            js.setName(name);
192            getPageFile().tx().execute(new Transaction.Closure<IOException>() {
193                public void execute(Transaction tx) throws IOException {
194                    js.createIndexes(tx);
195                    js.load(tx);
196                    metaData.storedSchedulers.put(tx, name, js);
197                }
198            });
199            result = js;
200            this.schedulers.put(name, js);
201            if (isStarted()) {
202                result.start();
203            }
204            this.pageFile.flush();
205        }
206        return result;
207    }
208
209    synchronized public boolean removeJobScheduler(final String name) throws Exception {
210        boolean result = false;
211        final JobSchedulerImpl js = this.schedulers.remove(name);
212        result = js != null;
213        if (result) {
214            js.stop();
215            getPageFile().tx().execute(new Transaction.Closure<IOException>() {
216                public void execute(Transaction tx) throws IOException {
217                    metaData.storedSchedulers.remove(tx, name);
218                    js.destroy(tx);
219                }
220            });
221        }
222        return result;
223    }
224
225    @Override
226    protected synchronized void doStart() throws Exception {
227        if (this.directory == null) {
228            this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
229        }
230        IOHelper.mkdirs(this.directory);
231        lock();
232        this.journal = new Journal();
233        this.journal.setDirectory(directory);
234        this.journal.setMaxFileLength(getJournalMaxFileLength());
235        this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
236        this.journal.start();
237        this.pageFile = new PageFile(directory, "scheduleDB");
238        this.pageFile.load();
239
240        this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
241            public void execute(Transaction tx) throws IOException {
242                if (pageFile.getPageCount() == 0) {
243                    Page<MetaData> page = tx.allocate();
244                    assert page.getPageId() == 0;
245                    page.set(metaData);
246                    metaData.page = page;
247                    metaData.createIndexes(tx);
248                    tx.store(metaData.page, metaDataMarshaller, true);
249
250                } else {
251                    Page<MetaData> page = tx.load(0, metaDataMarshaller);
252                    metaData = page.get();
253                    metaData.page = page;
254                }
255                metaData.load(tx);
256                metaData.loadScheduler(tx, schedulers);
257                for (JobSchedulerImpl js :schedulers.values()) {
258                    try {
259                        js.start();
260                    } catch (Exception e) {
261                        JobSchedulerStore.LOG.error("Failed to load " + js.getName(),e);
262                    }
263               }
264            }
265        });
266
267        this.pageFile.flush();
268        LOG.info(this + " started");
269    }
270    
271    @Override
272    protected synchronized void doStop(ServiceStopper stopper) throws Exception {
273        for (JobSchedulerImpl js : this.schedulers.values()) {
274            js.stop();
275        }
276        if (this.pageFile != null) {
277            this.pageFile.unload();
278        }
279        if (this.journal != null) {
280            journal.close();
281        }
282        if (this.lockFile != null) {
283            this.lockFile.unlock();
284        }
285        this.lockFile = null;
286        LOG.info(this + " stopped");
287
288    }
289
290    synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
291        int logId = location.getDataFileId();
292        Integer val = this.metaData.journalRC.get(tx, logId);
293        int refCount = val != null ? val.intValue() + 1 : 1;
294        this.metaData.journalRC.put(tx, logId, refCount);
295
296    }
297
298    synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
299        int logId = location.getDataFileId();
300        int refCount = this.metaData.journalRC.get(tx, logId);
301        refCount--;
302        if (refCount <= 0) {
303            this.metaData.journalRC.remove(tx, logId);
304            Set<Integer> set = new HashSet<Integer>();
305            set.add(logId);
306            this.journal.removeDataFiles(set);
307        } else {
308            this.metaData.journalRC.put(tx, logId, refCount);
309        }
310
311    }
312
313    synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
314        ByteSequence result = null;
315        result = this.journal.read(location);
316        return result;
317    }
318
319    synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
320        return this.journal.write(payload, sync);
321    }
322
323    private void lock() throws IOException {
324        if (lockFile == null) {
325            File lockFileName = new File(directory, "lock");
326            lockFile = new LockFile(lockFileName, true);
327            if (failIfDatabaseIsLocked) {
328                lockFile.lock();
329            } else {
330                while (true) {
331                    try {
332                        lockFile.lock();
333                        break;
334                    } catch (IOException e) {
335                        LOG.info("Database " + lockFileName + " is locked... waiting "
336                                + (DATABASE_LOCKED_WAIT_DELAY / 1000)
337                                + " seconds for the database to be unlocked. Reason: " + e);
338                        try {
339                            Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
340                        } catch (InterruptedException e1) {
341                        }
342                    }
343                }
344            }
345        }
346    }
347
348    PageFile getPageFile() {
349        this.pageFile.isLoaded();
350        return this.pageFile;
351    }
352
353    public boolean isFailIfDatabaseIsLocked() {
354        return failIfDatabaseIsLocked;
355    }
356
357    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
358        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
359    }
360
361    public int getJournalMaxFileLength() {
362        return journalMaxFileLength;
363    }
364
365    public void setJournalMaxFileLength(int journalMaxFileLength) {
366        this.journalMaxFileLength = journalMaxFileLength;
367    }
368
369    public int getJournalMaxWriteBatchSize() {
370        return journalMaxWriteBatchSize;
371    }
372
373    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
374        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
375    }
376
377    public boolean isEnableIndexWriteAsync() {
378        return enableIndexWriteAsync;
379    }
380
381    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
382        this.enableIndexWriteAsync = enableIndexWriteAsync;
383    }
384
385    @Override
386    public String toString() {
387        return "JobSchedulerStore:" + this.directory;
388    }
389
390}