001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.kaha.impl;
018
019import java.io.File;
020import java.io.IOException;
021import java.io.RandomAccessFile;
022import java.nio.channels.FileLock;
023import java.util.Date;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.atomic.AtomicLong;
030
031import org.apache.activemq.kaha.ContainerId;
032import org.apache.activemq.kaha.ListContainer;
033import org.apache.activemq.kaha.MapContainer;
034import org.apache.activemq.kaha.Store;
035import org.apache.activemq.kaha.StoreLocation;
036import org.apache.activemq.kaha.impl.async.AsyncDataManager;
037import org.apache.activemq.kaha.impl.async.DataManagerFacade;
038import org.apache.activemq.kaha.impl.container.ListContainerImpl;
039import org.apache.activemq.kaha.impl.container.MapContainerImpl;
040import org.apache.activemq.kaha.impl.data.DataManagerImpl;
041import org.apache.activemq.kaha.impl.data.Item;
042import org.apache.activemq.kaha.impl.data.RedoListener;
043import org.apache.activemq.kaha.impl.index.IndexItem;
044import org.apache.activemq.kaha.impl.index.IndexManager;
045import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
046import org.apache.activemq.util.IOHelper;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * Store Implementation
052 * 
053 * 
054 */
055public class KahaStore implements Store {
056
057    private static final String PROPERTY_PREFIX = "org.apache.activemq.kaha.Store";
058    private static final boolean BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
059                                                                                     + ".FileLockBroken",
060                                                                                     "false"));
061    private static final boolean DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
062                                                                                    + ".DisableLocking",
063                                                                                    "false"));
064    //according to the String javadoc, all constant strings are interned so this will be the same object throughout the vm
065    //and we can use it as a monitor for the lockset.
066    private final static String LOCKSET_MONITOR = PROPERTY_PREFIX + ".Lock.Monitor";
067    private static final Logger LOG = LoggerFactory.getLogger(KahaStore.class);
068
069    private final File directory;
070    private final String mode;
071    private IndexRootContainer mapsContainer;
072    private IndexRootContainer listsContainer;
073    private final Map<ContainerId, ListContainerImpl> lists = new ConcurrentHashMap<ContainerId, ListContainerImpl>();
074    private final Map<ContainerId, MapContainerImpl> maps = new ConcurrentHashMap<ContainerId, MapContainerImpl>();
075    private final Map<String, DataManager> dataManagers = new ConcurrentHashMap<String, DataManager>();
076    private final Map<String, IndexManager> indexManagers = new ConcurrentHashMap<String, IndexManager>();
077    private boolean closed;
078    private boolean initialized;
079    private boolean logIndexChanges;
080    private boolean useAsyncDataManager;
081    private long maxDataFileLength = 1024 * 1024 * 32;
082    private FileLock lock;
083    private boolean persistentIndex = true;
084    private RandomAccessFile lockFile;
085    private final AtomicLong storeSize;
086    private String defaultContainerName = DEFAULT_CONTAINER_NAME;
087
088    
089    public KahaStore(String name, String mode) throws IOException {
090        this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, new AtomicLong());
091    }
092
093    public KahaStore(File directory, String mode) throws IOException {
094        this(directory, mode, new AtomicLong());
095    }
096
097    public KahaStore(String name, String mode,AtomicLong storeSize) throws IOException {
098        this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, storeSize);
099    }
100    
101    public KahaStore(File directory, String mode, AtomicLong storeSize) throws IOException {
102        this.mode = mode;
103        this.storeSize = storeSize;
104        this.directory = directory;
105        IOHelper.mkdirs(this.directory);
106    }
107
108    public synchronized void close() throws IOException {
109        if (!closed) {
110            closed = true;
111            if (initialized) {
112                unlock();
113                for (ListContainerImpl container : lists.values()) {
114                    container.close();
115                }
116                lists.clear();
117                for (MapContainerImpl container : maps.values()) {
118                    container.close();
119                }
120                maps.clear();
121                for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
122                    IndexManager im = iter.next();
123                    im.close();
124                    iter.remove();
125                }
126                for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
127                    DataManager dm = iter.next();
128                    dm.close();
129                    iter.remove();
130                }
131            }
132            if (lockFile!=null) {
133                lockFile.close();
134                lockFile=null;
135            }
136        }
137    }
138
139    public synchronized void force() throws IOException {
140        if (initialized) {
141            for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
142                IndexManager im = iter.next();
143                im.force();
144            }
145            for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
146                DataManager dm = iter.next();
147                dm.force();
148            }
149        }
150    }
151
152    public synchronized void clear() throws IOException {
153        initialize();
154        for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
155            ContainerId id = (ContainerId)i.next();
156            MapContainer container = getMapContainer(id.getKey(), id.getDataContainerName());
157            container.clear();
158        }
159        for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
160            ContainerId id = (ContainerId)i.next();
161            ListContainer container = getListContainer(id.getKey(), id.getDataContainerName());
162            container.clear();
163        }
164
165    }
166
167    public synchronized boolean delete() throws IOException {
168        boolean result = true;
169        if (initialized) {
170            clear();
171            for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
172                IndexManager im = iter.next();
173                result &= im.delete();
174                iter.remove();
175            }
176            for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
177                DataManager dm = iter.next();
178                result &= dm.delete();
179                iter.remove();
180            }
181        }
182        if (directory != null && directory.isDirectory()) {
183            result =IOHelper.deleteChildren(directory);
184            String str = result ? "successfully deleted" : "failed to delete";
185            LOG.info("Kaha Store " + str + " data directory " + directory);
186        }
187        return result;
188    }
189
190    public synchronized boolean isInitialized() {
191        return initialized;
192    }
193
194    public boolean doesMapContainerExist(Object id) throws IOException {
195        return doesMapContainerExist(id, defaultContainerName);
196    }
197
198    public synchronized boolean doesMapContainerExist(Object id, String containerName) throws IOException {
199        initialize();
200        ContainerId containerId = new ContainerId(id, containerName);
201        return maps.containsKey(containerId) || mapsContainer.doesRootExist(containerId);
202    }
203
204    public MapContainer getMapContainer(Object id) throws IOException {
205        return getMapContainer(id, defaultContainerName);
206    }
207
208    public MapContainer getMapContainer(Object id, String containerName) throws IOException {
209        return getMapContainer(id, containerName, persistentIndex);
210    }
211
212    public synchronized MapContainer getMapContainer(Object id, String containerName, boolean persistentIndex)
213        throws IOException {
214        initialize();
215        ContainerId containerId = new ContainerId(id, containerName);
216        MapContainerImpl result = maps.get(containerId);
217        if (result == null) {
218            DataManager dm = getDataManager(containerName);
219            IndexManager im = getIndexManager(dm, containerName);
220
221            IndexItem root = mapsContainer.getRoot(im, containerId);
222            if (root == null) {
223                root = mapsContainer.addRoot(im, containerId);
224            }
225            result = new MapContainerImpl(directory, containerId, root, im, dm, persistentIndex);
226            maps.put(containerId, result);
227        }
228        return result;
229    }
230
231    public void deleteMapContainer(Object id) throws IOException {
232        deleteMapContainer(id, defaultContainerName);
233    }
234
235    public void deleteMapContainer(Object id, String containerName) throws IOException {
236        ContainerId containerId = new ContainerId(id, containerName);
237        deleteMapContainer(containerId);
238    }
239
240    public synchronized void deleteMapContainer(ContainerId containerId) throws IOException {
241        initialize();
242        MapContainerImpl container = maps.remove(containerId);
243        if (container != null) {
244            container.clear();
245            mapsContainer.removeRoot(container.getIndexManager(), containerId);
246            container.close();
247        }
248    }
249
250    public synchronized Set<ContainerId> getMapContainerIds() throws IOException {
251        initialize();
252        Set<ContainerId> set = new HashSet<ContainerId>();
253        for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
254            ContainerId id = (ContainerId)i.next();
255            set.add(id);
256        }
257        return set;
258    }
259
260    public boolean doesListContainerExist(Object id) throws IOException {
261        return doesListContainerExist(id, defaultContainerName);
262    }
263
264    public synchronized boolean doesListContainerExist(Object id, String containerName) throws IOException {
265        initialize();
266        ContainerId containerId = new ContainerId(id, containerName);
267        return lists.containsKey(containerId) || listsContainer.doesRootExist(containerId);
268    }
269
270    public ListContainer getListContainer(Object id) throws IOException {
271        return getListContainer(id, defaultContainerName);
272    }
273
274    public ListContainer getListContainer(Object id, String containerName) throws IOException {
275        return getListContainer(id, containerName, persistentIndex);
276    }
277
278    public synchronized ListContainer getListContainer(Object id, String containerName,
279                                                       boolean persistentIndex) throws IOException {
280        initialize();
281        ContainerId containerId = new ContainerId(id, containerName);
282        ListContainerImpl result = lists.get(containerId);
283        if (result == null) {
284            DataManager dm = getDataManager(containerName);
285            IndexManager im = getIndexManager(dm, containerName);
286
287            IndexItem root = listsContainer.getRoot(im, containerId);
288            if (root == null) {
289                root = listsContainer.addRoot(im, containerId);
290            }
291            result = new ListContainerImpl(containerId, root, im, dm, persistentIndex);
292            lists.put(containerId, result);
293        }
294        return result;
295    }
296
297    public void deleteListContainer(Object id) throws IOException {
298        deleteListContainer(id, defaultContainerName);
299    }
300
301    public synchronized void deleteListContainer(Object id, String containerName) throws IOException {
302        ContainerId containerId = new ContainerId(id, containerName);
303        deleteListContainer(containerId);
304    }
305
306    public synchronized void deleteListContainer(ContainerId containerId) throws IOException {
307        initialize();
308        ListContainerImpl container = lists.remove(containerId);
309        if (container != null) {
310            listsContainer.removeRoot(container.getIndexManager(), containerId);
311            container.clear();
312            container.close();
313        }
314    }
315
316    public synchronized Set<ContainerId> getListContainerIds() throws IOException {
317        initialize();
318        Set<ContainerId> set = new HashSet<ContainerId>();
319        for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
320            ContainerId id = (ContainerId)i.next();
321            set.add(id);
322        }
323        return set;
324    }
325
326    /**
327     * @return the listsContainer
328     */
329    public IndexRootContainer getListsContainer() {
330        return this.listsContainer;
331    }
332
333    /**
334     * @return the mapsContainer
335     */
336    public IndexRootContainer getMapsContainer() {
337        return this.mapsContainer;
338    }
339
340    public synchronized DataManager getDataManager(String name) throws IOException {
341        DataManager dm = dataManagers.get(name);
342        if (dm == null) {
343            if (isUseAsyncDataManager()) {
344                AsyncDataManager t = new AsyncDataManager(storeSize);
345                t.setDirectory(directory);
346                t.setFilePrefix("async-data-" + name + "-");
347                t.setMaxFileLength((int)maxDataFileLength);
348                t.start();
349                dm = new DataManagerFacade(t, name);
350            } else {
351                DataManagerImpl t = new DataManagerImpl(directory, name,storeSize);
352                t.setMaxFileLength(maxDataFileLength);
353                dm = t;
354            }
355            if (logIndexChanges) {
356                recover(dm);
357            }
358            dataManagers.put(name, dm);
359        }
360        return dm;
361    }
362
363    public synchronized IndexManager getIndexManager(DataManager dm, String name) throws IOException {
364        IndexManager im = indexManagers.get(name);
365        if (im == null) {
366            im = new IndexManager(directory, name, mode, logIndexChanges ? dm : null,storeSize);
367            indexManagers.put(name, im);
368        }
369        return im;
370    }
371
372    private void recover(final DataManager dm) throws IOException {
373        dm.recoverRedoItems(new RedoListener() {
374            public void onRedoItem(StoreLocation item, Object o) throws Exception {
375                RedoStoreIndexItem redo = (RedoStoreIndexItem)o;
376                // IndexManager im = getIndexManager(dm, redo.getIndexName());
377                IndexManager im = getIndexManager(dm, dm.getName());
378                im.redo(redo);
379            }
380        });
381    }
382
383    public synchronized boolean isLogIndexChanges() {
384        return logIndexChanges;
385    }
386
387    public synchronized void setLogIndexChanges(boolean logIndexChanges) {
388        this.logIndexChanges = logIndexChanges;
389    }
390
391    /**
392     * @return the maxDataFileLength
393     */
394    public synchronized long getMaxDataFileLength() {
395        return maxDataFileLength;
396    }
397
398    /**
399     * @param maxDataFileLength the maxDataFileLength to set
400     */
401    public synchronized void setMaxDataFileLength(long maxDataFileLength) {
402        this.maxDataFileLength = maxDataFileLength;
403    }
404
405    /**
406     * @return the default index type
407     */
408    public synchronized String getIndexTypeAsString() {
409        return persistentIndex ? "PERSISTENT" : "VM";
410    }
411
412    /**
413     * Set the default index type
414     * 
415     * @param type "PERSISTENT" or "VM"
416     */
417    public synchronized void setIndexTypeAsString(String type) {
418        if (type.equalsIgnoreCase("VM")) {
419            persistentIndex = false;
420        } else {
421            persistentIndex = true;
422        }
423    }
424    
425    public boolean isPersistentIndex() {
426                return persistentIndex;
427        }
428
429        public void setPersistentIndex(boolean persistentIndex) {
430                this.persistentIndex = persistentIndex;
431        }
432        
433
434    public synchronized boolean isUseAsyncDataManager() {
435        return useAsyncDataManager;
436    }
437
438    public synchronized void setUseAsyncDataManager(boolean useAsyncWriter) {
439        this.useAsyncDataManager = useAsyncWriter;
440    }
441
442    /**
443     * @return size of store
444     * @see org.apache.activemq.kaha.Store#size()
445     */
446    public long size(){
447        return storeSize.get();
448    }
449
450    public String getDefaultContainerName() {
451        return defaultContainerName;
452    }
453
454    public void setDefaultContainerName(String defaultContainerName) {
455        this.defaultContainerName = defaultContainerName;
456    }
457
458    public synchronized void initialize() throws IOException {
459        if (closed) {
460            throw new IOException("Store has been closed.");
461        }
462        if (!initialized) {       
463            LOG.info("Kaha Store using data directory " + directory);
464            lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
465            lock();
466            DataManager defaultDM = getDataManager(defaultContainerName);
467            IndexManager rootIndexManager = getIndexManager(defaultDM, defaultContainerName);
468            IndexItem mapRoot = new IndexItem();
469            IndexItem listRoot = new IndexItem();
470            if (rootIndexManager.isEmpty()) {
471                mapRoot.setOffset(0);
472                rootIndexManager.storeIndex(mapRoot);
473                listRoot.setOffset(IndexItem.INDEX_SIZE);
474                rootIndexManager.storeIndex(listRoot);
475                rootIndexManager.setLength(IndexItem.INDEX_SIZE * 2);
476            } else {
477                mapRoot = rootIndexManager.getIndex(0);
478                listRoot = rootIndexManager.getIndex(IndexItem.INDEX_SIZE);
479            }
480            initialized = true;
481            mapsContainer = new IndexRootContainer(mapRoot, rootIndexManager, defaultDM);
482            listsContainer = new IndexRootContainer(listRoot, rootIndexManager, defaultDM);
483            /**
484             * Add interest in data files - then consolidate them
485             */
486            generateInterestInMapDataFiles();
487            generateInterestInListDataFiles();
488            for (Iterator<DataManager> i = dataManagers.values().iterator(); i.hasNext();) {
489                DataManager dm = i.next();
490                dm.consolidateDataFiles();
491            }
492        }
493    }
494
495    private void lock() throws IOException {
496        synchronized (LOCKSET_MONITOR) {
497            if (!DISABLE_LOCKING && directory != null && lock == null) {
498                String key = getPropertyKey();
499                String property = System.getProperty(key);
500                if (null == property) {
501                    if (!BROKEN_FILE_LOCK) {
502                        lock = lockFile.getChannel().tryLock(0, lockFile.getChannel().size(), false);
503                        if (lock == null) {
504                            throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + "  is already opened by another application");
505                        } else
506                            System.setProperty(key, new Date().toString());
507                    }
508                } else { //already locked
509                    throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + " is already opened by this application.");
510                }
511            }
512        }
513    }
514
515    private void unlock() throws IOException {
516        synchronized (LOCKSET_MONITOR) {
517            if (!DISABLE_LOCKING && (null != directory) && (null != lock)) {
518                System.getProperties().remove(getPropertyKey());
519                if (lock.isValid()) {
520                    lock.release();
521                }
522                lock = null;
523            }
524        }
525    }
526
527
528    private String getPropertyKey() throws IOException {
529        return getClass().getName() + ".lock." + directory.getCanonicalPath();
530    }
531
532    /**
533     * scans the directory and builds up the IndexManager and DataManager
534     * 
535     * @throws IOException if there is a problem accessing an index or data file
536     */
537    private void generateInterestInListDataFiles() throws IOException {
538        for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
539            ContainerId id = (ContainerId)i.next();
540            DataManager dm = getDataManager(id.getDataContainerName());
541            IndexManager im = getIndexManager(dm, id.getDataContainerName());
542            IndexItem theRoot = listsContainer.getRoot(im, id);
543            long nextItem = theRoot.getNextItem();
544            while (nextItem != Item.POSITION_NOT_SET) {
545                IndexItem item = im.getIndex(nextItem);
546                item.setOffset(nextItem);
547                dm.addInterestInFile(item.getKeyFile());
548                dm.addInterestInFile(item.getValueFile());
549                nextItem = item.getNextItem();
550            }
551        }
552    }
553
554    /**
555     * scans the directory and builds up the IndexManager and DataManager
556     * 
557     * @throws IOException if there is a problem accessing an index or data file
558     */
559    private void generateInterestInMapDataFiles() throws IOException {
560        for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
561            ContainerId id = (ContainerId)i.next();
562            DataManager dm = getDataManager(id.getDataContainerName());
563            IndexManager im = getIndexManager(dm, id.getDataContainerName());
564            IndexItem theRoot = mapsContainer.getRoot(im, id);
565            long nextItem = theRoot.getNextItem();
566            while (nextItem != Item.POSITION_NOT_SET) {
567                IndexItem item = im.getIndex(nextItem);
568                item.setOffset(nextItem);
569                dm.addInterestInFile(item.getKeyFile());
570                dm.addInterestInFile(item.getValueFile());
571                nextItem = item.getNextItem();
572            }
573
574        }
575    }
576}