001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.LinkedList; 021import java.util.concurrent.atomic.AtomicLong; 022import javax.jms.JMSException; 023import org.apache.activemq.ActiveMQMessageAudit; 024import org.apache.activemq.broker.Broker; 025import org.apache.activemq.broker.ConnectionContext; 026import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; 027import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 028import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 029import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; 030import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; 031import org.apache.activemq.command.ConsumerControl; 032import org.apache.activemq.command.ConsumerInfo; 033import org.apache.activemq.command.Message; 034import org.apache.activemq.command.MessageAck; 035import org.apache.activemq.command.MessageDispatch; 036import org.apache.activemq.command.MessageDispatchNotification; 037import org.apache.activemq.command.MessagePull; 038import org.apache.activemq.command.Response; 039import org.apache.activemq.transaction.Synchronization; 040import org.apache.activemq.usage.SystemUsage; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044public class TopicSubscription extends AbstractSubscription { 045 046 private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class); 047 private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0); 048 049 protected PendingMessageCursor matched; 050 protected final SystemUsage usageManager; 051 protected AtomicLong dispatchedCounter = new AtomicLong(); 052 053 boolean singleDestination = true; 054 Destination destination; 055 056 private int maximumPendingMessages = -1; 057 private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy(); 058 private int discarded; 059 private final Object matchedListMutex = new Object(); 060 private final AtomicLong enqueueCounter = new AtomicLong(0); 061 private final AtomicLong dequeueCounter = new AtomicLong(0); 062 private int memoryUsageHighWaterMark = 95; 063 // allow duplicate suppression in a ring network of brokers 064 protected int maxProducersToAudit = 1024; 065 protected int maxAuditDepth = 1000; 066 protected boolean enableAudit = false; 067 protected ActiveMQMessageAudit audit; 068 protected boolean active = false; 069 070 public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception { 071 super(broker, context, info); 072 this.usageManager = usageManager; 073 String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]"; 074 if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) { 075 this.matched = new VMPendingMessageCursor(false); 076 } else { 077 this.matched = new FilePendingMessageCursor(broker,matchedName,false); 078 } 079 } 080 081 public void init() throws Exception { 082 this.matched.setSystemUsage(usageManager); 083 this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 084 this.matched.start(); 085 if (enableAudit) { 086 audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit); 087 } 088 this.active=true; 089 } 090 091 public void add(MessageReference node) throws Exception { 092 if (isDuplicate(node)) { 093 return; 094 } 095 enqueueCounter.incrementAndGet(); 096 if (!isFull() && matched.isEmpty() && !isSlave()) { 097 // if maximumPendingMessages is set we will only discard messages which 098 // have not been dispatched (i.e. we allow the prefetch buffer to be filled) 099 dispatch(node); 100 setSlowConsumer(false); 101 } else { 102 //we are slow 103 if(!isSlowConsumer()) { 104 setSlowConsumer(true); 105 for (Destination dest: destinations) { 106 dest.slowConsumer(getContext(), this); 107 } 108 } 109 if (maximumPendingMessages != 0) { 110 boolean warnedAboutWait = false; 111 while (active) { 112 synchronized (matchedListMutex) { 113 while (matched.isFull()) { 114 if (getContext().getStopping().get()) { 115 LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: " 116 + node.getMessageId()); 117 enqueueCounter.decrementAndGet(); 118 return; 119 } 120 if (!warnedAboutWait) { 121 LOG.info(toString() + ": Pending message cursor [" + matched 122 + "] is full, temp usage (" 123 + +matched.getSystemUsage().getTempUsage().getPercentUsage() 124 + "%) or memory usage (" 125 + matched.getSystemUsage().getMemoryUsage().getPercentUsage() 126 + "%) limit reached, blocking message add() pending the release of resources."); 127 warnedAboutWait = true; 128 } 129 matchedListMutex.wait(20); 130 } 131 //Temporary storage could be full - so just try to add the message 132 //see https://issues.apache.org/activemq/browse/AMQ-2475 133 if (matched.tryAddMessageLast(node, 10)) { 134 break; 135 } 136 } 137 } 138 synchronized (matchedListMutex) { 139 140 // NOTE - be careful about the slaveBroker! 141 if (maximumPendingMessages > 0) { 142 // calculate the high water mark from which point we 143 // will eagerly evict expired messages 144 int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark(); 145 if (maximumPendingMessages > 0 && maximumPendingMessages < max) { 146 max = maximumPendingMessages; 147 } 148 if (!matched.isEmpty() && matched.size() > max) { 149 removeExpiredMessages(); 150 } 151 // lets discard old messages as we are a slow consumer 152 while (!matched.isEmpty() && matched.size() > maximumPendingMessages) { 153 int pageInSize = matched.size() - maximumPendingMessages; 154 // only page in a 1000 at a time - else we could 155 // blow da memory 156 pageInSize = Math.max(1000, pageInSize); 157 LinkedList<MessageReference> list = null; 158 MessageReference[] oldMessages=null; 159 synchronized(matched){ 160 list = matched.pageInList(pageInSize); 161 oldMessages = messageEvictionStrategy.evictMessages(list); 162 for (MessageReference ref : list) { 163 ref.decrementReferenceCount(); 164 } 165 } 166 int messagesToEvict = 0; 167 if (oldMessages != null){ 168 messagesToEvict = oldMessages.length; 169 for (int i = 0; i < messagesToEvict; i++) { 170 MessageReference oldMessage = oldMessages[i]; 171 discard(oldMessage); 172 } 173 } 174 // lets avoid an infinite loop if we are given a bad 175 // eviction strategy 176 // for a bad strategy lets just not evict 177 if (messagesToEvict == 0) { 178 LOG.warn("No messages to evict returned from eviction strategy: " + messageEvictionStrategy); 179 break; 180 } 181 } 182 } 183 } 184 dispatchMatched(); 185 } 186 } 187 } 188 189 private boolean isDuplicate(MessageReference node) { 190 boolean duplicate = false; 191 if (enableAudit && audit != null) { 192 duplicate = audit.isDuplicate(node); 193 if (LOG.isDebugEnabled()) { 194 if (duplicate) { 195 LOG.debug("ignoring duplicate add: " + node.getMessageId()); 196 } 197 } 198 } 199 return duplicate; 200 } 201 202 /** 203 * Discard any expired messages from the matched list. Called from a 204 * synchronized block. 205 * 206 * @throws IOException 207 */ 208 protected void removeExpiredMessages() throws IOException { 209 try { 210 matched.reset(); 211 while (matched.hasNext()) { 212 MessageReference node = matched.next(); 213 node.decrementReferenceCount(); 214 if (broker.isExpired(node)) { 215 matched.remove(); 216 dispatchedCounter.incrementAndGet(); 217 node.decrementReferenceCount(); 218 node.getRegionDestination().getDestinationStatistics().getExpired().increment(); 219 broker.messageExpired(getContext(), node, this); 220 break; 221 } 222 } 223 } finally { 224 matched.release(); 225 } 226 } 227 228 public void processMessageDispatchNotification(MessageDispatchNotification mdn) { 229 synchronized (matchedListMutex) { 230 try { 231 matched.reset(); 232 while (matched.hasNext()) { 233 MessageReference node = matched.next(); 234 node.decrementReferenceCount(); 235 if (node.getMessageId().equals(mdn.getMessageId())) { 236 matched.remove(); 237 dispatchedCounter.incrementAndGet(); 238 node.decrementReferenceCount(); 239 break; 240 } 241 } 242 } finally { 243 matched.release(); 244 } 245 } 246 } 247 248 public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { 249 // Handle the standard acknowledgment case. 250 if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) { 251 if (context.isInTransaction()) { 252 context.getTransaction().addSynchronization(new Synchronization() { 253 254 @Override 255 public void afterCommit() throws Exception { 256 synchronized (TopicSubscription.this) { 257 if (singleDestination && destination != null) { 258 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); 259 } 260 } 261 dequeueCounter.addAndGet(ack.getMessageCount()); 262 dispatchMatched(); 263 } 264 }); 265 } else { 266 if (singleDestination && destination != null) { 267 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); 268 destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); 269 } 270 dequeueCounter.addAndGet(ack.getMessageCount()); 271 } 272 dispatchMatched(); 273 return; 274 } else if (ack.isDeliveredAck()) { 275 // Message was delivered but not acknowledged: update pre-fetch 276 // counters. 277 // also. get these for a consumer expired message. 278 if (destination != null && !ack.isInTransaction()) { 279 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); 280 destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); 281 } 282 dequeueCounter.addAndGet(ack.getMessageCount()); 283 dispatchMatched(); 284 return; 285 } else if (ack.isRedeliveredAck()) { 286 // nothing to do atm 287 return; 288 } 289 throw new JMSException("Invalid acknowledgment: " + ack); 290 } 291 292 public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { 293 // not supported for topics 294 return null; 295 } 296 297 public int getPendingQueueSize() { 298 return matched(); 299 } 300 301 public int getDispatchedQueueSize() { 302 return (int)(dispatchedCounter.get() - dequeueCounter.get()); 303 } 304 305 public int getMaximumPendingMessages() { 306 return maximumPendingMessages; 307 } 308 309 public long getDispatchedCounter() { 310 return dispatchedCounter.get(); 311 } 312 313 public long getEnqueueCounter() { 314 return enqueueCounter.get(); 315 } 316 317 public long getDequeueCounter() { 318 return dequeueCounter.get(); 319 } 320 321 /** 322 * @return the number of messages discarded due to being a slow consumer 323 */ 324 public int discarded() { 325 synchronized (matchedListMutex) { 326 return discarded; 327 } 328 } 329 330 /** 331 * @return the number of matched messages (messages targeted for the 332 * subscription but not yet able to be dispatched due to the 333 * prefetch buffer being full). 334 */ 335 public int matched() { 336 synchronized (matchedListMutex) { 337 return matched.size(); 338 } 339 } 340 341 /** 342 * Sets the maximum number of pending messages that can be matched against 343 * this consumer before old messages are discarded. 344 */ 345 public void setMaximumPendingMessages(int maximumPendingMessages) { 346 this.maximumPendingMessages = maximumPendingMessages; 347 } 348 349 public MessageEvictionStrategy getMessageEvictionStrategy() { 350 return messageEvictionStrategy; 351 } 352 353 /** 354 * Sets the eviction strategy used to decide which message to evict when the 355 * slow consumer needs to discard messages 356 */ 357 public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) { 358 this.messageEvictionStrategy = messageEvictionStrategy; 359 } 360 361 public int getMaxProducersToAudit() { 362 return maxProducersToAudit; 363 } 364 365 public synchronized void setMaxProducersToAudit(int maxProducersToAudit) { 366 this.maxProducersToAudit = maxProducersToAudit; 367 if (audit != null) { 368 audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit); 369 } 370 } 371 372 public int getMaxAuditDepth() { 373 return maxAuditDepth; 374 } 375 376 public synchronized void setMaxAuditDepth(int maxAuditDepth) { 377 this.maxAuditDepth = maxAuditDepth; 378 if (audit != null) { 379 audit.setAuditDepth(maxAuditDepth); 380 } 381 } 382 383 public boolean isEnableAudit() { 384 return enableAudit; 385 } 386 387 public synchronized void setEnableAudit(boolean enableAudit) { 388 this.enableAudit = enableAudit; 389 if (enableAudit && audit==null) { 390 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 391 } 392 } 393 394 // Implementation methods 395 // ------------------------------------------------------------------------- 396 public boolean isFull() { 397 return getDispatchedQueueSize() >= info.getPrefetchSize(); 398 } 399 400 public int getInFlightSize() { 401 return getDispatchedQueueSize(); 402 } 403 404 405 /** 406 * @return true when 60% or more room is left for dispatching messages 407 */ 408 public boolean isLowWaterMark() { 409 return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4); 410 } 411 412 /** 413 * @return true when 10% or less room is left for dispatching messages 414 */ 415 public boolean isHighWaterMark() { 416 return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9); 417 } 418 419 /** 420 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set 421 */ 422 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 423 this.memoryUsageHighWaterMark = memoryUsageHighWaterMark; 424 } 425 426 /** 427 * @return the memoryUsageHighWaterMark 428 */ 429 public int getMemoryUsageHighWaterMark() { 430 return this.memoryUsageHighWaterMark; 431 } 432 433 /** 434 * @return the usageManager 435 */ 436 public SystemUsage getUsageManager() { 437 return this.usageManager; 438 } 439 440 /** 441 * @return the matched 442 */ 443 public PendingMessageCursor getMatched() { 444 return this.matched; 445 } 446 447 /** 448 * @param matched the matched to set 449 */ 450 public void setMatched(PendingMessageCursor matched) { 451 this.matched = matched; 452 } 453 454 /** 455 * inform the MessageConsumer on the client to change it's prefetch 456 * 457 * @param newPrefetch 458 */ 459 public void updateConsumerPrefetch(int newPrefetch) { 460 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { 461 ConsumerControl cc = new ConsumerControl(); 462 cc.setConsumerId(info.getConsumerId()); 463 cc.setPrefetch(newPrefetch); 464 context.getConnection().dispatchAsync(cc); 465 } 466 } 467 468 private void dispatchMatched() throws IOException { 469 synchronized (matchedListMutex) { 470 if (!matched.isEmpty() && !isFull()) { 471 try { 472 matched.reset(); 473 474 while (matched.hasNext() && !isFull()) { 475 MessageReference message = matched.next(); 476 message.decrementReferenceCount(); 477 matched.remove(); 478 // Message may have been sitting in the matched list a 479 // while 480 // waiting for the consumer to ak the message. 481 if (message.isExpired()) { 482 discard(message); 483 continue; // just drop it. 484 } 485 dispatch(message); 486 } 487 } finally { 488 matched.release(); 489 } 490 } 491 } 492 } 493 494 private void dispatch(final MessageReference node) throws IOException { 495 Message message = (Message)node; 496 node.incrementReferenceCount(); 497 // Make sure we can dispatch a message. 498 MessageDispatch md = new MessageDispatch(); 499 md.setMessage(message); 500 md.setConsumerId(info.getConsumerId()); 501 md.setDestination(node.getRegionDestination().getActiveMQDestination()); 502 dispatchedCounter.incrementAndGet(); 503 // Keep track if this subscription is receiving messages from a single 504 // destination. 505 if (singleDestination) { 506 if (destination == null) { 507 destination = node.getRegionDestination(); 508 } else { 509 if (destination != node.getRegionDestination()) { 510 singleDestination = false; 511 } 512 } 513 } 514 if (info.isDispatchAsync()) { 515 md.setTransmitCallback(new Runnable() { 516 517 public void run() { 518 node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); 519 node.getRegionDestination().getDestinationStatistics().getInflight().increment(); 520 node.decrementReferenceCount(); 521 } 522 }); 523 context.getConnection().dispatchAsync(md); 524 } else { 525 context.getConnection().dispatchSync(md); 526 node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); 527 node.getRegionDestination().getDestinationStatistics().getInflight().increment(); 528 node.decrementReferenceCount(); 529 } 530 } 531 532 private void discard(MessageReference message) { 533 message.decrementReferenceCount(); 534 matched.remove(message); 535 discarded++; 536 if(destination != null) { 537 destination.getDestinationStatistics().getDequeues().increment(); 538 } 539 if (LOG.isDebugEnabled()) { 540 LOG.debug("Discarding message " + message); 541 } 542 Destination dest = message.getRegionDestination(); 543 if (dest != null) { 544 dest.messageDiscarded(getContext(), this, message); 545 } 546 broker.getRoot().sendToDeadLetterQueue(getContext(), message, this); 547 } 548 549 @Override 550 public String toString() { 551 return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered=" 552 + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded(); 553 } 554 555 public void destroy() { 556 this.active=false; 557 synchronized (matchedListMutex) { 558 try { 559 matched.destroy(); 560 } catch (Exception e) { 561 LOG.warn("Failed to destroy cursor", e); 562 } 563 } 564 setSlowConsumer(false); 565 } 566 567 @Override 568 public int getPrefetchSize() { 569 return info.getPrefetchSize(); 570 } 571 572}