001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.kaha.impl.index; 018 019import java.io.File; 020import java.io.IOException; 021import java.io.RandomAccessFile; 022import java.nio.channels.FileLock; 023import java.util.concurrent.atomic.AtomicLong; 024 025import org.apache.activemq.kaha.impl.DataManager; 026import org.apache.activemq.util.IOHelper; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * Optimized Store reader 032 * 033 * 034 */ 035public final class IndexManager { 036 037 public static final String NAME_PREFIX = "index-"; 038 private static final Logger LOG = LoggerFactory.getLogger(IndexManager.class); 039 private final String name; 040 private File directory; 041 private File file; 042 private RandomAccessFile indexFile; 043 private StoreIndexReader reader; 044 private StoreIndexWriter writer; 045 private DataManager redoLog; 046 private String mode; 047 private long length; 048 private IndexItem firstFree; 049 private IndexItem lastFree; 050 private boolean dirty; 051 private final AtomicLong storeSize; 052 private int freeSize = 0; 053 054 public IndexManager(File directory, String name, String mode, DataManager redoLog, AtomicLong storeSize) throws IOException { 055 this.directory = directory; 056 this.name = name; 057 this.mode = mode; 058 this.redoLog = redoLog; 059 this.storeSize=storeSize; 060 initialize(); 061 } 062 063 public synchronized boolean isEmpty() { 064 return lastFree == null && length == 0; 065 } 066 067 public synchronized IndexItem getIndex(long offset) throws IOException { 068 IndexItem result = null; 069 if (offset >= 0) { 070 result = reader.readItem(offset); 071 } 072 return result; 073 } 074 075 public synchronized IndexItem refreshIndex(IndexItem item) throws IOException { 076 reader.updateIndexes(item); 077 return item; 078 } 079 080 public synchronized void freeIndex(IndexItem item) throws IOException { 081 item.reset(); 082 item.setActive(false); 083 if (lastFree == null) { 084 firstFree = item; 085 lastFree = item; 086 } else { 087 lastFree.setNextItem(item.getOffset()); 088 if (lastFree.equals(firstFree)) { 089 firstFree=new IndexItem(); 090 firstFree.copyIndex(lastFree); 091 writer.updateIndexes(firstFree); 092 } 093 writer.updateIndexes(lastFree); 094 lastFree=item; 095 } 096 writer.updateIndexes(item); 097 freeSize++; 098 dirty = true; 099 } 100 101 public synchronized void storeIndex(IndexItem index) throws IOException { 102 writer.storeItem(index); 103 dirty = true; 104 } 105 106 public synchronized void updateIndexes(IndexItem index) throws IOException { 107 try { 108 writer.updateIndexes(index); 109 } catch (Throwable e) { 110 LOG.error(name + " error updating indexes ", e); 111 } 112 dirty = true; 113 } 114 115 public synchronized void redo(final RedoStoreIndexItem redo) throws IOException { 116 writer.redoStoreItem(redo); 117 dirty = true; 118 } 119 120 public synchronized IndexItem createNewIndex() throws IOException { 121 IndexItem result = getNextFreeIndex(); 122 if (result == null) { 123 // allocate one 124 result = new IndexItem(); 125 result.setOffset(length); 126 length += IndexItem.INDEX_SIZE; 127 storeSize.addAndGet(IndexItem.INDEX_SIZE); 128 } 129 return result; 130 } 131 132 public synchronized void close() throws IOException { 133 if (indexFile != null) { 134 indexFile.close(); 135 indexFile = null; 136 } 137 } 138 139 public synchronized void force() throws IOException { 140 if (indexFile != null && dirty) { 141 indexFile.getFD().sync(); 142 dirty = false; 143 } 144 } 145 146 public synchronized boolean delete() throws IOException { 147 firstFree = null; 148 lastFree = null; 149 if (indexFile != null) { 150 indexFile.close(); 151 indexFile = null; 152 } 153 return file.delete(); 154 } 155 156 private synchronized IndexItem getNextFreeIndex() throws IOException { 157 IndexItem result = null; 158 if (firstFree != null) { 159 if (firstFree.equals(lastFree)) { 160 result = firstFree; 161 firstFree = null; 162 lastFree = null; 163 } else { 164 result = firstFree; 165 firstFree = getIndex(firstFree.getNextItem()); 166 if (firstFree == null) { 167 lastFree = null; 168 } 169 } 170 result.reset(); 171 writer.updateIndexes(result); 172 freeSize--; 173 } 174 return result; 175 } 176 177 synchronized long getLength() { 178 return length; 179 } 180 181 public final long size() { 182 return length; 183 } 184 185 public synchronized void setLength(long value) { 186 this.length = value; 187 storeSize.addAndGet(length); 188 } 189 190 public synchronized FileLock getLock() throws IOException { 191 return indexFile.getChannel().tryLock(0, indexFile.getChannel().size(), false); 192 } 193 194 195 public String toString() { 196 return "IndexManager:(" + NAME_PREFIX + name + ")"; 197 } 198 199 protected void initialize() throws IOException { 200 file = new File(directory, NAME_PREFIX + IOHelper.toFileSystemSafeName(name) ); 201 IOHelper.mkdirs(file.getParentFile()); 202 indexFile = new RandomAccessFile(file, mode); 203 reader = new StoreIndexReader(indexFile); 204 writer = new StoreIndexWriter(indexFile, name, redoLog); 205 long offset = 0; 206 while ((offset + IndexItem.INDEX_SIZE) <= indexFile.length()) { 207 IndexItem index = reader.readItem(offset); 208 if (!index.isActive()) { 209 index.reset(); 210 if (lastFree != null) { 211 lastFree.setNextItem(index.getOffset()); 212 updateIndexes(lastFree); 213 lastFree = index; 214 } else { 215 lastFree = index; 216 firstFree = index; 217 } 218 freeSize++; 219 } 220 offset += IndexItem.INDEX_SIZE; 221 } 222 length = offset; 223 storeSize.addAndGet(length); 224 } 225}