001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.plist;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.File;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.Map.Entry;
031import org.apache.activemq.util.IOHelper;
032import org.apache.activemq.util.ServiceStopper;
033import org.apache.activemq.util.ServiceSupport;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.kahadb.index.BTreeIndex;
037import org.apache.kahadb.journal.Journal;
038import org.apache.kahadb.journal.Location;
039import org.apache.kahadb.page.Page;
040import org.apache.kahadb.page.PageFile;
041import org.apache.kahadb.page.Transaction;
042import org.apache.kahadb.util.ByteSequence;
043import org.apache.kahadb.util.IntegerMarshaller;
044import org.apache.kahadb.util.LockFile;
045import org.apache.kahadb.util.StringMarshaller;
046import org.apache.kahadb.util.VariableMarshaller;
047
048/**
049 * @org.apache.xbean.XBean
050 */
051public class PListStore extends ServiceSupport {
052    static final Logger LOG = LoggerFactory.getLogger(PListStore.class);
053    private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
054
055    static final int CLOSED_STATE = 1;
056    static final int OPEN_STATE = 2;
057
058    private File directory;
059    PageFile pageFile;
060    private Journal journal;
061    private LockFile lockFile;
062    private boolean failIfDatabaseIsLocked;
063    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
064    private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
065    private boolean enableIndexWriteAsync = false;
066    private boolean initialized = false;
067    // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
068    MetaData metaData = new MetaData(this);
069    final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
070    Map<String, PList> persistentLists = new HashMap<String, PList>();
071    final Object indexLock = new Object();
072
073    public Object getIndexLock() {
074        return indexLock;
075    }
076
077    protected class MetaData {
078        protected MetaData(PListStore store) {
079            this.store = store;
080        }
081
082        private final PListStore store;
083        Page<MetaData> page;
084        BTreeIndex<Integer, Integer> journalRC;
085        BTreeIndex<String, PList> storedSchedulers;
086
087        void createIndexes(Transaction tx) throws IOException {
088            this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
089            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
090        }
091
092        void load(Transaction tx) throws IOException {
093            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
094            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
095            this.storedSchedulers.load(tx);
096            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
097            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
098            this.journalRC.load(tx);
099        }
100
101        void loadLists(Transaction tx, Map<String, PList> schedulers) throws IOException {
102            for (Iterator<Entry<String, PList>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
103                Entry<String, PList> entry = i.next();
104                entry.getValue().load(tx);
105                schedulers.put(entry.getKey(), entry.getValue());
106            }
107        }
108
109        public void read(DataInput is) throws IOException {
110            this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, is.readLong());
111            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
112            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
113            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
114            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
115            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
116        }
117
118        public void write(DataOutput os) throws IOException {
119            os.writeLong(this.storedSchedulers.getPageId());
120            os.writeLong(this.journalRC.getPageId());
121
122        }
123    }
124
125    class MetaDataMarshaller extends VariableMarshaller<MetaData> {
126        private final PListStore store;
127
128        MetaDataMarshaller(PListStore store) {
129            this.store = store;
130        }
131        public MetaData readPayload(DataInput dataIn) throws IOException {
132            MetaData rc = new MetaData(this.store);
133            rc.read(dataIn);
134            return rc;
135        }
136
137        public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
138            object.write(dataOut);
139        }
140    }
141
142    class ValueMarshaller extends VariableMarshaller<List<EntryLocation>> {
143        public List<EntryLocation> readPayload(DataInput dataIn) throws IOException {
144            List<EntryLocation> result = new ArrayList<EntryLocation>();
145            int size = dataIn.readInt();
146            for (int i = 0; i < size; i++) {
147                EntryLocation jobLocation = new EntryLocation();
148                jobLocation.readExternal(dataIn);
149                result.add(jobLocation);
150            }
151            return result;
152        }
153
154        public void writePayload(List<EntryLocation> value, DataOutput dataOut) throws IOException {
155            dataOut.writeInt(value.size());
156            for (EntryLocation jobLocation : value) {
157                jobLocation.writeExternal(dataOut);
158            }
159        }
160    }
161
162    class JobSchedulerMarshaller extends VariableMarshaller<PList> {
163        private final PListStore store;
164        JobSchedulerMarshaller(PListStore store) {
165            this.store = store;
166        }
167        public PList readPayload(DataInput dataIn) throws IOException {
168            PList result = new PList(this.store);
169            result.read(dataIn);
170            return result;
171        }
172
173        public void writePayload(PList js, DataOutput dataOut) throws IOException {
174            js.write(dataOut);
175        }
176    }
177
178    public File getDirectory() {
179        return directory;
180    }
181
182    public void setDirectory(File directory) {
183        this.directory = directory;
184    }
185
186    public long size() {
187        synchronized (this) {
188            if (!initialized) {
189                return 0;
190            }
191        }
192        try {
193            return journal.getDiskSize() + pageFile.getDiskSize();
194        } catch (IOException e) {
195            throw new RuntimeException(e);
196        }
197    }
198
199    synchronized public PList getPList(final String name) throws Exception {
200        if (!isStarted()) {
201            throw new IllegalStateException("Not started");
202        }
203        intialize();
204        PList result = this.persistentLists.get(name);
205        if (result == null) {
206            final PList pl = new PList(this);
207            pl.setName(name);
208            getPageFile().tx().execute(new Transaction.Closure<IOException>() {
209                public void execute(Transaction tx) throws IOException {
210                    pl.setRootId(tx.allocate().getPageId());
211                    pl.load(tx);
212                    metaData.storedSchedulers.put(tx, name, pl);
213                }
214            });
215            result = pl;
216            this.persistentLists.put(name, pl);
217        }
218        final PList load = result;
219        getPageFile().tx().execute(new Transaction.Closure<IOException>() {
220            public void execute(Transaction tx) throws IOException {
221                load.load(tx);
222            }
223        });
224
225        return result;
226    }
227
228    synchronized public boolean removePList(final String name) throws Exception {
229        boolean result = false;
230        final PList pl = this.persistentLists.remove(name);
231        result = pl != null;
232        if (result) {
233            getPageFile().tx().execute(new Transaction.Closure<IOException>() {
234                public void execute(Transaction tx) throws IOException {
235                    metaData.storedSchedulers.remove(tx, name);
236                    pl.destroy(tx);
237                }
238            });
239        }
240        return result;
241    }
242
243    protected synchronized void intialize() throws Exception {
244        if (isStarted()) {
245            if (this.initialized == false) {
246                this.initialized = true;
247                if (this.directory == null) {
248                    this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
249                }
250                IOHelper.mkdirs(this.directory);
251                lock();
252                this.journal = new Journal();
253                this.journal.setDirectory(directory);
254                this.journal.setMaxFileLength(getJournalMaxFileLength());
255                this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
256                this.journal.start();
257                this.pageFile = new PageFile(directory, "tmpDB");
258                this.pageFile.load();
259
260                this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
261                    public void execute(Transaction tx) throws IOException {
262                        if (pageFile.getPageCount() == 0) {
263                            Page<MetaData> page = tx.allocate();
264                            assert page.getPageId() == 0;
265                            page.set(metaData);
266                            metaData.page = page;
267                            metaData.createIndexes(tx);
268                            tx.store(metaData.page, metaDataMarshaller, true);
269
270                        } else {
271                            Page<MetaData> page = tx.load(0, metaDataMarshaller);
272                            metaData = page.get();
273                            metaData.page = page;
274                        }
275                        metaData.load(tx);
276                        metaData.loadLists(tx, persistentLists);
277                    }
278                });
279
280                this.pageFile.flush();
281                LOG.info(this + " initialized");
282            }
283        }
284    }
285
286    @Override
287    protected synchronized void doStart() throws Exception {
288        LOG.info(this + " started");
289    }
290
291    @Override
292    protected synchronized void doStop(ServiceStopper stopper) throws Exception {
293        for (PList pl : this.persistentLists.values()) {
294            pl.unload();
295        }
296        if (this.pageFile != null) {
297            this.pageFile.unload();
298        }
299        if (this.journal != null) {
300            journal.close();
301        }
302        if (this.lockFile != null) {
303            this.lockFile.unlock();
304        }
305        this.lockFile = null;
306        this.initialized = false;
307        LOG.info(this + " stopped");
308
309    }
310
311    synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
312        int logId = location.getDataFileId();
313        Integer val = this.metaData.journalRC.get(tx, logId);
314        int refCount = val != null ? val.intValue() + 1 : 1;
315        this.metaData.journalRC.put(tx, logId, refCount);
316
317    }
318
319    synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
320        int logId = location.getDataFileId();
321        if (logId != Location.NOT_SET) {
322            int refCount = this.metaData.journalRC.get(tx, logId);
323            refCount--;
324            if (refCount <= 0) {
325                this.metaData.journalRC.remove(tx, logId);
326                Set<Integer> set = new HashSet<Integer>();
327                set.add(logId);
328                this.journal.removeDataFiles(set);
329            } else {
330                this.metaData.journalRC.put(tx, logId, refCount);
331            }
332        }
333    }
334
335    synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
336        ByteSequence result = null;
337        result = this.journal.read(location);
338        return result;
339    }
340
341    synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
342        return this.journal.write(payload, sync);
343    }
344
345    private void lock() throws IOException {
346        if (lockFile == null) {
347            File lockFileName = new File(directory, "lock");
348            lockFile = new LockFile(lockFileName, true);
349            if (failIfDatabaseIsLocked) {
350                lockFile.lock();
351            } else {
352                while (true) {
353                    try {
354                        lockFile.lock();
355                        break;
356                    } catch (IOException e) {
357                        LOG.info("Database " + lockFileName + " is locked... waiting "
358                                + (DATABASE_LOCKED_WAIT_DELAY / 1000)
359                                + " seconds for the database to be unlocked. Reason: " + e);
360                        try {
361                            Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
362                        } catch (InterruptedException e1) {
363                        }
364                    }
365                }
366            }
367        }
368    }
369
370    PageFile getPageFile() {
371        this.pageFile.isLoaded();
372        return this.pageFile;
373    }
374
375    public boolean isFailIfDatabaseIsLocked() {
376        return failIfDatabaseIsLocked;
377    }
378
379    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
380        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
381    }
382
383    public int getJournalMaxFileLength() {
384        return journalMaxFileLength;
385    }
386
387    public void setJournalMaxFileLength(int journalMaxFileLength) {
388        this.journalMaxFileLength = journalMaxFileLength;
389    }
390
391    public int getJournalMaxWriteBatchSize() {
392        return journalMaxWriteBatchSize;
393    }
394
395    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
396        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
397    }
398
399    public boolean isEnableIndexWriteAsync() {
400        return enableIndexWriteAsync;
401    }
402
403    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
404        this.enableIndexWriteAsync = enableIndexWriteAsync;
405    }
406
407    @Override
408    public String toString() {
409        return "PListStore:" + this.directory;
410    }
411
412}