001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.kahadb.journal;
018
019import java.io.File;
020import java.io.FilenameFilter;
021import java.io.IOException;
022import java.io.UnsupportedEncodingException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.LinkedHashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.Timer;
032import java.util.TimerTask;
033import java.util.TreeMap;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.atomic.AtomicLong;
036import java.util.concurrent.atomic.AtomicReference;
037import java.util.zip.Adler32;
038import java.util.zip.Checksum;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
042import org.apache.kahadb.journal.DataFileAppender.WriteKey;
043import org.apache.kahadb.util.ByteSequence;
044import org.apache.kahadb.util.DataByteArrayInputStream;
045import org.apache.kahadb.util.DataByteArrayOutputStream;
046import org.apache.kahadb.util.LinkedNodeList;
047import org.apache.kahadb.util.SchedulerTimerTask;
048import org.apache.kahadb.util.Sequence;
049
050/**
051 * Manages DataFiles
052 * 
053 * 
054 */
055public class Journal {
056
057    private static final int MAX_BATCH_SIZE = 32*1024*1024;
058
059        // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
060    public static final int RECORD_HEAD_SPACE = 4 + 1;
061    
062    public static final byte USER_RECORD_TYPE = 1;
063    public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
064    // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 
065    public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
066    public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
067    public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
068
069    private static byte[] createBatchControlRecordHeader() {
070        try {
071            DataByteArrayOutputStream os = new DataByteArrayOutputStream();
072            os.writeInt(BATCH_CONTROL_RECORD_SIZE);
073            os.writeByte(BATCH_CONTROL_RECORD_TYPE);
074            os.write(BATCH_CONTROL_RECORD_MAGIC);
075            ByteSequence sequence = os.toByteSequence();
076            sequence.compact();
077            return sequence.getData();
078        } catch (IOException e) {
079            throw new RuntimeException("Could not create batch control record header.");
080        }
081    }
082
083    public static final String DEFAULT_DIRECTORY = ".";
084    public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
085    public static final String DEFAULT_FILE_PREFIX = "db-";
086    public static final String DEFAULT_FILE_SUFFIX = ".log";
087    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
088    public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
089    public static final int PREFERED_DIFF = 1024 * 512;
090    public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
091    
092    private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
093
094    protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
095
096    protected File directory = new File(DEFAULT_DIRECTORY);
097    protected File directoryArchive = new File(DEFAULT_ARCHIVE_DIRECTORY);
098    protected String filePrefix = DEFAULT_FILE_PREFIX;
099    protected String fileSuffix = DEFAULT_FILE_SUFFIX;
100    protected boolean started;
101    
102    protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
103    protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
104    protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
105    
106    protected DataFileAppender appender;
107    protected DataFileAccessorPool accessorPool;
108
109    protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
110    protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
111    protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
112
113    protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
114    protected Runnable cleanupTask;
115    protected AtomicLong totalLength = new AtomicLong();
116    protected boolean archiveDataLogs;
117        private ReplicationTarget replicationTarget;
118    protected boolean checksum;
119    protected boolean checkForCorruptionOnStartup;
120    private Timer timer;
121   
122
123    public synchronized void start() throws IOException {
124        if (started) {
125            return;
126        }
127        
128        long start = System.currentTimeMillis();
129        accessorPool = new DataFileAccessorPool(this);
130        started = true;
131        preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
132
133        appender = new DataFileAppender(this);
134
135        File[] files = directory.listFiles(new FilenameFilter() {
136            public boolean accept(File dir, String n) {
137                return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
138            }
139        });
140
141        if (files != null) {
142            for (int i = 0; i < files.length; i++) {
143                try {
144                    File file = files[i];
145                    String n = file.getName();
146                    String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
147                    int num = Integer.parseInt(numStr);
148                    DataFile dataFile = new DataFile(file, num, preferedFileLength);
149                    fileMap.put(dataFile.getDataFileId(), dataFile);
150                    totalLength.addAndGet(dataFile.getLength());
151                } catch (NumberFormatException e) {
152                    // Ignore file that do not match the pattern.
153                }
154            }
155
156            // Sort the list so that we can link the DataFiles together in the
157            // right order.
158            List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
159            Collections.sort(l);
160            for (DataFile df : l) {
161                if (df.getLength() == 0) {
162                    // possibly the result of a previous failed write
163                    LOG.info("ignoring zero length, partially initialised journal data file: " + df);
164                    continue;
165                }
166                dataFiles.addLast(df);
167                fileByFileMap.put(df.getFile(), df);
168
169                if( isCheckForCorruptionOnStartup() ) {
170                    lastAppendLocation.set(recoveryCheck(df));
171                }
172            }
173        }
174
175        getCurrentWriteFile();
176
177        if( lastAppendLocation.get()==null ) {
178            DataFile df = dataFiles.getTail();
179            lastAppendLocation.set(recoveryCheck(df));
180        }
181
182        cleanupTask = new Runnable() {
183            public void run() {
184                cleanup();
185            }
186        };
187        this.timer = new Timer("KahaDB Scheduler", true);
188        TimerTask task = new SchedulerTimerTask(cleanupTask);
189        this.timer.scheduleAtFixedRate(task, DEFAULT_CLEANUP_INTERVAL,DEFAULT_CLEANUP_INTERVAL);
190        long end = System.currentTimeMillis();
191        LOG.trace("Startup took: "+(end-start)+" ms");
192    }
193
194    private static byte[] bytes(String string) {
195        try {
196                        return string.getBytes("UTF-8");
197                } catch (UnsupportedEncodingException e) {
198                        throw new RuntimeException(e);
199                }
200        }
201
202        protected Location recoveryCheck(DataFile dataFile) throws IOException {
203        Location location = new Location();
204        location.setDataFileId(dataFile.getDataFileId());
205        location.setOffset(0);
206
207        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
208        try {
209            while( true ) {
210                int size = checkBatchRecord(reader, location.getOffset());
211                if ( size>=0 ) {
212                    location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
213                } else {
214
215                    // Perhaps it's just some corruption... scan through the file to find the next valid batch record.  We
216                    // may have subsequent valid batch records.
217                    int nextOffset = findNextBatchRecord(reader, location.getOffset()+1);
218                    if( nextOffset >=0 ) {
219                        Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
220                        LOG.info("Corrupt journal records found in '"+dataFile.getFile()+"' between offsets: "+sequence);
221                        dataFile.corruptedBlocks.add(sequence);
222                        location.setOffset(nextOffset);
223                    } else {
224                        break;
225                    }
226                }
227            }
228            
229        } catch (IOException e) {
230                } finally {
231            accessorPool.closeDataFileAccessor(reader);
232        }
233
234        int existingLen = dataFile.getLength();
235        dataFile.setLength(location.getOffset());
236        if (existingLen > dataFile.getLength()) {
237            totalLength.addAndGet(dataFile.getLength() - existingLen);
238        }
239
240        if( !dataFile.corruptedBlocks.isEmpty() ) {
241            // Is the end of the data file corrupted?
242            if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() ) {
243                dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
244            }
245        }
246
247        return location;
248    }
249
250    private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException {
251        ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
252        byte data[] = new byte[1024*4];
253        ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
254
255        int pos = 0;
256        while( true ) {
257            pos = bs.indexOf(header, pos);
258            if( pos >= 0 ) {
259                return offset+pos;
260            } else {
261                // need to load the next data chunck in..
262                if( bs.length != data.length ) {
263                    // If we had a short read then we were at EOF
264                    return -1;
265                }
266                offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length;
267                bs = new ByteSequence(data, 0, reader.read(offset, data));
268                pos=0;
269            }
270        }
271    }
272
273
274    public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
275        byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
276        DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);
277
278        reader.readFully(offset, controlRecord);
279
280        // Assert that it's  a batch record.
281        for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) {
282            if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) {
283                return -1;
284            }
285        }
286
287        int size = controlIs.readInt();
288        if( size > MAX_BATCH_SIZE ) {
289            return -1;
290        }
291
292        if( isChecksum() ) {
293
294            long expectedChecksum = controlIs.readLong();
295            if( expectedChecksum == 0 ) {
296                // Checksuming was not enabled when the record was stored.
297                // we can't validate the record :(
298                return size;
299            }
300
301            byte data[] = new byte[size];
302            reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data);
303
304            Checksum checksum = new Adler32();
305            checksum.update(data, 0, data.length);
306
307            if( expectedChecksum!=checksum.getValue() ) {
308                return -1;
309            }
310
311        }
312        return size;
313    }
314
315
316        void addToTotalLength(int size) {
317                totalLength.addAndGet(size);
318        }
319    
320    
321    synchronized DataFile getCurrentWriteFile() throws IOException {
322        if (dataFiles.isEmpty()) {
323            rotateWriteFile();
324        }
325        return dataFiles.getTail();
326    }
327
328    synchronized DataFile rotateWriteFile() {
329                int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
330                File file = getFile(nextNum);
331                DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
332                // actually allocate the disk space
333                fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
334                fileByFileMap.put(file, nextWriteFile);
335                dataFiles.addLast(nextWriteFile);
336                return nextWriteFile;
337        }
338
339        public File getFile(int nextNum) {
340                String fileName = filePrefix + nextNum + fileSuffix;
341                File file = new File(directory, fileName);
342                return file;
343        }
344
345    synchronized DataFile getDataFile(Location item) throws IOException {
346        Integer key = Integer.valueOf(item.getDataFileId());
347        DataFile dataFile = fileMap.get(key);
348        if (dataFile == null) {
349            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
350            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
351        }
352        return dataFile;
353    }
354
355    synchronized File getFile(Location item) throws IOException {
356        Integer key = Integer.valueOf(item.getDataFileId());
357        DataFile dataFile = fileMap.get(key);
358        if (dataFile == null) {
359            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
360            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
361        }
362        return dataFile.getFile();
363    }
364
365    private DataFile getNextDataFile(DataFile dataFile) {
366        return dataFile.getNext();
367    }
368
369    public synchronized void close() throws IOException {
370        if (!started) {
371            return;
372        }
373        if (this.timer != null) {
374            this.timer.cancel();
375        }
376        accessorPool.close();
377        appender.close();
378        fileMap.clear();
379        fileByFileMap.clear();
380        dataFiles.clear();
381        lastAppendLocation.set(null);
382        started = false;
383    }
384
385    synchronized void cleanup() {
386        if (accessorPool != null) {
387            accessorPool.disposeUnused();
388        }
389    }
390
391    public synchronized boolean delete() throws IOException {
392
393        // Close all open file handles...
394        appender.close();
395        accessorPool.close();
396
397        boolean result = true;
398        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
399            DataFile dataFile = i.next();
400            totalLength.addAndGet(-dataFile.getLength());
401            result &= dataFile.delete();
402        }
403        fileMap.clear();
404        fileByFileMap.clear();
405        lastAppendLocation.set(null);
406        dataFiles = new LinkedNodeList<DataFile>();
407
408        // reopen open file handles...
409        accessorPool = new DataFileAccessorPool(this);
410        appender = new DataFileAppender(this);
411        return result;
412    }
413
414    public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
415        for (Integer key : files) {
416            // Can't remove the data file (or subsequent files) that is currently being written to.
417                if( key >= lastAppendLocation.get().getDataFileId() ) {
418                        continue;
419                }
420            DataFile dataFile = fileMap.get(key);
421            if( dataFile!=null ) {
422                forceRemoveDataFile(dataFile);
423            }
424        }
425    }
426
427    private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
428        accessorPool.disposeDataFileAccessors(dataFile);
429        fileByFileMap.remove(dataFile.getFile());
430        fileMap.remove(dataFile.getDataFileId());
431        totalLength.addAndGet(-dataFile.getLength());
432        dataFile.unlink();
433        if (archiveDataLogs) {
434            dataFile.move(getDirectoryArchive());
435            LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive());
436        } else {
437            if ( dataFile.delete() ) {
438                LOG.debug("Discarded data file " + dataFile);
439            } else {
440                LOG.warn("Failed to discard data file " + dataFile.getFile());
441            }
442        }
443    }
444
445    /**
446     * @return the maxFileLength
447     */
448    public int getMaxFileLength() {
449        return maxFileLength;
450    }
451
452    /**
453     * @param maxFileLength the maxFileLength to set
454     */
455    public void setMaxFileLength(int maxFileLength) {
456        this.maxFileLength = maxFileLength;
457    }
458
459    @Override
460    public String toString() {
461        return directory.toString();
462    }
463
464        public synchronized void appendedExternally(Location loc, int length) throws IOException {
465                DataFile dataFile = null;
466                if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
467                        // It's an update to the current log file..
468                        dataFile = dataFiles.getTail();
469                        dataFile.incrementLength(length);
470                } else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
471                        // It's an update to the next log file.
472            int nextNum = loc.getDataFileId();
473            File file = getFile(nextNum);
474            dataFile = new DataFile(file, nextNum, preferedFileLength);
475            // actually allocate the disk space
476            fileMap.put(dataFile.getDataFileId(), dataFile);
477            fileByFileMap.put(file, dataFile);
478            dataFiles.addLast(dataFile);
479                } else {
480                        throw new IOException("Invalid external append.");
481                }
482        }
483
484    public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
485
486        Location cur = null;
487        while (true) {
488            if (cur == null) {
489                if (location == null) {
490                    DataFile head = dataFiles.getHead();
491                    if( head == null ) {
492                        return null;
493                    }
494                    cur = new Location();
495                    cur.setDataFileId(head.getDataFileId());
496                    cur.setOffset(0);
497                } else {
498                    // Set to the next offset..
499                    if (location.getSize() == -1) {
500                        cur = new Location(location);
501                    } else {
502                        cur = new Location(location);
503                        cur.setOffset(location.getOffset() + location.getSize());
504                    }
505                }
506            } else {
507                cur.setOffset(cur.getOffset() + cur.getSize());
508            }
509
510            DataFile dataFile = getDataFile(cur);
511
512            // Did it go into the next file??
513            if (dataFile.getLength() <= cur.getOffset()) {
514                dataFile = getNextDataFile(dataFile);
515                if (dataFile == null) {
516                    return null;
517                } else {
518                    cur.setDataFileId(dataFile.getDataFileId().intValue());
519                    cur.setOffset(0);
520                }
521            }
522
523            // Load in location size and type.
524            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
525            try {
526                                reader.readLocationDetails(cur);
527            } finally {
528                accessorPool.closeDataFileAccessor(reader);
529            }
530
531            if (cur.getType() == 0) {
532                return null;
533            } else if (cur.getType() == USER_RECORD_TYPE) {
534                // Only return user records.
535                return cur;
536            }
537        }
538    }
539
540    public synchronized Location getNextLocation(File file, Location lastLocation, boolean thisFileOnly) throws IllegalStateException, IOException {
541        DataFile df = fileByFileMap.get(file);
542        return getNextLocation(df, lastLocation, thisFileOnly);
543    }
544
545    public synchronized Location getNextLocation(DataFile dataFile, Location lastLocation, boolean thisFileOnly) throws IOException, IllegalStateException {
546
547        Location cur = null;
548        while (true) {
549            if (cur == null) {
550                if (lastLocation == null) {
551                    DataFile head = dataFile.getHeadNode();
552                    cur = new Location();
553                    cur.setDataFileId(head.getDataFileId());
554                    cur.setOffset(0);
555                } else {
556                    // Set to the next offset..
557                    cur = new Location(lastLocation);
558                    cur.setOffset(cur.getOffset() + cur.getSize());
559                }
560            } else {
561                cur.setOffset(cur.getOffset() + cur.getSize());
562            }
563
564            // Did it go into the next file??
565            if (dataFile.getLength() <= cur.getOffset()) {
566                if (thisFileOnly) {
567                    return null;
568                } else {
569                    dataFile = getNextDataFile(dataFile);
570                    if (dataFile == null) {
571                        return null;
572                    } else {
573                        cur.setDataFileId(dataFile.getDataFileId().intValue());
574                        cur.setOffset(0);
575                    }
576                }
577            }
578
579            // Load in location size and type.
580            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
581            try {
582                reader.readLocationDetails(cur);
583            } finally {
584                accessorPool.closeDataFileAccessor(reader);
585            }
586
587            if (cur.getType() == 0) {
588                return null;
589            } else if (cur.getType() > 0) {
590                // Only return user records.
591                return cur;
592            }
593        }
594    }
595
596    public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
597        DataFile dataFile = getDataFile(location);
598        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
599        ByteSequence rc = null;
600        try {
601            rc = reader.readRecord(location);
602        } finally {
603            accessorPool.closeDataFileAccessor(reader);
604        }
605        return rc;
606    }
607
608    public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
609        Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
610        return loc;
611    }
612
613    public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
614        Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
615        return loc;
616    }
617
618    public void update(Location location, ByteSequence data, boolean sync) throws IOException {
619        DataFile dataFile = getDataFile(location);
620        DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
621        try {
622            updater.updateRecord(location, data, sync);
623        } finally {
624            accessorPool.closeDataFileAccessor(updater);
625        }
626    }
627
628    public File getDirectory() {
629        return directory;
630    }
631
632    public void setDirectory(File directory) {
633        this.directory = directory;
634    }
635
636    public String getFilePrefix() {
637        return filePrefix;
638    }
639
640    public void setFilePrefix(String filePrefix) {
641        this.filePrefix = filePrefix;
642    }
643
644    public Map<WriteKey, WriteCommand> getInflightWrites() {
645        return inflightWrites;
646    }
647
648    public Location getLastAppendLocation() {
649        return lastAppendLocation.get();
650    }
651
652    public void setLastAppendLocation(Location lastSyncedLocation) {
653        this.lastAppendLocation.set(lastSyncedLocation);
654    }
655
656    public File getDirectoryArchive() {
657        return directoryArchive;
658    }
659
660    public void setDirectoryArchive(File directoryArchive) {
661        this.directoryArchive = directoryArchive;
662    }
663
664    public boolean isArchiveDataLogs() {
665        return archiveDataLogs;
666    }
667
668    public void setArchiveDataLogs(boolean archiveDataLogs) {
669        this.archiveDataLogs = archiveDataLogs;
670    }
671
672    synchronized public Integer getCurrentDataFileId() {
673        if (dataFiles.isEmpty())
674            return null;
675        return dataFiles.getTail().getDataFileId();
676    }
677
678    /**
679     * Get a set of files - only valid after start()
680     * 
681     * @return files currently being used
682     */
683    public Set<File> getFiles() {
684        return fileByFileMap.keySet();
685    }
686
687    public synchronized Map<Integer, DataFile> getFileMap() {
688        return new TreeMap<Integer, DataFile>(fileMap);
689    }
690    
691    public long getDiskSize() {
692        long tailLength=0;
693        synchronized( this ) {
694            if( !dataFiles.isEmpty() ) {
695                tailLength = dataFiles.getTail().getLength();
696            }
697        }
698        
699        long rc = totalLength.get();
700        
701        // The last file is actually at a minimum preferedFileLength big.
702        if( tailLength < preferedFileLength ) {
703            rc -= tailLength;
704            rc += preferedFileLength;
705        }
706        return rc;
707    }
708
709        public void setReplicationTarget(ReplicationTarget replicationTarget) {
710                this.replicationTarget = replicationTarget;
711        }
712        public ReplicationTarget getReplicationTarget() {
713                return replicationTarget;
714        }
715
716    public String getFileSuffix() {
717        return fileSuffix;
718    }
719
720    public void setFileSuffix(String fileSuffix) {
721        this.fileSuffix = fileSuffix;
722    }
723
724        public boolean isChecksum() {
725                return checksum;
726        }
727
728        public void setChecksum(boolean checksumWrites) {
729                this.checksum = checksumWrites;
730        }
731
732    public boolean isCheckForCorruptionOnStartup() {
733        return checkForCorruptionOnStartup;
734    }
735
736    public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
737        this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
738    }
739
740    public void setWriteBatchSize(int writeBatchSize) {
741        this.writeBatchSize = writeBatchSize;
742    }
743    
744    public int getWriteBatchSize() {
745        return writeBatchSize;
746    }
747
748    public void setSizeAccumulator(AtomicLong storeSizeAccumulator) {
749       this.totalLength = storeSizeAccumulator;
750    }
751}