001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.kahadb.util;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.NoSuchElementException;
025
026/**
027 * Keeps track of a added long values. Collapses ranges of numbers using a
028 * Sequence representation. Use to keep track of received message ids to find
029 * out if a message is duplicate or if there are any missing messages.
030 * 
031 * @author chirino
032 */
033public class SequenceSet extends LinkedNodeList<Sequence> {
034
035    public static class Marshaller implements org.apache.kahadb.util.Marshaller<SequenceSet> {
036
037        public static final Marshaller INSTANCE = new Marshaller();
038        
039        public SequenceSet readPayload(DataInput in) throws IOException {
040            SequenceSet value = new SequenceSet();
041            int count = in.readInt();
042            for (int i = 0; i < count; i++) {
043                if( in.readBoolean() ) {
044                    Sequence sequence = new Sequence(in.readLong(), in.readLong());
045                    value.addLast(sequence);
046                } else {
047                    Sequence sequence = new Sequence(in.readLong());
048                    value.addLast(sequence);
049                }
050            }
051            return value;
052        }
053
054        public void writePayload(SequenceSet value, DataOutput out) throws IOException {
055            out.writeInt(value.size());
056            Sequence sequence = value.getHead();
057            while (sequence != null ) {
058                if( sequence.range() > 1 ) {
059                    out.writeBoolean(true);
060                    out.writeLong(sequence.first);
061                    out.writeLong(sequence.last);
062                } else {
063                    out.writeBoolean(false);
064                    out.writeLong(sequence.first);
065                }
066                sequence = sequence.getNext();
067            }
068        }
069
070        public int getFixedSize() {
071            return -1;
072        }
073
074        public SequenceSet deepCopy(SequenceSet value) {
075            SequenceSet rc = new SequenceSet();
076            Sequence sequence = value.getHead();
077            while (sequence != null ) {
078                rc.add(new Sequence(sequence.first, sequence.last));
079                sequence = sequence.getNext();
080            }
081            return rc;
082        }
083
084        public boolean isDeepCopySupported() {
085            return true;
086        }
087    }
088    
089    public void add(Sequence value) {
090        // TODO we can probably optimize this a bit
091        for(long i=value.first; i<value.last+1; i++) {
092            add(i);
093        }
094    }
095    
096    
097    /**
098     * 
099     * @param value
100     *            the value to add to the list
101     * @return false if the value was a duplicate.
102     */
103    public boolean add(long value) {
104
105        if (isEmpty()) {
106            addFirst(new Sequence(value));
107            return true;
108        }
109
110        Sequence sequence = getHead();
111        while (sequence != null) {
112
113            if (sequence.isAdjacentToLast(value)) {
114                // grow the sequence...
115                sequence.last = value;
116                // it might connect us to the next sequence..
117                if (sequence.getNext() != null) {
118                    Sequence next = sequence.getNext();
119                    if (next.isAdjacentToFirst(value)) {
120                        // Yep the sequence connected.. so join them.
121                        sequence.last = next.last;
122                        next.unlink();
123                    }
124                }
125                return true;
126            }
127
128            if (sequence.isAdjacentToFirst(value)) {
129                // grow the sequence...
130                sequence.first = value;
131
132                // it might connect us to the previous
133                if (sequence.getPrevious() != null) {
134                    Sequence prev = sequence.getPrevious();
135                    if (prev.isAdjacentToLast(value)) {
136                        // Yep the sequence connected.. so join them.
137                        sequence.first = prev.first;
138                        prev.unlink();
139                    }
140                }
141                return true;
142            }
143
144            // Did that value land before this sequence?
145            if (value < sequence.first) {
146                // Then insert a new entry before this sequence item.
147                sequence.linkBefore(new Sequence(value));
148                return true;
149            }
150
151            // Did that value land within the sequence? The it's a duplicate.
152            if (sequence.contains(value)) {
153                return false;
154            }
155
156            sequence = sequence.getNext();
157        }
158
159        // Then the value is getting appended to the tail of the sequence.
160        addLast(new Sequence(value));
161        return true;
162    }
163    
164    /**
165     * Removes and returns the first element from this list.
166     *
167     * @return the first element from this list.
168     * @throws NoSuchElementException if this list is empty.
169     */
170    public long removeFirst() {
171        if (isEmpty()) {
172            throw new NoSuchElementException();
173        }
174        
175        Sequence rc = removeFirstSequence(1);
176        return rc.first;
177    }
178
179
180    public Sequence removeLastSequence() {
181        if (isEmpty()) {
182            return null;
183        }
184
185        Sequence rc = getTail();
186        rc.unlink();
187        return rc;
188    }
189
190    /**
191     * Removes and returns the first sequence that is count range large.
192     *
193     * @return a sequence that is count range large, or null if no sequence is that large in the list.
194     */
195    public Sequence removeFirstSequence(long count) {
196        if (isEmpty()) {
197            return null;
198        }
199        
200        Sequence sequence = getHead();
201        while (sequence != null ) {
202            if (sequence.range() == count ) {
203                sequence.unlink();
204                return sequence;
205            }
206            if (sequence.range() > count ) {
207                Sequence rc = new Sequence(sequence.first, sequence.first+count);
208                sequence.first+=count;
209                return rc;
210            }
211            sequence = sequence.getNext();
212        }
213        return null;
214    }
215
216
217    /**
218     * @return all the id Sequences that are missing from this set that are not
219     *         in between the range provided.
220     */
221    public List<Sequence> getMissing(long first, long last) {
222        ArrayList<Sequence> rc = new ArrayList<Sequence>();
223        if (first > last) {
224            throw new IllegalArgumentException("First cannot be more than last");
225        }
226        if (isEmpty()) {
227            // We are missing all the messages.
228            rc.add(new Sequence(first, last));
229            return rc;
230        }
231
232        Sequence sequence = getHead();
233        while (sequence != null && first <= last) {
234            if (sequence.contains(first)) {
235                first = sequence.last + 1;
236            } else {
237                if (first < sequence.first) {
238                    if (last < sequence.first) {
239                        rc.add(new Sequence(first, last));
240                        return rc;
241                    } else {
242                        rc.add(new Sequence(first, sequence.first - 1));
243                        first = sequence.last + 1;
244                    }
245                }
246            }
247            sequence = sequence.getNext();
248        }
249
250        if (first <= last) {
251            rc.add(new Sequence(first, last));
252        }
253        return rc;
254    }
255
256    /**
257     * @return all the Sequence that are in this list
258     */
259    public List<Sequence> getReceived() {
260        ArrayList<Sequence> rc = new ArrayList<Sequence>(size());
261        Sequence sequence = getHead();
262        while (sequence != null) {
263            rc.add(new Sequence(sequence.first, sequence.last));
264            sequence = sequence.getNext();
265        }
266        return rc;
267    }
268
269    public boolean contains(int first, int last) {
270        if (isEmpty()) {
271            return false;
272        }
273        Sequence sequence = getHead();
274        while (sequence != null) {
275            if (sequence.first <= first && first <= sequence.last ) {
276                return last <= sequence.last ;
277            }
278            sequence = sequence.getNext();
279        }
280        return false;
281    }
282
283}