001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.kahadb.page;
018
019import java.io.DataInputStream;
020import java.io.EOFException;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.NoSuchElementException;
027
028import org.apache.kahadb.page.PageFile.PageWrite;
029import org.apache.kahadb.util.ByteSequence;
030import org.apache.kahadb.util.DataByteArrayInputStream;
031import org.apache.kahadb.util.DataByteArrayOutputStream;
032import org.apache.kahadb.util.Marshaller;
033import org.apache.kahadb.util.Sequence;
034import org.apache.kahadb.util.SequenceSet;
035
036/**
037 * The class used to read/update a PageFile object.  Using a transaction allows you to
038 * do multiple update operations in a single unit of work.
039 */
040public class Transaction implements Iterable<Page> {
041    
042    /**
043     * The PageOverflowIOException occurs when a page write is requested
044     * and it's data is larger than what would fit into a single page.
045     */
046    public class PageOverflowIOException extends IOException {
047        public PageOverflowIOException(String message) {
048            super(message);
049        }
050    }
051    
052    /**
053     * The InvalidPageIOException is thrown if try to load/store a a page
054     * with an invalid page id.
055     */
056    public class InvalidPageIOException extends IOException {
057        private final long page;
058
059        public InvalidPageIOException(String message, long page) {
060            super(message);
061            this.page = page;
062        }
063
064        public long getPage() {
065            return page;
066        }
067    }    
068    
069    /**
070     * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
071     * 
072     * @param <T> The type of exceptions that operation will throw.
073     */
074    public interface Closure <T extends Throwable> {
075        public void execute(Transaction tx) throws T;
076    }
077
078    /**
079     * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
080     * 
081     * @param <R> The type of result that the closure produces.
082     * @param <T> The type of exceptions that operation will throw.
083     */
084    public interface CallableClosure<R, T extends Throwable> {
085        public R execute(Transaction tx) throws T;
086    }
087    
088
089    // The page file that this Transaction operates against.
090    private final PageFile pageFile;
091    // If this transaction is updating stuff.. this is the tx of 
092    private long writeTransactionId=-1;
093    // List of pages that this transaction has modified.
094    private HashMap<Long, PageWrite> writes=new HashMap<Long, PageWrite>();
095    // List of pages allocated in this transaction
096    private final SequenceSet allocateList = new SequenceSet();
097    // List of pages freed in this transaction
098    private final SequenceSet freeList = new SequenceSet();
099
100    Transaction(PageFile pageFile) {
101        this.pageFile = pageFile;
102    }
103
104    /**
105     * @return the page file that created this Transaction
106     */
107    public PageFile getPageFile() {
108        return this.pageFile;
109    }
110
111    /** 
112     * Allocates a free page that you can write data to.
113     * 
114     * @return a newly allocated page.  
115     * @throws IOException
116     *         If an disk error occurred.
117     * @throws IllegalStateException
118     *         if the PageFile is not loaded
119     */
120    public <T> Page<T> allocate() throws IOException {
121        return allocate(1);
122    }
123
124    /** 
125     * Allocates a block of free pages that you can write data to.
126     * 
127     * @param count the number of sequential pages to allocate
128     * @return the first page of the sequential set. 
129     * @throws IOException
130     *         If an disk error occurred.
131     * @throws IllegalStateException
132     *         if the PageFile is not loaded
133     */
134    public <T> Page<T> allocate(int count) throws IOException {
135        // TODO: we need to track allocated pages so that they can be returned if the 
136        // transaction gets rolled back.
137        Page<T> rc = pageFile.allocate(count);
138        allocateList.add(new Sequence(rc.getPageId(), rc.getPageId()+count-1));
139        return rc;
140    }
141
142    /**
143     * Frees up a previously allocated page so that it can be re-allocated again.
144     * 
145     * @param page the page to free up
146     * @throws IOException
147     *         If an disk error occurred.
148     * @throws IllegalStateException
149     *         if the PageFile is not loaded
150     */
151    public void free(long pageId) throws IOException {
152        free(load(pageId, null));
153    }
154
155    /**
156     * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
157     * 
158     * @param page the initial page of the sequence that will be getting freed
159     * @param count the number of pages in the sequence
160     * 
161     * @throws IOException
162     *         If an disk error occurred.
163     * @throws IllegalStateException
164     *         if the PageFile is not loaded
165     */
166    public void free(long pageId, int count) throws IOException {
167        free(load(pageId, null), count);
168    }
169
170    /**
171     * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
172     * 
173     * @param page the initial page of the sequence that will be getting freed
174     * @param count the number of pages in the sequence
175     * 
176     * @throws IOException
177     *         If an disk error occurred.
178     * @throws IllegalStateException
179     *         if the PageFile is not loaded
180     */
181    public <T> void free(Page<T> page, int count) throws IOException {
182        pageFile.assertLoaded();
183        long offsetPage = page.getPageId();
184        for (int i = 0; i < count; i++) {
185            if (page == null) {
186                page = load(offsetPage + i, null);
187            }
188            free(page);
189            page = null;
190        }
191    }
192    
193    /**
194     * Frees up a previously allocated page so that it can be re-allocated again.
195     * 
196     * @param page the page to free up
197     * @throws IOException
198     *         If an disk error occurred.
199     * @throws IllegalStateException
200     *         if the PageFile is not loaded
201     */
202    public <T> void free(Page<T> page) throws IOException {
203        pageFile.assertLoaded();
204
205        // We may need loop to free up a page chain.
206        while (page != null) {
207
208            // Is it already free??
209            if (page.getType() == Page.PAGE_FREE_TYPE) {
210                return;
211            }
212
213            Page<T> next = null;
214            if (page.getType() == Page.PAGE_PART_TYPE) {
215                next = load(page.getNext(), null);
216            }
217
218            page.makeFree(getWriteTransactionId());
219
220            DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize());
221            page.write(out);
222            write(page, out.getData());
223
224            freeList.add(page.getPageId());
225            page = next;
226        }
227    }
228
229    /**
230     * 
231     * @param page
232     *        the page to write. The Page object must be fully populated with a valid pageId, type, and data.
233     * @param marshaller
234     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
235     * @param overflow
236     *        If true, then if the page data marshalls to a bigger size than can fit in one page, then additional 
237     *        overflow pages are automatically allocated and chained to this page to store all the data.  If false,
238     *        and the overflow condition would occur, then the PageOverflowIOException is thrown. 
239     * @throws IOException
240     *         If an disk error occurred.
241     * @throws PageOverflowIOException
242     *         If the page data marshalls to size larger than maximum page size and overflow was false.
243     * @throws IllegalStateException
244     *         if the PageFile is not loaded
245     */
246    public <T> void store(Page<T> page, Marshaller<T> marshaller, final boolean overflow) throws IOException {
247        DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow);
248        if (marshaller != null) {
249            marshaller.writePayload(page.get(), out);
250        }
251        out.close();
252    }
253
254    /**
255     * @throws IOException
256     */
257    public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException {
258        pageFile.assertLoaded();
259
260        // Copy to protect against the end user changing
261        // the page instance while we are doing a write.
262        final Page copy = page.copy();
263        pageFile.addToCache(copy);
264
265        //
266        // To support writing VERY large data, we override the output stream so
267        // that we
268        // we do the page writes incrementally while the data is being
269        // marshalled.
270        DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize() * 2) {
271            Page current = copy;
272
273            @SuppressWarnings("unchecked")
274            @Override
275            protected void onWrite() throws IOException {
276
277                // Are we at an overflow condition?
278                final int pageSize = pageFile.getPageSize();
279                if (pos >= pageSize) {
280                    // If overflow is allowed
281                    if (overflow) {
282
283                        Page next;
284                        if (current.getType() == Page.PAGE_PART_TYPE) {
285                            next = load(current.getNext(), null);
286                        } else {
287                            next = allocate();
288                        }
289
290                        next.txId = current.txId;
291
292                        // Write the page header
293                        int oldPos = pos;
294                        pos = 0;
295
296                        current.makePagePart(next.getPageId(), getWriteTransactionId());
297                        current.write(this);
298
299                        // Do the page write..
300                        byte[] data = new byte[pageSize];
301                        System.arraycopy(buf, 0, data, 0, pageSize);
302                        Transaction.this.write(current, data);
303
304                        // Reset for the next page chunk
305                        pos = 0;
306                        // The page header marshalled after the data is written.
307                        skip(Page.PAGE_HEADER_SIZE);
308                        // Move the overflow data after the header.
309                        System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
310                        pos += oldPos - pageSize;
311                        current = next;
312
313                    } else {
314                        throw new PageOverflowIOException("Page overflow.");
315                    }
316                }
317
318            }
319
320            @SuppressWarnings("unchecked")
321            @Override
322            public void close() throws IOException {
323                super.close();
324
325                // We need to free up the rest of the page chain..
326                if (current.getType() == Page.PAGE_PART_TYPE) {
327                    free(current.getNext());
328                }
329
330                current.makePageEnd(pos, getWriteTransactionId());
331
332                // Write the header..
333                pos = 0;
334                current.write(this);
335
336                Transaction.this.write(current, buf);
337            }
338        };
339
340        // The page header marshaled after the data is written.
341        out.skip(Page.PAGE_HEADER_SIZE);
342        return out;
343    }
344
345    /**
346     * Loads a page from disk.
347     * 
348     * @param pageId 
349     *        the id of the page to load
350     * @param marshaller
351     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
352     * @return The page with the given id
353     * @throws IOException
354     *         If an disk error occurred.
355     * @throws IllegalStateException
356     *         if the PageFile is not loaded
357     */
358    public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException {
359        pageFile.assertLoaded();
360        Page<T> page = new Page<T>(pageId);
361        load(page, marshaller);
362        return page;
363    }
364
365    /**
366     * Loads a page from disk.
367     * 
368     * @param page - The pageId field must be properly set 
369     * @param marshaller
370     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
371     * @throws IOException
372     *         If an disk error occurred.
373     * @throws InvalidPageIOException
374     *         If the page is is not valid.      
375     * @throws IllegalStateException
376     *         if the PageFile is not loaded
377     */
378    @SuppressWarnings("unchecked")
379    public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException {
380        pageFile.assertLoaded();
381
382        // Can't load invalid offsets...
383        long pageId = page.getPageId();
384        if (pageId < 0) {
385            throw new InvalidPageIOException("Page id is not valid", pageId);
386        }
387
388        // It might be a page this transaction has modified...
389        PageWrite update = writes.get(pageId);
390        if (update != null) {
391            page.copy(update.getPage());
392            return;
393        }
394
395        // We may be able to get it from the cache...
396        Page<T> t = pageFile.getFromCache(pageId);
397        if (t != null) {
398            page.copy(t);
399            return;
400        }
401
402        if (marshaller != null) {
403            // Full page read..
404            InputStream is = openInputStream(page);
405            DataInputStream dataIn = new DataInputStream(is);
406            page.set(marshaller.readPayload(dataIn));
407            is.close();
408        } else {
409            // Page header read.
410            DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]);
411            pageFile.readPage(pageId, in.getRawData());
412            page.read(in);
413            page.set(null);
414        }
415
416        // Cache it.
417        if (marshaller != null) {
418            pageFile.addToCache(page);
419        }
420    }
421
422    /**
423     * @see org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page,
424     *      org.apache.kahadb.util.Marshaller)
425     */
426    public InputStream openInputStream(final Page p) throws IOException {
427
428        return new InputStream() {
429
430            private ByteSequence chunk = new ByteSequence(new byte[pageFile.getPageSize()]);
431            private Page page = readPage(p);
432            private int pageCount = 1;
433
434            private Page markPage;
435            private ByteSequence markChunk;
436
437            private Page readPage(Page page) throws IOException {
438                // Read the page data
439                
440                pageFile.readPage(page.getPageId(), chunk.getData());
441                
442                chunk.setOffset(0);
443                chunk.setLength(pageFile.getPageSize());
444
445                DataByteArrayInputStream in = new DataByteArrayInputStream(chunk);
446                page.read(in);
447
448                chunk.setOffset(Page.PAGE_HEADER_SIZE);
449                if (page.getType() == Page.PAGE_END_TYPE) {
450                    chunk.setLength((int)(page.getNext()));
451                }
452
453                if (page.getType() == Page.PAGE_FREE_TYPE) {
454                    throw new EOFException("Chunk stream does not exist at page: " + page.getPageId());
455                }
456
457                return page;
458            }
459
460            public int read() throws IOException {
461                if (!atEOF()) {
462                    return chunk.data[chunk.offset++] & 0xff;
463                } else {
464                    return -1;
465                }
466            }
467
468            private boolean atEOF() throws IOException {
469                if (chunk.offset < chunk.length) {
470                    return false;
471                }
472                if (page.getType() == Page.PAGE_END_TYPE) {
473                    return true;
474                }
475                fill();
476                return chunk.offset >= chunk.length;
477            }
478
479            private void fill() throws IOException {
480                page = readPage(new Page(page.getNext()));
481                pageCount++;
482            }
483
484            public int read(byte[] b) throws IOException {
485                return read(b, 0, b.length);
486            }
487
488            public int read(byte b[], int off, int len) throws IOException {
489                if (!atEOF()) {
490                    int rc = 0;
491                    while (!atEOF() && rc < len) {
492                        len = Math.min(len, chunk.length - chunk.offset);
493                        if (len > 0) {
494                            System.arraycopy(chunk.data, chunk.offset, b, off, len);
495                            chunk.offset += len;
496                        }
497                        rc += len;
498                    }
499                    return rc;
500                } else {
501                    return -1;
502                }
503            }
504
505            public long skip(long len) throws IOException {
506                if (atEOF()) {
507                    int rc = 0;
508                    while (!atEOF() && rc < len) {
509                        len = Math.min(len, chunk.length - chunk.offset);
510                        if (len > 0) {
511                            chunk.offset += len;
512                        }
513                        rc += len;
514                    }
515                    return rc;
516                } else {
517                    return -1;
518                }
519            }
520
521            public int available() {
522                return chunk.length - chunk.offset;
523            }
524
525            public boolean markSupported() {
526                return true;
527            }
528
529            public void mark(int markpos) {
530                markPage = page;
531                byte data[] = new byte[pageFile.getPageSize()];
532                System.arraycopy(chunk.getData(), 0, data, 0, pageFile.getPageSize());
533                markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength());
534            }
535
536            public void reset() {
537                page = markPage;
538                chunk = markChunk;
539            }
540
541        };
542    }
543
544    /**
545     * Allows you to iterate through all active Pages in this object.  Pages with type Page.FREE_TYPE are 
546     * not included in this iteration. 
547     * 
548     * Pages removed with Iterator.remove() will not actually get removed until the transaction commits.
549     * 
550     * @throws IllegalStateException
551     *         if the PageFile is not loaded
552     */
553    @SuppressWarnings("unchecked")
554    public Iterator<Page> iterator() {
555        return (Iterator<Page>)iterator(false);
556    }
557
558    /**
559     * Allows you to iterate through all active Pages in this object.  You can optionally include free pages in the pages
560     * iterated.
561     * 
562     * @param includeFreePages - if true, free pages are included in the iteration
563     * @param tx - if not null, then the remove() opeation on the Iterator will operate in scope of that transaction.
564     * @throws IllegalStateException
565     *         if the PageFile is not loaded
566     */
567    public Iterator<Page> iterator(final boolean includeFreePages) {
568
569        pageFile.assertLoaded();
570
571        return new Iterator<Page>() {
572            long nextId;
573            Page nextPage;
574            Page lastPage;
575
576            private void findNextPage() {
577                if (!pageFile.isLoaded()) {
578                    throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
579                }
580
581                if (nextPage != null) {
582                    return;
583                }
584
585                try {
586                    while (nextId < pageFile.getPageCount()) {
587
588                        Page page = load(nextId, null);
589
590                        if (includeFreePages || page.getType() != Page.PAGE_FREE_TYPE) {
591                            nextPage = page;
592                            return;
593                        } else {
594                            nextId++;
595                        }
596                    }
597                } catch (IOException e) {
598                }
599            }
600
601            public boolean hasNext() {
602                findNextPage();
603                return nextPage != null;
604            }
605
606            public Page next() {
607                findNextPage();
608                if (nextPage != null) {
609                    lastPage = nextPage;
610                    nextPage = null;
611                    nextId++;
612                    return lastPage;
613                } else {
614                    throw new NoSuchElementException();
615                }
616            }
617
618            @SuppressWarnings("unchecked")
619            public void remove() {
620                if (lastPage == null) {
621                    throw new IllegalStateException();
622                }
623                try {
624                    free(lastPage);
625                    lastPage = null;
626                } catch (IOException e) {
627                    new RuntimeException(e);
628                }
629            }
630        };
631    }
632
633    ///////////////////////////////////////////////////////////////////
634    // Commit / Rollback related methods..
635    ///////////////////////////////////////////////////////////////////
636    
637    /**
638     * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated
639     * with the transaction are written to disk or none will.
640     */
641    public void commit() throws IOException {
642        if( writeTransactionId!=-1 ) {
643            // Actually do the page writes...
644            pageFile.write(writes.entrySet());
645            // Release the pages that were freed up in the transaction..
646            freePages(freeList);
647            
648            freeList.clear();
649            allocateList.clear();
650            writes.clear();
651            writeTransactionId = -1;
652        }
653    }
654
655    /**
656     * Rolls back the transaction.
657     */
658    public void rollback() throws IOException {
659        if( writeTransactionId!=-1 ) {
660            // Release the pages that were allocated in the transaction...
661            freePages(allocateList);
662
663            freeList.clear();
664            allocateList.clear();
665            writes.clear();
666            writeTransactionId = -1;
667        }
668    }
669
670    private long getWriteTransactionId() {
671        if( writeTransactionId==-1 ) {
672            writeTransactionId = pageFile.getNextWriteTransactionId();
673        }
674        return writeTransactionId;
675    }
676
677    /**
678     * Queues up a page write that should get done when commit() gets called.
679     */
680    @SuppressWarnings("unchecked")
681    private void write(final Page page, byte[] data) throws IOException {
682        Long key = page.getPageId();
683        // TODO: if a large update transaction is in progress, we may want to move
684        // all the current updates to a temp file so that we don't keep using 
685        // up memory.
686        writes.put(key, new PageWrite(page, data));        
687    }   
688
689    /**
690     * @param list
691     * @throws RuntimeException
692     */
693    private void freePages(SequenceSet list) throws RuntimeException {
694        Sequence seq = list.getHead();
695        while( seq!=null ) {
696            seq.each(new Sequence.Closure<RuntimeException>(){
697                public void execute(long value) {
698                    pageFile.freePage(value);
699                }
700            });
701            seq = seq.getNext();
702        }
703    }
704    
705    /**
706     * @return true if there are no uncommitted page file updates associated with this transaction.
707     */
708    public boolean isReadOnly() {
709        return writeTransactionId==-1;
710    }
711    
712    ///////////////////////////////////////////////////////////////////
713    // Transaction closure helpers...
714    ///////////////////////////////////////////////////////////////////
715    
716    /**
717     * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
718     * If the closure throws an Exception, then the transaction is rolled back.
719     * 
720     * @param <T>
721     * @param closure - the work to get exectued.
722     * @throws T if the closure throws it
723     * @throws IOException If the commit fails.
724     */
725    public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException {
726        boolean success = false;
727        try {
728            closure.execute(this);
729            success = true;
730        } finally {
731            if (success) {
732                commit();
733            } else {
734                rollback();
735            }
736        }
737    }
738
739    /**
740     * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
741     * If the closure throws an Exception, then the transaction is rolled back.
742     * 
743     * @param <T>
744     * @param closure - the work to get exectued.
745     * @throws T if the closure throws it
746     * @throws IOException If the commit fails.
747     */
748    public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException {
749        boolean success = false;
750        try {
751            R rc = closure.execute(this);
752            success = true;
753            return rc;
754        } finally {
755            if (success) {
756                commit();
757            } else {
758                rollback();
759            }
760        }
761    }
762
763}