001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.LinkedList;
024import java.util.List;
025import java.util.Map;
026import java.util.Map.Entry;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Executors;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicReference;
032import javax.jms.IllegalStateException;
033import javax.jms.InvalidDestinationException;
034import javax.jms.JMSException;
035import javax.jms.Message;
036import javax.jms.MessageConsumer;
037import javax.jms.MessageListener;
038import javax.jms.TransactionRolledBackException;
039import org.apache.activemq.blob.BlobDownloader;
040import org.apache.activemq.command.ActiveMQBlobMessage;
041import org.apache.activemq.command.ActiveMQDestination;
042import org.apache.activemq.command.ActiveMQMessage;
043import org.apache.activemq.command.ActiveMQTempDestination;
044import org.apache.activemq.command.CommandTypes;
045import org.apache.activemq.command.ConsumerId;
046import org.apache.activemq.command.ConsumerInfo;
047import org.apache.activemq.command.MessageAck;
048import org.apache.activemq.command.MessageDispatch;
049import org.apache.activemq.command.MessageId;
050import org.apache.activemq.command.MessagePull;
051import org.apache.activemq.command.RemoveInfo;
052import org.apache.activemq.command.TransactionId;
053import org.apache.activemq.management.JMSConsumerStatsImpl;
054import org.apache.activemq.management.StatsCapable;
055import org.apache.activemq.management.StatsImpl;
056import org.apache.activemq.selector.SelectorParser;
057import org.apache.activemq.thread.Scheduler;
058import org.apache.activemq.transaction.Synchronization;
059import org.apache.activemq.util.Callback;
060import org.apache.activemq.util.IntrospectionSupport;
061import org.apache.activemq.util.JMSExceptionSupport;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
067 * from a destination. A <CODE> MessageConsumer</CODE> object is created by
068 * passing a <CODE>Destination</CODE> object to a message-consumer creation
069 * method supplied by a session.
070 * <P>
071 * <CODE>MessageConsumer</CODE> is the parent interface for all message
072 * consumers.
073 * <P>
074 * A message consumer can be created with a message selector. A message selector
075 * allows the client to restrict the messages delivered to the message consumer
076 * to those that match the selector.
077 * <P>
078 * A client may either synchronously receive a message consumer's messages or
079 * have the consumer asynchronously deliver them as they arrive.
080 * <P>
081 * For synchronous receipt, a client can request the next message from a message
082 * consumer using one of its <CODE> receive</CODE> methods. There are several
083 * variations of <CODE>receive</CODE> that allow a client to poll or wait for
084 * the next message.
085 * <P>
086 * For asynchronous delivery, a client can register a
087 * <CODE>MessageListener</CODE> object with a message consumer. As messages
088 * arrive at the message consumer, it delivers them by calling the
089 * <CODE>MessageListener</CODE>'s<CODE>
090 * onMessage</CODE> method.
091 * <P>
092 * It is a client programming error for a <CODE>MessageListener</CODE> to
093 * throw an exception.
094 * 
095 * 
096 * @see javax.jms.MessageConsumer
097 * @see javax.jms.QueueReceiver
098 * @see javax.jms.TopicSubscriber
099 * @see javax.jms.Session
100 */
101public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
102
103    @SuppressWarnings("serial")
104    class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> {
105        final TransactionId transactionId;
106        public PreviouslyDeliveredMap(TransactionId transactionId) {
107            this.transactionId = transactionId;
108        }
109    }
110
111    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
112    protected final Scheduler scheduler;
113    protected final ActiveMQSession session;
114    protected final ConsumerInfo info;
115
116    // These are the messages waiting to be delivered to the client
117    protected final MessageDispatchChannel unconsumedMessages;
118
119    // The are the messages that were delivered to the consumer but that have
120    // not been acknowledged. It's kept in reverse order since we
121    // Always walk list in reverse order.
122    private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
123    // track duplicate deliveries in a transaction such that the tx integrity can be validated
124    private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;
125    private int deliveredCounter;
126    private int additionalWindowSize;
127    private long redeliveryDelay;
128    private int ackCounter;
129    private int dispatchedCount;
130    private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
131    private final JMSConsumerStatsImpl stats;
132
133    private final String selector;
134    private boolean synchronizationRegistered;
135    private final AtomicBoolean started = new AtomicBoolean(false);
136
137    private MessageAvailableListener availableListener;
138
139    private RedeliveryPolicy redeliveryPolicy;
140    private boolean optimizeAcknowledge;
141    private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
142    private ExecutorService executorService;
143    private MessageTransformer transformer;
144    private boolean clearDispatchList;
145    boolean inProgressClearRequiredFlag;
146
147    private MessageAck pendingAck;
148    private long lastDeliveredSequenceId;
149
150    private IOException failureError;
151    
152    private long optimizeAckTimestamp = System.currentTimeMillis();
153    private final long optimizeAckTimeout = 300;
154    private long failoverRedeliveryWaitPeriod = 0;
155
156    /**
157     * Create a MessageConsumer
158     * 
159     * @param session
160     * @param dest
161     * @param name
162     * @param selector
163     * @param prefetch
164     * @param maximumPendingMessageCount
165     * @param noLocal
166     * @param browser
167     * @param dispatchAsync
168     * @param messageListener
169     * @throws JMSException
170     */
171    public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
172            String name, String selector, int prefetch,
173            int maximumPendingMessageCount, boolean noLocal, boolean browser,
174            boolean dispatchAsync, MessageListener messageListener) throws JMSException {
175        if (dest == null) {
176            throw new InvalidDestinationException("Don't understand null destinations");
177        } else if (dest.getPhysicalName() == null) {
178            throw new InvalidDestinationException("The destination object was not given a physical name.");
179        } else if (dest.isTemporary()) {
180            String physicalName = dest.getPhysicalName();
181
182            if (physicalName == null) {
183                throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
184            }
185
186            String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
187
188            if (physicalName.indexOf(connectionID) < 0) {
189                throw new InvalidDestinationException(
190                                                      "Cannot use a Temporary destination from another Connection");
191            }
192
193            if (session.connection.isDeleted(dest)) {
194                throw new InvalidDestinationException(
195                                                      "Cannot use a Temporary destination that has been deleted");
196            }
197            if (prefetch < 0) {
198                throw new JMSException("Cannot have a prefetch size less than zero");
199            }
200        }
201        if (session.connection.isMessagePrioritySupported()) {
202            this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
203        }else {
204            this.unconsumedMessages = new FifoMessageDispatchChannel();
205        }
206
207        this.session = session;
208        this.scheduler = session.getScheduler();
209        this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
210        setTransformer(session.getTransformer());
211
212        this.info = new ConsumerInfo(consumerId);
213        this.info.setExclusive(this.session.connection.isExclusiveConsumer());
214        this.info.setSubscriptionName(name);
215        this.info.setPrefetchSize(prefetch);
216        this.info.setCurrentPrefetchSize(prefetch);
217        this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
218        this.info.setNoLocal(noLocal);
219        this.info.setDispatchAsync(dispatchAsync);
220        this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
221        this.info.setSelector(null);
222
223        // Allows the options on the destination to configure the consumerInfo
224        if (dest.getOptions() != null) {
225            Map<String, String> options = new HashMap<String, String>(dest.getOptions());
226            IntrospectionSupport.setProperties(this.info, options, "consumer.");
227        }
228
229        this.info.setDestination(dest);
230        this.info.setBrowser(browser);
231        if (selector != null && selector.trim().length() != 0) {
232            // Validate the selector
233            SelectorParser.parse(selector);
234            this.info.setSelector(selector);
235            this.selector = selector;
236        } else if (info.getSelector() != null) {
237            // Validate the selector
238            SelectorParser.parse(this.info.getSelector());
239            this.selector = this.info.getSelector();
240        } else {
241            this.selector = null;
242        }
243
244        this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
245        this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
246                                   && !info.isBrowser();
247        this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
248        this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
249        if (messageListener != null) {
250            setMessageListener(messageListener);
251        }
252        try {
253            this.session.addConsumer(this);
254            this.session.syncSendPacket(info);
255        } catch (JMSException e) {
256            this.session.removeConsumer(this);
257            throw e;
258        }
259
260        if (session.connection.isStarted()) {
261            start();
262        }
263    }
264
265    private boolean isAutoAcknowledgeEach() {
266        return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() );
267    }
268
269    private boolean isAutoAcknowledgeBatch() {
270        return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
271    }
272
273    public StatsImpl getStats() {
274        return stats;
275    }
276
277    public JMSConsumerStatsImpl getConsumerStats() {
278        return stats;
279    }
280
281    public RedeliveryPolicy getRedeliveryPolicy() {
282        return redeliveryPolicy;
283    }
284
285    /**
286     * Sets the redelivery policy used when messages are redelivered
287     */
288    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
289        this.redeliveryPolicy = redeliveryPolicy;
290    }
291
292    public MessageTransformer getTransformer() {
293        return transformer;
294    }
295
296    /**
297     * Sets the transformer used to transform messages before they are sent on
298     * to the JMS bus
299     */
300    public void setTransformer(MessageTransformer transformer) {
301        this.transformer = transformer;
302    }
303
304    /**
305     * @return Returns the value.
306     */
307    public ConsumerId getConsumerId() {
308        return info.getConsumerId();
309    }
310
311    /**
312     * @return the consumer name - used for durable consumers
313     */
314    public String getConsumerName() {
315        return this.info.getSubscriptionName();
316    }
317
318    /**
319     * @return true if this consumer does not accept locally produced messages
320     */
321    protected boolean isNoLocal() {
322        return info.isNoLocal();
323    }
324
325    /**
326     * Retrieve is a browser
327     * 
328     * @return true if a browser
329     */
330    protected boolean isBrowser() {
331        return info.isBrowser();
332    }
333
334    /**
335     * @return ActiveMQDestination
336     */
337    protected ActiveMQDestination getDestination() {
338        return info.getDestination();
339    }
340
341    /**
342     * @return Returns the prefetchNumber.
343     */
344    public int getPrefetchNumber() {
345        return info.getPrefetchSize();
346    }
347
348    /**
349     * @return true if this is a durable topic subscriber
350     */
351    public boolean isDurableSubscriber() {
352        return info.getSubscriptionName() != null && info.getDestination().isTopic();
353    }
354
355    /**
356     * Gets this message consumer's message selector expression.
357     * 
358     * @return this message consumer's message selector, or null if no message
359     *         selector exists for the message consumer (that is, if the message
360     *         selector was not set or was set to null or the empty string)
361     * @throws JMSException if the JMS provider fails to receive the next
362     *                 message due to some internal error.
363     */
364    public String getMessageSelector() throws JMSException {
365        checkClosed();
366        return selector;
367    }
368
369    /**
370     * Gets the message consumer's <CODE>MessageListener</CODE>.
371     * 
372     * @return the listener for the message consumer, or null if no listener is
373     *         set
374     * @throws JMSException if the JMS provider fails to get the message
375     *                 listener due to some internal error.
376     * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
377     */
378    public MessageListener getMessageListener() throws JMSException {
379        checkClosed();
380        return this.messageListener.get();
381    }
382
383    /**
384     * Sets the message consumer's <CODE>MessageListener</CODE>.
385     * <P>
386     * Setting the message listener to null is the equivalent of unsetting the
387     * message listener for the message consumer.
388     * <P>
389     * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
390     * while messages are being consumed by an existing listener or the consumer
391     * is being used to consume messages synchronously is undefined.
392     * 
393     * @param listener the listener to which the messages are to be delivered
394     * @throws JMSException if the JMS provider fails to receive the next
395     *                 message due to some internal error.
396     * @see javax.jms.MessageConsumer#getMessageListener
397     */
398    public void setMessageListener(MessageListener listener) throws JMSException {
399        checkClosed();
400        if (info.getPrefetchSize() == 0) {
401            throw new JMSException(
402                                   "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
403        }
404        if (listener != null) {
405            boolean wasRunning = session.isRunning();
406            if (wasRunning) {
407                session.stop();
408            }
409
410            this.messageListener.set(listener);
411            session.redispatch(this, unconsumedMessages);
412
413            if (wasRunning) {
414                session.start();
415            }
416        } else {
417            this.messageListener.set(null);
418        }
419    }
420
421    public MessageAvailableListener getAvailableListener() {
422        return availableListener;
423    }
424
425    /**
426     * Sets the listener used to notify synchronous consumers that there is a
427     * message available so that the {@link MessageConsumer#receiveNoWait()} can
428     * be called.
429     */
430    public void setAvailableListener(MessageAvailableListener availableListener) {
431        this.availableListener = availableListener;
432    }
433
434    /**
435     * Used to get an enqueued message from the unconsumedMessages list. The
436     * amount of time this method blocks is based on the timeout value. - if
437     * timeout==-1 then it blocks until a message is received. - if timeout==0
438     * then it it tries to not block at all, it returns a message if it is
439     * available - if timeout>0 then it blocks up to timeout amount of time.
440     * Expired messages will consumed by this method.
441     * 
442     * @throws JMSException
443     * @return null if we timeout or if the consumer is closed.
444     */
445    private MessageDispatch dequeue(long timeout) throws JMSException {
446        try {
447            long deadline = 0;
448            if (timeout > 0) {
449                deadline = System.currentTimeMillis() + timeout;
450            }
451            while (true) {
452                MessageDispatch md = unconsumedMessages.dequeue(timeout);
453                if (md == null) {
454                    if (timeout > 0 && !unconsumedMessages.isClosed()) {
455                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
456                    } else {
457                        if (failureError != null) {
458                                throw JMSExceptionSupport.create(failureError);
459                        } else {
460                                return null;
461                        }
462                    }
463                } else if (md.getMessage() == null) {
464                    return null;
465                } else if (md.getMessage().isExpired()) {
466                    if (LOG.isDebugEnabled()) {
467                        LOG.debug(getConsumerId() + " received expired message: " + md);
468                    }
469                    beforeMessageIsConsumed(md);
470                    afterMessageIsConsumed(md, true);
471                    if (timeout > 0) {
472                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
473                    }
474                } else {
475                    if (LOG.isTraceEnabled()) {
476                        LOG.trace(getConsumerId() + " received message: " + md);
477                    }
478                    return md;
479                }
480            }
481        } catch (InterruptedException e) {
482            Thread.currentThread().interrupt();
483            throw JMSExceptionSupport.create(e);
484        }
485    }
486
487    /**
488     * Receives the next message produced for this message consumer.
489     * <P>
490     * This call blocks indefinitely until a message is produced or until this
491     * message consumer is closed.
492     * <P>
493     * If this <CODE>receive</CODE> is done within a transaction, the consumer
494     * retains the message until the transaction commits.
495     * 
496     * @return the next message produced for this message consumer, or null if
497     *         this message consumer is concurrently closed
498     */
499    public Message receive() throws JMSException {
500        checkClosed();
501        checkMessageListener();
502
503        sendPullCommand(0);
504        MessageDispatch md = dequeue(-1);
505        if (md == null) {
506            return null;
507        }
508
509        beforeMessageIsConsumed(md);
510        afterMessageIsConsumed(md, false);
511
512        return createActiveMQMessage(md);
513    }
514
515    /**
516     * @param md
517     * @return
518     */
519    private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
520        ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
521        if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
522                ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
523        }
524        if (transformer != null) {
525            Message transformedMessage = transformer.consumerTransform(session, this, m);
526            if (transformedMessage != null) {
527                m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
528            }
529        }
530        if (session.isClientAcknowledge()) {
531            m.setAcknowledgeCallback(new Callback() {
532                public void execute() throws Exception {
533                    session.checkClosed();
534                    session.acknowledge();
535                }
536            });
537        }else if (session.isIndividualAcknowledge()) {
538            m.setAcknowledgeCallback(new Callback() {
539                public void execute() throws Exception {
540                    session.checkClosed();
541                    acknowledge(md);
542                }
543            });
544        }
545        return m;
546    }
547
548    /**
549     * Receives the next message that arrives within the specified timeout
550     * interval.
551     * <P>
552     * This call blocks until a message arrives, the timeout expires, or this
553     * message consumer is closed. A <CODE>timeout</CODE> of zero never
554     * expires, and the call blocks indefinitely.
555     * 
556     * @param timeout the timeout value (in milliseconds), a time out of zero
557     *                never expires.
558     * @return the next message produced for this message consumer, or null if
559     *         the timeout expires or this message consumer is concurrently
560     *         closed
561     */
562    public Message receive(long timeout) throws JMSException {
563        checkClosed();
564        checkMessageListener();
565        if (timeout == 0) {
566            return this.receive();
567
568        }
569
570        sendPullCommand(timeout);
571        while (timeout > 0) {
572
573            MessageDispatch md;
574            if (info.getPrefetchSize() == 0) {
575                md = dequeue(-1); // We let the broker let us know when we timeout.
576            } else {
577                md = dequeue(timeout);
578            }
579
580            if (md == null) {
581                return null;
582            }
583
584            beforeMessageIsConsumed(md);
585            afterMessageIsConsumed(md, false);
586            return createActiveMQMessage(md);
587        }
588        return null;
589    }
590
591    /**
592     * Receives the next message if one is immediately available.
593     * 
594     * @return the next message produced for this message consumer, or null if
595     *         one is not available
596     * @throws JMSException if the JMS provider fails to receive the next
597     *                 message due to some internal error.
598     */
599    public Message receiveNoWait() throws JMSException {
600        checkClosed();
601        checkMessageListener();
602        sendPullCommand(-1);
603
604        MessageDispatch md;
605        if (info.getPrefetchSize() == 0) {
606            md = dequeue(-1); // We let the broker let us know when we
607            // timeout.
608        } else {
609            md = dequeue(0);
610        }
611
612        if (md == null) {
613            return null;
614        }
615
616        beforeMessageIsConsumed(md);
617        afterMessageIsConsumed(md, false);
618        return createActiveMQMessage(md);
619    }
620
621    /**
622     * Closes the message consumer.
623     * <P>
624     * Since a provider may allocate some resources on behalf of a <CODE>
625     * MessageConsumer</CODE>
626     * outside the Java virtual machine, clients should close them when they are
627     * not needed. Relying on garbage collection to eventually reclaim these
628     * resources may not be timely enough.
629     * <P>
630     * This call blocks until a <CODE>receive</CODE> or message listener in
631     * progress has completed. A blocked message consumer <CODE>receive </CODE>
632     * call returns null when this message consumer is closed.
633     * 
634     * @throws JMSException if the JMS provider fails to close the consumer due
635     *                 to some internal error.
636     */
637    public void close() throws JMSException {
638        if (!unconsumedMessages.isClosed()) {
639            if (session.getTransactionContext().isInTransaction()) {
640                session.getTransactionContext().addSynchronization(new Synchronization() {
641                    @Override
642                    public void afterCommit() throws Exception {
643                        doClose();
644                    }
645
646                    @Override
647                    public void afterRollback() throws Exception {
648                        doClose();
649                    }
650                });
651            } else {
652                doClose();
653            } 
654        }
655    }
656
657    void doClose() throws JMSException {
658        dispose();
659        RemoveInfo removeCommand = info.createRemoveCommand();
660        if (LOG.isDebugEnabled()) {
661            LOG.debug("remove: " + this.getConsumerId() + ", lastDeliveredSequenceId:" + lastDeliveredSequenceId);
662        }
663        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
664        this.session.asyncSendPacket(removeCommand);
665    }
666    
667    void inProgressClearRequired() {
668        inProgressClearRequiredFlag = true;
669        // deal with delivered messages async to avoid lock contention with in progress acks
670        clearDispatchList = true;
671    }
672    
673    void clearMessagesInProgress() {
674        if (inProgressClearRequiredFlag) {
675            synchronized (unconsumedMessages.getMutex()) {
676                if (inProgressClearRequiredFlag) {
677                    if (LOG.isDebugEnabled()) {
678                        LOG.debug(getConsumerId() + " clearing dispatched list (" + unconsumedMessages.size() + ") on transport interrupt");
679                    }
680                    // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
681                    List<MessageDispatch> list = unconsumedMessages.removeAll();
682                    if (!this.info.isBrowser()) {
683                        for (MessageDispatch old : list) {
684                            session.connection.rollbackDuplicate(this, old.getMessage());
685                        }
686                    }
687                    // allow dispatch on this connection to resume
688                    session.connection.transportInterruptionProcessingComplete();
689                    inProgressClearRequiredFlag = false;
690                }
691            }
692        }
693    }
694
695    void deliverAcks() {
696        MessageAck ack = null;
697        if (deliveryingAcknowledgements.compareAndSet(false, true)) {
698            if (isAutoAcknowledgeEach()) {
699                synchronized(deliveredMessages) {
700                    ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
701                    if (ack != null) {
702                        deliveredMessages.clear();
703                        ackCounter = 0;
704                        } else {
705                            ack = pendingAck;
706                            pendingAck = null;
707                        }
708                }
709            } else if (pendingAck != null && pendingAck.isStandardAck()) {
710                ack = pendingAck;
711                pendingAck = null;
712            }
713            if (ack != null) {
714                final MessageAck ackToSend = ack;
715                
716                if (executorService == null) {
717                    executorService = Executors.newSingleThreadExecutor();
718                }
719                executorService.submit(new Runnable() {
720                    public void run() {
721                        try {
722                            session.sendAck(ackToSend,true);
723                        } catch (JMSException e) {
724                            LOG.error(getConsumerId() + " failed to delivered acknowledgements", e);
725                        } finally {
726                            deliveryingAcknowledgements.set(false);
727                        }
728                    }
729                });
730            } else {
731                deliveryingAcknowledgements.set(false);
732            }
733        }
734    }
735
736    public void dispose() throws JMSException {
737        if (!unconsumedMessages.isClosed()) {
738            
739            // Do we have any acks we need to send out before closing?
740            // Ack any delivered messages now.
741            if (!session.getTransacted()) { 
742                deliverAcks();
743                if (isAutoAcknowledgeBatch()) {
744                    acknowledge();
745                }
746            }
747            if (executorService != null) {
748                executorService.shutdown();
749                try {
750                    executorService.awaitTermination(60, TimeUnit.SECONDS);
751                } catch (InterruptedException e) {
752                    Thread.currentThread().interrupt();
753                }
754            }
755            
756            if (session.isClientAcknowledge()) {
757                if (!this.info.isBrowser()) {
758                    // rollback duplicates that aren't acknowledged
759                    List<MessageDispatch> tmp = null;
760                    synchronized (this.deliveredMessages) {
761                        tmp = new ArrayList<MessageDispatch>(this.deliveredMessages);
762                    }
763                    for (MessageDispatch old : tmp) {
764                        this.session.connection.rollbackDuplicate(this, old.getMessage());
765                    }
766                    tmp.clear();
767                }
768            }
769            if (!session.isTransacted()) {
770                synchronized(deliveredMessages) {
771                    deliveredMessages.clear();
772                }
773            }
774            unconsumedMessages.close();
775            this.session.removeConsumer(this);
776            List<MessageDispatch> list = unconsumedMessages.removeAll();
777            if (!this.info.isBrowser()) {
778                for (MessageDispatch old : list) {
779                    // ensure we don't filter this as a duplicate
780                    session.connection.rollbackDuplicate(this, old.getMessage());
781                }
782            }
783        }
784    }
785
786    /**
787     * @throws IllegalStateException
788     */
789    protected void checkClosed() throws IllegalStateException {
790        if (unconsumedMessages.isClosed()) {
791            throw new IllegalStateException("The Consumer is closed");
792        }
793    }
794
795    /**
796     * If we have a zero prefetch specified then send a pull command to the
797     * broker to pull a message we are about to receive
798     */
799    protected void sendPullCommand(long timeout) throws JMSException {
800        clearDispatchList();
801        if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
802            MessagePull messagePull = new MessagePull();
803            messagePull.configure(info);
804            messagePull.setTimeout(timeout);
805            session.asyncSendPacket(messagePull);
806        }
807    }
808
809    protected void checkMessageListener() throws JMSException {
810        session.checkMessageListener();
811    }
812
813    protected void setOptimizeAcknowledge(boolean value) {
814        if (optimizeAcknowledge && !value) {
815            deliverAcks();
816        }
817        optimizeAcknowledge = value;
818    }
819
820    protected void setPrefetchSize(int prefetch) {
821        deliverAcks();
822        this.info.setCurrentPrefetchSize(prefetch);
823    }
824
825    private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
826        md.setDeliverySequenceId(session.getNextDeliveryId());
827        lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
828        if (!isAutoAcknowledgeBatch()) {
829            synchronized(deliveredMessages) {
830                deliveredMessages.addFirst(md);
831            }
832            if (session.getTransacted()) {
833                ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
834            }
835        }
836    }
837    
838    private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
839        if (unconsumedMessages.isClosed()) {
840            return;
841        }
842        if (messageExpired) {
843            synchronized (deliveredMessages) {
844                deliveredMessages.remove(md);
845            }
846            stats.getExpiredMessageCount().increment();
847            ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
848        } else {
849            stats.onMessage();
850            if (session.getTransacted()) {
851                // Do nothing.
852            } else if (isAutoAcknowledgeEach()) {
853                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
854                    synchronized (deliveredMessages) {
855                        if (!deliveredMessages.isEmpty()) {
856                            if (optimizeAcknowledge) {
857                                ackCounter++;
858                                if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) {
859                                        MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
860                                        if (ack != null) {
861                                            deliveredMessages.clear();
862                                            ackCounter = 0;
863                                            session.sendAck(ack);
864                                            optimizeAckTimestamp = System.currentTimeMillis();
865                                        }
866                                }
867                            } else {
868                                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
869                                if (ack!=null) {
870                                    deliveredMessages.clear();
871                                    session.sendAck(ack);
872                                }
873                            }
874                        }
875                    }
876                    deliveryingAcknowledgements.set(false);
877                }
878            } else if (isAutoAcknowledgeBatch()) {
879                ackLater(md, MessageAck.STANDARD_ACK_TYPE);
880            } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
881                boolean messageUnackedByConsumer = false;
882                synchronized (deliveredMessages) {
883                    messageUnackedByConsumer = deliveredMessages.contains(md);
884                }
885                if (messageUnackedByConsumer) {
886                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
887                }
888            } 
889            else {
890                throw new IllegalStateException("Invalid session state.");
891            }
892        }
893    }
894
895    /**
896     * Creates a MessageAck for all messages contained in deliveredMessages.
897     * Caller should hold the lock for deliveredMessages.
898     * 
899     * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE) 
900     * @return <code>null</code> if nothing to ack.
901     */
902        private MessageAck makeAckForAllDeliveredMessages(byte type) {
903                synchronized (deliveredMessages) {
904                        if (deliveredMessages.isEmpty())
905                                return null;
906                            
907                        MessageDispatch md = deliveredMessages.getFirst();
908                    MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
909                    ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
910                    return ack;
911                }
912        }
913
914    private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
915
916        // Don't acknowledge now, but we may need to let the broker know the
917        // consumer got the message to expand the pre-fetch window
918        if (session.getTransacted()) {
919            session.doStartTransaction();
920            if (!synchronizationRegistered) {
921                synchronizationRegistered = true;
922                session.getTransactionContext().addSynchronization(new Synchronization() {
923                    @Override
924                    public void beforeEnd() throws Exception {
925                        acknowledge();
926                        synchronizationRegistered = false;
927                    }
928
929                    @Override
930                    public void afterCommit() throws Exception {
931                        commit();
932                        synchronizationRegistered = false;
933                    }
934
935                    @Override
936                    public void afterRollback() throws Exception {
937                        rollback();
938                        synchronizationRegistered = false;
939                    }
940                });
941            }
942        }
943
944        deliveredCounter++;
945        
946        MessageAck oldPendingAck = pendingAck;
947        pendingAck = new MessageAck(md, ackType, deliveredCounter);
948        pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
949        if( oldPendingAck==null ) {
950            pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
951        } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
952            pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
953        } else {
954            // old pending ack being superseded by ack of another type, if is is not a delivered
955            // ack and hence important, send it now so it is not lost.
956            if ( !oldPendingAck.isDeliveredAck()) {
957                if (LOG.isDebugEnabled()) {
958                    LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
959                }
960                session.sendAck(oldPendingAck);
961            } else {
962                if (LOG.isDebugEnabled()) {
963                    LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
964                }
965            }
966        }
967        
968        if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
969            session.sendAck(pendingAck);
970            pendingAck=null;
971            deliveredCounter = 0;
972            additionalWindowSize = 0;
973        }
974    }
975
976    /**
977     * Acknowledge all the messages that have been delivered to the client up to
978     * this point.
979     * 
980     * @throws JMSException
981     */
982    public void acknowledge() throws JMSException {
983        clearDispatchList();
984        waitForRedeliveries();
985        synchronized(deliveredMessages) {
986            // Acknowledge all messages so far.
987            MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
988            if (ack == null)
989                return; // no msgs
990            
991            if (session.getTransacted()) {
992                rollbackOnFailedRecoveryRedelivery();
993                session.doStartTransaction();
994                ack.setTransactionId(session.getTransactionContext().getTransactionId());
995            }
996            session.sendAck(ack);
997            pendingAck = null;
998            
999            // Adjust the counters
1000            deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
1001            additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1002            
1003            if (!session.getTransacted()) {  
1004                deliveredMessages.clear();
1005            } 
1006        }
1007    }
1008    
1009    private void waitForRedeliveries() {
1010        if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) {
1011            long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod;
1012            int numberNotReplayed;
1013            do {
1014                numberNotReplayed = 0;
1015                synchronized(deliveredMessages) {
1016                    if (previouslyDeliveredMessages != null) { 
1017                        for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1018                            if (!entry.getValue()) {
1019                                numberNotReplayed++;
1020                            }
1021                        }
1022                    }
1023                }
1024                if (numberNotReplayed > 0) {
1025                    LOG.info("waiting for redelivery of " + numberNotReplayed + " in transaction: "
1026                            + previouslyDeliveredMessages.transactionId +  ", to consumer :" + this.getConsumerId());
1027                    try {
1028                        Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4));
1029                    } catch (InterruptedException outOfhere) {
1030                        break;
1031                    }
1032                }
1033            } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis());
1034        }
1035    }
1036
1037    /*
1038     * called with deliveredMessages locked
1039     */
1040    private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
1041        if (previouslyDeliveredMessages != null) {
1042            // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback
1043            // as messages have been dispatched else where.
1044            int numberNotReplayed = 0;
1045            for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1046                if (!entry.getValue()) {
1047                    numberNotReplayed++;
1048                    if (LOG.isDebugEnabled()) {
1049                        LOG.debug("previously delivered message has not been replayed in transaction: "
1050                                + previouslyDeliveredMessages.transactionId 
1051                                + " , messageId: " + entry.getKey());
1052                    }
1053                }
1054            }
1055            if (numberNotReplayed > 0) {
1056                String message = "rolling back transaction (" 
1057                    + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed
1058                    + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId();
1059                LOG.warn(message);
1060                throw new TransactionRolledBackException(message);   
1061            }
1062        }
1063    }
1064
1065    void acknowledge(MessageDispatch md) throws JMSException {
1066        MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
1067        session.sendAck(ack);
1068        synchronized(deliveredMessages){
1069            deliveredMessages.remove(md);
1070        }
1071    }
1072
1073    public void commit() throws JMSException {
1074        synchronized (deliveredMessages) {
1075            deliveredMessages.clear();
1076            clearPreviouslyDelivered();
1077        }
1078        redeliveryDelay = 0;
1079    }
1080
1081    public void rollback() throws JMSException {
1082        synchronized (unconsumedMessages.getMutex()) {
1083            if (optimizeAcknowledge) {
1084                // remove messages read but not acked at the broker yet through
1085                // optimizeAcknowledge
1086                if (!this.info.isBrowser()) {
1087                    synchronized(deliveredMessages) {
1088                        for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
1089                            // ensure we don't filter this as a duplicate
1090                            MessageDispatch md = deliveredMessages.removeLast();
1091                            session.connection.rollbackDuplicate(this, md.getMessage());
1092                        }
1093                    }
1094                }
1095            }
1096            synchronized(deliveredMessages) {
1097                rollbackPreviouslyDeliveredAndNotRedelivered();
1098                if (deliveredMessages.isEmpty()) {
1099                    return;
1100                }
1101    
1102                // use initial delay for first redelivery
1103                MessageDispatch lastMd = deliveredMessages.getFirst();
1104                final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
1105                if (currentRedeliveryCount > 0) {
1106                    redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
1107                } else {
1108                    redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
1109                }
1110                MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
1111    
1112                for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
1113                    MessageDispatch md = iter.next();
1114                    md.getMessage().onMessageRolledBack();
1115                    // ensure we don't filter this as a duplicate
1116                    session.connection.rollbackDuplicate(this, md.getMessage());
1117                }
1118    
1119                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
1120                    && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
1121                    // We need to NACK the messages so that they get sent to the
1122                    // DLQ.
1123                    // Acknowledge the last message.
1124                    
1125                    MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
1126                    ack.setPoisonCause(lastMd.getRollbackCause());
1127                                        ack.setFirstMessageId(firstMsgId);
1128                    session.sendAck(ack,true);
1129                    // Adjust the window size.
1130                    additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1131                    redeliveryDelay = 0;
1132                } else {
1133                    
1134                    // only redelivery_ack after first delivery
1135                    if (currentRedeliveryCount > 0) {
1136                        MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
1137                        ack.setFirstMessageId(firstMsgId);
1138                        session.sendAck(ack,true);
1139                    }
1140    
1141                    // stop the delivery of messages.
1142                    unconsumedMessages.stop();
1143    
1144                    for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
1145                        MessageDispatch md = iter.next();
1146                        unconsumedMessages.enqueueFirst(md);
1147                    }
1148    
1149                    if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
1150                        // Start up the delivery again a little later.
1151                        scheduler.executeAfterDelay(new Runnable() {
1152                            public void run() {
1153                                try {
1154                                    if (started.get()) {
1155                                        start();
1156                                    }
1157                                } catch (JMSException e) {
1158                                    session.connection.onAsyncException(e);
1159                                }
1160                            }
1161                        }, redeliveryDelay);
1162                    } else {
1163                        start();
1164                    }
1165    
1166                }
1167                deliveredCounter -= deliveredMessages.size();
1168                deliveredMessages.clear();
1169            }
1170        }
1171        if (messageListener.get() != null) {
1172            session.redispatch(this, unconsumedMessages);
1173        }
1174    }
1175
1176    /*
1177     * called with unconsumedMessages && deliveredMessages locked
1178     * remove any message not re-delivered as they can't be replayed to this 
1179     * consumer on rollback
1180     */
1181    private void rollbackPreviouslyDeliveredAndNotRedelivered() {
1182        if (previouslyDeliveredMessages != null) {
1183            for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1184                if (!entry.getValue()) {              
1185                    removeFromDeliveredMessages(entry.getKey());
1186                }
1187            }
1188            clearPreviouslyDelivered();
1189        }
1190    }
1191
1192    /*
1193     * called with deliveredMessages locked
1194     */
1195    private void removeFromDeliveredMessages(MessageId key) {
1196        Iterator<MessageDispatch> iterator = deliveredMessages.iterator();
1197        while (iterator.hasNext()) {
1198            MessageDispatch candidate = iterator.next();
1199            if (key.equals(candidate.getMessage().getMessageId())) {
1200                session.connection.rollbackDuplicate(this, candidate.getMessage());
1201                iterator.remove();
1202                break;
1203            }
1204        }
1205    }
1206    /*
1207     * called with deliveredMessages locked
1208     */
1209    private void clearPreviouslyDelivered() {
1210        if (previouslyDeliveredMessages != null) {
1211            previouslyDeliveredMessages.clear();
1212            previouslyDeliveredMessages = null;
1213        }
1214    }
1215
1216    public void dispatch(MessageDispatch md) {
1217        MessageListener listener = this.messageListener.get();
1218        try {
1219            clearMessagesInProgress();
1220            clearDispatchList();
1221            synchronized (unconsumedMessages.getMutex()) {
1222                if (!unconsumedMessages.isClosed()) {
1223                    if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
1224                        if (listener != null && unconsumedMessages.isRunning()) {
1225                            ActiveMQMessage message = createActiveMQMessage(md);
1226                            beforeMessageIsConsumed(md);
1227                            try {
1228                                boolean expired = message.isExpired();
1229                                if (!expired) {
1230                                    listener.onMessage(message);
1231                                }
1232                                afterMessageIsConsumed(md, expired);
1233                            } catch (RuntimeException e) {
1234                                LOG.error(getConsumerId() + " Exception while processing message: " + md.getMessage().getMessageId(), e);
1235                                if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
1236                                    // schedual redelivery and possible dlq processing
1237                                    md.setRollbackCause(e);
1238                                    rollback();
1239                                } else {
1240                                    // Transacted or Client ack: Deliver the
1241                                    // next message.
1242                                    afterMessageIsConsumed(md, false);
1243                                }
1244                            }
1245                        } else {
1246                            if (!unconsumedMessages.isRunning()) {
1247                                // delayed redelivery, ensure it can be re delivered
1248                                session.connection.rollbackDuplicate(this, md.getMessage());
1249                            }
1250                            unconsumedMessages.enqueue(md);
1251                            if (availableListener != null) {
1252                                availableListener.onMessageAvailable(this);
1253                            }
1254                        }
1255                    } else {
1256                        if (!session.isTransacted()) {
1257                            if (LOG.isDebugEnabled()) {
1258                                LOG.debug(getConsumerId() + " ignoring (auto acking) duplicate: " + md.getMessage());
1259                            }
1260                            MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
1261                            session.sendAck(ack);
1262                        } else {
1263                            if (LOG.isDebugEnabled()) {
1264                                LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
1265                            }
1266                            boolean needsPoisonAck = false;
1267                            synchronized (deliveredMessages) {
1268                                if (previouslyDeliveredMessages != null) {
1269                                    previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
1270                                } else {
1271                                    // delivery while pending redelivery to another consumer on the same connection
1272                                    // not waiting for redelivery will help here
1273                                    needsPoisonAck = true;
1274                                }
1275                            }
1276                            if (needsPoisonAck) {
1277                                LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
1278                                        + " consumer on this connection, failoverRedeliveryWaitPeriod=" 
1279                                        + failoverRedeliveryWaitPeriod + ". Message: " + md);
1280                                MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
1281                                poisonAck.setFirstMessageId(md.getMessage().getMessageId());
1282                                session.sendAck(poisonAck);
1283                            } else {
1284                                ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
1285                            }
1286                        }
1287                    }
1288                }
1289            }
1290            if (++dispatchedCount % 1000 == 0) {
1291                dispatchedCount = 0;
1292                Thread.yield();
1293            }
1294        } catch (Exception e) {
1295            session.connection.onClientInternalException(e);
1296        }
1297    }
1298
1299    // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
1300    private void clearDispatchList() {
1301        if (clearDispatchList) {
1302            synchronized (deliveredMessages) {  
1303                if (clearDispatchList) {
1304                    if (!deliveredMessages.isEmpty()) {
1305                        if (session.isTransacted()) {    
1306                            if (LOG.isDebugEnabled()) {
1307                                LOG.debug(getConsumerId() + " tracking existing transacted delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1308                            }
1309                            if (previouslyDeliveredMessages == null) {
1310                                previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId());
1311                            }
1312                            for (MessageDispatch delivered : deliveredMessages) {
1313                                previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
1314                            }
1315                        } else {
1316                            if (LOG.isDebugEnabled()) {
1317                                LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1318                            }
1319                            deliveredMessages.clear();
1320                            pendingAck = null;
1321                        }
1322                    }
1323                    clearDispatchList = false;
1324                }
1325            }
1326        }
1327    }
1328
1329    public int getMessageSize() {
1330        return unconsumedMessages.size();
1331    }
1332
1333    public void start() throws JMSException {
1334        if (unconsumedMessages.isClosed()) {
1335            return;
1336        }
1337        started.set(true);
1338        unconsumedMessages.start();
1339        session.executor.wakeup();
1340    }
1341
1342    public void stop() {
1343        started.set(false);
1344        unconsumedMessages.stop();
1345    }
1346
1347    @Override
1348    public String toString() {
1349        return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
1350               + " }";
1351    }
1352
1353    /**
1354     * Delivers a message to the message listener.
1355     * 
1356     * @return
1357     * @throws JMSException
1358     */
1359    public boolean iterate() {
1360        MessageListener listener = this.messageListener.get();
1361        if (listener != null) {
1362            MessageDispatch md = unconsumedMessages.dequeueNoWait();
1363            if (md != null) {
1364                dispatch(md);
1365                return true;
1366            }
1367        }
1368        return false;
1369    }
1370
1371    public boolean isInUse(ActiveMQTempDestination destination) {
1372        return info.getDestination().equals(destination);
1373    }
1374
1375    public long getLastDeliveredSequenceId() {
1376        return lastDeliveredSequenceId;
1377    }
1378
1379        public IOException getFailureError() {
1380                return failureError;
1381        }
1382
1383        public void setFailureError(IOException failureError) {
1384                this.failureError = failureError;
1385        }
1386}