001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.Set;
024import java.util.concurrent.CancellationException;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.CopyOnWriteArrayList;
027import java.util.concurrent.CopyOnWriteArraySet;
028import java.util.concurrent.Future;
029import org.apache.activemq.broker.BrokerService;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.broker.ProducerBrokerExchange;
032import org.apache.activemq.broker.region.policy.DispatchPolicy;
033import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
034import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
035import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
036import org.apache.activemq.command.ActiveMQDestination;
037import org.apache.activemq.command.ExceptionResponse;
038import org.apache.activemq.command.Message;
039import org.apache.activemq.command.MessageAck;
040import org.apache.activemq.command.MessageId;
041import org.apache.activemq.command.ProducerAck;
042import org.apache.activemq.command.ProducerInfo;
043import org.apache.activemq.command.Response;
044import org.apache.activemq.command.SubscriptionInfo;
045import org.apache.activemq.filter.MessageEvaluationContext;
046import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
047import org.apache.activemq.store.MessageRecoveryListener;
048import org.apache.activemq.store.TopicMessageStore;
049import org.apache.activemq.thread.Task;
050import org.apache.activemq.thread.TaskRunner;
051import org.apache.activemq.thread.TaskRunnerFactory;
052import org.apache.activemq.thread.Valve;
053import org.apache.activemq.transaction.Synchronization;
054import org.apache.activemq.util.SubscriptionKey;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058/**
059 * The Topic is a destination that sends a copy of a message to every active
060 * Subscription registered.
061 * 
062 * 
063 */
064public class Topic extends BaseDestination implements Task {
065    protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
066    private final TopicMessageStore topicStore;
067    protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
068    protected final Valve dispatchValve = new Valve(true);
069    private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
070    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
071    private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
072    private final TaskRunner taskRunner;
073    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
074    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
075        public void run() {
076            try {
077                Topic.this.taskRunner.wakeup();
078            } catch (InterruptedException e) {
079            }
080        };
081    };
082
083    public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
084            DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
085        super(brokerService, store, destination, parentStats);
086        this.topicStore = store;
087        // set default subscription recovery policy
088        subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
089        this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
090    }
091
092    @Override
093    public void initialize() throws Exception {
094        super.initialize();
095        if (store != null) {
096            // AMQ-2586: Better to leave this stat at zero than to give the user
097            // misleading metrics.
098            // int messageCount = store.getMessageCount();
099            // destinationStatistics.getMessages().setCount(messageCount);
100        }
101    }
102
103    public List<Subscription> getConsumers() {
104        synchronized (consumers) {
105            return new ArrayList<Subscription>(consumers);
106        }
107    }
108
109    public boolean lock(MessageReference node, LockOwner sub) {
110        return true;
111    }
112
113    public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
114
115       super.addSubscription(context, sub);
116
117        if (!sub.getConsumerInfo().isDurable()) {
118
119            // Do a retroactive recovery if needed.
120            if (sub.getConsumerInfo().isRetroactive()) {
121
122                // synchronize with dispatch method so that no new messages are
123                // sent
124                // while we are recovering a subscription to avoid out of order
125                // messages.
126                dispatchValve.turnOff();
127                try {
128
129                    synchronized (consumers) {
130                        sub.add(context, this);
131                        consumers.add(sub);
132                    }
133                    subscriptionRecoveryPolicy.recover(context, this, sub);
134
135                } finally {
136                    dispatchValve.turnOn();
137                }
138
139            } else {
140                synchronized (consumers) {
141                    sub.add(context, this);
142                    consumers.add(sub);
143                }
144            }
145        } else {
146            sub.add(context, this);
147            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
148            durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
149        }
150    }
151
152    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
153            throws Exception {
154        if (!sub.getConsumerInfo().isDurable()) {
155            super.removeSubscription(context, sub, lastDeliveredSequenceId);
156            synchronized (consumers) {
157                consumers.remove(sub);
158            }
159        }
160        sub.remove(context, this);
161    }
162
163    public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
164        if (topicStore != null) {
165            topicStore.deleteSubscription(key.clientId, key.subscriptionName);
166            DurableTopicSubscription removed = durableSubcribers.remove(key);
167            if (removed != null) {
168                destinationStatistics.getConsumers().decrement();
169                // deactivate and remove
170                removed.deactivate(false);
171                consumers.remove(removed);
172            }
173        }
174    }
175
176    public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
177        // synchronize with dispatch method so that no new messages are sent
178        // while
179        // we are recovering a subscription to avoid out of order messages.
180        dispatchValve.turnOff();
181        try {
182
183            if (topicStore == null) {
184                return;
185            }
186
187            // Recover the durable subscription.
188            String clientId = subscription.getSubscriptionKey().getClientId();
189            String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
190            String selector = subscription.getConsumerInfo().getSelector();
191            SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
192            if (info != null) {
193                // Check to see if selector changed.
194                String s1 = info.getSelector();
195                if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
196                    // Need to delete the subscription
197                    topicStore.deleteSubscription(clientId, subscriptionName);
198                    info = null;
199                } else {
200                    synchronized (consumers) {
201                        consumers.add(subscription);
202                    }
203                }
204            }
205            // Do we need to create the subscription?
206            if (info == null) {
207                info = new SubscriptionInfo();
208                info.setClientId(clientId);
209                info.setSelector(selector);
210                info.setSubscriptionName(subscriptionName);
211                info.setDestination(getActiveMQDestination());
212                // This destination is an actual destination id.
213                info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
214                // This destination might be a pattern
215                synchronized (consumers) {
216                    consumers.add(subscription);
217                    topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
218                }
219            }
220
221            final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
222            msgContext.setDestination(destination);
223            if (subscription.isRecoveryRequired()) {
224                topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
225                    public boolean recoverMessage(Message message) throws Exception {
226                        message.setRegionDestination(Topic.this);
227                        try {
228                            msgContext.setMessageReference(message);
229                            if (subscription.matches(message, msgContext)) {
230                                subscription.add(message);
231                            }
232                        } catch (IOException e) {
233                            LOG.error("Failed to recover this message " + message);
234                        }
235                        return true;
236                    }
237
238                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
239                        throw new RuntimeException("Should not be called.");
240                    }
241
242                    public boolean hasSpace() {
243                        return true;
244                    }
245
246                    public boolean isDuplicate(MessageId id) {
247                        return false;
248                    }
249                });
250            }
251        } finally {
252            dispatchValve.turnOn();
253        }
254    }
255
256    public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception {
257        synchronized (consumers) {
258            consumers.remove(sub);
259        }
260        sub.remove(context, this);
261    }
262
263    protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
264        if (subscription.getConsumerInfo().isRetroactive()) {
265            subscriptionRecoveryPolicy.recover(context, this, subscription);
266        }
267    }
268
269    public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
270        final ConnectionContext context = producerExchange.getConnectionContext();
271
272        final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
273        final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
274                && !context.isInRecoveryMode();
275
276        // There is delay between the client sending it and it arriving at the
277        // destination.. it may have expired.
278        if (message.isExpired()) {
279            broker.messageExpired(context, message, null);
280            getDestinationStatistics().getExpired().increment();
281            if (sendProducerAck) {
282                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
283                context.getConnection().dispatchAsync(ack);
284            }
285            return;
286        }
287
288        if (memoryUsage.isFull()) {
289            isFull(context, memoryUsage);
290            fastProducer(context, producerInfo);
291
292            if (isProducerFlowControl() && context.isProducerFlowControl()) {
293
294                if (warnOnProducerFlowControl) {
295                    warnOnProducerFlowControl = false;
296                    LOG
297                            .info("Usage Manager memory limit ("
298                                    + memoryUsage.getLimit()
299                                    + ") reached for "
300                                    + getActiveMQDestination().getQualifiedName()
301                                    + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
302                                    + " See http://activemq.apache.org/producer-flow-control.html for more info");
303                }
304
305                if (systemUsage.isSendFailIfNoSpace()) {
306                    throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
307                            + memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId()
308                            + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
309                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
310                }
311
312                // We can avoid blocking due to low usage if the producer is
313                // sending
314                // a sync message or
315                // if it is using a producer window
316                if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
317                    synchronized (messagesWaitingForSpace) {
318                        messagesWaitingForSpace.add(new Runnable() {
319                            public void run() {
320                                try {
321
322                                    // While waiting for space to free up... the
323                                    // message may have expired.
324                                    if (message.isExpired()) {
325                                        broker.messageExpired(context, message, null);
326                                        getDestinationStatistics().getExpired().increment();
327                                    } else {
328                                        doMessageSend(producerExchange, message);
329                                    }
330
331                                    if (sendProducerAck) {
332                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
333                                                .getSize());
334                                        context.getConnection().dispatchAsync(ack);
335                                    } else {
336                                        Response response = new Response();
337                                        response.setCorrelationId(message.getCommandId());
338                                        context.getConnection().dispatchAsync(response);
339                                    }
340
341                                } catch (Exception e) {
342                                    if (!sendProducerAck && !context.isInRecoveryMode()) {
343                                        ExceptionResponse response = new ExceptionResponse(e);
344                                        response.setCorrelationId(message.getCommandId());
345                                        context.getConnection().dispatchAsync(response);
346                                    }
347                                }
348
349                            }
350                        });
351
352                        registerCallbackForNotFullNotification();
353                        context.setDontSendReponse(true);
354                        return;
355                    }
356
357                } else {
358                    // Producer flow control cannot be used, so we have do the
359                    // flow
360                    // control at the broker
361                    // by blocking this thread until there is space available.
362
363                    if (memoryUsage.isFull()) {
364                        if (context.isInTransaction()) {
365
366                            int count = 0;
367                            while (!memoryUsage.waitForSpace(1000)) {
368                                if (context.getStopping().get()) {
369                                    throw new IOException("Connection closed, send aborted.");
370                                }
371                                if (count > 2 && context.isInTransaction()) {
372                                    count = 0;
373                                    int size = context.getTransaction().size();
374                                    LOG.warn("Waiting for space to send  transacted message - transaction elements = "
375                                            + size + " need more space to commit. Message = " + message);
376                                }
377                            }
378                        } else {
379                            waitForSpace(
380                                    context,
381                                    memoryUsage,
382                                    "Usage Manager memory limit reached. Stopping producer ("
383                                            + message.getProducerId()
384                                            + ") to prevent flooding "
385                                            + getActiveMQDestination().getQualifiedName()
386                                            + "."
387                                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
388                        }
389                    }
390
391                    // The usage manager could have delayed us by the time
392                    // we unblock the message could have expired..
393                    if (message.isExpired()) {
394                        getDestinationStatistics().getExpired().increment();
395                        if (LOG.isDebugEnabled()) {
396                            LOG.debug("Expired message: " + message);
397                        }
398                        return;
399                    }
400                }
401            }
402        }
403
404        doMessageSend(producerExchange, message);
405        messageDelivered(context, message);
406        if (sendProducerAck) {
407            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
408            context.getConnection().dispatchAsync(ack);
409        }
410    }
411
412    /**
413     * do send the message - this needs to be synchronized to ensure messages
414     * are stored AND dispatched in the right order
415     * 
416     * @param producerExchange
417     * @param message
418     * @throws IOException
419     * @throws Exception
420     */
421    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
422            throws IOException, Exception {
423        final ConnectionContext context = producerExchange.getConnectionContext();
424        message.setRegionDestination(this);
425        message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
426        Future<Object> result = null;
427
428        if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
429            if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
430                final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of "
431                        + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
432                        + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
433                        + " See http://activemq.apache.org/producer-flow-control.html for more info";
434                if (systemUsage.isSendFailIfNoSpace()) {
435                    throw new javax.jms.ResourceAllocationException(logMessage);
436                }
437
438                waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
439            }
440            result = topicStore.asyncAddTopicMessage(context, message);
441        }
442
443        message.incrementReferenceCount();
444
445        if (context.isInTransaction()) {
446            context.getTransaction().addSynchronization(new Synchronization() {
447                @Override
448                public void afterCommit() throws Exception {
449                    // It could take while before we receive the commit
450                    // operration.. by that time the message could have
451                    // expired..
452                    if (broker.isExpired(message)) {
453                        getDestinationStatistics().getExpired().increment();
454                        broker.messageExpired(context, message, null);
455                        message.decrementReferenceCount();
456                        return;
457                    }
458                    try {
459                        dispatch(context, message);
460                    } finally {
461                        message.decrementReferenceCount();
462                    }
463                }
464            });
465
466        } else {
467            try {
468                dispatch(context, message);
469            } finally {
470                message.decrementReferenceCount();
471            }
472        }
473        if (result != null && !result.isCancelled()) {
474            try {
475                result.get();
476            } catch (CancellationException e) {
477                // ignore - the task has been cancelled if the message
478                // has already been deleted
479            }
480        }
481
482    }
483
484    private boolean canOptimizeOutPersistence() {
485        return durableSubcribers.size() == 0;
486    }
487
488    @Override
489    public String toString() {
490        return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
491    }
492
493    public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
494            final MessageReference node) throws IOException {
495        if (topicStore != null && node.isPersistent()) {
496            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
497            SubscriptionKey key = dsub.getSubscriptionKey();
498            topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), ack);
499        }
500        messageConsumed(context, node);
501    }
502
503    public void gc() {
504    }
505
506    public Message loadMessage(MessageId messageId) throws IOException {
507        return topicStore != null ? topicStore.getMessage(messageId) : null;
508    }
509
510    public void start() throws Exception {
511        this.subscriptionRecoveryPolicy.start();
512        if (memoryUsage != null) {
513            memoryUsage.start();
514        }
515
516    }
517
518    public void stop() throws Exception {
519        if (taskRunner != null) {
520            taskRunner.shutdown();
521        }
522        this.subscriptionRecoveryPolicy.stop();
523        if (memoryUsage != null) {
524            memoryUsage.stop();
525        }
526        if (this.topicStore != null) {
527            this.topicStore.stop();
528        }
529    }
530
531    public Message[] browse() {
532        final Set<Message> result = new CopyOnWriteArraySet<Message>();
533        try {
534            if (topicStore != null) {
535                topicStore.recover(new MessageRecoveryListener() {
536                    public boolean recoverMessage(Message message) throws Exception {
537                        result.add(message);
538                        return true;
539                    }
540
541                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
542                        return true;
543                    }
544
545                    public boolean hasSpace() {
546                        return true;
547                    }
548
549                    public boolean isDuplicate(MessageId id) {
550                        return false;
551                    }
552                });
553                Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
554                if (msgs != null) {
555                    for (int i = 0; i < msgs.length; i++) {
556                        result.add(msgs[i]);
557                    }
558                }
559            }
560        } catch (Throwable e) {
561            LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e);
562        }
563        return result.toArray(new Message[result.size()]);
564    }
565
566    public boolean iterate() {
567        synchronized (messagesWaitingForSpace) {
568            while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
569                Runnable op = messagesWaitingForSpace.removeFirst();
570                op.run();
571            }
572
573            if (!messagesWaitingForSpace.isEmpty()) {
574                registerCallbackForNotFullNotification();
575            }
576        }
577        return false;
578    }
579
580    private void registerCallbackForNotFullNotification() {
581        // If the usage manager is not full, then the task will not
582        // get called..
583        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
584            // so call it directly here.
585            sendMessagesWaitingForSpaceTask.run();
586        }
587    }
588
589    // Properties
590    // -------------------------------------------------------------------------
591
592    public DispatchPolicy getDispatchPolicy() {
593        return dispatchPolicy;
594    }
595
596    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
597        this.dispatchPolicy = dispatchPolicy;
598    }
599
600    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
601        return subscriptionRecoveryPolicy;
602    }
603
604    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
605        this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
606    }
607
608    // Implementation methods
609    // -------------------------------------------------------------------------
610
611    public final void wakeup() {
612    }
613
614    protected void dispatch(final ConnectionContext context, Message message) throws Exception {
615        // AMQ-2586: Better to leave this stat at zero than to give the user
616        // misleading metrics.
617        // destinationStatistics.getMessages().increment();
618        destinationStatistics.getEnqueues().increment();
619        dispatchValve.increment();
620        MessageEvaluationContext msgContext = null;
621        try {
622            if (!subscriptionRecoveryPolicy.add(context, message)) {
623                return;
624            }
625            synchronized (consumers) {
626                if (consumers.isEmpty()) {
627                    onMessageWithNoConsumers(context, message);
628                    return;
629                }
630            }
631            msgContext = context.getMessageEvaluationContext();
632            msgContext.setDestination(destination);
633            msgContext.setMessageReference(message);
634            if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
635                onMessageWithNoConsumers(context, message);
636            }
637
638        } finally {
639            dispatchValve.decrement();
640            if (msgContext != null) {
641                msgContext.clear();
642            }
643        }
644    }
645
646    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
647        broker.messageExpired(context, reference, subs);
648        // AMQ-2586: Better to leave this stat at zero than to give the user
649        // misleading metrics.
650        // destinationStatistics.getMessages().decrement();
651        destinationStatistics.getEnqueues().decrement();
652        destinationStatistics.getExpired().increment();
653        MessageAck ack = new MessageAck();
654        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
655        ack.setDestination(destination);
656        ack.setMessageID(reference.getMessageId());
657        try {
658            acknowledge(context, subs, ack, reference);
659        } catch (IOException e) {
660            LOG.error("Failed to remove expired Message from the store ", e);
661        }
662    }
663
664    @Override
665    protected Logger getLog() {
666        return LOG;
667    }
668
669}