001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.kahadb.page; 018 019import java.io.DataInputStream; 020import java.io.EOFException; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.NoSuchElementException; 027 028import org.apache.kahadb.page.PageFile.PageWrite; 029import org.apache.kahadb.util.ByteSequence; 030import org.apache.kahadb.util.DataByteArrayInputStream; 031import org.apache.kahadb.util.DataByteArrayOutputStream; 032import org.apache.kahadb.util.Marshaller; 033import org.apache.kahadb.util.Sequence; 034import org.apache.kahadb.util.SequenceSet; 035 036/** 037 * The class used to read/update a PageFile object. Using a transaction allows you to 038 * do multiple update operations in a single unit of work. 039 */ 040public class Transaction implements Iterable<Page> { 041 042 /** 043 * The PageOverflowIOException occurs when a page write is requested 044 * and it's data is larger than what would fit into a single page. 045 */ 046 public class PageOverflowIOException extends IOException { 047 public PageOverflowIOException(String message) { 048 super(message); 049 } 050 } 051 052 /** 053 * The InvalidPageIOException is thrown if try to load/store a a page 054 * with an invalid page id. 055 */ 056 public class InvalidPageIOException extends IOException { 057 private final long page; 058 059 public InvalidPageIOException(String message, long page) { 060 super(message); 061 this.page = page; 062 } 063 064 public long getPage() { 065 return page; 066 } 067 } 068 069 /** 070 * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method. 071 * 072 * @param <T> The type of exceptions that operation will throw. 073 */ 074 public interface Closure <T extends Throwable> { 075 public void execute(Transaction tx) throws T; 076 } 077 078 /** 079 * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method. 080 * 081 * @param <R> The type of result that the closure produces. 082 * @param <T> The type of exceptions that operation will throw. 083 */ 084 public interface CallableClosure<R, T extends Throwable> { 085 public R execute(Transaction tx) throws T; 086 } 087 088 089 // The page file that this Transaction operates against. 090 private final PageFile pageFile; 091 // If this transaction is updating stuff.. this is the tx of 092 private long writeTransactionId=-1; 093 // List of pages that this transaction has modified. 094 private HashMap<Long, PageWrite> writes=new HashMap<Long, PageWrite>(); 095 // List of pages allocated in this transaction 096 private final SequenceSet allocateList = new SequenceSet(); 097 // List of pages freed in this transaction 098 private final SequenceSet freeList = new SequenceSet(); 099 100 Transaction(PageFile pageFile) { 101 this.pageFile = pageFile; 102 } 103 104 /** 105 * @return the page file that created this Transaction 106 */ 107 public PageFile getPageFile() { 108 return this.pageFile; 109 } 110 111 /** 112 * Allocates a free page that you can write data to. 113 * 114 * @return a newly allocated page. 115 * @throws IOException 116 * If an disk error occurred. 117 * @throws IllegalStateException 118 * if the PageFile is not loaded 119 */ 120 public <T> Page<T> allocate() throws IOException { 121 return allocate(1); 122 } 123 124 /** 125 * Allocates a block of free pages that you can write data to. 126 * 127 * @param count the number of sequential pages to allocate 128 * @return the first page of the sequential set. 129 * @throws IOException 130 * If an disk error occurred. 131 * @throws IllegalStateException 132 * if the PageFile is not loaded 133 */ 134 public <T> Page<T> allocate(int count) throws IOException { 135 // TODO: we need to track allocated pages so that they can be returned if the 136 // transaction gets rolled back. 137 Page<T> rc = pageFile.allocate(count); 138 allocateList.add(new Sequence(rc.getPageId(), rc.getPageId()+count-1)); 139 return rc; 140 } 141 142 /** 143 * Frees up a previously allocated page so that it can be re-allocated again. 144 * 145 * @param page the page to free up 146 * @throws IOException 147 * If an disk error occurred. 148 * @throws IllegalStateException 149 * if the PageFile is not loaded 150 */ 151 public void free(long pageId) throws IOException { 152 free(load(pageId, null)); 153 } 154 155 /** 156 * Frees up a previously allocated sequence of pages so that it can be re-allocated again. 157 * 158 * @param page the initial page of the sequence that will be getting freed 159 * @param count the number of pages in the sequence 160 * 161 * @throws IOException 162 * If an disk error occurred. 163 * @throws IllegalStateException 164 * if the PageFile is not loaded 165 */ 166 public void free(long pageId, int count) throws IOException { 167 free(load(pageId, null), count); 168 } 169 170 /** 171 * Frees up a previously allocated sequence of pages so that it can be re-allocated again. 172 * 173 * @param page the initial page of the sequence that will be getting freed 174 * @param count the number of pages in the sequence 175 * 176 * @throws IOException 177 * If an disk error occurred. 178 * @throws IllegalStateException 179 * if the PageFile is not loaded 180 */ 181 public <T> void free(Page<T> page, int count) throws IOException { 182 pageFile.assertLoaded(); 183 long offsetPage = page.getPageId(); 184 for (int i = 0; i < count; i++) { 185 if (page == null) { 186 page = load(offsetPage + i, null); 187 } 188 free(page); 189 page = null; 190 } 191 } 192 193 /** 194 * Frees up a previously allocated page so that it can be re-allocated again. 195 * 196 * @param page the page to free up 197 * @throws IOException 198 * If an disk error occurred. 199 * @throws IllegalStateException 200 * if the PageFile is not loaded 201 */ 202 public <T> void free(Page<T> page) throws IOException { 203 pageFile.assertLoaded(); 204 205 // We may need loop to free up a page chain. 206 while (page != null) { 207 208 // Is it already free?? 209 if (page.getType() == Page.PAGE_FREE_TYPE) { 210 return; 211 } 212 213 Page<T> next = null; 214 if (page.getType() == Page.PAGE_PART_TYPE) { 215 next = load(page.getNext(), null); 216 } 217 218 page.makeFree(getWriteTransactionId()); 219 220 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize()); 221 page.write(out); 222 write(page, out.getData()); 223 224 freeList.add(page.getPageId()); 225 page = next; 226 } 227 } 228 229 /** 230 * 231 * @param page 232 * the page to write. The Page object must be fully populated with a valid pageId, type, and data. 233 * @param marshaller 234 * the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data. 235 * @param overflow 236 * If true, then if the page data marshalls to a bigger size than can fit in one page, then additional 237 * overflow pages are automatically allocated and chained to this page to store all the data. If false, 238 * and the overflow condition would occur, then the PageOverflowIOException is thrown. 239 * @throws IOException 240 * If an disk error occurred. 241 * @throws PageOverflowIOException 242 * If the page data marshalls to size larger than maximum page size and overflow was false. 243 * @throws IllegalStateException 244 * if the PageFile is not loaded 245 */ 246 public <T> void store(Page<T> page, Marshaller<T> marshaller, final boolean overflow) throws IOException { 247 DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow); 248 if (marshaller != null) { 249 marshaller.writePayload(page.get(), out); 250 } 251 out.close(); 252 } 253 254 /** 255 * @throws IOException 256 */ 257 public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException { 258 pageFile.assertLoaded(); 259 260 // Copy to protect against the end user changing 261 // the page instance while we are doing a write. 262 final Page copy = page.copy(); 263 pageFile.addToCache(copy); 264 265 // 266 // To support writing VERY large data, we override the output stream so 267 // that we 268 // we do the page writes incrementally while the data is being 269 // marshalled. 270 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize() * 2) { 271 Page current = copy; 272 273 @SuppressWarnings("unchecked") 274 @Override 275 protected void onWrite() throws IOException { 276 277 // Are we at an overflow condition? 278 final int pageSize = pageFile.getPageSize(); 279 if (pos >= pageSize) { 280 // If overflow is allowed 281 if (overflow) { 282 283 Page next; 284 if (current.getType() == Page.PAGE_PART_TYPE) { 285 next = load(current.getNext(), null); 286 } else { 287 next = allocate(); 288 } 289 290 next.txId = current.txId; 291 292 // Write the page header 293 int oldPos = pos; 294 pos = 0; 295 296 current.makePagePart(next.getPageId(), getWriteTransactionId()); 297 current.write(this); 298 299 // Do the page write.. 300 byte[] data = new byte[pageSize]; 301 System.arraycopy(buf, 0, data, 0, pageSize); 302 Transaction.this.write(current, data); 303 304 // Reset for the next page chunk 305 pos = 0; 306 // The page header marshalled after the data is written. 307 skip(Page.PAGE_HEADER_SIZE); 308 // Move the overflow data after the header. 309 System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize); 310 pos += oldPos - pageSize; 311 current = next; 312 313 } else { 314 throw new PageOverflowIOException("Page overflow."); 315 } 316 } 317 318 } 319 320 @SuppressWarnings("unchecked") 321 @Override 322 public void close() throws IOException { 323 super.close(); 324 325 // We need to free up the rest of the page chain.. 326 if (current.getType() == Page.PAGE_PART_TYPE) { 327 free(current.getNext()); 328 } 329 330 current.makePageEnd(pos, getWriteTransactionId()); 331 332 // Write the header.. 333 pos = 0; 334 current.write(this); 335 336 Transaction.this.write(current, buf); 337 } 338 }; 339 340 // The page header marshaled after the data is written. 341 out.skip(Page.PAGE_HEADER_SIZE); 342 return out; 343 } 344 345 /** 346 * Loads a page from disk. 347 * 348 * @param pageId 349 * the id of the page to load 350 * @param marshaller 351 * the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data. 352 * @return The page with the given id 353 * @throws IOException 354 * If an disk error occurred. 355 * @throws IllegalStateException 356 * if the PageFile is not loaded 357 */ 358 public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException { 359 pageFile.assertLoaded(); 360 Page<T> page = new Page<T>(pageId); 361 load(page, marshaller); 362 return page; 363 } 364 365 /** 366 * Loads a page from disk. 367 * 368 * @param page - The pageId field must be properly set 369 * @param marshaller 370 * the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data. 371 * @throws IOException 372 * If an disk error occurred. 373 * @throws InvalidPageIOException 374 * If the page is is not valid. 375 * @throws IllegalStateException 376 * if the PageFile is not loaded 377 */ 378 @SuppressWarnings("unchecked") 379 public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException { 380 pageFile.assertLoaded(); 381 382 // Can't load invalid offsets... 383 long pageId = page.getPageId(); 384 if (pageId < 0) { 385 throw new InvalidPageIOException("Page id is not valid", pageId); 386 } 387 388 // It might be a page this transaction has modified... 389 PageWrite update = writes.get(pageId); 390 if (update != null) { 391 page.copy(update.getPage()); 392 return; 393 } 394 395 // We may be able to get it from the cache... 396 Page<T> t = pageFile.getFromCache(pageId); 397 if (t != null) { 398 page.copy(t); 399 return; 400 } 401 402 if (marshaller != null) { 403 // Full page read.. 404 InputStream is = openInputStream(page); 405 DataInputStream dataIn = new DataInputStream(is); 406 page.set(marshaller.readPayload(dataIn)); 407 is.close(); 408 } else { 409 // Page header read. 410 DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]); 411 pageFile.readPage(pageId, in.getRawData()); 412 page.read(in); 413 page.set(null); 414 } 415 416 // Cache it. 417 if (marshaller != null) { 418 pageFile.addToCache(page); 419 } 420 } 421 422 /** 423 * @see org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page, 424 * org.apache.kahadb.util.Marshaller) 425 */ 426 public InputStream openInputStream(final Page p) throws IOException { 427 428 return new InputStream() { 429 430 private ByteSequence chunk = new ByteSequence(new byte[pageFile.getPageSize()]); 431 private Page page = readPage(p); 432 private int pageCount = 1; 433 434 private Page markPage; 435 private ByteSequence markChunk; 436 437 private Page readPage(Page page) throws IOException { 438 // Read the page data 439 440 pageFile.readPage(page.getPageId(), chunk.getData()); 441 442 chunk.setOffset(0); 443 chunk.setLength(pageFile.getPageSize()); 444 445 DataByteArrayInputStream in = new DataByteArrayInputStream(chunk); 446 page.read(in); 447 448 chunk.setOffset(Page.PAGE_HEADER_SIZE); 449 if (page.getType() == Page.PAGE_END_TYPE) { 450 chunk.setLength((int)(page.getNext())); 451 } 452 453 if (page.getType() == Page.PAGE_FREE_TYPE) { 454 throw new EOFException("Chunk stream does not exist at page: " + page.getPageId()); 455 } 456 457 return page; 458 } 459 460 public int read() throws IOException { 461 if (!atEOF()) { 462 return chunk.data[chunk.offset++] & 0xff; 463 } else { 464 return -1; 465 } 466 } 467 468 private boolean atEOF() throws IOException { 469 if (chunk.offset < chunk.length) { 470 return false; 471 } 472 if (page.getType() == Page.PAGE_END_TYPE) { 473 return true; 474 } 475 fill(); 476 return chunk.offset >= chunk.length; 477 } 478 479 private void fill() throws IOException { 480 page = readPage(new Page(page.getNext())); 481 pageCount++; 482 } 483 484 public int read(byte[] b) throws IOException { 485 return read(b, 0, b.length); 486 } 487 488 public int read(byte b[], int off, int len) throws IOException { 489 if (!atEOF()) { 490 int rc = 0; 491 while (!atEOF() && rc < len) { 492 len = Math.min(len, chunk.length - chunk.offset); 493 if (len > 0) { 494 System.arraycopy(chunk.data, chunk.offset, b, off, len); 495 chunk.offset += len; 496 } 497 rc += len; 498 } 499 return rc; 500 } else { 501 return -1; 502 } 503 } 504 505 public long skip(long len) throws IOException { 506 if (atEOF()) { 507 int rc = 0; 508 while (!atEOF() && rc < len) { 509 len = Math.min(len, chunk.length - chunk.offset); 510 if (len > 0) { 511 chunk.offset += len; 512 } 513 rc += len; 514 } 515 return rc; 516 } else { 517 return -1; 518 } 519 } 520 521 public int available() { 522 return chunk.length - chunk.offset; 523 } 524 525 public boolean markSupported() { 526 return true; 527 } 528 529 public void mark(int markpos) { 530 markPage = page; 531 byte data[] = new byte[pageFile.getPageSize()]; 532 System.arraycopy(chunk.getData(), 0, data, 0, pageFile.getPageSize()); 533 markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength()); 534 } 535 536 public void reset() { 537 page = markPage; 538 chunk = markChunk; 539 } 540 541 }; 542 } 543 544 /** 545 * Allows you to iterate through all active Pages in this object. Pages with type Page.FREE_TYPE are 546 * not included in this iteration. 547 * 548 * Pages removed with Iterator.remove() will not actually get removed until the transaction commits. 549 * 550 * @throws IllegalStateException 551 * if the PageFile is not loaded 552 */ 553 @SuppressWarnings("unchecked") 554 public Iterator<Page> iterator() { 555 return (Iterator<Page>)iterator(false); 556 } 557 558 /** 559 * Allows you to iterate through all active Pages in this object. You can optionally include free pages in the pages 560 * iterated. 561 * 562 * @param includeFreePages - if true, free pages are included in the iteration 563 * @param tx - if not null, then the remove() opeation on the Iterator will operate in scope of that transaction. 564 * @throws IllegalStateException 565 * if the PageFile is not loaded 566 */ 567 public Iterator<Page> iterator(final boolean includeFreePages) { 568 569 pageFile.assertLoaded(); 570 571 return new Iterator<Page>() { 572 long nextId; 573 Page nextPage; 574 Page lastPage; 575 576 private void findNextPage() { 577 if (!pageFile.isLoaded()) { 578 throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded"); 579 } 580 581 if (nextPage != null) { 582 return; 583 } 584 585 try { 586 while (nextId < pageFile.getPageCount()) { 587 588 Page page = load(nextId, null); 589 590 if (includeFreePages || page.getType() != Page.PAGE_FREE_TYPE) { 591 nextPage = page; 592 return; 593 } else { 594 nextId++; 595 } 596 } 597 } catch (IOException e) { 598 } 599 } 600 601 public boolean hasNext() { 602 findNextPage(); 603 return nextPage != null; 604 } 605 606 public Page next() { 607 findNextPage(); 608 if (nextPage != null) { 609 lastPage = nextPage; 610 nextPage = null; 611 nextId++; 612 return lastPage; 613 } else { 614 throw new NoSuchElementException(); 615 } 616 } 617 618 @SuppressWarnings("unchecked") 619 public void remove() { 620 if (lastPage == null) { 621 throw new IllegalStateException(); 622 } 623 try { 624 free(lastPage); 625 lastPage = null; 626 } catch (IOException e) { 627 new RuntimeException(e); 628 } 629 } 630 }; 631 } 632 633 /////////////////////////////////////////////////////////////////// 634 // Commit / Rollback related methods.. 635 /////////////////////////////////////////////////////////////////// 636 637 /** 638 * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated 639 * with the transaction are written to disk or none will. 640 */ 641 public void commit() throws IOException { 642 if( writeTransactionId!=-1 ) { 643 // Actually do the page writes... 644 pageFile.write(writes.entrySet()); 645 // Release the pages that were freed up in the transaction.. 646 freePages(freeList); 647 648 freeList.clear(); 649 allocateList.clear(); 650 writes.clear(); 651 writeTransactionId = -1; 652 } 653 } 654 655 /** 656 * Rolls back the transaction. 657 */ 658 public void rollback() throws IOException { 659 if( writeTransactionId!=-1 ) { 660 // Release the pages that were allocated in the transaction... 661 freePages(allocateList); 662 663 freeList.clear(); 664 allocateList.clear(); 665 writes.clear(); 666 writeTransactionId = -1; 667 } 668 } 669 670 private long getWriteTransactionId() { 671 if( writeTransactionId==-1 ) { 672 writeTransactionId = pageFile.getNextWriteTransactionId(); 673 } 674 return writeTransactionId; 675 } 676 677 /** 678 * Queues up a page write that should get done when commit() gets called. 679 */ 680 @SuppressWarnings("unchecked") 681 private void write(final Page page, byte[] data) throws IOException { 682 Long key = page.getPageId(); 683 // TODO: if a large update transaction is in progress, we may want to move 684 // all the current updates to a temp file so that we don't keep using 685 // up memory. 686 writes.put(key, new PageWrite(page, data)); 687 } 688 689 /** 690 * @param list 691 * @throws RuntimeException 692 */ 693 private void freePages(SequenceSet list) throws RuntimeException { 694 Sequence seq = list.getHead(); 695 while( seq!=null ) { 696 seq.each(new Sequence.Closure<RuntimeException>(){ 697 public void execute(long value) { 698 pageFile.freePage(value); 699 } 700 }); 701 seq = seq.getNext(); 702 } 703 } 704 705 /** 706 * @return true if there are no uncommitted page file updates associated with this transaction. 707 */ 708 public boolean isReadOnly() { 709 return writeTransactionId==-1; 710 } 711 712 /////////////////////////////////////////////////////////////////// 713 // Transaction closure helpers... 714 /////////////////////////////////////////////////////////////////// 715 716 /** 717 * Executes a closure and if it does not throw any exceptions, then it commits the transaction. 718 * If the closure throws an Exception, then the transaction is rolled back. 719 * 720 * @param <T> 721 * @param closure - the work to get exectued. 722 * @throws T if the closure throws it 723 * @throws IOException If the commit fails. 724 */ 725 public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException { 726 boolean success = false; 727 try { 728 closure.execute(this); 729 success = true; 730 } finally { 731 if (success) { 732 commit(); 733 } else { 734 rollback(); 735 } 736 } 737 } 738 739 /** 740 * Executes a closure and if it does not throw any exceptions, then it commits the transaction. 741 * If the closure throws an Exception, then the transaction is rolled back. 742 * 743 * @param <T> 744 * @param closure - the work to get exectued. 745 * @throws T if the closure throws it 746 * @throws IOException If the commit fails. 747 */ 748 public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException { 749 boolean success = false; 750 try { 751 R rc = closure.execute(this); 752 success = true; 753 return rc; 754 } finally { 755 if (success) { 756 commit(); 757 } else { 758 rollback(); 759 } 760 } 761 } 762 763}