001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.io.IOException;
020import java.util.Iterator;
021import java.util.LinkedList;
022import java.util.concurrent.atomic.AtomicBoolean;
023import java.util.concurrent.atomic.AtomicLong;
024import org.apache.activemq.broker.Broker;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.region.Destination;
027import org.apache.activemq.broker.region.MessageReference;
028import org.apache.activemq.broker.region.QueueMessageReference;
029import org.apache.activemq.command.Message;
030import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
031import org.apache.activemq.openwire.OpenWireFormat;
032import org.apache.activemq.store.kahadb.plist.PList;
033import org.apache.activemq.store.kahadb.plist.PListEntry;
034import org.apache.activemq.store.kahadb.plist.PListStore;
035import org.apache.activemq.usage.SystemUsage;
036import org.apache.activemq.usage.Usage;
037import org.apache.activemq.usage.UsageListener;
038import org.apache.activemq.wireformat.WireFormat;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041import org.apache.kahadb.util.ByteSequence;
042
043/**
044 * persist pending messages pending message (messages awaiting dispatch to a
045 * consumer) cursor
046 * 
047 * 
048 */
049public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
050    static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class);
051    private static final AtomicLong NAME_COUNT = new AtomicLong();
052    protected Broker broker;
053    private final PListStore store;
054    private final String name;
055    private LinkedList<MessageReference> memoryList = new LinkedList<MessageReference>();
056    private PList diskList;
057    private Iterator<MessageReference> iter;
058    private Destination regionDestination;
059    private boolean iterating;
060    private boolean flushRequired;
061    private final AtomicBoolean started = new AtomicBoolean();
062    private final WireFormat wireFormat = new OpenWireFormat();
063    /**
064     * @param broker
065     * @param name
066     * @param prioritizedMessages
067     */
068    public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) {
069        super(prioritizedMessages);
070        this.broker = broker;
071        // the store can be null if the BrokerService has persistence
072        // turned off
073        this.store = broker.getTempDataStore();
074        this.name = NAME_COUNT.incrementAndGet() + "_" + name;
075    }
076
077    @Override
078    public void start() throws Exception {
079        if (started.compareAndSet(false, true)) {
080            super.start();
081            if (systemUsage != null) {
082                systemUsage.getMemoryUsage().addUsageListener(this);
083            }
084        }
085    }
086
087    @Override
088    public void stop() throws Exception {
089        if (started.compareAndSet(true, false)) {
090            super.stop();
091            if (systemUsage != null) {
092                systemUsage.getMemoryUsage().removeUsageListener(this);
093            }
094        }
095    }
096
097    /**
098     * @return true if there are no pending messages
099     */
100    @Override
101    public synchronized boolean isEmpty() {
102        if (memoryList.isEmpty() && isDiskListEmpty()) {
103            return true;
104        }
105        for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
106            MessageReference node = iterator.next();
107            if (node == QueueMessageReference.NULL_MESSAGE) {
108                continue;
109            }
110            if (!node.isDropped()) {
111                return false;
112            }
113            // We can remove dropped references.
114            iterator.remove();
115        }
116        return isDiskListEmpty();
117    }
118
119    /**
120     * reset the cursor
121     */
122    @Override
123    public synchronized void reset() {
124        iterating = true;
125        last = null;
126        if (isDiskListEmpty()) {
127            this.iter = this.memoryList.iterator();
128        } else {
129            this.iter = new DiskIterator();
130        }
131    }
132
133    @Override
134    public synchronized void release() {
135        iterating = false;
136        if (flushRequired) {
137            flushRequired = false;
138            flushToDisk();
139        }
140    }
141
142    @Override
143    public synchronized void destroy() throws Exception {
144        stop();
145        for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
146            Message node = (Message) i.next();
147            node.decrementReferenceCount();
148        }
149        memoryList.clear();
150        destroyDiskList();
151    }
152
153    private void destroyDiskList() throws Exception {
154        if (!isDiskListEmpty()) {
155            store.removePList(name);
156        }
157    }
158
159    @Override
160    public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
161        LinkedList<MessageReference> result = new LinkedList<MessageReference>();
162        int count = 0;
163        for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
164            MessageReference ref = i.next();
165            ref.incrementReferenceCount();
166            result.add(ref);
167            count++;
168        }
169        if (count < maxItems && !isDiskListEmpty()) {
170            for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) {
171                Message message = (Message) i.next();
172                message.setRegionDestination(regionDestination);
173                message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
174                message.incrementReferenceCount();
175                result.add(message);
176                count++;
177            }
178        }
179        return result;
180    }
181
182    /**
183     * add message to await dispatch
184     * 
185     * @param node
186     * @throws Exception 
187     */
188    @Override
189    public synchronized void addMessageLast(MessageReference node) throws Exception {
190        tryAddMessageLast(node, 0);
191    }
192    
193    @Override
194    public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
195        if (!node.isExpired()) {
196            try {
197                regionDestination = node.getMessage().getRegionDestination();
198                if (isDiskListEmpty()) {
199                    if (hasSpace() || this.store == null) {
200                        memoryList.add(node);
201                        node.incrementReferenceCount();
202                        setCacheEnabled(true);
203                        return true;
204                    }
205                }
206                if (!hasSpace()) {
207                    if (isDiskListEmpty()) {
208                        expireOldMessages();
209                        if (hasSpace()) {
210                            memoryList.add(node);
211                            node.incrementReferenceCount();
212                            return true;
213                        } else {
214                            flushToDisk();
215                        }
216                    }
217                }
218                if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) {
219                    ByteSequence bs = getByteSequence(node.getMessage());
220                    getDiskList().addLast(node.getMessageId().toString(), bs);
221                    return true;
222                }
223                return false;
224
225            } catch (Exception e) {
226                LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
227                throw new RuntimeException(e);
228            }
229        } else {
230            discard(node);
231        }
232        //message expired
233        return true;
234    }
235
236    /**
237     * add message to await dispatch
238     * 
239     * @param node
240     */
241    @Override
242    public synchronized void addMessageFirst(MessageReference node) {
243        if (!node.isExpired()) {
244            try {
245                regionDestination = node.getMessage().getRegionDestination();
246                if (isDiskListEmpty()) {
247                    if (hasSpace()) {
248                        memoryList.addFirst(node);
249                        node.incrementReferenceCount();
250                        setCacheEnabled(true);
251                        return;
252                    }
253                }
254                if (!hasSpace()) {
255                    if (isDiskListEmpty()) {
256                        expireOldMessages();
257                        if (hasSpace()) {
258                            memoryList.addFirst(node);
259                            node.incrementReferenceCount();
260                            return;
261                        } else {
262                            flushToDisk();
263                        }
264                    }
265                }
266                systemUsage.getTempUsage().waitForSpace();
267                node.decrementReferenceCount();
268                ByteSequence bs = getByteSequence(node.getMessage());
269                getDiskList().addFirst(node.getMessageId().toString(), bs);
270
271            } catch (Exception e) {
272                LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
273                throw new RuntimeException(e);
274            }
275        } else {
276            discard(node);
277        }
278    }
279
280    /**
281     * @return true if there pending messages to dispatch
282     */
283    @Override
284    public synchronized boolean hasNext() {
285        return iter.hasNext();
286    }
287
288    /**
289     * @return the next pending message
290     */
291    @Override
292    public synchronized MessageReference next() {
293        Message message = (Message) iter.next();
294        last = message;
295        if (!isDiskListEmpty()) {
296            // got from disk
297            message.setRegionDestination(regionDestination);
298            message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
299        }
300        message.incrementReferenceCount();
301        return message;
302    }
303
304    /**
305     * remove the message at the cursor position
306     */
307    @Override
308    public synchronized void remove() {
309        iter.remove();
310        if (last != null) {
311            last.decrementReferenceCount();
312        }
313    }
314
315    /**
316     * @param node
317     * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
318     */
319    @Override
320    public synchronized void remove(MessageReference node) {
321        if (memoryList.remove(node)) {
322            node.decrementReferenceCount();
323        }
324        if (!isDiskListEmpty()) {
325            try {
326                getDiskList().remove(node.getMessageId().toString());
327            } catch (IOException e) {
328                throw new RuntimeException(e);
329            }
330        }
331    }
332
333    /**
334     * @return the number of pending messages
335     */
336    @Override
337    public synchronized int size() {
338        return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size());
339    }
340
341    /**
342     * clear all pending messages
343     */
344    @Override
345    public synchronized void clear() {
346        memoryList.clear();
347        if (!isDiskListEmpty()) {
348            try {
349                getDiskList().destroy();
350            } catch (IOException e) {
351                throw new RuntimeException(e);
352            }
353        }
354        last = null;
355    }
356
357    @Override
358    public synchronized boolean isFull() {
359
360        return super.isFull() || (systemUsage != null && systemUsage.getTempUsage().isFull());
361
362    }
363
364    @Override
365    public boolean hasMessagesBufferedToDeliver() {
366        return !isEmpty();
367    }
368
369    @Override
370    public void setSystemUsage(SystemUsage usageManager) {
371        super.setSystemUsage(usageManager);
372    }
373
374    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
375        if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
376            synchronized (this) {
377                flushRequired = true;
378                if (!iterating) {
379                    expireOldMessages();
380                    if (!hasSpace()) {
381                        flushToDisk();
382                        flushRequired = false;
383                    }
384                }
385            }
386        }
387    }
388
389    @Override
390    public boolean isTransient() {
391        return true;
392    }
393
394    protected boolean isSpaceInMemoryList() {
395        return hasSpace() && isDiskListEmpty();
396    }
397
398    protected synchronized void expireOldMessages() {
399        if (!memoryList.isEmpty()) {
400            LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
401            this.memoryList = new LinkedList<MessageReference>();
402            while (!tmpList.isEmpty()) {
403                MessageReference node = tmpList.removeFirst();
404                if (node.isExpired()) {
405                    discard(node);
406                } else {
407                    memoryList.add(node);
408                }
409            }
410        }
411
412    }
413
414    protected synchronized void flushToDisk() {
415
416        if (!memoryList.isEmpty()) {
417            while (!memoryList.isEmpty()) {
418                MessageReference node = memoryList.removeFirst();
419                node.decrementReferenceCount();
420                ByteSequence bs;
421                try {
422                    bs = getByteSequence(node.getMessage());
423                    getDiskList().addLast(node.getMessageId().toString(), bs);
424                } catch (IOException e) {
425                    LOG.error("Failed to write to disk list", e);
426                    throw new RuntimeException(e);
427                }
428
429            }
430            memoryList.clear();
431            setCacheEnabled(false);
432        }
433    }
434
435    protected boolean isDiskListEmpty() {
436        return diskList == null || diskList.isEmpty();
437    }
438
439    protected PList getDiskList() {
440        if (diskList == null) {
441            try {
442                diskList = store.getPList(name);
443            } catch (Exception e) {
444                LOG.error("Caught an IO Exception getting the DiskList " + name, e);
445                throw new RuntimeException(e);
446            }
447        }
448        return diskList;
449    }
450
451    protected void discard(MessageReference message) {
452        message.decrementReferenceCount();
453        if (LOG.isDebugEnabled()) {
454            LOG.debug("Discarding message " + message);
455        }
456        ConnectionContext ctx = new ConnectionContext(new NonCachedMessageEvaluationContext());
457        ctx.setBroker(broker);
458        broker.getRoot().sendToDeadLetterQueue(ctx, message, null);
459    }
460
461    protected ByteSequence getByteSequence(Message message) throws IOException {
462        org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
463        return new ByteSequence(packet.data, packet.offset, packet.length);
464    }
465
466    protected Message getMessage(ByteSequence bs) throws IOException {
467        org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs
468                .getOffset(), bs.getLength());
469        return (Message) this.wireFormat.unmarshal(packet);
470
471    }
472
473    final class DiskIterator implements Iterator<MessageReference> {
474        private PListEntry next = null;
475        private PListEntry current = null;
476        PList list;
477
478        DiskIterator() {
479            try {
480                this.list = getDiskList();
481                synchronized (this.list) {
482                    this.current = this.list.getFirst();
483                    this.next = this.current;
484                }
485            } catch (Exception e) {
486                throw new RuntimeException(e);
487            }
488        }
489
490        public boolean hasNext() {
491            return this.next != null;
492        }
493
494        public MessageReference next() {
495            this.current = next;
496            try {
497                ByteSequence bs = this.current.getByteSequence();
498                synchronized (this.list) {
499                    this.current = this.list.refresh(this.current);
500                    this.next = this.list.getNext(this.current);
501                }
502                return getMessage(bs);
503            } catch (IOException e) {
504                LOG.error("I/O error", e);
505                throw new RuntimeException(e);
506            }
507        }
508
509        public void remove() {
510            try {
511                synchronized (this.list) {
512                    this.current = this.list.refresh(this.current);
513                    this.list.remove(this.current);
514                }
515
516            } catch (IOException e) {
517                LOG.error("I/O error", e);
518                throw new RuntimeException(e);
519            }
520
521        }
522
523    }
524}