001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import org.apache.activemq.broker.Broker;
020import org.apache.activemq.broker.region.MessageReference;
021import org.apache.activemq.broker.region.Queue;
022import org.apache.activemq.command.Message;
023import org.apache.activemq.usage.SystemUsage;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027/**
028 * Store based Cursor for Queues
029 * 
030 * 
031 */
032public class StoreQueueCursor extends AbstractPendingMessageCursor {
033
034    private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursor.class);
035    private final Broker broker;
036    private int pendingCount;
037    private final Queue queue;
038    private PendingMessageCursor nonPersistent;
039    private final QueueStorePrefetch persistent;
040    private boolean started;
041    private PendingMessageCursor currentCursor;
042
043    /**
044     * Construct
045     * @param broker 
046     * @param queue
047     */
048    public StoreQueueCursor(Broker broker,Queue queue) {
049        super((queue != null ? queue.isPrioritizedMessages():false));
050        this.broker=broker;
051        this.queue = queue;
052        this.persistent = new QueueStorePrefetch(queue);
053        currentCursor = persistent;
054    }
055
056    public synchronized void start() throws Exception {
057        started = true;
058        super.start();
059        if (nonPersistent == null) {
060            if (broker.getBrokerService().isPersistent()) {
061                nonPersistent = new FilePendingMessageCursor(broker,queue.getName(),this.prioritizedMessages);
062            }else {
063                nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
064            }
065            nonPersistent.setMaxBatchSize(getMaxBatchSize());
066            nonPersistent.setSystemUsage(systemUsage);
067            nonPersistent.setEnableAudit(isEnableAudit());
068            nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
069            nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
070        }
071        nonPersistent.setMessageAudit(getMessageAudit());
072        nonPersistent.start();
073        persistent.setMessageAudit(getMessageAudit());
074        persistent.start();
075        pendingCount = persistent.size() + nonPersistent.size();
076    }
077
078    public synchronized void stop() throws Exception {
079        started = false;
080        if (nonPersistent != null) {
081            nonPersistent.stop();
082            nonPersistent.gc();
083        }
084        persistent.stop();
085        persistent.gc();
086        super.stop();
087        pendingCount = 0;
088    }
089
090    public synchronized void addMessageLast(MessageReference node) throws Exception {
091        if (node != null) {
092            Message msg = node.getMessage();
093            if (started) {
094                pendingCount++;
095                if (!msg.isPersistent()) {
096                    nonPersistent.addMessageLast(node);
097                }
098            }
099            if (msg.isPersistent()) {
100                persistent.addMessageLast(node);
101            }
102        }
103    }
104    
105    public synchronized void addMessageFirst(MessageReference node) throws Exception {
106        if (node != null) {
107            Message msg = node.getMessage();
108            if (started) {
109                pendingCount++;
110                if (!msg.isPersistent()) {
111                    nonPersistent.addMessageFirst(node);
112                }
113            }
114            if (msg.isPersistent()) {
115                persistent.addMessageFirst(node);
116            }
117        }
118    }
119
120    public synchronized void clear() {
121        pendingCount = 0;
122    }
123
124    public synchronized boolean hasNext() {
125        try {
126            getNextCursor();
127        } catch (Exception e) {
128            LOG.error("Failed to get current cursor ", e);
129            throw new RuntimeException(e);
130       }
131       return currentCursor != null ? currentCursor.hasNext() : false;
132    }
133
134    public synchronized MessageReference next() {
135        MessageReference result = currentCursor != null ? currentCursor.next() : null;
136        return result;
137    }
138
139    public synchronized void remove() {
140        if (currentCursor != null) {
141            currentCursor.remove();
142        }
143        pendingCount--;
144    }
145
146    public synchronized void remove(MessageReference node) {
147        if (!node.isPersistent()) {
148            nonPersistent.remove(node);
149        } else {
150            persistent.remove(node);
151        }
152        pendingCount--;
153    }
154
155    public synchronized void reset() {
156        nonPersistent.reset();
157        persistent.reset();
158        pendingCount = persistent.size() + nonPersistent.size();        
159    }
160    
161    public void release() {
162        nonPersistent.release();
163        persistent.release();
164    }
165
166
167    public synchronized int size() {
168        if (pendingCount < 0) {
169            pendingCount = persistent.size() + nonPersistent.size();
170        }
171        return pendingCount;
172    }
173
174    public synchronized boolean isEmpty() {
175        // if negative, more messages arrived in store since last reset so non empty
176        return pendingCount == 0;
177    }
178
179    /**
180     * Informs the Broker if the subscription needs to intervention to recover
181     * it's state e.g. DurableTopicSubscriber may do
182     * 
183     * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
184     * @return true if recovery required
185     */
186    public boolean isRecoveryRequired() {
187        return false;
188    }
189
190    /**
191     * @return the nonPersistent Cursor
192     */
193    public PendingMessageCursor getNonPersistent() {
194        return this.nonPersistent;
195    }
196
197    /**
198     * @param nonPersistent cursor to set
199     */
200    public void setNonPersistent(PendingMessageCursor nonPersistent) {
201        this.nonPersistent = nonPersistent;
202    }
203
204    public void setMaxBatchSize(int maxBatchSize) {
205        persistent.setMaxBatchSize(maxBatchSize);
206        if (nonPersistent != null) {
207            nonPersistent.setMaxBatchSize(maxBatchSize);
208        }
209        super.setMaxBatchSize(maxBatchSize);
210    }
211    
212    
213    public void setMaxProducersToAudit(int maxProducersToAudit) {
214        super.setMaxProducersToAudit(maxProducersToAudit);
215        if (persistent != null) {
216            persistent.setMaxProducersToAudit(maxProducersToAudit);
217        }
218        if (nonPersistent != null) {
219            nonPersistent.setMaxProducersToAudit(maxProducersToAudit);
220        }
221    }
222
223    public void setMaxAuditDepth(int maxAuditDepth) {
224        super.setMaxAuditDepth(maxAuditDepth);
225        if (persistent != null) {
226            persistent.setMaxAuditDepth(maxAuditDepth);
227        }
228        if (nonPersistent != null) {
229            nonPersistent.setMaxAuditDepth(maxAuditDepth);
230        }
231    }
232    
233    public void setEnableAudit(boolean enableAudit) {
234        super.setEnableAudit(enableAudit);
235        if (persistent != null) {
236            persistent.setEnableAudit(enableAudit);
237        }
238        if (nonPersistent != null) {
239            nonPersistent.setEnableAudit(enableAudit);
240        }
241    }
242    
243    @Override
244    public void setUseCache(boolean useCache) {
245        super.setUseCache(useCache);
246        if (persistent != null) {
247            persistent.setUseCache(useCache);
248        }
249        if (nonPersistent != null) {
250            nonPersistent.setUseCache(useCache);
251        }
252    }
253    
254    @Override
255    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
256        super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
257        if (persistent != null) {
258            persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
259        }
260        if (nonPersistent != null) {
261            nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
262        }
263    }
264
265
266
267    public synchronized void gc() {
268        if (persistent != null) {
269            persistent.gc();
270        }
271        if (nonPersistent != null) {
272            nonPersistent.gc();
273        }
274        pendingCount = persistent.size() + nonPersistent.size();
275    }
276
277    public void setSystemUsage(SystemUsage usageManager) {
278        super.setSystemUsage(usageManager);
279        if (persistent != null) {
280            persistent.setSystemUsage(usageManager);
281        }
282        if (nonPersistent != null) {
283            nonPersistent.setSystemUsage(usageManager);
284        }
285    }
286
287    protected synchronized PendingMessageCursor getNextCursor() throws Exception {
288        if (currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()) {
289            currentCursor = currentCursor == persistent ? nonPersistent : persistent;
290            // sanity check
291            if (currentCursor.isEmpty()) {
292                currentCursor = currentCursor == persistent ? nonPersistent : persistent;
293            }
294        }
295        return currentCursor;
296    }
297
298    @Override
299    public boolean isCacheEnabled() {
300        boolean cacheEnabled = isUseCache();
301        if (cacheEnabled) {
302            if (persistent != null) {
303                cacheEnabled &= persistent.isCacheEnabled();
304            }
305            if (nonPersistent != null) {
306                cacheEnabled &= nonPersistent.isCacheEnabled();
307            }
308            setCacheEnabled(cacheEnabled);
309        }
310        return cacheEnabled;
311    }
312}