001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.Iterator;
021import java.util.concurrent.ConcurrentHashMap;
022import java.util.concurrent.atomic.AtomicBoolean;
023
024import javax.jms.InvalidSelectorException;
025import javax.jms.JMSException;
026
027import org.apache.activemq.broker.Broker;
028import org.apache.activemq.broker.ConnectionContext;
029import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
030import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
031import org.apache.activemq.command.ActiveMQDestination;
032import org.apache.activemq.command.ConsumerInfo;
033import org.apache.activemq.command.Message;
034import org.apache.activemq.command.MessageAck;
035import org.apache.activemq.command.MessageDispatch;
036import org.apache.activemq.command.MessageId;
037import org.apache.activemq.filter.MessageEvaluationContext;
038import org.apache.activemq.store.TopicMessageStore;
039import org.apache.activemq.usage.SystemUsage;
040import org.apache.activemq.usage.Usage;
041import org.apache.activemq.usage.UsageListener;
042import org.apache.activemq.util.SubscriptionKey;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
047
048    private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
049    private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
050    private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
051    private final SubscriptionKey subscriptionKey;
052    private final boolean keepDurableSubsActive;
053    private AtomicBoolean active = new AtomicBoolean();
054
055    public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
056        throws JMSException {
057        super(broker,usageManager, context, info);
058        this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
059        this.pending.setSystemUsage(usageManager);
060        this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
061        this.keepDurableSubsActive = keepDurableSubsActive;
062        subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
063        
064    }
065
066    public final boolean isActive() {
067        return active.get();
068    }
069
070    public boolean isFull() {
071        return !active.get() || super.isFull();
072    }
073
074    public void gc() {
075    }
076
077    /**
078     * store will have a pending ack for all durables, irrespective of the selector
079     * so we need to ack if node is un-matched
080     */
081    public void unmatched(MessageReference node) throws IOException {
082        MessageAck ack = new MessageAck();
083        ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
084        ack.setMessageID(node.getMessageId());
085        node.getRegionDestination().acknowledge(this.getContext(), this, ack, node);
086    }
087
088    @Override
089    protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
090        // statically configured via maxPageSize
091    }
092
093    public void add(ConnectionContext context, Destination destination) throws Exception {
094        super.add(context, destination);
095        // do it just once per destination
096        if (destinations.containsKey(destination.getActiveMQDestination())) {
097            return;
098        }
099        destinations.put(destination.getActiveMQDestination(), destination);
100
101        if (active.get() || keepDurableSubsActive) {
102            Topic topic = (Topic)destination;
103            topic.activate(context, this);
104            if (pending.isEmpty(topic)) {
105                topic.recoverRetroactiveMessages(context, this);
106            }
107            this.enqueueCounter+=pending.size();
108        } else if (destination.getMessageStore() != null) {
109            TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
110            try {
111                this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
112            } catch (IOException e) {
113                JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e);
114                jmsEx.setLinkedException(e);
115                throw jmsEx;
116            }
117        }
118        dispatchPending();
119    }
120
121    public void activate(SystemUsage memoryManager, ConnectionContext context,
122            ConsumerInfo info) throws Exception {
123        if (!active.get()) {
124            this.context = context;
125            this.info = info;
126            LOG.debug("Activating " + this);
127            if (!keepDurableSubsActive) {
128                for (Iterator<Destination> iter = destinations.values()
129                        .iterator(); iter.hasNext();) {
130                    Topic topic = (Topic) iter.next();
131                    add(context, topic);
132                    topic.activate(context, this);
133                }
134            }
135            synchronized (pendingLock) {
136                pending.setSystemUsage(memoryManager);
137                pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
138                pending.setMaxAuditDepth(getMaxAuditDepth());
139                pending.setMaxProducersToAudit(getMaxProducersToAudit());
140                pending.start();
141                // If nothing was in the persistent store, then try to use the
142                // recovery policy.
143                if (pending.isEmpty()) {
144                    for (Iterator<Destination> iter = destinations.values()
145                            .iterator(); iter.hasNext();) {
146                        Topic topic = (Topic) iter.next();
147                        topic.recoverRetroactiveMessages(context, this);
148                    }
149                }
150            }
151            this.active.set(true);
152            dispatchPending();
153            this.usageManager.getMemoryUsage().addUsageListener(this);
154        }
155    }
156
157    public void deactivate(boolean keepDurableSubsActive) throws Exception {
158        LOG.debug("Deactivating " + this);
159        active.set(false);
160        this.usageManager.getMemoryUsage().removeUsageListener(this);
161        synchronized (pending) {
162            pending.stop();
163        }
164        if (!keepDurableSubsActive) {
165            for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
166                Topic topic = (Topic)iter.next();
167                topic.deactivate(context, this);
168            }
169        }
170        
171        for (final MessageReference node : dispatched) {
172            // Mark the dispatched messages as redelivered for next time.
173            Integer count = redeliveredMessages.get(node.getMessageId());
174            if (count != null) {
175                redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
176            } else {
177                redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
178            }
179            if (keepDurableSubsActive&& pending.isTransient()) {
180                synchronized (pending) {
181                    pending.addMessageFirst(node);
182                }
183            } else {
184                node.decrementReferenceCount();
185            }
186        }
187        synchronized(dispatched) {
188            dispatched.clear();
189        }
190        if (!keepDurableSubsActive && pending.isTransient()) {
191            synchronized (pending) {
192                try {
193                    pending.reset();
194                    while (pending.hasNext()) {
195                        MessageReference node = pending.next();
196                        node.decrementReferenceCount();
197                        pending.remove();
198                    }
199                } finally {
200                    pending.release();
201                }
202            }
203        }
204        prefetchExtension = 0;
205    }
206
207    
208    protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
209        MessageDispatch md = super.createMessageDispatch(node, message);
210        Integer count = redeliveredMessages.get(node.getMessageId());
211        if (count != null) {
212            md.setRedeliveryCounter(count.intValue());
213        }
214        return md;
215    }
216
217    public void add(MessageReference node) throws Exception {
218        if (!active.get() && !keepDurableSubsActive) {
219            return;
220        }
221        super.add(node);
222    }
223
224    protected void dispatchPending() throws IOException {
225        if (isActive()) {
226            super.dispatchPending();
227        }
228    }
229    
230    protected void doAddRecoveredMessage(MessageReference message) throws Exception {
231        synchronized(pending) {
232            pending.addRecoveredMessage(message);
233        }
234    }
235
236    public int getPendingQueueSize() {
237        if (active.get() || keepDurableSubsActive) {
238            return super.getPendingQueueSize();
239        }
240        // TODO: need to get from store
241        return 0;
242    }
243
244    public void setSelector(String selector) throws InvalidSelectorException {
245        throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
246    }
247
248    protected boolean canDispatch(MessageReference node) {
249        return isActive();
250    }
251
252    protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
253        node.getRegionDestination().acknowledge(context, this, ack, node);
254        redeliveredMessages.remove(node.getMessageId());
255        node.decrementReferenceCount();
256    }
257
258    
259    public synchronized String toString() {
260        return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending="
261               + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension;
262    }
263
264    public SubscriptionKey getSubscriptionKey() {
265        return subscriptionKey;
266    }
267
268    /**
269     * Release any references that we are holding.
270     */
271    public void destroy() {
272        synchronized (pending) {
273            try {
274
275                pending.reset();
276                while (pending.hasNext()) {
277                    MessageReference node = pending.next();
278                    node.decrementReferenceCount();
279                }
280
281            } finally {
282                pending.release();
283                pending.clear();
284            }
285        }
286        synchronized(dispatched) {
287            for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
288                MessageReference node = (MessageReference) iter.next();
289                node.decrementReferenceCount();
290            }
291            dispatched.clear();
292        }
293        setSlowConsumer(false);
294    }
295
296    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
297        if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
298            try {
299                dispatchPending();
300            } catch (IOException e) {
301                LOG.warn("problem calling dispatchMatched", e);
302            }
303        }
304    }
305    
306    protected boolean isDropped(MessageReference node) {
307       return false;
308    }
309
310    public boolean isKeepDurableSubsActive() {
311        return keepDurableSubsActive;
312    }
313}