001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.LinkedList;
021import java.util.concurrent.atomic.AtomicLong;
022import javax.jms.JMSException;
023import org.apache.activemq.ActiveMQMessageAudit;
024import org.apache.activemq.broker.Broker;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
027import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
028import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
029import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
030import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
031import org.apache.activemq.command.ConsumerControl;
032import org.apache.activemq.command.ConsumerInfo;
033import org.apache.activemq.command.Message;
034import org.apache.activemq.command.MessageAck;
035import org.apache.activemq.command.MessageDispatch;
036import org.apache.activemq.command.MessageDispatchNotification;
037import org.apache.activemq.command.MessagePull;
038import org.apache.activemq.command.Response;
039import org.apache.activemq.transaction.Synchronization;
040import org.apache.activemq.usage.SystemUsage;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044public class TopicSubscription extends AbstractSubscription {
045
046    private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
047    private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
048    
049    protected PendingMessageCursor matched;
050    protected final SystemUsage usageManager;
051    protected AtomicLong dispatchedCounter = new AtomicLong();
052       
053    boolean singleDestination = true;
054    Destination destination;
055
056    private int maximumPendingMessages = -1;
057    private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
058    private int discarded;
059    private final Object matchedListMutex = new Object();
060    private final AtomicLong enqueueCounter = new AtomicLong(0);
061    private final AtomicLong dequeueCounter = new AtomicLong(0);
062    private int memoryUsageHighWaterMark = 95;
063    // allow duplicate suppression in a ring network of brokers
064    protected int maxProducersToAudit = 1024;
065    protected int maxAuditDepth = 1000;
066    protected boolean enableAudit = false;
067    protected ActiveMQMessageAudit audit;
068    protected boolean active = false;
069
070    public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
071        super(broker, context, info);
072        this.usageManager = usageManager;
073        String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
074        if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {
075            this.matched = new VMPendingMessageCursor(false);
076        } else {
077            this.matched = new FilePendingMessageCursor(broker,matchedName,false);
078        }
079    }
080
081    public void init() throws Exception {
082        this.matched.setSystemUsage(usageManager);
083        this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
084        this.matched.start();
085        if (enableAudit) {
086            audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
087        }
088        this.active=true;
089    }
090
091    public void add(MessageReference node) throws Exception {
092        if (isDuplicate(node)) {
093            return;
094        }
095        enqueueCounter.incrementAndGet();
096        if (!isFull() && matched.isEmpty()  && !isSlave()) {
097            // if maximumPendingMessages is set we will only discard messages which
098            // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
099            dispatch(node);
100            setSlowConsumer(false);
101        } else {
102            //we are slow
103            if(!isSlowConsumer()) {
104                setSlowConsumer(true);
105                for (Destination dest: destinations) {
106                    dest.slowConsumer(getContext(), this);
107                }
108            }
109            if (maximumPendingMessages != 0) {
110                boolean warnedAboutWait = false;
111                while (active) {
112                    synchronized (matchedListMutex) {
113                        while (matched.isFull()) {
114                            if (getContext().getStopping().get()) {
115                                LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: "
116                                        + node.getMessageId());
117                                enqueueCounter.decrementAndGet();
118                                return;
119                            }
120                            if (!warnedAboutWait) {
121                                LOG.info(toString() + ": Pending message cursor [" + matched
122                                        + "] is full, temp usage ("
123                                        + +matched.getSystemUsage().getTempUsage().getPercentUsage()
124                                        + "%) or memory usage ("
125                                        + matched.getSystemUsage().getMemoryUsage().getPercentUsage()
126                                        + "%) limit reached, blocking message add() pending the release of resources.");
127                                warnedAboutWait = true;
128                            }
129                            matchedListMutex.wait(20);
130                        }
131                        //Temporary storage could be full - so just try to add the message
132                        //see https://issues.apache.org/activemq/browse/AMQ-2475
133                        if (matched.tryAddMessageLast(node, 10)) {
134                            break;
135                        }
136                    }
137                }
138                synchronized (matchedListMutex) {
139                    
140                    // NOTE - be careful about the slaveBroker!
141                    if (maximumPendingMessages > 0) {
142                        // calculate the high water mark from which point we
143                        // will eagerly evict expired messages
144                        int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
145                        if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
146                            max = maximumPendingMessages;
147                        }
148                        if (!matched.isEmpty() && matched.size() > max) {
149                            removeExpiredMessages();
150                        }
151                        // lets discard old messages as we are a slow consumer
152                        while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
153                            int pageInSize = matched.size() - maximumPendingMessages;
154                            // only page in a 1000 at a time - else we could
155                            // blow da memory
156                            pageInSize = Math.max(1000, pageInSize);
157                            LinkedList<MessageReference> list = null;
158                            MessageReference[] oldMessages=null;
159                            synchronized(matched){
160                                list = matched.pageInList(pageInSize);
161                                oldMessages = messageEvictionStrategy.evictMessages(list);
162                                for (MessageReference ref : list) {
163                                    ref.decrementReferenceCount();
164                                }
165                            }
166                            int messagesToEvict = 0;
167                            if (oldMessages != null){
168                                    messagesToEvict = oldMessages.length;
169                                    for (int i = 0; i < messagesToEvict; i++) {
170                                        MessageReference oldMessage = oldMessages[i];
171                                        discard(oldMessage);
172                                    }
173                            }
174                            // lets avoid an infinite loop if we are given a bad
175                            // eviction strategy
176                            // for a bad strategy lets just not evict
177                            if (messagesToEvict == 0) {
178                                LOG.warn("No messages to evict returned from eviction strategy: " + messageEvictionStrategy);
179                                break;
180                            }
181                        }
182                    }
183                }
184                dispatchMatched();
185            }
186        }
187    }
188
189    private boolean isDuplicate(MessageReference node) {
190        boolean duplicate = false;
191        if (enableAudit && audit != null) {
192            duplicate = audit.isDuplicate(node);
193            if (LOG.isDebugEnabled()) {
194                if (duplicate) {
195                    LOG.debug("ignoring duplicate add: " + node.getMessageId());
196                }
197            }
198        }
199        return duplicate;
200    }
201
202    /**
203     * Discard any expired messages from the matched list. Called from a
204     * synchronized block.
205     * 
206     * @throws IOException
207     */
208    protected void removeExpiredMessages() throws IOException {
209        try {
210            matched.reset();
211            while (matched.hasNext()) {
212                MessageReference node = matched.next();
213                node.decrementReferenceCount();
214                if (broker.isExpired(node)) {
215                    matched.remove();
216                    dispatchedCounter.incrementAndGet();
217                    node.decrementReferenceCount();
218                    node.getRegionDestination().getDestinationStatistics().getExpired().increment();
219                    broker.messageExpired(getContext(), node, this);
220                    break;
221                }
222            }
223        } finally {
224            matched.release();
225        }
226    }
227
228    public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
229        synchronized (matchedListMutex) {
230            try {
231                matched.reset();
232                while (matched.hasNext()) {
233                    MessageReference node = matched.next();
234                    node.decrementReferenceCount();
235                    if (node.getMessageId().equals(mdn.getMessageId())) {
236                        matched.remove();
237                        dispatchedCounter.incrementAndGet();
238                        node.decrementReferenceCount();
239                        break;
240                    }
241                }
242            } finally {
243                matched.release();
244            }
245        }
246    }
247
248    public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
249        // Handle the standard acknowledgment case.
250        if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
251            if (context.isInTransaction()) {
252                context.getTransaction().addSynchronization(new Synchronization() {
253
254                    @Override
255                    public void afterCommit() throws Exception {
256                       synchronized (TopicSubscription.this) {
257                            if (singleDestination && destination != null) {
258                                destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
259                            }
260                        }
261                        dequeueCounter.addAndGet(ack.getMessageCount());
262                        dispatchMatched();
263                    }
264                });
265            } else {
266                if (singleDestination && destination != null) {
267                    destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
268                    destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
269                }
270                dequeueCounter.addAndGet(ack.getMessageCount());
271            }
272            dispatchMatched();
273            return;
274        } else if (ack.isDeliveredAck()) {
275            // Message was delivered but not acknowledged: update pre-fetch
276            // counters.
277            // also. get these for a consumer expired message.
278            if (destination != null && !ack.isInTransaction()) {
279                destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
280                destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());   
281            }
282            dequeueCounter.addAndGet(ack.getMessageCount());
283            dispatchMatched();
284            return;
285        } else if (ack.isRedeliveredAck()) {
286            // nothing to do atm
287            return;
288        }
289        throw new JMSException("Invalid acknowledgment: " + ack);
290    }
291
292    public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
293        // not supported for topics
294        return null;
295    }
296
297    public int getPendingQueueSize() {
298        return matched();
299    }
300
301    public int getDispatchedQueueSize() {
302        return (int)(dispatchedCounter.get() - dequeueCounter.get());
303    }
304
305    public int getMaximumPendingMessages() {
306        return maximumPendingMessages;
307    }
308
309    public long getDispatchedCounter() {
310        return dispatchedCounter.get();
311    }
312
313    public long getEnqueueCounter() {
314        return enqueueCounter.get();
315    }
316
317    public long getDequeueCounter() {
318        return dequeueCounter.get();
319    }
320
321    /**
322     * @return the number of messages discarded due to being a slow consumer
323     */
324    public int discarded() {
325        synchronized (matchedListMutex) {
326            return discarded;
327        }
328    }
329
330    /**
331     * @return the number of matched messages (messages targeted for the
332     *         subscription but not yet able to be dispatched due to the
333     *         prefetch buffer being full).
334     */
335    public int matched() {
336        synchronized (matchedListMutex) {
337            return matched.size();
338        }
339    }
340
341    /**
342     * Sets the maximum number of pending messages that can be matched against
343     * this consumer before old messages are discarded.
344     */
345    public void setMaximumPendingMessages(int maximumPendingMessages) {
346        this.maximumPendingMessages = maximumPendingMessages;
347    }
348
349    public MessageEvictionStrategy getMessageEvictionStrategy() {
350        return messageEvictionStrategy;
351    }
352
353    /**
354     * Sets the eviction strategy used to decide which message to evict when the
355     * slow consumer needs to discard messages
356     */
357    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
358        this.messageEvictionStrategy = messageEvictionStrategy;
359    }
360
361    public int getMaxProducersToAudit() {
362        return maxProducersToAudit;
363    }
364
365    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
366        this.maxProducersToAudit = maxProducersToAudit;
367        if (audit != null) {
368            audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
369        }
370    }
371
372    public int getMaxAuditDepth() {
373        return maxAuditDepth;
374    }
375    
376    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
377        this.maxAuditDepth = maxAuditDepth;
378        if (audit != null) {
379            audit.setAuditDepth(maxAuditDepth);
380        }
381    }
382    
383    public boolean isEnableAudit() {
384        return enableAudit;
385    }
386
387    public synchronized void setEnableAudit(boolean enableAudit) {
388        this.enableAudit = enableAudit;
389        if (enableAudit && audit==null) {
390            audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
391        }
392    }
393    
394    // Implementation methods
395    // -------------------------------------------------------------------------
396    public boolean isFull() {
397        return getDispatchedQueueSize()  >= info.getPrefetchSize();
398    }
399    
400    public int getInFlightSize() {
401        return getDispatchedQueueSize();
402    }
403    
404    
405    /**
406     * @return true when 60% or more room is left for dispatching messages
407     */
408    public boolean isLowWaterMark() {
409        return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
410    }
411
412    /**
413     * @return true when 10% or less room is left for dispatching messages
414     */
415    public boolean isHighWaterMark() {
416        return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
417    }
418
419    /**
420     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
421     */
422    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
423        this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
424    }
425
426    /**
427     * @return the memoryUsageHighWaterMark
428     */
429    public int getMemoryUsageHighWaterMark() {
430        return this.memoryUsageHighWaterMark;
431    }
432
433    /**
434     * @return the usageManager
435     */
436    public SystemUsage getUsageManager() {
437        return this.usageManager;
438    }
439
440    /**
441     * @return the matched
442     */
443    public PendingMessageCursor getMatched() {
444        return this.matched;
445    }
446
447    /**
448     * @param matched the matched to set
449     */
450    public void setMatched(PendingMessageCursor matched) {
451        this.matched = matched;
452    }
453
454    /**
455     * inform the MessageConsumer on the client to change it's prefetch
456     * 
457     * @param newPrefetch
458     */
459    public void updateConsumerPrefetch(int newPrefetch) {
460        if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
461            ConsumerControl cc = new ConsumerControl();
462            cc.setConsumerId(info.getConsumerId());
463            cc.setPrefetch(newPrefetch);
464            context.getConnection().dispatchAsync(cc);
465        }
466    }
467
468    private void dispatchMatched() throws IOException {       
469        synchronized (matchedListMutex) {
470            if (!matched.isEmpty() && !isFull()) {
471                try {
472                    matched.reset();
473                   
474                    while (matched.hasNext() && !isFull()) {
475                        MessageReference message = matched.next();
476                        message.decrementReferenceCount();
477                        matched.remove();
478                        // Message may have been sitting in the matched list a
479                        // while
480                        // waiting for the consumer to ak the message.
481                        if (message.isExpired()) {
482                            discard(message);
483                            continue; // just drop it.
484                        }
485                        dispatch(message);
486                    }
487                } finally {
488                    matched.release();
489                }
490            }
491        }
492    }
493
494    private void dispatch(final MessageReference node) throws IOException {
495        Message message = (Message)node;
496        node.incrementReferenceCount();
497        // Make sure we can dispatch a message.
498        MessageDispatch md = new MessageDispatch();
499        md.setMessage(message);
500        md.setConsumerId(info.getConsumerId());
501        md.setDestination(node.getRegionDestination().getActiveMQDestination());
502        dispatchedCounter.incrementAndGet();
503        // Keep track if this subscription is receiving messages from a single
504        // destination.
505        if (singleDestination) {
506            if (destination == null) {
507                destination = node.getRegionDestination();
508            } else {
509                if (destination != node.getRegionDestination()) {
510                    singleDestination = false;
511                }
512            }
513        }
514        if (info.isDispatchAsync()) {
515            md.setTransmitCallback(new Runnable() {
516
517                public void run() {
518                    node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
519                    node.getRegionDestination().getDestinationStatistics().getInflight().increment();
520                    node.decrementReferenceCount();
521                }
522            });
523            context.getConnection().dispatchAsync(md);
524        } else {
525            context.getConnection().dispatchSync(md);
526            node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
527            node.getRegionDestination().getDestinationStatistics().getInflight().increment();
528            node.decrementReferenceCount();
529        }
530    }
531
532    private void discard(MessageReference message) {
533        message.decrementReferenceCount();
534        matched.remove(message);
535        discarded++;
536        if(destination != null) {
537            destination.getDestinationStatistics().getDequeues().increment();
538        }
539        if (LOG.isDebugEnabled()) {
540            LOG.debug("Discarding message " + message);
541        }
542        Destination dest = message.getRegionDestination();
543        if (dest != null) {
544            dest.messageDiscarded(getContext(), this, message);
545        }
546        broker.getRoot().sendToDeadLetterQueue(getContext(), message, this);
547    }
548
549    @Override
550    public String toString() {
551        return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
552               + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
553    }
554
555    public void destroy() {
556        this.active=false;
557        synchronized (matchedListMutex) {
558            try {
559                matched.destroy();
560            } catch (Exception e) {
561                LOG.warn("Failed to destroy cursor", e);
562            }
563        }
564        setSlowConsumer(false);
565    }
566
567    @Override
568    public int getPrefetchSize() {
569        return info.getPrefetchSize();
570    }
571
572}