001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.kahadb.journal;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Map;
025
026/**
027 * Used to pool DataFileAccessors.
028 * 
029 * @author chirino
030 */
031public class DataFileAccessorPool {
032
033    private final Journal journal;
034    private final Map<Integer, Pool> pools = new HashMap<Integer, Pool>();
035    private boolean closed;
036    private int maxOpenReadersPerFile = 5;
037
038    class Pool {
039
040        private final DataFile file;
041        private final List<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
042        private boolean used;
043        private int openCounter;
044        private boolean disposed;
045
046        public Pool(DataFile file) {
047            this.file = file;
048        }
049
050        public DataFileAccessor openDataFileReader() throws IOException {
051            DataFileAccessor rc = null;
052            if (pool.isEmpty()) {
053                rc = new DataFileAccessor(journal, file);
054            } else {
055                rc = pool.remove(pool.size() - 1);
056            }
057            used = true;
058            openCounter++;
059            return rc;
060        }
061
062        public synchronized void closeDataFileReader(DataFileAccessor reader) {
063            openCounter--;
064            if (pool.size() >= maxOpenReadersPerFile || disposed) {
065                reader.dispose();
066            } else {
067                pool.add(reader);
068            }
069        }
070
071        public synchronized void clearUsedMark() {
072            used = false;
073        }
074
075        public synchronized boolean isUsed() {
076            return used;
077        }
078
079        public synchronized void dispose() {
080            for (DataFileAccessor reader : pool) {
081                reader.dispose();
082            }
083            pool.clear();
084            disposed = true;
085        }
086
087        public synchronized int getOpenCounter() {
088            return openCounter;
089        }
090
091    }
092
093    public DataFileAccessorPool(Journal dataManager) {
094        this.journal = dataManager;
095    }
096
097    synchronized void clearUsedMark() {
098        for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
099            Pool pool = iter.next();
100            pool.clearUsedMark();
101        }
102    }
103
104    synchronized void disposeUnused() {
105        for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
106            Pool pool = iter.next();
107            if (!pool.isUsed()) {
108                pool.dispose();
109                iter.remove();
110            }
111        }
112    }
113
114    synchronized void disposeDataFileAccessors(DataFile dataFile) {
115        if (closed) {
116            throw new IllegalStateException("Closed.");
117        }
118        Pool pool = pools.get(dataFile.getDataFileId());
119        if (pool != null) {
120            if (pool.getOpenCounter() == 0) {
121                pool.dispose();
122                pools.remove(dataFile.getDataFileId());
123            } else {
124                throw new IllegalStateException("The data file is still in use: " + dataFile + ", use count: " + pool.getOpenCounter());
125            }
126        }
127    }
128
129    synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException {
130        if (closed) {
131            throw new IOException("Closed.");
132        }
133
134        Pool pool = pools.get(dataFile.getDataFileId());
135        if (pool == null) {
136            pool = new Pool(dataFile);
137            pools.put(dataFile.getDataFileId(), pool);
138        }
139        return pool.openDataFileReader();
140    }
141
142    synchronized void closeDataFileAccessor(DataFileAccessor reader) {
143        Pool pool = pools.get(reader.getDataFile().getDataFileId());
144        if (pool == null || closed) {
145            reader.dispose();
146        } else {
147            pool.closeDataFileReader(reader);
148        }
149    }
150
151    public synchronized void close() {
152        if (closed) {
153            return;
154        }
155        closed = true;
156        for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
157            Pool pool = iter.next();
158            pool.dispose();
159        }
160        pools.clear();
161    }
162
163}