001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.util.Collections;
020import java.util.HashMap;
021import java.util.List;
022import java.util.Map;
023import java.util.concurrent.CopyOnWriteArrayList;
024import org.apache.activemq.advisory.AdvisorySupport;
025import org.apache.activemq.broker.Broker;
026import org.apache.activemq.broker.ConnectionContext;
027import org.apache.activemq.broker.region.Destination;
028import org.apache.activemq.broker.region.DurableTopicSubscription;
029import org.apache.activemq.broker.region.MessageReference;
030import org.apache.activemq.broker.region.Topic;
031import org.apache.activemq.command.Message;
032import org.apache.activemq.usage.SystemUsage;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * persist pending messages pending message (messages awaiting dispatch to a
038 * consumer) cursor
039 * 
040 * 
041 */
042public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
043
044    private static final Logger LOG = LoggerFactory.getLogger(StoreDurableSubscriberCursor.class);
045    private static final int UNKNOWN = -1;
046    private final String clientId;
047    private final String subscriberName;
048    private final Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>();
049    private final List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>();
050    private final PendingMessageCursor nonPersistent;
051    private PendingMessageCursor currentCursor;
052    private final DurableTopicSubscription subscription;
053    private int cacheCurrentLowestPriority = UNKNOWN;
054    private boolean immediatePriorityDispatch = true;
055    /**
056     * @param broker Broker for this cursor
057     * @param clientId clientId for this cursor
058     * @param subscriberName subscriber name for this cursor
059     * @param maxBatchSize currently ignored
060     * @param subscription  subscription for this cursor
061     */
062    public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, DurableTopicSubscription subscription) {
063        super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,subscription));
064        this.subscription=subscription;
065        this.clientId = clientId;
066        this.subscriberName = subscriberName;
067        if (broker.getBrokerService().isPersistent()) {
068            this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName,this.prioritizedMessages);
069        } else {
070            this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
071        }
072        
073        this.nonPersistent.setMaxBatchSize(maxBatchSize);
074        this.nonPersistent.setSystemUsage(systemUsage);
075        this.storePrefetches.add(this.nonPersistent);
076
077        if (prioritizedMessages) {
078            setMaxAuditDepth(10*getMaxAuditDepth());
079        }
080    }
081
082    @Override
083    public synchronized void start() throws Exception {
084        if (!isStarted()) {
085            super.start();
086            for (PendingMessageCursor tsp : storePrefetches) {
087                tsp.setMessageAudit(getMessageAudit());
088                tsp.start();
089            }
090        }
091    }
092
093    @Override
094    public synchronized void stop() throws Exception {
095        if (isStarted()) {
096            if (subscription.isKeepDurableSubsActive()) {
097                super.gc();
098                super.getMessageAudit().clear();
099                for (PendingMessageCursor tsp : storePrefetches) {
100                    tsp.gc();
101                    tsp.getMessageAudit().clear();
102                }
103            } else {
104                super.stop();
105                for (PendingMessageCursor tsp : storePrefetches) {
106                    tsp.stop();
107                }
108            }
109        }
110    }
111
112    /**
113     * Add a destination
114     * 
115     * @param context
116     * @param destination
117     * @throws Exception
118     */
119    @Override
120    public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
121        if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
122            TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName);
123            tsp.setMaxBatchSize(destination.getMaxPageSize());
124            tsp.setSystemUsage(systemUsage);
125            tsp.setMessageAudit(getMessageAudit());
126            tsp.setEnableAudit(isEnableAudit());
127            tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark());
128            topics.put(destination, tsp);
129            storePrefetches.add(tsp);
130            if (isStarted()) {
131                tsp.start();
132            }
133        }
134    }
135
136    /**
137     * remove a destination
138     * 
139     * @param context
140     * @param destination
141     * @throws Exception
142     */
143    @Override
144    public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
145        PendingMessageCursor tsp = topics.remove(destination);
146        if (tsp != null) {
147            storePrefetches.remove(tsp);
148        }
149        return Collections.EMPTY_LIST;
150    }
151
152    /**
153     * @return true if there are no pending messages
154     */
155    @Override
156    public synchronized boolean isEmpty() {
157        for (PendingMessageCursor tsp : storePrefetches) {
158            if( !tsp.isEmpty() )
159                return false;
160        }
161        return true;
162    }
163
164    @Override
165    public synchronized boolean isEmpty(Destination destination) {
166        boolean result = true;
167        TopicStorePrefetch tsp = topics.get(destination);
168        if (tsp != null) {
169            result = tsp.isEmpty();
170        }
171        return result;
172    }
173
174    /**
175     * Informs the Broker if the subscription needs to intervention to recover
176     * it's state e.g. DurableTopicSubscriber may do
177     * 
178     * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor
179     * @return true if recovery required
180     */
181    @Override
182    public boolean isRecoveryRequired() {
183        return false;
184    }
185
186    @Override
187    public synchronized void addMessageLast(MessageReference node) throws Exception {
188        if (node != null) {
189            Message msg = node.getMessage();
190            if (isStarted()) {
191                if (!msg.isPersistent()) {
192                    nonPersistent.addMessageLast(node);
193                }
194            }
195            if (msg.isPersistent()) {
196                Destination dest = msg.getRegionDestination();
197                TopicStorePrefetch tsp = topics.get(dest);
198                if (tsp != null) {
199                    // cache can become high priority cache for immediate dispatch
200                    final int priority = msg.getPriority();
201                    if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.isCacheEnabled()) {
202                        if (priority > tsp.getCurrentLowestPriority()) {
203                            if (LOG.isTraceEnabled()) {
204                                LOG.trace("enabling cache for cursor on high priority message " + priority
205                                        + ", current lowest: " + tsp.getCurrentLowestPriority());
206                            }
207                            tsp.setCacheEnabled(true);
208                            cacheCurrentLowestPriority = tsp.getCurrentLowestPriority();
209                        }
210                    } else if (cacheCurrentLowestPriority != UNKNOWN && priority <= cacheCurrentLowestPriority) {
211                        // go to the store to get next priority message as lower priority messages may be recovered
212                        // already and need to acked sequence order
213                        if (LOG.isTraceEnabled()) {
214                            LOG.trace("disabling/clearing cache for cursor on lower priority message "
215                                    + priority + ", tsp current lowest: " + tsp.getCurrentLowestPriority()
216                                    + " cache lowest: " + cacheCurrentLowestPriority);
217                        }
218                        tsp.setCacheEnabled(false);
219                        cacheCurrentLowestPriority = UNKNOWN;
220                    }
221                    tsp.addMessageLast(node);
222                }
223            }
224
225        }
226    }
227
228    @Override
229    public boolean isTransient() {
230        return subscription.isKeepDurableSubsActive();
231    }
232
233    @Override
234    public void addMessageFirst(MessageReference node) throws Exception {
235        // for keep durable subs active, need to deal with redispatch
236        if (node != null) {
237            Message msg = node.getMessage();
238            if (!msg.isPersistent()) {
239                nonPersistent.addMessageFirst(node);
240            } else {
241                Destination dest = msg.getRegionDestination();
242                TopicStorePrefetch tsp = topics.get(dest);
243                if (tsp != null) {
244                    tsp.addMessageFirst(node);
245                }
246            }
247        }
248    }
249
250    @Override
251    public synchronized void addRecoveredMessage(MessageReference node) throws Exception {
252        nonPersistent.addMessageLast(node);
253    }
254
255    @Override
256    public synchronized void clear() {
257        for (PendingMessageCursor tsp : storePrefetches) {
258            tsp.clear();
259        }
260    }
261
262    @Override
263    public synchronized boolean hasNext() {
264        boolean result = true;
265        if (result) {
266            try {
267                currentCursor = getNextCursor();
268            } catch (Exception e) {
269                LOG.error("Failed to get current cursor ", e);
270                throw new RuntimeException(e);
271            }
272            result = currentCursor != null ? currentCursor.hasNext() : false;
273        }
274        return result;
275    }
276
277    @Override
278    public synchronized MessageReference next() {
279        MessageReference result = currentCursor != null ? currentCursor.next() : null;
280        return result;
281    }
282
283    @Override
284    public synchronized void remove() {
285        if (currentCursor != null) {
286            currentCursor.remove();
287        }
288    }
289
290    @Override
291    public synchronized void remove(MessageReference node) {
292        if (currentCursor != null) {
293            currentCursor.remove(node);
294        }
295    }
296
297    @Override
298    public synchronized void reset() {
299        for (PendingMessageCursor storePrefetch : storePrefetches) {
300            storePrefetch.reset();
301        }
302    }
303
304    @Override
305    public synchronized void release() {
306        for (PendingMessageCursor storePrefetch : storePrefetches) {
307            storePrefetch.release();
308        }
309    }
310
311    @Override
312    public synchronized int size() {
313        int pendingCount=0;
314        for (PendingMessageCursor tsp : storePrefetches) {
315            pendingCount += tsp.size();
316        }
317        return pendingCount;
318    }
319
320    @Override
321    public void setMaxBatchSize(int newMaxBatchSize) {
322        for (PendingMessageCursor storePrefetch : storePrefetches) {
323            storePrefetch.setMaxBatchSize(newMaxBatchSize);
324        }
325        super.setMaxBatchSize(newMaxBatchSize);
326    }
327
328    @Override
329    public synchronized void gc() {
330        for (PendingMessageCursor tsp : storePrefetches) {
331            tsp.gc();
332        }
333        cacheCurrentLowestPriority = UNKNOWN;
334    }
335
336    @Override
337    public void setSystemUsage(SystemUsage usageManager) {
338        super.setSystemUsage(usageManager);
339        for (PendingMessageCursor tsp : storePrefetches) {
340            tsp.setSystemUsage(usageManager);
341        }
342    }
343    
344    @Override
345    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
346        super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
347        for (PendingMessageCursor cursor : storePrefetches) {
348            cursor.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
349        }
350    }
351    
352    @Override
353    public void setMaxProducersToAudit(int maxProducersToAudit) {
354        super.setMaxProducersToAudit(maxProducersToAudit);
355        for (PendingMessageCursor cursor : storePrefetches) {
356            cursor.setMaxAuditDepth(maxAuditDepth);
357        }
358    }
359
360    @Override
361    public void setMaxAuditDepth(int maxAuditDepth) {
362        super.setMaxAuditDepth(maxAuditDepth);
363        for (PendingMessageCursor cursor : storePrefetches) {
364            cursor.setMaxAuditDepth(maxAuditDepth);
365        }
366    }
367    
368    @Override
369    public void setEnableAudit(boolean enableAudit) {
370        super.setEnableAudit(enableAudit);
371        for (PendingMessageCursor cursor : storePrefetches) {
372            cursor.setEnableAudit(enableAudit);
373        }
374    }
375    
376    @Override
377    public  void setUseCache(boolean useCache) {
378        super.setUseCache(useCache);
379        for (PendingMessageCursor cursor : storePrefetches) {
380            cursor.setUseCache(useCache);
381        }
382    }
383    
384    protected synchronized PendingMessageCursor getNextCursor() throws Exception {
385        if (currentCursor == null || currentCursor.isEmpty()) {
386            currentCursor = null;
387            for (PendingMessageCursor tsp : storePrefetches) {
388                if (tsp.hasNext()) {
389                    currentCursor = tsp;
390                    break;
391                }
392            }
393            // round-robin
394            if (storePrefetches.size()>1) {
395                PendingMessageCursor first = storePrefetches.remove(0);
396                storePrefetches.add(first);
397            }
398        }
399        return currentCursor;
400    }
401    
402    @Override
403    public String toString() {
404        return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")";
405    }
406
407    public boolean isImmediatePriorityDispatch() {
408        return immediatePriorityDispatch;
409    }
410
411    public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) {
412        this.immediatePriorityDispatch = immediatePriorityDispatch;
413    }
414
415}