001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.plist; 018 019import java.io.DataInput; 020import java.io.DataOutput; 021import java.io.IOException; 022import java.util.concurrent.atomic.AtomicBoolean; 023import java.util.concurrent.atomic.AtomicReference; 024import org.apache.activemq.store.kahadb.plist.EntryLocation.EntryLocationMarshaller; 025import org.apache.kahadb.journal.Location; 026import org.apache.kahadb.page.Page; 027import org.apache.kahadb.page.Transaction; 028import org.apache.kahadb.util.ByteSequence; 029 030public class PList { 031 final PListStore store; 032 private String name; 033 private long rootId = EntryLocation.NOT_SET; 034 private long lastId = EntryLocation.NOT_SET; 035 private final AtomicBoolean loaded = new AtomicBoolean(); 036 private int size = 0; 037 Object indexLock; 038 039 PList(PListStore store) { 040 this.store = store; 041 this.indexLock = store.getIndexLock(); 042 } 043 044 public void setName(String name) { 045 this.name = name; 046 } 047 048 /* 049 * (non-Javadoc) 050 * @see org.apache.activemq.beanstalk.JobScheduler#getName() 051 */ 052 public String getName() { 053 return this.name; 054 } 055 056 public synchronized int size() { 057 return this.size; 058 } 059 060 public synchronized boolean isEmpty() { 061 return size == 0; 062 } 063 064 /** 065 * @return the rootId 066 */ 067 public long getRootId() { 068 return this.rootId; 069 } 070 071 /** 072 * @param rootId 073 * the rootId to set 074 */ 075 public void setRootId(long rootId) { 076 this.rootId = rootId; 077 } 078 079 /** 080 * @return the lastId 081 */ 082 public long getLastId() { 083 return this.lastId; 084 } 085 086 /** 087 * @param lastId 088 * the lastId to set 089 */ 090 public void setLastId(long lastId) { 091 this.lastId = lastId; 092 } 093 094 /** 095 * @return the loaded 096 */ 097 public boolean isLoaded() { 098 return this.loaded.get(); 099 } 100 101 void read(DataInput in) throws IOException { 102 this.rootId = in.readLong(); 103 this.name = in.readUTF(); 104 } 105 106 public void write(DataOutput out) throws IOException { 107 out.writeLong(this.rootId); 108 out.writeUTF(name); 109 } 110 111 public synchronized void destroy() throws IOException { 112 synchronized (indexLock) { 113 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 114 public void execute(Transaction tx) throws IOException { 115 destroy(tx); 116 } 117 }); 118 } 119 } 120 121 void destroy(Transaction tx) throws IOException { 122 // start from the first 123 EntryLocation entry = getFirst(tx); 124 while (entry != null) { 125 EntryLocation toRemove = entry.copy(); 126 entry = getNext(tx, entry.getNext()); 127 doRemove(tx, toRemove); 128 } 129 } 130 131 synchronized void load(Transaction tx) throws IOException { 132 if (loaded.compareAndSet(false, true)) { 133 final Page<EntryLocation> p = tx.load(this.rootId, null); 134 if (p.getType() == Page.PAGE_FREE_TYPE) { 135 // Need to initialize it.. 136 EntryLocation root = createEntry(p, "root", EntryLocation.NOT_SET, EntryLocation.NOT_SET); 137 138 storeEntry(tx, root); 139 this.lastId = root.getPage().getPageId(); 140 } else { 141 // find last id 142 long nextId = this.rootId; 143 while (nextId != EntryLocation.NOT_SET) { 144 EntryLocation next = getNext(tx, nextId); 145 if (next != null) { 146 this.lastId = next.getPage().getPageId(); 147 nextId = next.getNext(); 148 this.size++; 149 } 150 } 151 } 152 } 153 } 154 155 synchronized public void unload() { 156 if (loaded.compareAndSet(true, false)) { 157 this.rootId = EntryLocation.NOT_SET; 158 this.lastId = EntryLocation.NOT_SET; 159 this.size=0; 160 } 161 } 162 163 synchronized public void addLast(final String id, final ByteSequence bs) throws IOException { 164 final Location location = this.store.write(bs, false); 165 synchronized (indexLock) { 166 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 167 public void execute(Transaction tx) throws IOException { 168 addLast(tx, id, bs, location); 169 } 170 }); 171 } 172 } 173 174 private void addLast(Transaction tx, String id, ByteSequence bs, Location location) throws IOException { 175 EntryLocation entry = createEntry(tx, id, this.lastId, EntryLocation.NOT_SET); 176 entry.setLocation(location); 177 storeEntry(tx, entry); 178 this.store.incrementJournalCount(tx, location); 179 180 EntryLocation last = loadEntry(tx, this.lastId); 181 last.setNext(entry.getPage().getPageId()); 182 storeEntry(tx, last); 183 this.lastId = entry.getPage().getPageId(); 184 this.size++; 185 } 186 187 synchronized public void addFirst(final String id, final ByteSequence bs) throws IOException { 188 final Location location = this.store.write(bs, false); 189 synchronized (indexLock) { 190 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 191 public void execute(Transaction tx) throws IOException { 192 addFirst(tx, id, bs, location); 193 } 194 }); 195 } 196 } 197 198 private void addFirst(Transaction tx, String id, ByteSequence bs, Location location) throws IOException { 199 EntryLocation entry = createEntry(tx, id, EntryLocation.NOT_SET, EntryLocation.NOT_SET); 200 entry.setLocation(location); 201 EntryLocation oldFirst = getFirst(tx); 202 if (oldFirst != null) { 203 oldFirst.setPrev(entry.getPage().getPageId()); 204 storeEntry(tx, oldFirst); 205 entry.setNext(oldFirst.getPage().getPageId()); 206 207 } 208 EntryLocation root = getRoot(tx); 209 root.setNext(entry.getPage().getPageId()); 210 storeEntry(tx, root); 211 storeEntry(tx, entry); 212 213 this.store.incrementJournalCount(tx, location); 214 this.size++; 215 } 216 217 synchronized public boolean remove(final String id) throws IOException { 218 final AtomicBoolean result = new AtomicBoolean(); 219 synchronized (indexLock) { 220 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 221 public void execute(Transaction tx) throws IOException { 222 result.set(remove(tx, id)); 223 } 224 }); 225 } 226 return result.get(); 227 } 228 229 synchronized public boolean remove(final int position) throws IOException { 230 final AtomicBoolean result = new AtomicBoolean(); 231 synchronized (indexLock) { 232 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 233 public void execute(Transaction tx) throws IOException { 234 result.set(remove(tx, position)); 235 } 236 }); 237 } 238 return result.get(); 239 } 240 241 synchronized public boolean remove(final PListEntry entry) throws IOException { 242 final AtomicBoolean result = new AtomicBoolean(); 243 synchronized (indexLock) { 244 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 245 public void execute(Transaction tx) throws IOException { 246 result.set(doRemove(tx, entry.getEntry())); 247 } 248 }); 249 } 250 return result.get(); 251 } 252 253 synchronized public PListEntry get(final int position) throws IOException { 254 PListEntry result = null; 255 final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>(); 256 synchronized (indexLock) { 257 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 258 public void execute(Transaction tx) throws IOException { 259 ref.set(get(tx, position)); 260 } 261 }); 262 } 263 if (ref.get() != null) { 264 ByteSequence bs = this.store.getPayload(ref.get().getLocation()); 265 result = new PListEntry(ref.get(), bs); 266 } 267 return result; 268 } 269 270 synchronized public PListEntry getFirst() throws IOException { 271 PListEntry result = null; 272 final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>(); 273 synchronized (indexLock) { 274 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 275 public void execute(Transaction tx) throws IOException { 276 ref.set(getFirst(tx)); 277 } 278 }); 279 if (ref.get() != null) { 280 ByteSequence bs = this.store.getPayload(ref.get().getLocation()); 281 result = new PListEntry(ref.get(), bs); 282 } 283 } 284 return result; 285 } 286 287 synchronized public PListEntry getLast() throws IOException { 288 PListEntry result = null; 289 final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>(); 290 synchronized (indexLock) { 291 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 292 public void execute(Transaction tx) throws IOException { 293 ref.set(getLast(tx)); 294 } 295 }); 296 if (ref.get() != null) { 297 ByteSequence bs = this.store.getPayload(ref.get().getLocation()); 298 result = new PListEntry(ref.get(), bs); 299 } 300 } 301 return result; 302 } 303 304 synchronized public PListEntry getNext(PListEntry entry) throws IOException { 305 PListEntry result = null; 306 final long nextId = entry != null ? entry.getEntry().getNext() : this.rootId; 307 if (nextId != EntryLocation.NOT_SET) { 308 final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>(); 309 synchronized (indexLock) { 310 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 311 public void execute(Transaction tx) throws IOException { 312 ref.set(getNext(tx, nextId)); 313 } 314 }); 315 if (ref.get() != null) { 316 ByteSequence bs = this.store.getPayload(ref.get().getLocation()); 317 result = new PListEntry(ref.get(), bs); 318 } 319 } 320 } 321 return result; 322 } 323 324 synchronized public PListEntry refresh(final PListEntry entry) throws IOException { 325 PListEntry result = null; 326 final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>(); 327 synchronized (indexLock) { 328 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 329 public void execute(Transaction tx) throws IOException { 330 ref.set(loadEntry(tx, entry.getEntry().getPage().getPageId())); 331 } 332 }); 333 if (ref.get() != null) { 334 result = new PListEntry(ref.get(), entry.getByteSequence()); 335 } 336 } 337 return result; 338 } 339 340 boolean remove(Transaction tx, String id) throws IOException { 341 boolean result = false; 342 long nextId = this.rootId; 343 while (nextId != EntryLocation.NOT_SET) { 344 EntryLocation entry = getNext(tx, nextId); 345 if (entry != null) { 346 if (entry.getId().equals(id)) { 347 result = doRemove(tx, entry); 348 break; 349 } 350 nextId = entry.getNext(); 351 } else { 352 // not found 353 break; 354 } 355 } 356 return result; 357 } 358 359 boolean remove(Transaction tx, int position) throws IOException { 360 boolean result = false; 361 long nextId = this.rootId; 362 int count = 0; 363 while (nextId != EntryLocation.NOT_SET) { 364 EntryLocation entry = getNext(tx, nextId); 365 if (entry != null) { 366 if (count == position) { 367 result = doRemove(tx, entry); 368 break; 369 } 370 nextId = entry.getNext(); 371 } else { 372 // not found 373 break; 374 } 375 count++; 376 } 377 return result; 378 } 379 380 EntryLocation get(Transaction tx, int position) throws IOException { 381 EntryLocation result = null; 382 long nextId = this.rootId; 383 int count = -1; 384 while (nextId != EntryLocation.NOT_SET) { 385 EntryLocation entry = getNext(tx, nextId); 386 if (entry != null) { 387 if (count == position) { 388 result = entry; 389 break; 390 } 391 nextId = entry.getNext(); 392 } else { 393 break; 394 } 395 count++; 396 } 397 return result; 398 } 399 400 EntryLocation getFirst(Transaction tx) throws IOException { 401 long offset = getRoot(tx).getNext(); 402 if (offset != EntryLocation.NOT_SET) { 403 return loadEntry(tx, offset); 404 } 405 return null; 406 } 407 408 EntryLocation getLast(Transaction tx) throws IOException { 409 if (this.lastId != EntryLocation.NOT_SET) { 410 return loadEntry(tx, this.lastId); 411 } 412 return null; 413 } 414 415 private boolean doRemove(Transaction tx, EntryLocation entry) throws IOException { 416 boolean result = false; 417 if (entry != null) { 418 419 EntryLocation prev = getPrevious(tx, entry.getPrev()); 420 EntryLocation next = getNext(tx, entry.getNext()); 421 long prevId = prev != null ? prev.getPage().getPageId() : this.rootId; 422 long nextId = next != null ? next.getPage().getPageId() : EntryLocation.NOT_SET; 423 424 if (next != null) { 425 next.setPrev(prevId); 426 storeEntry(tx, next); 427 } else { 428 // we are deleting the last one in the list 429 this.lastId = prevId; 430 } 431 if (prev != null) { 432 prev.setNext(nextId); 433 storeEntry(tx, prev); 434 } 435 436 this.store.decrementJournalCount(tx, entry.getLocation()); 437 entry.reset(); 438 storeEntry(tx, entry); 439 tx.free(entry.getPage().getPageId()); 440 result = true; 441 this.size--; 442 } 443 return result; 444 } 445 446 private EntryLocation createEntry(Transaction tx, String id, long previous, long next) throws IOException { 447 Page<EntryLocation> p = tx.allocate(); 448 EntryLocation result = new EntryLocation(); 449 result.setPage(p); 450 p.set(result); 451 result.setId(id); 452 result.setPrev(previous); 453 result.setNext(next); 454 return result; 455 } 456 457 private EntryLocation createEntry(Page<EntryLocation> p, String id, long previous, long next) throws IOException { 458 EntryLocation result = new EntryLocation(); 459 result.setPage(p); 460 p.set(result); 461 result.setId(id); 462 result.setPrev(previous); 463 result.setNext(next); 464 return result; 465 } 466 467 EntryLocation loadEntry(Transaction tx, long pageId) throws IOException { 468 Page<EntryLocation> page = tx.load(pageId, EntryLocationMarshaller.INSTANCE); 469 EntryLocation entry = page.get(); 470 if (entry != null) { 471 entry.setPage(page); 472 } 473 return entry; 474 } 475 476 private void storeEntry(Transaction tx, EntryLocation entry) throws IOException { 477 tx.store(entry.getPage(), EntryLocationMarshaller.INSTANCE, true); 478 } 479 480 EntryLocation getNext(Transaction tx, long next) throws IOException { 481 EntryLocation result = null; 482 if (next != EntryLocation.NOT_SET) { 483 result = loadEntry(tx, next); 484 } 485 return result; 486 } 487 488 private EntryLocation getPrevious(Transaction tx, long previous) throws IOException { 489 EntryLocation result = null; 490 if (previous != EntryLocation.NOT_SET) { 491 result = loadEntry(tx, previous); 492 } 493 return result; 494 } 495 496 private EntryLocation getRoot(Transaction tx) throws IOException { 497 EntryLocation result = loadEntry(tx, this.rootId); 498 return result; 499 } 500 501 ByteSequence getPayload(EntryLocation entry) throws IOException { 502 return this.store.getPayload(entry.getLocation()); 503 } 504}