001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.util.Collections;
020import java.util.LinkedList;
021import java.util.List;
022import java.util.Set;
023import org.apache.activemq.ActiveMQMessageAudit;
024import org.apache.activemq.broker.Broker;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.region.BaseDestination;
027import org.apache.activemq.broker.region.Destination;
028import org.apache.activemq.broker.region.MessageReference;
029import org.apache.activemq.broker.region.Subscription;
030import org.apache.activemq.command.MessageId;
031import org.apache.activemq.usage.SystemUsage;
032
033/**
034 * Abstract method holder for pending message (messages awaiting disptach to a
035 * consumer) cursor
036 * 
037 * 
038 */
039public abstract class AbstractPendingMessageCursor implements PendingMessageCursor {
040    protected int memoryUsageHighWaterMark = 70;
041    protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
042    protected SystemUsage systemUsage;
043    protected int maxProducersToAudit = BaseDestination.MAX_PRODUCERS_TO_AUDIT;
044    protected int maxAuditDepth = BaseDestination.MAX_AUDIT_DEPTH;
045    protected boolean enableAudit=true;
046    protected ActiveMQMessageAudit audit;
047    protected boolean useCache=true;
048    private boolean cacheEnabled=true;
049    private boolean started=false;
050    protected MessageReference last = null;
051    protected final boolean prioritizedMessages;
052    
053    public AbstractPendingMessageCursor(boolean prioritizedMessages) {
054        this.prioritizedMessages=prioritizedMessages;
055    }
056  
057
058    public synchronized void start() throws Exception  {
059        if (!started && enableAudit && audit==null) {
060            audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
061        }
062        started=true;
063    }
064
065    public synchronized void stop() throws Exception  {
066        started=false;
067        audit=null;
068        gc();
069    }
070
071    public void add(ConnectionContext context, Destination destination) throws Exception {
072    }
073
074    @SuppressWarnings("unchecked")
075    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
076        return Collections.EMPTY_LIST;
077    }
078
079    public boolean isRecoveryRequired() {
080        return true;
081    }
082
083    public void addMessageFirst(MessageReference node) throws Exception {
084    }
085
086    public void addMessageLast(MessageReference node) throws Exception {
087    }
088    
089    public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
090        addMessageLast(node);
091        return true;
092    }
093
094    public void addRecoveredMessage(MessageReference node) throws Exception {
095        addMessageLast(node);
096    }
097
098    public void clear() {
099    }
100
101    public boolean hasNext() {
102        return false;
103    }
104
105    public boolean isEmpty() {
106        return false;
107    }
108
109    public boolean isEmpty(Destination destination) {
110        return isEmpty();
111    }
112
113    public MessageReference next() {
114        return null;
115    }
116
117    public void remove() {
118    }
119
120    public void reset() {
121    }
122
123    public int size() {
124        return 0;
125    }
126
127    public int getMaxBatchSize() {
128        return maxBatchSize;
129    }
130
131    public void setMaxBatchSize(int maxBatchSize) {
132        this.maxBatchSize = maxBatchSize;
133    }
134
135    protected void fillBatch() throws Exception {
136    }
137
138    public void resetForGC() {
139        reset();
140    }
141
142    public void remove(MessageReference node) {
143    }
144
145    public void gc() {
146    }
147
148    public void setSystemUsage(SystemUsage usageManager) {
149        this.systemUsage = usageManager;
150    }
151
152    public boolean hasSpace() {
153        return systemUsage != null ? (systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
154    }
155
156    public boolean isFull() {
157        return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
158    }
159
160    public void release() {
161    }
162
163    public boolean hasMessagesBufferedToDeliver() {
164        return false;
165    }
166
167    /**
168     * @return the memoryUsageHighWaterMark
169     */
170    public int getMemoryUsageHighWaterMark() {
171        return memoryUsageHighWaterMark;
172    }
173
174    /**
175     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
176     */
177    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
178        this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
179    }
180
181    /**
182     * @return the usageManager
183     */
184    public SystemUsage getSystemUsage() {
185        return this.systemUsage;
186    }
187
188    /**
189     * destroy the cursor
190     * 
191     * @throws Exception
192     */
193    public void destroy() throws Exception {
194        stop();
195    }
196
197    /**
198     * Page in a restricted number of messages
199     * 
200     * @param maxItems maximum number of messages to return
201     * @return a list of paged in messages
202     */
203    public LinkedList<MessageReference> pageInList(int maxItems) {
204        throw new RuntimeException("Not supported");
205    }
206
207    /**
208     * @return the maxProducersToAudit
209     */
210    public int getMaxProducersToAudit() {
211        return maxProducersToAudit;
212    }
213
214    /**
215     * @param maxProducersToAudit the maxProducersToAudit to set
216     */
217    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
218        this.maxProducersToAudit = maxProducersToAudit;
219        if (audit != null) {
220            audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
221        }
222    }
223
224    /**
225     * @return the maxAuditDepth
226     */
227    public int getMaxAuditDepth() {
228        return maxAuditDepth;
229    }
230    
231
232    /**
233     * @param maxAuditDepth the maxAuditDepth to set
234     */
235    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
236        this.maxAuditDepth = maxAuditDepth;
237        if (audit != null) {
238            audit.setAuditDepth(maxAuditDepth);
239        }
240    }
241    
242    
243    /**
244     * @return the enableAudit
245     */
246    public boolean isEnableAudit() {
247        return enableAudit;
248    }
249
250    /**
251     * @param enableAudit the enableAudit to set
252     */
253    public synchronized void setEnableAudit(boolean enableAudit) {
254        this.enableAudit = enableAudit;
255        if (enableAudit && started && audit==null) {
256            audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
257        }
258    }
259    
260    public boolean isTransient() {
261        return false;
262    }
263    
264       
265    /**
266     * set the audit
267     * @param audit new audit component
268     */
269    public void setMessageAudit(ActiveMQMessageAudit audit) {
270        this.audit=audit;
271    }
272    
273    
274    /**
275     * @return the audit
276     */
277    public ActiveMQMessageAudit getMessageAudit() {
278        return audit;
279    }
280    
281    public boolean isUseCache() {
282        return useCache;
283    }
284
285    public void setUseCache(boolean useCache) {
286        this.useCache = useCache;
287    }
288
289    public synchronized boolean isDuplicate(MessageId messageId) {
290        boolean unique = recordUniqueId(messageId);
291        rollback(messageId);
292        return !unique;
293    }
294    
295    /**
296     * records a message id and checks if it is a duplicate
297     * @param messageId
298     * @return true if id is unique, false otherwise.
299     */
300    public synchronized boolean recordUniqueId(MessageId messageId) {
301        if (!enableAudit || audit==null) {
302            return true;
303        }
304        return !audit.isDuplicate(messageId);
305    }
306    
307    public synchronized void rollback(MessageId id) {
308        if (audit != null) {
309            audit.rollback(id);
310        }
311    }
312    
313    protected synchronized boolean isStarted() {
314        return started;
315    }
316    
317    public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) {
318        boolean result = false;
319        Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination());
320        if (destinations != null) {
321            for (Destination dest:destinations) {
322                if (dest.isPrioritizedMessages()) {
323                    result = true;
324                    break;
325                }
326            }
327        }
328        return result;
329
330    }
331
332    public synchronized boolean isCacheEnabled() {
333        return cacheEnabled;
334    }
335
336    public synchronized void setCacheEnabled(boolean val) {
337        cacheEnabled = val;
338    }
339}