001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.kahadb.page;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.io.RandomAccessFile;
029import java.util.*;
030import java.util.Map.Entry;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicLong;
034import java.util.zip.Adler32;
035import java.util.zip.Checksum;
036
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039import org.apache.kahadb.util.DataByteArrayOutputStream;
040import org.apache.kahadb.util.IOExceptionSupport;
041import org.apache.kahadb.util.IOHelper;
042import org.apache.kahadb.util.IntrospectionSupport;
043import org.apache.kahadb.util.LRUCache;
044import org.apache.kahadb.util.Sequence;
045import org.apache.kahadb.util.SequenceSet;
046
047/**
048 * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should 
049 * be externally synchronized.
050 * 
051 * The file has 3 parts:
052 * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file.
053 * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent
054 * Page Space: The pages in the page file.
055 * 
056 * 
057 */
058public class PageFile {
059    
060    private static final String PAGEFILE_SUFFIX = ".data";
061    private static final String RECOVERY_FILE_SUFFIX = ".redo";
062    private static final String FREE_FILE_SUFFIX = ".free";
063    
064    // 4k Default page size.
065    public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4)); 
066    public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", ""+1000));
067    private static final int RECOVERY_FILE_HEADER_SIZE=1024*4;
068    private static final int PAGE_FILE_HEADER_SIZE=1024*4;
069
070    // Recovery header is (long offset)
071    private static final Logger LOG = LoggerFactory.getLogger(PageFile.class);
072
073    // A PageFile will use a couple of files in this directory
074    private File directory;
075    // And the file names in that directory will be based on this name.
076    private final String name;
077    
078    // File handle used for reading pages..
079    private RandomAccessFile readFile;
080    // File handle used for writing pages..
081    private RandomAccessFile writeFile;
082    // File handle used for writing pages..
083    private RandomAccessFile recoveryFile;
084
085    // The size of pages
086    private int pageSize = DEFAULT_PAGE_SIZE;
087    
088    // The minimum number of space allocated to the recovery file in number of pages.
089    private int recoveryFileMinPageCount = 1000;
090    // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize 
091    // to this max size as soon as  possible.
092    private int recoveryFileMaxPageCount = 10000;
093    // The number of pages in the current recovery buffer
094    private int recoveryPageCount;
095
096    private AtomicBoolean loaded = new AtomicBoolean();
097    // The number of pages we are aiming to write every time we 
098    // write to disk.
099    int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
100
101    // We keep a cache of pages recently used?
102    private Map<Long, Page> pageCache;
103    // The cache of recently used pages.
104    private boolean enablePageCaching=true;
105    // How many pages will we keep in the cache?
106    private int pageCacheSize = 100;
107    
108    // Should first log the page write to the recovery buffer? Avoids partial
109    // page write failures..
110    private boolean enableRecoveryFile=true;
111    // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
112    private boolean enableDiskSyncs=true;
113    // Will writes be done in an async thread?
114    private boolean enabledWriteThread=false;
115
116    // These are used if enableAsyncWrites==true 
117    private AtomicBoolean stopWriter = new AtomicBoolean();
118    private Thread writerThread;
119    private CountDownLatch checkpointLatch;
120
121    // Keeps track of writes that are being written to disk.
122    private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
123
124    // Keeps track of free pages.
125    private final AtomicLong nextFreePageId = new AtomicLong();
126    private SequenceSet freeList = new SequenceSet();
127    
128    private AtomicLong nextTxid = new AtomicLong();
129    
130    // Persistent settings stored in the page file. 
131    private MetaData metaData;
132    
133    /**
134     * Use to keep track of updated pages which have not yet been committed.
135     */
136    static class PageWrite {
137        Page page;
138        byte[] current;
139        byte[] diskBound;
140
141        public PageWrite(Page page, byte[] data) {
142            this.page=page;
143            current=data;
144        }
145                
146        public void setCurrent(Page page, byte[] data) {
147            this.page=page;
148            current=data;
149        }
150
151        @Override
152        public String toString() {
153            return "[PageWrite:"+page.getPageId()+"]";
154        }
155
156        @SuppressWarnings("unchecked")
157        public Page getPage() {
158            return page;
159        }
160        
161        void begin() {
162           diskBound = current;
163           current = null;
164        }
165        
166        /**
167         * @return true if there is no pending writes to do.
168         */
169        boolean done() {
170            diskBound=null;
171            return current == null;
172        }
173        
174        boolean isDone() {
175            return diskBound == null && current == null;
176        }
177
178    }
179    
180    /**
181     * The MetaData object hold the persistent data associated with a PageFile object. 
182     */
183    public static class MetaData {
184        
185        String fileType;
186        String fileTypeVersion;
187        
188        long metaDataTxId=-1;
189        int pageSize;
190        boolean cleanShutdown;
191        long lastTxId;
192        long freePages;
193        
194        public String getFileType() {
195            return fileType;
196        }
197        public void setFileType(String fileType) {
198            this.fileType = fileType;
199        }
200        public String getFileTypeVersion() {
201            return fileTypeVersion;
202        }
203        public void setFileTypeVersion(String version) {
204            this.fileTypeVersion = version;
205        }
206        public long getMetaDataTxId() {
207            return metaDataTxId;
208        }
209        public void setMetaDataTxId(long metaDataTxId) {
210            this.metaDataTxId = metaDataTxId;
211        }
212        public int getPageSize() {
213            return pageSize;
214        }
215        public void setPageSize(int pageSize) {
216            this.pageSize = pageSize;
217        }
218        public boolean isCleanShutdown() {
219            return cleanShutdown;
220        }
221        public void setCleanShutdown(boolean cleanShutdown) {
222            this.cleanShutdown = cleanShutdown;
223        }
224        public long getLastTxId() {
225            return lastTxId;
226        }
227        public void setLastTxId(long lastTxId) {
228            this.lastTxId = lastTxId;
229        }
230        public long getFreePages() {
231            return freePages;
232        }
233        public void setFreePages(long value) {
234            this.freePages = value;
235        }
236    }
237
238    public Transaction tx() {
239        assertLoaded();
240        return new Transaction(this);
241    }
242    
243    /**
244     * Creates a PageFile in the specified directory who's data files are named by name.
245     * 
246     * @param directory
247     * @param name
248     */
249    public PageFile(File directory, String name) {
250        this.directory = directory;
251        this.name = name;
252    }
253    
254    /**
255     * Deletes the files used by the PageFile object.  This method can only be used when this object is not loaded.
256     * 
257     * @throws IOException 
258     *         if the files cannot be deleted.
259     * @throws IllegalStateException 
260     *         if this PageFile is loaded
261     */
262    public void delete() throws IOException {
263        if( loaded.get() ) {
264            throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
265        }
266        delete(getMainPageFile());
267        delete(getFreeFile());
268        delete(getRecoveryFile());
269    }
270
271    /**
272     * @param file
273     * @throws IOException
274     */
275    private void delete(File file) throws IOException {
276        if( file.exists() ) {
277            if( !file.delete() ) {
278                throw new IOException("Could not delete: "+file.getPath());
279            }
280        }
281    }
282    
283    /**
284     * Loads the page file so that it can be accessed for read/write purposes.  This allocates OS resources.  If this is the 
285     * first time the page file is loaded, then this creates the page file in the file system.
286     * 
287     * @throws IOException
288     *         If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if 
289     *         there was a disk error.
290     * @throws IllegalStateException 
291     *         If the page file was already loaded.
292     */
293    public void load() throws IOException, IllegalStateException {
294        if (loaded.compareAndSet(false, true)) {
295            
296            if( enablePageCaching ) {
297                pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true));
298            }
299            
300            File file = getMainPageFile();
301            IOHelper.mkdirs(file.getParentFile());
302            writeFile = new RandomAccessFile(file, "rw");
303            readFile = new RandomAccessFile(file, "r");
304            
305            if (readFile.length() > 0) {
306                // Load the page size setting cause that can't change once the file is created.
307                loadMetaData();
308                pageSize = metaData.getPageSize();
309            } else {
310                // Store the page size setting cause that can't change once the file is created.
311                metaData = new MetaData();
312                metaData.setFileType(PageFile.class.getName());
313                metaData.setFileTypeVersion("1");
314                metaData.setPageSize(getPageSize());
315                metaData.setCleanShutdown(true);
316                metaData.setFreePages(-1);
317                metaData.setLastTxId(0);
318                storeMetaData();
319            }
320
321            if( enableRecoveryFile ) {
322                recoveryFile = new RandomAccessFile(getRecoveryFile(), "rw");
323            }
324            
325            if(  metaData.isCleanShutdown() ) {
326                nextTxid.set(metaData.getLastTxId()+1);
327                if( metaData.getFreePages()>0 ) {
328                    loadFreeList();
329                } 
330            } else {
331                LOG.debug(toString() + ", Recovering page file...");
332                nextTxid.set(redoRecoveryUpdates());
333                
334                // Scan all to find the free pages.
335                freeList = new SequenceSet();
336                for (Iterator i = tx().iterator(true); i.hasNext();) {
337                    Page page = (Page)i.next();
338                    if( page.getType() == Page.PAGE_FREE_TYPE ) {
339                        freeList.add(page.getPageId());
340                    }
341                }
342                
343            }
344            
345            metaData.setCleanShutdown(false);
346            storeMetaData();
347            getFreeFile().delete();
348            
349            if( writeFile.length() < PAGE_FILE_HEADER_SIZE) {
350                writeFile.setLength(PAGE_FILE_HEADER_SIZE);
351            }
352            nextFreePageId.set((writeFile.length()-PAGE_FILE_HEADER_SIZE)/pageSize);
353            startWriter();
354                
355        } else {
356            throw new IllegalStateException("Cannot load the page file when it is allready loaded.");
357        }
358    }
359
360
361    /**
362     * Unloads a previously loaded PageFile.  This deallocates OS related resources like file handles.
363     * once unloaded, you can no longer use the page file to read or write Pages.
364     * 
365     * @throws IOException
366     *         if there was a disk error occurred while closing the down the page file.
367     * @throws IllegalStateException
368     *         if the PageFile is not loaded
369     */
370    public void unload() throws IOException {
371        if (loaded.compareAndSet(true, false)) {
372            flush();
373            try {
374                stopWriter();
375            } catch (InterruptedException e) {
376                throw new InterruptedIOException();
377            }
378            
379            if( freeList.isEmpty() ) {
380                metaData.setFreePages(0);
381            } else {
382                storeFreeList();
383                metaData.setFreePages(freeList.size());
384            }
385            
386            metaData.setLastTxId( nextTxid.get()-1 );
387            metaData.setCleanShutdown(true);
388            storeMetaData();
389            
390            if (readFile != null) {
391                readFile.close();
392                readFile = null;
393                writeFile.close();
394                writeFile=null;
395                if( enableRecoveryFile ) {
396                    recoveryFile.close();
397                    recoveryFile=null;
398                }
399                freeList.clear();
400                if( pageCache!=null ) {
401                    pageCache=null;
402                }
403                synchronized(writes) {
404                    writes.clear();
405                }
406            }
407        } else {
408            throw new IllegalStateException("Cannot unload the page file when it is not loaded");
409        }
410    }
411        
412    public boolean isLoaded() {
413        return loaded.get();
414    }
415
416    /**
417     * Flush and sync all write buffers to disk.
418     * 
419     * @throws IOException
420     *         If an disk error occurred.
421     */
422    public void flush() throws IOException {
423
424        if( enabledWriteThread && stopWriter.get() ) {
425            throw new IOException("Page file already stopped: checkpointing is not allowed");
426        }
427        
428        // Setup a latch that gets notified when all buffered writes hits the disk.
429        CountDownLatch checkpointLatch;
430        synchronized( writes ) {
431            if( writes.isEmpty()) {                
432                return;
433            }
434            if( enabledWriteThread ) {
435                if( this.checkpointLatch == null ) {
436                    this.checkpointLatch = new CountDownLatch(1);
437                }
438                checkpointLatch = this.checkpointLatch;
439                writes.notify();
440            } else {
441                writeBatch();
442                return;
443            }
444        }
445        try {
446            checkpointLatch.await();
447        } catch (InterruptedException e) {
448            throw new InterruptedIOException();
449        }
450    }
451
452    
453    public String toString() {
454        return "Page File: "+getMainPageFile();
455    }
456    
457    ///////////////////////////////////////////////////////////////////
458    // Private Implementation Methods
459    ///////////////////////////////////////////////////////////////////
460    private File getMainPageFile() {
461        return new File(directory, IOHelper.toFileSystemSafeName(name)+PAGEFILE_SUFFIX);
462    }
463    
464    public File getFreeFile() {
465        return new File(directory, IOHelper.toFileSystemSafeName(name)+FREE_FILE_SUFFIX);
466    } 
467
468    public File getRecoveryFile() {
469        return new File(directory, IOHelper.toFileSystemSafeName(name)+RECOVERY_FILE_SUFFIX);
470    } 
471
472    private long toOffset(long pageId) {
473        return PAGE_FILE_HEADER_SIZE+(pageId*pageSize);
474    }
475
476    private void loadMetaData() throws IOException {
477
478        ByteArrayInputStream is;
479        MetaData v1 = new MetaData();
480        MetaData v2 = new MetaData();
481        try {
482            Properties p = new Properties();
483            byte[] d = new byte[PAGE_FILE_HEADER_SIZE/2];
484            readFile.seek(0);
485            readFile.readFully(d);
486            is = new ByteArrayInputStream(d);
487            p.load(is);
488            IntrospectionSupport.setProperties(v1, p);
489        } catch (IOException e) {
490            v1 = null;
491        }
492        
493        try {
494            Properties p = new Properties();
495            byte[] d = new byte[PAGE_FILE_HEADER_SIZE/2];
496            readFile.seek(PAGE_FILE_HEADER_SIZE/2);
497            readFile.readFully(d);
498            is = new ByteArrayInputStream(d);
499            p.load(is);
500            IntrospectionSupport.setProperties(v2, p);
501        } catch (IOException e) {
502            v2 = null;
503        }
504        
505        if( v1==null && v2==null ) {
506            throw new IOException("Could not load page file meta data");
507        } 
508        
509        if( v1 == null || v1.metaDataTxId<0 ) {
510            metaData = v2;
511        } else if( v2==null || v1.metaDataTxId<0 ) {
512            metaData = v1;
513        } else if( v1.metaDataTxId==v2.metaDataTxId ) {
514            metaData = v1; // use the first since the 2nd could be a partial..
515        } else {
516            metaData = v2; // use the second cause the first is probably a partial.
517        }
518    }
519    
520    private void storeMetaData() throws IOException {
521        // Convert the metadata into a property format
522        metaData.metaDataTxId++;
523        Properties p = new Properties();
524        IntrospectionSupport.getProperties(metaData, p, null);
525        
526        ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
527        p.store(os, "");
528        if( os.size() > PAGE_FILE_HEADER_SIZE/2) { 
529            throw new IOException("Configuation is to larger than: "+PAGE_FILE_HEADER_SIZE/2);
530        }
531        // Fill the rest with space...
532        byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE/2)-os.size()];
533        Arrays.fill(filler, (byte)' ');
534        os.write(filler);
535        os.flush();
536        
537        byte[] d = os.toByteArray();
538
539        // So we don't loose it.. write it 2 times...
540        writeFile.seek(0);
541        writeFile.write(d);
542        writeFile.getFD().sync();
543        writeFile.seek(PAGE_FILE_HEADER_SIZE/2);
544        writeFile.write(d);
545        writeFile.getFD().sync();
546    }
547
548    private void storeFreeList() throws IOException {
549        FileOutputStream os = new FileOutputStream(getFreeFile());
550        DataOutputStream dos = new DataOutputStream(os);
551        SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos);
552        dos.close();
553    }
554
555    private void loadFreeList() throws IOException {
556        freeList.clear();
557        FileInputStream is = new FileInputStream(getFreeFile());
558        DataInputStream dis = new DataInputStream(is);
559        freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis);
560        dis.close();
561    }
562    
563    ///////////////////////////////////////////////////////////////////
564    // Property Accessors 
565    ///////////////////////////////////////////////////////////////////
566    
567    /**
568     * Is the recovery buffer used to double buffer page writes.  Enabled by default.
569     * 
570     * @return is the recovery buffer enabled.
571     */
572    public boolean isEnableRecoveryFile() {
573        return enableRecoveryFile;
574    }
575
576    /**
577     * Sets if the recovery buffer uses to double buffer page writes.  Enabled by default.  Disabling this
578     * may potentially cause partial page writes which can lead to page file corruption.
579     */
580    public void setEnableRecoveryFile(boolean doubleBuffer) {
581        assertNotLoaded();
582        this.enableRecoveryFile = doubleBuffer;
583    }
584
585    /**
586     * @return Are page writes synced to disk?
587     */
588    public boolean isEnableDiskSyncs() {
589        return enableDiskSyncs;
590    }
591
592    /**
593     * Allows you enable syncing writes to disk.
594     * @param syncWrites
595     */
596    public void setEnableDiskSyncs(boolean syncWrites) {
597        assertNotLoaded();
598        this.enableDiskSyncs = syncWrites;
599    }
600    
601    /**
602     * @return the page size
603     */
604    public int getPageSize() {
605        return this.pageSize;
606    }
607
608    /**
609     * @return the amount of content data that a page can hold.
610     */
611    public int getPageContentSize() {
612        return this.pageSize-Page.PAGE_HEADER_SIZE;
613    }
614    
615    /**
616     * Configures the page size used by the page file.  By default it is 4k.  Once a page file is created on disk,
617     * subsequent loads of that file will use the original pageSize.  Once the PageFile is loaded, this setting
618     * can no longer be changed.
619     * 
620     * @param pageSize the pageSize to set
621     * @throws IllegalStateException
622     *         once the page file is loaded.
623     */
624    public void setPageSize(int pageSize) throws IllegalStateException {
625        assertNotLoaded();
626        this.pageSize = pageSize;
627    }
628    
629    /**
630     * @return true if read page caching is enabled
631     */
632    public boolean isEnablePageCaching() {
633        return this.enablePageCaching;
634    }
635
636    /**
637     * @param allows you to enable read page caching
638     */
639    public void setEnablePageCaching(boolean enablePageCaching) {
640        assertNotLoaded();
641        this.enablePageCaching = enablePageCaching;
642    }
643
644    /**
645     * @return the maximum number of pages that will get stored in the read page cache.
646     */
647    public int getPageCacheSize() {
648        return this.pageCacheSize;
649    }
650
651    /**
652     * @param Sets the maximum number of pages that will get stored in the read page cache.
653     */
654    public void setPageCacheSize(int pageCacheSize) {
655        assertNotLoaded();
656        this.pageCacheSize = pageCacheSize;
657    }
658
659    public boolean isEnabledWriteThread() {
660        return enabledWriteThread;
661    }
662
663    public void setEnableWriteThread(boolean enableAsyncWrites) {
664        assertNotLoaded();
665        this.enabledWriteThread = enableAsyncWrites;
666    }
667
668    public long getDiskSize() throws IOException {
669        return toOffset(nextFreePageId.get());
670    }
671    
672    /**
673     * @return the number of pages allocated in the PageFile
674     */
675    public long getPageCount() {
676        return nextFreePageId.get();
677    }
678
679    public int getRecoveryFileMinPageCount() {
680        return recoveryFileMinPageCount;
681    }
682
683    public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) {
684        assertNotLoaded();
685        this.recoveryFileMinPageCount = recoveryFileMinPageCount;
686    }
687
688    public int getRecoveryFileMaxPageCount() {
689        return recoveryFileMaxPageCount;
690    }
691
692    public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) {
693        assertNotLoaded();
694        this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
695    }
696
697        public int getWriteBatchSize() {
698                return writeBatchSize;
699        }
700
701        public void setWriteBatchSize(int writeBatchSize) {
702        assertNotLoaded();
703                this.writeBatchSize = writeBatchSize;
704        }
705
706        ///////////////////////////////////////////////////////////////////
707    // Package Protected Methods exposed to Transaction
708    ///////////////////////////////////////////////////////////////////
709
710    /**
711     * @throws IllegalStateException if the page file is not loaded.
712     */
713    void assertLoaded() throws IllegalStateException {
714        if( !loaded.get() ) {
715            throw new IllegalStateException("PageFile is not loaded");
716        }
717    }
718    void assertNotLoaded() throws IllegalStateException {
719        if( loaded.get() ) {
720            throw new IllegalStateException("PageFile is loaded");
721        }
722    }
723        
724    /** 
725     * Allocates a block of free pages that you can write data to.
726     * 
727     * @param count the number of sequential pages to allocate
728     * @return the first page of the sequential set. 
729     * @throws IOException
730     *         If an disk error occurred.
731     * @throws IllegalStateException
732     *         if the PageFile is not loaded
733     */
734    <T> Page<T> allocate(int count) throws IOException {
735        assertLoaded();
736        if (count <= 0) {
737            throw new IllegalArgumentException("The allocation count must be larger than zero");
738        }
739
740        Sequence seq = freeList.removeFirstSequence(count);
741
742        // We may need to create new free pages...
743        if (seq == null) {
744
745            Page<T> first = null;
746            int c = count;
747            while (c > 0) {
748                Page<T> page = new Page<T>(nextFreePageId.getAndIncrement());
749                page.makeFree(getNextWriteTransactionId());
750
751                if (first == null) {
752                    first = page;
753                }
754
755                addToCache(page);
756                DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
757                page.write(out);
758                write(page, out.getData());
759
760                // LOG.debug("allocate writing: "+page.getPageId());
761                c--;
762            }
763
764            return first;
765        }
766
767        Page<T> page = new Page<T>(seq.getFirst());
768        page.makeFree(0);
769        // LOG.debug("allocated: "+page.getPageId());
770        return page;
771    }
772
773    long getNextWriteTransactionId() {
774        return nextTxid.incrementAndGet();
775    }
776
777    void readPage(long pageId, byte[] data) throws IOException {
778        readFile.seek(toOffset(pageId));
779        readFile.readFully(data);
780    }
781
782    public void freePage(long pageId) {
783        freeList.add(pageId);
784        if( enablePageCaching ) {
785            pageCache.remove(pageId);
786        }
787    }
788    
789    @SuppressWarnings("unchecked")
790    private <T> void write(Page<T> page, byte[] data) throws IOException {
791        final PageWrite write = new PageWrite(page, data);
792        Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>(){
793            public Long getKey() {
794                return write.getPage().getPageId();
795            }
796            public PageWrite getValue() {
797                return write;
798            }
799            public PageWrite setValue(PageWrite value) {
800                return null;
801            }
802        };
803        Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry};
804        write(Arrays.asList(entries));
805    }
806
807    void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException {
808        synchronized( writes ) {
809            if( enabledWriteThread  ) {
810                while( writes.size() >= writeBatchSize && !stopWriter.get() ) {
811                    try {
812                        writes.wait();
813                    } catch (InterruptedException e) {
814                        Thread.currentThread().interrupt();
815                        throw new InterruptedIOException();
816                    }
817                }
818            }
819
820            for (Map.Entry<Long, PageWrite> entry : updates) {
821                Long key = entry.getKey();
822                PageWrite value = entry.getValue();
823                PageWrite write = writes.get(key);
824                if( write==null ) {
825                    writes.put(key, value);
826                } else {
827                    write.setCurrent(value.page, value.current);
828                }
829            }
830            
831            // Once we start approaching capacity, notify the writer to start writing
832            if( canStartWriteBatch() ) {
833                if( enabledWriteThread  ) {
834                    writes.notify();
835                } else {
836                    writeBatch();
837                }
838            }
839        }            
840    }
841    
842    private boolean canStartWriteBatch() {
843                int capacityUsed = ((writes.size() * 100)/writeBatchSize);
844        if( enabledWriteThread ) {
845            // The constant 10 here controls how soon write batches start going to disk..
846            // would be nice to figure out how to auto tune that value.  Make to small and
847            // we reduce through put because we are locking the write mutex too often doing writes
848            return capacityUsed >= 10 || checkpointLatch!=null;
849        } else {
850            return capacityUsed >= 80 || checkpointLatch!=null;
851        }
852    }
853
854    ///////////////////////////////////////////////////////////////////
855    // Cache Related operations
856    ///////////////////////////////////////////////////////////////////
857    @SuppressWarnings("unchecked")
858    <T> Page<T> getFromCache(long pageId) {
859        synchronized(writes) {
860            PageWrite pageWrite = writes.get(pageId);
861            if( pageWrite != null ) {
862                return pageWrite.page;
863            }
864        }
865
866        Page<T> result = null;
867        if (enablePageCaching) {
868            result = pageCache.get(pageId);
869        }
870        return result;
871    }
872
873    void addToCache(Page page) {
874        if (enablePageCaching) {
875            pageCache.put(page.getPageId(), page);
876        }
877    }
878
879    void removeFromCache(Page page) {
880        if (enablePageCaching) {
881            pageCache.remove(page.getPageId());
882        }
883    }
884
885    ///////////////////////////////////////////////////////////////////
886    // Internal Double write implementation follows...
887    ///////////////////////////////////////////////////////////////////
888    /**
889     * 
890     */
891    private void pollWrites() {
892        try {
893            while( !stopWriter.get() ) {
894                // Wait for a notification...
895                synchronized( writes ) {  
896                    writes.notifyAll();
897                    
898                    // If there is not enough to write, wait for a notification...
899                    while( writes.isEmpty() && checkpointLatch==null && !stopWriter.get() ) {
900                        writes.wait(100);
901                    }
902                    
903                    if( writes.isEmpty() ) {
904                        releaseCheckpointWaiter();
905                    }
906                }
907                writeBatch();
908            }
909        } catch (Throwable e) {
910            e.printStackTrace();
911        } finally {
912            releaseCheckpointWaiter();
913        }
914    }
915
916    /**
917     * 
918     * @param timeout
919     * @param unit
920     * @return true if there are still pending writes to do.
921     * @throws InterruptedException 
922     * @throws IOException 
923     */
924    private void writeBatch() throws IOException {
925            
926        CountDownLatch checkpointLatch;
927        ArrayList<PageWrite> batch;
928        synchronized( writes ) {
929            // If there is not enough to write, wait for a notification...
930
931            batch = new ArrayList<PageWrite>(writes.size());
932            // build a write batch from the current write cache.
933            for (PageWrite write : writes.values()) {
934                batch.add(write);
935                // Move the current write to the diskBound write, this lets folks update the 
936                // page again without blocking for this write.
937                write.begin();
938                if (write.diskBound == null) {
939                    batch.remove(write);
940                }
941            }
942
943            // Grab on to the existing checkpoint latch cause once we do this write we can 
944            // release the folks that were waiting for those writes to hit disk.
945            checkpointLatch = this.checkpointLatch;
946            this.checkpointLatch=null;
947        }
948        
949       try {
950            if (enableRecoveryFile) {
951
952                // Using Adler-32 instead of CRC-32 because it's much faster and
953                // it's
954                // weakness for short messages with few hundred bytes is not a
955                // factor in this case since we know
956                // our write batches are going to much larger.
957                Checksum checksum = new Adler32();
958                for (PageWrite w : batch) {
959                    try {
960                        checksum.update(w.diskBound, 0, pageSize);
961                    } catch (Throwable t) {
962                        throw IOExceptionSupport.create(
963                                "Cannot create recovery file. Reason: " + t, t);
964                    }
965                }
966
967                // Can we shrink the recovery buffer??
968                if (recoveryPageCount > recoveryFileMaxPageCount) {
969                    int t = Math.max(recoveryFileMinPageCount, batch.size());
970                    recoveryFile.setLength(recoveryFileSizeForPages(t));
971                }
972
973                // Record the page writes in the recovery buffer.
974                recoveryFile.seek(0);
975                // Store the next tx id...
976                recoveryFile.writeLong(nextTxid.get());
977                // Store the checksum for thw write batch so that on recovery we
978                // know if we have a consistent
979                // write batch on disk.
980                recoveryFile.writeLong(checksum.getValue());
981                // Write the # of pages that will follow
982                recoveryFile.writeInt(batch.size());
983
984                // Write the pages.
985                recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
986
987                for (PageWrite w : batch) {
988                    recoveryFile.writeLong(w.page.getPageId());
989                    recoveryFile.write(w.diskBound, 0, pageSize);
990                }
991
992                if (enableDiskSyncs) {
993                    // Sync to make sure recovery buffer writes land on disk..
994                    recoveryFile.getFD().sync();
995                }
996
997                recoveryPageCount = batch.size();
998            }
999
1000            for (PageWrite w : batch) {
1001                writeFile.seek(toOffset(w.page.getPageId()));
1002                writeFile.write(w.diskBound, 0, pageSize);
1003                w.done();
1004            }
1005
1006            // Sync again
1007            if (enableDiskSyncs) {
1008                writeFile.getFD().sync();
1009            }
1010
1011        } finally {
1012            synchronized (writes) {
1013                for (PageWrite w : batch) {
1014                    // If there are no more pending writes, then remove it from
1015                    // the write cache.
1016                    if (w.isDone()) {
1017                        writes.remove(w.page.getPageId());
1018                    }
1019                }
1020            }
1021            
1022            if( checkpointLatch!=null ) {
1023                checkpointLatch.countDown();
1024            }
1025        }
1026    }
1027
1028    private long recoveryFileSizeForPages(int pageCount) {
1029        return RECOVERY_FILE_HEADER_SIZE+((pageSize+8)*pageCount);
1030    }
1031
1032    private void releaseCheckpointWaiter() {
1033        if( checkpointLatch!=null ) {
1034            checkpointLatch.countDown();
1035            checkpointLatch=null;
1036        }
1037    }       
1038    
1039    /**
1040     * Inspects the recovery buffer and re-applies any 
1041     * partially applied page writes.
1042     * 
1043     * @return the next transaction id that can be used.
1044     * @throws IOException
1045     */
1046    private long redoRecoveryUpdates() throws IOException {
1047        if( !enableRecoveryFile ) {
1048            return 0;
1049        }
1050        recoveryPageCount=0;
1051        
1052        // Are we initializing the recovery file?
1053        if( recoveryFile.length() == 0 ) {
1054            // Write an empty header..
1055            recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]);
1056            // Preallocate the minium size for better performance.
1057            recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount));
1058            return 0;
1059        }
1060        
1061        // How many recovery pages do we have in the recovery buffer?
1062        recoveryFile.seek(0);
1063        long nextTxId = recoveryFile.readLong();
1064        long expectedChecksum = recoveryFile.readLong();
1065        int pageCounter = recoveryFile.readInt();
1066        
1067        recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1068        Checksum checksum = new Adler32();
1069        LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
1070        try {
1071            for (int i = 0; i < pageCounter; i++) {
1072                long offset = recoveryFile.readLong();
1073                byte []data = new byte[pageSize];
1074                if( recoveryFile.read(data, 0, pageSize) != pageSize ) {
1075                    // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
1076                    return nextTxId;
1077                }
1078                checksum.update(data, 0, pageSize);
1079                batch.put(offset, data);
1080            }
1081        } catch (Exception e) {
1082            // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it. 
1083            // as the pages should still be consistent.
1084            LOG.debug("Redo buffer was not fully intact: ", e);
1085            return nextTxId;
1086        }
1087        
1088        recoveryPageCount = pageCounter;
1089        
1090        // If the checksum is not valid then the recovery buffer was partially written to disk.
1091        if( checksum.getValue() != expectedChecksum ) {
1092            return nextTxId;
1093        }
1094        
1095        // Re-apply all the writes in the recovery buffer.
1096        for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
1097            writeFile.seek(toOffset(e.getKey()));
1098            writeFile.write(e.getValue());
1099        }
1100        
1101        // And sync it to disk
1102        writeFile.getFD().sync();
1103        return nextTxId;
1104    }
1105
1106    private void startWriter() {
1107        synchronized( writes ) {
1108            if( enabledWriteThread ) {
1109                stopWriter.set(false);
1110                writerThread = new Thread("KahaDB Page Writer") {
1111                    @Override
1112                    public void run() {
1113                        pollWrites();
1114                    }
1115                };
1116                writerThread.setPriority(Thread.MAX_PRIORITY);
1117                writerThread.setDaemon(true);
1118                writerThread.start();
1119            }
1120        }
1121    }
1122 
1123    private void stopWriter() throws InterruptedException {
1124        if( enabledWriteThread ) {
1125            stopWriter.set(true);
1126            writerThread.join();
1127        }
1128    }
1129
1130        public File getFile() {
1131                return getMainPageFile();
1132        }
1133
1134}