001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Iterator;
022import java.util.List;
023import java.util.concurrent.CopyOnWriteArrayList;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.TimeUnit;
026import javax.jms.InvalidSelectorException;
027import javax.jms.JMSException;
028import org.apache.activemq.broker.Broker;
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
031import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
032import org.apache.activemq.command.ActiveMQMessage;
033import org.apache.activemq.command.ConsumerControl;
034import org.apache.activemq.command.ConsumerInfo;
035import org.apache.activemq.command.Message;
036import org.apache.activemq.command.MessageAck;
037import org.apache.activemq.command.MessageDispatch;
038import org.apache.activemq.command.MessageDispatchNotification;
039import org.apache.activemq.command.MessageId;
040import org.apache.activemq.command.MessagePull;
041import org.apache.activemq.command.Response;
042import org.apache.activemq.thread.Scheduler;
043import org.apache.activemq.transaction.Synchronization;
044import org.apache.activemq.usage.SystemUsage;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * A subscription that honors the pre-fetch option of the ConsumerInfo.
050 * 
051 * 
052 */
053public abstract class PrefetchSubscription extends AbstractSubscription {
054
055    private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class);
056    protected final Scheduler scheduler;
057    
058    protected PendingMessageCursor pending;
059    protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
060    protected int prefetchExtension;
061    protected boolean usePrefetchExtension = true;
062    protected long enqueueCounter;
063    protected long dispatchCounter;
064    protected long dequeueCounter;
065    private int maxProducersToAudit=32;
066    private int maxAuditDepth=2048;
067    protected final SystemUsage usageManager;
068    protected final Object pendingLock = new Object();
069    private final Object dispatchLock = new Object();
070    private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
071    
072    public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
073        super(broker,context, info);
074        this.usageManager=usageManager;
075        pending = cursor;
076        this.scheduler = broker.getScheduler();
077    }
078
079    public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
080        this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
081    }
082
083    /**
084     * Allows a message to be pulled on demand by a client
085     */
086    public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
087        // The slave should not deliver pull messages. TODO: when the slave
088        // becomes a master,
089        // He should send a NULL message to all the consumers to 'wake them up'
090        // in case
091        // they were waiting for a message.
092        if (getPrefetchSize() == 0 && !isSlave()) {
093            final long dispatchCounterBeforePull;
094                synchronized(this) {
095                        prefetchExtension++;
096                        dispatchCounterBeforePull = dispatchCounter;
097                }
098            
099                // Have the destination push us some messages.
100                for (Destination dest : destinations) {
101                                dest.iterate();
102                        }
103                dispatchPending();
104            
105            synchronized(this) {
106                    // If there was nothing dispatched.. we may need to setup a timeout.
107                    if (dispatchCounterBeforePull == dispatchCounter) {
108                        // immediate timeout used by receiveNoWait()
109                        if (pull.getTimeout() == -1) {
110                            // Send a NULL message.
111                            add(QueueMessageReference.NULL_MESSAGE);
112                            dispatchPending();
113                        }
114                        if (pull.getTimeout() > 0) {
115                            scheduler.executeAfterDelay(new Runnable() {
116        
117                                public void run() {
118                                    pullTimeout(dispatchCounterBeforePull);
119                                }
120                            }, pull.getTimeout());
121                        }
122                    }
123            }
124        }
125        return null;
126    }
127
128    /**
129     * Occurs when a pull times out. If nothing has been dispatched since the
130     * timeout was setup, then send the NULL message.
131     */
132    final void pullTimeout(long dispatchCounterBeforePull) {
133        synchronized (pendingLock) {
134                if (dispatchCounterBeforePull == dispatchCounter) {
135                try {
136                    add(QueueMessageReference.NULL_MESSAGE);
137                    dispatchPending();
138                } catch (Exception e) {
139                    context.getConnection().serviceException(e);
140                }
141            }
142        }
143    }
144
145    public void add(MessageReference node) throws Exception {
146        synchronized (pendingLock) {
147            // The destination may have just been removed...  
148            if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
149                // perhaps we should inform the caller that we are no longer valid to dispatch to?
150                return;
151            }
152            enqueueCounter++;
153            pending.addMessageLast(node);    
154        }
155        dispatchPending();
156    }
157
158    public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
159        synchronized(pendingLock) {
160            try {
161                pending.reset();
162                while (pending.hasNext()) {
163                    MessageReference node = pending.next();
164                    node.decrementReferenceCount();
165                    if (node.getMessageId().equals(mdn.getMessageId())) {
166                        // Synchronize between dispatched list and removal of messages from pending list
167                        // related to remove subscription action
168                        synchronized(dispatchLock) {
169                            pending.remove();
170                            createMessageDispatch(node, node.getMessage());
171                            dispatched.add(node);
172                            onDispatch(node, node.getMessage());
173                        }
174                        return;
175                    }
176                }
177            } finally {
178                pending.release();
179            }
180        }
181        throw new JMSException(
182                "Slave broker out of sync with master: Dispatched message ("
183                        + mdn.getMessageId() + ") was not in the pending list for "
184                        + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
185    }
186
187    public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
188        // Handle the standard acknowledgment case.
189        boolean callDispatchMatched = false;
190        Destination destination = null;
191        
192        if (!isSlave()) {
193            if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
194                // suppress unexpected ack exception in this expected case
195                LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: " + ack);
196                return;
197            }
198        }
199        if (LOG.isTraceEnabled()) {
200            LOG.trace("ack:" + ack);
201        }
202        synchronized(dispatchLock) {
203            if (ack.isStandardAck()) {
204                // First check if the ack matches the dispatched. When using failover this might
205                // not be the case. We don't ever want to ack the wrong messages.
206                assertAckMatchesDispatched(ack);
207                
208                // Acknowledge all dispatched messages up till the message id of
209                // the acknowledgment.
210                int index = 0;
211                boolean inAckRange = false;
212                List<MessageReference> removeList = new ArrayList<MessageReference>();
213                for (final MessageReference node : dispatched) {
214                    MessageId messageId = node.getMessageId();
215                    if (ack.getFirstMessageId() == null
216                            || ack.getFirstMessageId().equals(messageId)) {
217                        inAckRange = true;
218                    }
219                    if (inAckRange) {
220                        // Don't remove the nodes until we are committed.  
221                        if (!context.isInTransaction()) {
222                            dequeueCounter++;
223                            node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
224                            removeList.add(node);
225                        } else {
226                            // setup a Synchronization to remove nodes from the
227                            // dispatched list.
228                            context.getTransaction().addSynchronization(
229                                    new Synchronization() {
230
231                                        @Override
232                                        public void afterCommit()
233                                                throws Exception {
234                                            synchronized(dispatchLock) {
235                                                dequeueCounter++;
236                                                dispatched.remove(node);
237                                                node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
238                                            }
239                                        }
240
241                                        @Override
242                                        public void afterRollback() throws Exception {
243                                            synchronized(dispatchLock) {
244                                                if (isSlave()) {
245                                                    node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
246                                                } else {
247                                                    // poisionAck will decrement - otherwise still inflight on client
248                                                }
249                                            }
250                                        }
251                                    });
252                        }
253                        index++;
254                        acknowledge(context, ack, node);
255                        if (ack.getLastMessageId().equals(messageId)) {                  
256                            // contract prefetch if dispatch required a pull
257                            if (getPrefetchSize() == 0) {
258                                prefetchExtension = Math.max(0, prefetchExtension - index);
259                            } else if (usePrefetchExtension && context.isInTransaction()) {
260                                // extend prefetch window only if not a pulling consumer
261                                prefetchExtension = Math.max(prefetchExtension, index);
262                            }
263                            destination = node.getRegionDestination();
264                            callDispatchMatched = true;
265                            break;
266                        }
267                    }
268                }
269                for (final MessageReference node : removeList) {
270                    dispatched.remove(node);
271                }
272                // this only happens after a reconnect - get an ack which is not
273                // valid
274                if (!callDispatchMatched) {
275                    LOG.warn("Could not correlate acknowledgment with dispatched message: "
276                                  + ack);
277                }
278            } else if (ack.isIndividualAck()) {
279                // Message was delivered and acknowledge - but only delete the
280                // individual message
281                for (final MessageReference node : dispatched) {
282                    MessageId messageId = node.getMessageId();
283                    if (ack.getLastMessageId().equals(messageId)) {
284                        // this should never be within a transaction
285                        dequeueCounter++;
286                        node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
287                        destination = node.getRegionDestination();
288                        acknowledge(context, ack, node);
289                        dispatched.remove(node);
290                        prefetchExtension = Math.max(0, prefetchExtension - 1);
291                        callDispatchMatched = true;
292                        break;
293                    }
294                }
295            }else if (ack.isDeliveredAck()) {
296                // Message was delivered but not acknowledged: update pre-fetch
297                // counters.
298                int index = 0;
299                for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
300                    final MessageReference node = iter.next();
301                    if (node.isExpired()) {
302                        if (broker.isExpired(node)) {
303                            node.getRegionDestination().messageExpired(context, this, node);
304                        }
305                        dispatched.remove(node);
306                        node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
307                    }
308                    if (ack.getLastMessageId().equals(node.getMessageId())) {
309                        if (usePrefetchExtension) {
310                            prefetchExtension = Math.max(prefetchExtension, index + 1);
311                        }
312                        destination = node.getRegionDestination();
313                        callDispatchMatched = true;
314                        break;
315                    }
316                }
317                if (!callDispatchMatched) {
318                    throw new JMSException(
319                            "Could not correlate acknowledgment with dispatched message: "
320                                    + ack);
321                }
322            } else if (ack.isRedeliveredAck()) {
323                // Message was re-delivered but it was not yet considered to be
324                // a DLQ message.
325                boolean inAckRange = false;
326                for (final MessageReference node : dispatched) {
327                    MessageId messageId = node.getMessageId();
328                    if (ack.getFirstMessageId() == null
329                            || ack.getFirstMessageId().equals(messageId)) {
330                        inAckRange = true;
331                    }
332                    if (inAckRange) {
333                        if (ack.getLastMessageId().equals(messageId)) {
334                            destination = node.getRegionDestination();
335                            callDispatchMatched = true;
336                            break;
337                        }
338                    }
339                }
340                if (!callDispatchMatched) {
341                    throw new JMSException(
342                            "Could not correlate acknowledgment with dispatched message: "
343                                    + ack);
344                }
345            } else if (ack.isPoisonAck()) {
346                // TODO: what if the message is already in a DLQ???
347                // Handle the poison ACK case: we need to send the message to a
348                // DLQ
349                if (ack.isInTransaction()) {
350                    throw new JMSException("Poison ack cannot be transacted: "
351                            + ack);
352                }
353                int index = 0;
354                boolean inAckRange = false;
355                List<MessageReference> removeList = new ArrayList<MessageReference>();
356                for (final MessageReference node : dispatched) {
357                    MessageId messageId = node.getMessageId();
358                    if (ack.getFirstMessageId() == null
359                            || ack.getFirstMessageId().equals(messageId)) {
360                        inAckRange = true;
361                    }
362                    if (inAckRange) {
363                        if (ack.getPoisonCause() != null) {
364                            node.getMessage().setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
365                                    ack.getPoisonCause().toString());
366                        }
367                        sendToDLQ(context, node);
368                        node.getRegionDestination().getDestinationStatistics()
369                                .getInflight().decrement();
370                        removeList.add(node);
371                        dequeueCounter++;
372                        index++;
373                        acknowledge(context, ack, node);
374                        if (ack.getLastMessageId().equals(messageId)) {
375                            prefetchExtension = Math.max(0, prefetchExtension
376                                    - (index + 1));
377                            destination = node.getRegionDestination();
378                            callDispatchMatched = true;
379                            break;
380                        }
381                    }
382                }
383                for (final MessageReference node : removeList) {
384                    dispatched.remove(node);
385                }
386                if (!callDispatchMatched) {
387                    throw new JMSException(
388                            "Could not correlate acknowledgment with dispatched message: "
389                                    + ack);
390                }
391            }
392        }
393        if (callDispatchMatched && destination != null) {    
394            destination.wakeup();
395            dispatchPending();
396        } else {
397            if (isSlave()) {
398                throw new JMSException(
399                        "Slave broker out of sync with master: Acknowledgment ("
400                                + ack + ") was not in the dispatch list: "
401                                + dispatched);
402            } else {
403                LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
404                        + ack);
405            }
406        }
407    }
408
409    /**
410     * Checks an ack versus the contents of the dispatched list.
411     * 
412     * @param ack
413     * @throws JMSException if it does not match
414     */
415        protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
416        MessageId firstAckedMsg = ack.getFirstMessageId();
417        MessageId lastAckedMsg = ack.getLastMessageId();
418        int checkCount = 0;
419        boolean checkFoundStart = false;
420        boolean checkFoundEnd = false;
421        for (MessageReference node : dispatched) {
422
423            if (firstAckedMsg == null) {
424                checkFoundStart = true;
425            } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
426                checkFoundStart = true;
427            }
428
429            if (checkFoundStart) {
430                checkCount++;
431            }
432
433            if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
434                checkFoundEnd = true;
435                break;
436            }
437        }
438        if (!checkFoundStart && firstAckedMsg != null)
439            throw new JMSException("Unmatched acknowledge: " + ack
440                    + "; Could not find Message-ID " + firstAckedMsg
441                    + " in dispatched-list (start of ack)");
442        if (!checkFoundEnd && lastAckedMsg != null)
443            throw new JMSException("Unmatched acknowledge: " + ack
444                    + "; Could not find Message-ID " + lastAckedMsg
445                    + " in dispatched-list (end of ack)");
446        if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
447            throw new JMSException("Unmatched acknowledge: " + ack
448                    + "; Expected message count (" + ack.getMessageCount()
449                    + ") differs from count in dispatched-list (" + checkCount
450                    + ")");
451        }
452    }
453
454    /**
455     * @param context
456     * @param node
457     * @throws IOException
458     * @throws Exception
459     */
460    protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
461        broker.getRoot().sendToDeadLetterQueue(context, node, this);
462    }
463    
464    public int getInFlightSize() {
465        return dispatched.size();
466    }
467    
468    /**
469     * Used to determine if the broker can dispatch to the consumer.
470     * 
471     * @return
472     */
473    public boolean isFull() {
474        return dispatched.size() - prefetchExtension >= info.getPrefetchSize();
475    }
476
477    /**
478     * @return true when 60% or more room is left for dispatching messages
479     */
480    public boolean isLowWaterMark() {
481        return (dispatched.size() - prefetchExtension) <= (info.getPrefetchSize() * .4);
482    }
483
484    /**
485     * @return true when 10% or less room is left for dispatching messages
486     */
487    public boolean isHighWaterMark() {
488        return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9);
489    }
490
491    @Override
492    public int countBeforeFull() {
493        return info.getPrefetchSize() + prefetchExtension - dispatched.size();
494    }
495
496    public int getPendingQueueSize() {
497        return pending.size();
498    }
499
500    public int getDispatchedQueueSize() {
501        return dispatched.size();
502    }
503
504    public long getDequeueCounter() {
505        return dequeueCounter;
506    }
507
508    public long getDispatchedCounter() {
509        return dispatchCounter;
510    }
511
512    public long getEnqueueCounter() {
513        return enqueueCounter;
514    }
515
516    @Override
517    public boolean isRecoveryRequired() {
518        return pending.isRecoveryRequired();
519    }
520
521    public PendingMessageCursor getPending() {
522        return this.pending;
523    }
524
525    public void setPending(PendingMessageCursor pending) {
526        this.pending = pending;
527        if (this.pending!=null) {
528            this.pending.setSystemUsage(usageManager);
529            this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
530        }
531    }
532
533   @Override
534    public void add(ConnectionContext context, Destination destination) throws Exception {
535        synchronized(pendingLock) {
536            super.add(context, destination);
537            pending.add(context, destination);
538        }
539    }
540
541    @Override
542    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
543        List<MessageReference> rc = new ArrayList<MessageReference>();
544        synchronized(pendingLock) {
545            super.remove(context, destination);
546            // Here is a potential problem concerning Inflight stat:
547            // Messages not already committed or rolled back may not be removed from dispatched list at the moment
548            // Except if each commit or rollback callback action comes before remove of subscriber.
549            rc.addAll(pending.remove(context, destination));
550
551            // Synchronized to DispatchLock
552            synchronized(dispatchLock) {
553                ArrayList<MessageReference> references = new ArrayList<MessageReference>();
554                    for (MessageReference r : dispatched) {
555                        if( r.getRegionDestination() == destination) {
556                        references.add(r);
557                        }
558                    }
559                rc.addAll(references);
560                destination.getDestinationStatistics().getDispatched().subtract(references.size());
561                destination.getDestinationStatistics().getInflight().subtract(references.size());
562                dispatched.removeAll(references);
563            }            
564        }
565        return rc;
566    }
567
568    protected void dispatchPending() throws IOException {
569        if (!isSlave()) {
570           synchronized(pendingLock) {
571                try {
572                    int numberToDispatch = countBeforeFull();
573                    if (numberToDispatch > 0) {
574                        setSlowConsumer(false);
575                        setPendingBatchSize(pending, numberToDispatch);
576                        int count = 0;
577                        pending.reset();
578                        while (pending.hasNext() && !isFull()
579                                && count < numberToDispatch) {
580                            MessageReference node = pending.next();
581                            if (node == null) {
582                                break;
583                            }
584                            
585                            // Synchronize between dispatched list and remove of message from pending list
586                            // related to remove subscription action
587                            synchronized(dispatchLock) {
588                                pending.remove();
589                                node.decrementReferenceCount();
590                                if( !isDropped(node) && canDispatch(node)) {
591
592                                    // Message may have been sitting in the pending
593                                    // list a while waiting for the consumer to ak the message.
594                                    if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
595                                        //increment number to dispatch
596                                        numberToDispatch++;
597                                        if (broker.isExpired(node)) {
598                                            node.getRegionDestination().messageExpired(context, this, node);
599                                        }
600                                        continue;
601                                    }
602                                    dispatch(node);
603                                    count++;
604                                }
605                            }
606                        }
607                    } else if (!isSlowConsumer()) {
608                        setSlowConsumer(true);
609                        for (Destination dest :destinations) {
610                            dest.slowConsumer(context, this);
611                        }
612                    }
613                } finally {
614                    pending.release();
615                }
616            }
617        }
618    }
619
620    protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
621        pending.setMaxBatchSize(numberToDispatch);
622    }
623
624    protected boolean dispatch(final MessageReference node) throws IOException {
625        final Message message = node.getMessage();
626        if (message == null) {
627            return false;
628        }
629        
630        okForAckAsDispatchDone.countDown();
631        
632        // No reentrant lock - Patch needed to IndirectMessageReference on method lock
633        if (!isSlave()) {
634
635            MessageDispatch md = createMessageDispatch(node, message);
636            // NULL messages don't count... they don't get Acked.
637            if (node != QueueMessageReference.NULL_MESSAGE) {
638                dispatchCounter++;
639                dispatched.add(node);
640            } else {
641                prefetchExtension = Math.max(0, prefetchExtension - 1);
642            }
643            if (info.isDispatchAsync()) {
644                md.setTransmitCallback(new Runnable() {
645
646                    public void run() {
647                        // Since the message gets queued up in async dispatch,
648                        // we don't want to
649                        // decrease the reference count until it gets put on the
650                        // wire.
651                        onDispatch(node, message);
652                    }
653                });
654                context.getConnection().dispatchAsync(md);
655            } else {
656                context.getConnection().dispatchSync(md);
657                onDispatch(node, message);
658            }
659            return true;
660        } else {
661            return false;
662        }
663    }
664
665    protected void onDispatch(final MessageReference node, final Message message) {
666        if (node.getRegionDestination() != null) {
667            if (node != QueueMessageReference.NULL_MESSAGE) {
668                node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
669                node.getRegionDestination().getDestinationStatistics().getInflight().increment();   
670                if (LOG.isTraceEnabled()) {
671                    LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId() + " - "
672                            + message.getDestination()  + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size());
673                }
674            }
675        }
676        
677        if (info.isDispatchAsync()) {
678            try {
679                dispatchPending();
680            } catch (IOException e) {
681                context.getConnection().serviceExceptionAsync(e);
682            }
683        }
684    }
685
686    /**
687     * inform the MessageConsumer on the client to change it's prefetch
688     * 
689     * @param newPrefetch
690     */
691    public void updateConsumerPrefetch(int newPrefetch) {
692        if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
693            ConsumerControl cc = new ConsumerControl();
694            cc.setConsumerId(info.getConsumerId());
695            cc.setPrefetch(newPrefetch);
696            context.getConnection().dispatchAsync(cc);
697        }
698    }
699
700    /**
701     * @param node
702     * @param message
703     * @return MessageDispatch
704     */
705    protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
706        if (node == QueueMessageReference.NULL_MESSAGE) {
707            MessageDispatch md = new MessageDispatch();
708            md.setMessage(null);
709            md.setConsumerId(info.getConsumerId());
710            md.setDestination(null);
711            return md;
712        } else {
713            MessageDispatch md = new MessageDispatch();
714            md.setConsumerId(info.getConsumerId());
715            md.setDestination(node.getRegionDestination().getActiveMQDestination());
716            md.setMessage(message);
717            md.setRedeliveryCounter(node.getRedeliveryCounter());
718            return md;
719        }
720    }
721
722    /**
723     * Use when a matched message is about to be dispatched to the client.
724     * 
725     * @param node
726     * @return false if the message should not be dispatched to the client
727     *         (another sub may have already dispatched it for example).
728     * @throws IOException
729     */
730    protected abstract boolean canDispatch(MessageReference node) throws IOException;
731    
732    protected abstract boolean isDropped(MessageReference node);
733
734    /**
735     * Used during acknowledgment to remove the message.
736     * 
737     * @throws IOException
738     */
739    protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
740
741    
742    public int getMaxProducersToAudit() {
743        return maxProducersToAudit;
744    }
745
746    public void setMaxProducersToAudit(int maxProducersToAudit) {
747        this.maxProducersToAudit = maxProducersToAudit;
748    }
749
750    public int getMaxAuditDepth() {
751        return maxAuditDepth;
752    }
753
754    public void setMaxAuditDepth(int maxAuditDepth) {
755        this.maxAuditDepth = maxAuditDepth;
756    }
757    
758    public boolean isUsePrefetchExtension() {
759        return usePrefetchExtension;
760    }
761
762    public void setUsePrefetchExtension(boolean usePrefetchExtension) {
763        this.usePrefetchExtension = usePrefetchExtension;
764    }
765}