001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.plist;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.util.concurrent.atomic.AtomicBoolean;
023import java.util.concurrent.atomic.AtomicReference;
024import org.apache.activemq.store.kahadb.plist.EntryLocation.EntryLocationMarshaller;
025import org.apache.kahadb.journal.Location;
026import org.apache.kahadb.page.Page;
027import org.apache.kahadb.page.Transaction;
028import org.apache.kahadb.util.ByteSequence;
029
030public class PList {
031    final PListStore store;
032    private String name;
033    private long rootId = EntryLocation.NOT_SET;
034    private long lastId = EntryLocation.NOT_SET;
035    private final AtomicBoolean loaded = new AtomicBoolean();
036    private int size = 0;
037    Object indexLock;
038
039    PList(PListStore store) {
040        this.store = store;
041        this.indexLock = store.getIndexLock();
042    }
043
044    public void setName(String name) {
045        this.name = name;
046    }
047
048    /*
049     * (non-Javadoc)
050     * @see org.apache.activemq.beanstalk.JobScheduler#getName()
051     */
052    public String getName() {
053        return this.name;
054    }
055
056    public synchronized int size() {
057        return this.size;
058    }
059
060    public synchronized boolean isEmpty() {
061        return size == 0;
062    }
063
064    /**
065     * @return the rootId
066     */
067    public long getRootId() {
068        return this.rootId;
069    }
070
071    /**
072     * @param rootId
073     *            the rootId to set
074     */
075    public void setRootId(long rootId) {
076        this.rootId = rootId;
077    }
078
079    /**
080     * @return the lastId
081     */
082    public long getLastId() {
083        return this.lastId;
084    }
085
086    /**
087     * @param lastId
088     *            the lastId to set
089     */
090    public void setLastId(long lastId) {
091        this.lastId = lastId;
092    }
093
094    /**
095     * @return the loaded
096     */
097    public boolean isLoaded() {
098        return this.loaded.get();
099    }
100
101    void read(DataInput in) throws IOException {
102        this.rootId = in.readLong();
103        this.name = in.readUTF();
104    }
105
106    public void write(DataOutput out) throws IOException {
107        out.writeLong(this.rootId);
108        out.writeUTF(name);
109    }
110
111    public synchronized void destroy() throws IOException {
112        synchronized (indexLock) {
113            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
114                public void execute(Transaction tx) throws IOException {
115                    destroy(tx);
116                }
117            });
118        }
119    }
120
121    void destroy(Transaction tx) throws IOException {
122        // start from the first
123        EntryLocation entry = getFirst(tx);
124        while (entry != null) {
125            EntryLocation toRemove = entry.copy();
126            entry = getNext(tx, entry.getNext());
127            doRemove(tx, toRemove);
128        }
129    }
130
131    synchronized void load(Transaction tx) throws IOException {
132        if (loaded.compareAndSet(false, true)) {
133            final Page<EntryLocation> p = tx.load(this.rootId, null);
134            if (p.getType() == Page.PAGE_FREE_TYPE) {
135                // Need to initialize it..
136                EntryLocation root = createEntry(p, "root", EntryLocation.NOT_SET, EntryLocation.NOT_SET);
137
138                storeEntry(tx, root);
139                this.lastId = root.getPage().getPageId();
140            } else {
141                // find last id
142                long nextId = this.rootId;
143                while (nextId != EntryLocation.NOT_SET) {
144                    EntryLocation next = getNext(tx, nextId);
145                    if (next != null) {
146                        this.lastId = next.getPage().getPageId();
147                        nextId = next.getNext();
148                        this.size++;
149                    }
150                }
151            }
152        }
153    }
154
155    synchronized public void unload() {
156        if (loaded.compareAndSet(true, false)) {
157            this.rootId = EntryLocation.NOT_SET;
158            this.lastId = EntryLocation.NOT_SET;
159            this.size=0;
160        }
161    }
162
163    synchronized public void addLast(final String id, final ByteSequence bs) throws IOException {
164        final Location location = this.store.write(bs, false);
165        synchronized (indexLock) {
166            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
167                public void execute(Transaction tx) throws IOException {
168                    addLast(tx, id, bs, location);
169                }
170            });
171        }
172    }
173
174    private void addLast(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
175        EntryLocation entry = createEntry(tx, id, this.lastId, EntryLocation.NOT_SET);
176        entry.setLocation(location);
177        storeEntry(tx, entry);
178        this.store.incrementJournalCount(tx, location);
179
180        EntryLocation last = loadEntry(tx, this.lastId);
181        last.setNext(entry.getPage().getPageId());
182        storeEntry(tx, last);
183        this.lastId = entry.getPage().getPageId();
184        this.size++;
185    }
186
187    synchronized public void addFirst(final String id, final ByteSequence bs) throws IOException {
188        final Location location = this.store.write(bs, false);
189        synchronized (indexLock) {
190            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
191                public void execute(Transaction tx) throws IOException {
192                    addFirst(tx, id, bs, location);
193                }
194            });
195        }
196    }
197
198    private void addFirst(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
199        EntryLocation entry = createEntry(tx, id, EntryLocation.NOT_SET, EntryLocation.NOT_SET);
200        entry.setLocation(location);
201        EntryLocation oldFirst = getFirst(tx);
202        if (oldFirst != null) {
203            oldFirst.setPrev(entry.getPage().getPageId());
204            storeEntry(tx, oldFirst);
205            entry.setNext(oldFirst.getPage().getPageId());
206
207        }
208        EntryLocation root = getRoot(tx);
209        root.setNext(entry.getPage().getPageId());
210        storeEntry(tx, root);
211        storeEntry(tx, entry);
212
213        this.store.incrementJournalCount(tx, location);
214        this.size++;
215    }
216
217    synchronized public boolean remove(final String id) throws IOException {
218        final AtomicBoolean result = new AtomicBoolean();
219        synchronized (indexLock) {
220            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
221                public void execute(Transaction tx) throws IOException {
222                    result.set(remove(tx, id));
223                }
224            });
225        }
226        return result.get();
227    }
228
229    synchronized public boolean remove(final int position) throws IOException {
230        final AtomicBoolean result = new AtomicBoolean();
231        synchronized (indexLock) {
232            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
233                public void execute(Transaction tx) throws IOException {
234                    result.set(remove(tx, position));
235                }
236            });
237        }
238        return result.get();
239    }
240
241    synchronized public boolean remove(final PListEntry entry) throws IOException {
242        final AtomicBoolean result = new AtomicBoolean();
243        synchronized (indexLock) {
244            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
245                public void execute(Transaction tx) throws IOException {
246                    result.set(doRemove(tx, entry.getEntry()));
247                }
248            });
249        }
250        return result.get();
251    }
252
253    synchronized public PListEntry get(final int position) throws IOException {
254        PListEntry result = null;
255        final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
256        synchronized (indexLock) {
257            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
258                public void execute(Transaction tx) throws IOException {
259                    ref.set(get(tx, position));
260                }
261            });
262        }
263        if (ref.get() != null) {
264            ByteSequence bs = this.store.getPayload(ref.get().getLocation());
265            result = new PListEntry(ref.get(), bs);
266        }
267        return result;
268    }
269
270    synchronized public PListEntry getFirst() throws IOException {
271        PListEntry result = null;
272        final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
273        synchronized (indexLock) {
274            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
275                public void execute(Transaction tx) throws IOException {
276                    ref.set(getFirst(tx));
277                }
278            });
279            if (ref.get() != null) {
280                ByteSequence bs = this.store.getPayload(ref.get().getLocation());
281                result = new PListEntry(ref.get(), bs);
282            }
283        }
284        return result;
285    }
286
287    synchronized public PListEntry getLast() throws IOException {
288        PListEntry result = null;
289        final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
290        synchronized (indexLock) {
291            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
292                public void execute(Transaction tx) throws IOException {
293                    ref.set(getLast(tx));
294                }
295            });
296            if (ref.get() != null) {
297                ByteSequence bs = this.store.getPayload(ref.get().getLocation());
298                result = new PListEntry(ref.get(), bs);
299            }
300        }
301        return result;
302    }
303
304    synchronized public PListEntry getNext(PListEntry entry) throws IOException {
305        PListEntry result = null;
306        final long nextId = entry != null ? entry.getEntry().getNext() : this.rootId;
307        if (nextId != EntryLocation.NOT_SET) {
308            final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
309            synchronized (indexLock) {
310                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
311                    public void execute(Transaction tx) throws IOException {
312                        ref.set(getNext(tx, nextId));
313                    }
314                });
315                if (ref.get() != null) {
316                    ByteSequence bs = this.store.getPayload(ref.get().getLocation());
317                    result = new PListEntry(ref.get(), bs);
318                }
319            }
320        }
321        return result;
322    }
323
324    synchronized public PListEntry refresh(final PListEntry entry) throws IOException {
325        PListEntry result = null;
326        final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
327        synchronized (indexLock) {
328            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
329                public void execute(Transaction tx) throws IOException {
330                    ref.set(loadEntry(tx, entry.getEntry().getPage().getPageId()));
331                }
332            });
333            if (ref.get() != null) {
334                result = new PListEntry(ref.get(), entry.getByteSequence());
335            }
336        }
337        return result;
338    }
339
340    boolean remove(Transaction tx, String id) throws IOException {
341        boolean result = false;
342        long nextId = this.rootId;
343        while (nextId != EntryLocation.NOT_SET) {
344            EntryLocation entry = getNext(tx, nextId);
345            if (entry != null) {
346                if (entry.getId().equals(id)) {
347                    result = doRemove(tx, entry);
348                    break;
349                }
350                nextId = entry.getNext();
351            } else {
352                // not found
353                break;
354            }
355        }
356        return result;
357    }
358
359    boolean remove(Transaction tx, int position) throws IOException {
360        boolean result = false;
361        long nextId = this.rootId;
362        int count = 0;
363        while (nextId != EntryLocation.NOT_SET) {
364            EntryLocation entry = getNext(tx, nextId);
365            if (entry != null) {
366                if (count == position) {
367                    result = doRemove(tx, entry);
368                    break;
369                }
370                nextId = entry.getNext();
371            } else {
372                // not found
373                break;
374            }
375            count++;
376        }
377        return result;
378    }
379
380    EntryLocation get(Transaction tx, int position) throws IOException {
381        EntryLocation result = null;
382        long nextId = this.rootId;
383        int count = -1;
384        while (nextId != EntryLocation.NOT_SET) {
385            EntryLocation entry = getNext(tx, nextId);
386            if (entry != null) {
387                if (count == position) {
388                    result = entry;
389                    break;
390                }
391                nextId = entry.getNext();
392            } else {
393                break;
394            }
395            count++;
396        }
397        return result;
398    }
399
400    EntryLocation getFirst(Transaction tx) throws IOException {
401        long offset = getRoot(tx).getNext();
402        if (offset != EntryLocation.NOT_SET) {
403            return loadEntry(tx, offset);
404        }
405        return null;
406    }
407
408    EntryLocation getLast(Transaction tx) throws IOException {
409        if (this.lastId != EntryLocation.NOT_SET) {
410            return loadEntry(tx, this.lastId);
411        }
412        return null;
413    }
414
415    private boolean doRemove(Transaction tx, EntryLocation entry) throws IOException {
416        boolean result = false;
417        if (entry != null) {
418
419            EntryLocation prev = getPrevious(tx, entry.getPrev());
420            EntryLocation next = getNext(tx, entry.getNext());
421            long prevId = prev != null ? prev.getPage().getPageId() : this.rootId;
422            long nextId = next != null ? next.getPage().getPageId() : EntryLocation.NOT_SET;
423
424            if (next != null) {
425                next.setPrev(prevId);
426                storeEntry(tx, next);
427            } else {
428                // we are deleting the last one in the list
429                this.lastId = prevId;
430            }
431            if (prev != null) {
432                prev.setNext(nextId);
433                storeEntry(tx, prev);
434            }
435
436            this.store.decrementJournalCount(tx, entry.getLocation());
437            entry.reset();
438            storeEntry(tx, entry);
439            tx.free(entry.getPage().getPageId());
440            result = true;
441            this.size--;
442        }
443        return result;
444    }
445
446    private EntryLocation createEntry(Transaction tx, String id, long previous, long next) throws IOException {
447        Page<EntryLocation> p = tx.allocate();
448        EntryLocation result = new EntryLocation();
449        result.setPage(p);
450        p.set(result);
451        result.setId(id);
452        result.setPrev(previous);
453        result.setNext(next);
454        return result;
455    }
456
457    private EntryLocation createEntry(Page<EntryLocation> p, String id, long previous, long next) throws IOException {
458        EntryLocation result = new EntryLocation();
459        result.setPage(p);
460        p.set(result);
461        result.setId(id);
462        result.setPrev(previous);
463        result.setNext(next);
464        return result;
465    }
466
467    EntryLocation loadEntry(Transaction tx, long pageId) throws IOException {
468        Page<EntryLocation> page = tx.load(pageId, EntryLocationMarshaller.INSTANCE);
469        EntryLocation entry = page.get();
470        if (entry != null) {
471            entry.setPage(page);
472        }
473        return entry;
474    }
475    
476    private void storeEntry(Transaction tx, EntryLocation entry) throws IOException {
477        tx.store(entry.getPage(), EntryLocationMarshaller.INSTANCE, true);
478    }
479
480    EntryLocation getNext(Transaction tx, long next) throws IOException {
481        EntryLocation result = null;
482        if (next != EntryLocation.NOT_SET) {
483            result = loadEntry(tx, next);
484        }
485        return result;
486    }
487
488    private EntryLocation getPrevious(Transaction tx, long previous) throws IOException {
489        EntryLocation result = null;
490        if (previous != EntryLocation.NOT_SET) {
491            result = loadEntry(tx, previous);
492        }
493        return result;
494    }
495
496    private EntryLocation getRoot(Transaction tx) throws IOException {
497        EntryLocation result = loadEntry(tx, this.rootId);
498        return result;
499    }
500
501    ByteSequence getPayload(EntryLocation entry) throws IOException {
502        return this.store.getPayload(entry.getLocation());
503    }
504}