001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.io.IOException; 020import java.util.Iterator; 021import java.util.LinkedList; 022import java.util.concurrent.atomic.AtomicBoolean; 023import java.util.concurrent.atomic.AtomicLong; 024import org.apache.activemq.broker.Broker; 025import org.apache.activemq.broker.ConnectionContext; 026import org.apache.activemq.broker.region.Destination; 027import org.apache.activemq.broker.region.MessageReference; 028import org.apache.activemq.broker.region.QueueMessageReference; 029import org.apache.activemq.command.Message; 030import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 031import org.apache.activemq.openwire.OpenWireFormat; 032import org.apache.activemq.store.kahadb.plist.PList; 033import org.apache.activemq.store.kahadb.plist.PListEntry; 034import org.apache.activemq.store.kahadb.plist.PListStore; 035import org.apache.activemq.usage.SystemUsage; 036import org.apache.activemq.usage.Usage; 037import org.apache.activemq.usage.UsageListener; 038import org.apache.activemq.wireformat.WireFormat; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041import org.apache.kahadb.util.ByteSequence; 042 043/** 044 * persist pending messages pending message (messages awaiting dispatch to a 045 * consumer) cursor 046 * 047 * 048 */ 049public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener { 050 static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class); 051 private static final AtomicLong NAME_COUNT = new AtomicLong(); 052 protected Broker broker; 053 private final PListStore store; 054 private final String name; 055 private LinkedList<MessageReference> memoryList = new LinkedList<MessageReference>(); 056 private PList diskList; 057 private Iterator<MessageReference> iter; 058 private Destination regionDestination; 059 private boolean iterating; 060 private boolean flushRequired; 061 private final AtomicBoolean started = new AtomicBoolean(); 062 private final WireFormat wireFormat = new OpenWireFormat(); 063 /** 064 * @param broker 065 * @param name 066 * @param prioritizedMessages 067 */ 068 public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) { 069 super(prioritizedMessages); 070 this.broker = broker; 071 // the store can be null if the BrokerService has persistence 072 // turned off 073 this.store = broker.getTempDataStore(); 074 this.name = NAME_COUNT.incrementAndGet() + "_" + name; 075 } 076 077 @Override 078 public void start() throws Exception { 079 if (started.compareAndSet(false, true)) { 080 super.start(); 081 if (systemUsage != null) { 082 systemUsage.getMemoryUsage().addUsageListener(this); 083 } 084 } 085 } 086 087 @Override 088 public void stop() throws Exception { 089 if (started.compareAndSet(true, false)) { 090 super.stop(); 091 if (systemUsage != null) { 092 systemUsage.getMemoryUsage().removeUsageListener(this); 093 } 094 } 095 } 096 097 /** 098 * @return true if there are no pending messages 099 */ 100 @Override 101 public synchronized boolean isEmpty() { 102 if (memoryList.isEmpty() && isDiskListEmpty()) { 103 return true; 104 } 105 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 106 MessageReference node = iterator.next(); 107 if (node == QueueMessageReference.NULL_MESSAGE) { 108 continue; 109 } 110 if (!node.isDropped()) { 111 return false; 112 } 113 // We can remove dropped references. 114 iterator.remove(); 115 } 116 return isDiskListEmpty(); 117 } 118 119 /** 120 * reset the cursor 121 */ 122 @Override 123 public synchronized void reset() { 124 iterating = true; 125 last = null; 126 if (isDiskListEmpty()) { 127 this.iter = this.memoryList.iterator(); 128 } else { 129 this.iter = new DiskIterator(); 130 } 131 } 132 133 @Override 134 public synchronized void release() { 135 iterating = false; 136 if (flushRequired) { 137 flushRequired = false; 138 flushToDisk(); 139 } 140 } 141 142 @Override 143 public synchronized void destroy() throws Exception { 144 stop(); 145 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) { 146 Message node = (Message) i.next(); 147 node.decrementReferenceCount(); 148 } 149 memoryList.clear(); 150 destroyDiskList(); 151 } 152 153 private void destroyDiskList() throws Exception { 154 if (!isDiskListEmpty()) { 155 store.removePList(name); 156 } 157 } 158 159 @Override 160 public synchronized LinkedList<MessageReference> pageInList(int maxItems) { 161 LinkedList<MessageReference> result = new LinkedList<MessageReference>(); 162 int count = 0; 163 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) { 164 MessageReference ref = i.next(); 165 ref.incrementReferenceCount(); 166 result.add(ref); 167 count++; 168 } 169 if (count < maxItems && !isDiskListEmpty()) { 170 for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) { 171 Message message = (Message) i.next(); 172 message.setRegionDestination(regionDestination); 173 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 174 message.incrementReferenceCount(); 175 result.add(message); 176 count++; 177 } 178 } 179 return result; 180 } 181 182 /** 183 * add message to await dispatch 184 * 185 * @param node 186 * @throws Exception 187 */ 188 @Override 189 public synchronized void addMessageLast(MessageReference node) throws Exception { 190 tryAddMessageLast(node, 0); 191 } 192 193 @Override 194 public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception { 195 if (!node.isExpired()) { 196 try { 197 regionDestination = node.getMessage().getRegionDestination(); 198 if (isDiskListEmpty()) { 199 if (hasSpace() || this.store == null) { 200 memoryList.add(node); 201 node.incrementReferenceCount(); 202 setCacheEnabled(true); 203 return true; 204 } 205 } 206 if (!hasSpace()) { 207 if (isDiskListEmpty()) { 208 expireOldMessages(); 209 if (hasSpace()) { 210 memoryList.add(node); 211 node.incrementReferenceCount(); 212 return true; 213 } else { 214 flushToDisk(); 215 } 216 } 217 } 218 if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) { 219 ByteSequence bs = getByteSequence(node.getMessage()); 220 getDiskList().addLast(node.getMessageId().toString(), bs); 221 return true; 222 } 223 return false; 224 225 } catch (Exception e) { 226 LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e); 227 throw new RuntimeException(e); 228 } 229 } else { 230 discard(node); 231 } 232 //message expired 233 return true; 234 } 235 236 /** 237 * add message to await dispatch 238 * 239 * @param node 240 */ 241 @Override 242 public synchronized void addMessageFirst(MessageReference node) { 243 if (!node.isExpired()) { 244 try { 245 regionDestination = node.getMessage().getRegionDestination(); 246 if (isDiskListEmpty()) { 247 if (hasSpace()) { 248 memoryList.addFirst(node); 249 node.incrementReferenceCount(); 250 setCacheEnabled(true); 251 return; 252 } 253 } 254 if (!hasSpace()) { 255 if (isDiskListEmpty()) { 256 expireOldMessages(); 257 if (hasSpace()) { 258 memoryList.addFirst(node); 259 node.incrementReferenceCount(); 260 return; 261 } else { 262 flushToDisk(); 263 } 264 } 265 } 266 systemUsage.getTempUsage().waitForSpace(); 267 node.decrementReferenceCount(); 268 ByteSequence bs = getByteSequence(node.getMessage()); 269 getDiskList().addFirst(node.getMessageId().toString(), bs); 270 271 } catch (Exception e) { 272 LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e); 273 throw new RuntimeException(e); 274 } 275 } else { 276 discard(node); 277 } 278 } 279 280 /** 281 * @return true if there pending messages to dispatch 282 */ 283 @Override 284 public synchronized boolean hasNext() { 285 return iter.hasNext(); 286 } 287 288 /** 289 * @return the next pending message 290 */ 291 @Override 292 public synchronized MessageReference next() { 293 Message message = (Message) iter.next(); 294 last = message; 295 if (!isDiskListEmpty()) { 296 // got from disk 297 message.setRegionDestination(regionDestination); 298 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 299 } 300 message.incrementReferenceCount(); 301 return message; 302 } 303 304 /** 305 * remove the message at the cursor position 306 */ 307 @Override 308 public synchronized void remove() { 309 iter.remove(); 310 if (last != null) { 311 last.decrementReferenceCount(); 312 } 313 } 314 315 /** 316 * @param node 317 * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference) 318 */ 319 @Override 320 public synchronized void remove(MessageReference node) { 321 if (memoryList.remove(node)) { 322 node.decrementReferenceCount(); 323 } 324 if (!isDiskListEmpty()) { 325 try { 326 getDiskList().remove(node.getMessageId().toString()); 327 } catch (IOException e) { 328 throw new RuntimeException(e); 329 } 330 } 331 } 332 333 /** 334 * @return the number of pending messages 335 */ 336 @Override 337 public synchronized int size() { 338 return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size()); 339 } 340 341 /** 342 * clear all pending messages 343 */ 344 @Override 345 public synchronized void clear() { 346 memoryList.clear(); 347 if (!isDiskListEmpty()) { 348 try { 349 getDiskList().destroy(); 350 } catch (IOException e) { 351 throw new RuntimeException(e); 352 } 353 } 354 last = null; 355 } 356 357 @Override 358 public synchronized boolean isFull() { 359 360 return super.isFull() || (systemUsage != null && systemUsage.getTempUsage().isFull()); 361 362 } 363 364 @Override 365 public boolean hasMessagesBufferedToDeliver() { 366 return !isEmpty(); 367 } 368 369 @Override 370 public void setSystemUsage(SystemUsage usageManager) { 371 super.setSystemUsage(usageManager); 372 } 373 374 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 375 if (newPercentUsage >= getMemoryUsageHighWaterMark()) { 376 synchronized (this) { 377 flushRequired = true; 378 if (!iterating) { 379 expireOldMessages(); 380 if (!hasSpace()) { 381 flushToDisk(); 382 flushRequired = false; 383 } 384 } 385 } 386 } 387 } 388 389 @Override 390 public boolean isTransient() { 391 return true; 392 } 393 394 protected boolean isSpaceInMemoryList() { 395 return hasSpace() && isDiskListEmpty(); 396 } 397 398 protected synchronized void expireOldMessages() { 399 if (!memoryList.isEmpty()) { 400 LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList); 401 this.memoryList = new LinkedList<MessageReference>(); 402 while (!tmpList.isEmpty()) { 403 MessageReference node = tmpList.removeFirst(); 404 if (node.isExpired()) { 405 discard(node); 406 } else { 407 memoryList.add(node); 408 } 409 } 410 } 411 412 } 413 414 protected synchronized void flushToDisk() { 415 416 if (!memoryList.isEmpty()) { 417 while (!memoryList.isEmpty()) { 418 MessageReference node = memoryList.removeFirst(); 419 node.decrementReferenceCount(); 420 ByteSequence bs; 421 try { 422 bs = getByteSequence(node.getMessage()); 423 getDiskList().addLast(node.getMessageId().toString(), bs); 424 } catch (IOException e) { 425 LOG.error("Failed to write to disk list", e); 426 throw new RuntimeException(e); 427 } 428 429 } 430 memoryList.clear(); 431 setCacheEnabled(false); 432 } 433 } 434 435 protected boolean isDiskListEmpty() { 436 return diskList == null || diskList.isEmpty(); 437 } 438 439 protected PList getDiskList() { 440 if (diskList == null) { 441 try { 442 diskList = store.getPList(name); 443 } catch (Exception e) { 444 LOG.error("Caught an IO Exception getting the DiskList " + name, e); 445 throw new RuntimeException(e); 446 } 447 } 448 return diskList; 449 } 450 451 protected void discard(MessageReference message) { 452 message.decrementReferenceCount(); 453 if (LOG.isDebugEnabled()) { 454 LOG.debug("Discarding message " + message); 455 } 456 ConnectionContext ctx = new ConnectionContext(new NonCachedMessageEvaluationContext()); 457 ctx.setBroker(broker); 458 broker.getRoot().sendToDeadLetterQueue(ctx, message, null); 459 } 460 461 protected ByteSequence getByteSequence(Message message) throws IOException { 462 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 463 return new ByteSequence(packet.data, packet.offset, packet.length); 464 } 465 466 protected Message getMessage(ByteSequence bs) throws IOException { 467 org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs 468 .getOffset(), bs.getLength()); 469 return (Message) this.wireFormat.unmarshal(packet); 470 471 } 472 473 final class DiskIterator implements Iterator<MessageReference> { 474 private PListEntry next = null; 475 private PListEntry current = null; 476 PList list; 477 478 DiskIterator() { 479 try { 480 this.list = getDiskList(); 481 synchronized (this.list) { 482 this.current = this.list.getFirst(); 483 this.next = this.current; 484 } 485 } catch (Exception e) { 486 throw new RuntimeException(e); 487 } 488 } 489 490 public boolean hasNext() { 491 return this.next != null; 492 } 493 494 public MessageReference next() { 495 this.current = next; 496 try { 497 ByteSequence bs = this.current.getByteSequence(); 498 synchronized (this.list) { 499 this.current = this.list.refresh(this.current); 500 this.next = this.list.getNext(this.current); 501 } 502 return getMessage(bs); 503 } catch (IOException e) { 504 LOG.error("I/O error", e); 505 throw new RuntimeException(e); 506 } 507 } 508 509 public void remove() { 510 try { 511 synchronized (this.list) { 512 this.current = this.list.refresh(this.current); 513 this.list.remove(this.current); 514 } 515 516 } catch (IOException e) { 517 LOG.error("I/O error", e); 518 throw new RuntimeException(e); 519 } 520 521 } 522 523 } 524}