001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import javax.jms.ResourceAllocationException;
021import org.apache.activemq.advisory.AdvisorySupport;
022import org.apache.activemq.broker.Broker;
023import org.apache.activemq.broker.BrokerService;
024import org.apache.activemq.broker.ConnectionContext;
025import org.apache.activemq.broker.ProducerBrokerExchange;
026import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
027import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
028import org.apache.activemq.command.ActiveMQDestination;
029import org.apache.activemq.command.ActiveMQTopic;
030import org.apache.activemq.command.Message;
031import org.apache.activemq.command.MessageDispatchNotification;
032import org.apache.activemq.command.ProducerInfo;
033import org.apache.activemq.state.ProducerState;
034import org.apache.activemq.store.MessageStore;
035import org.apache.activemq.usage.MemoryUsage;
036import org.apache.activemq.usage.SystemUsage;
037import org.apache.activemq.usage.Usage;
038import org.slf4j.Logger;
039
040/**
041 * 
042 */
043public abstract class BaseDestination implements Destination {
044    /**
045     * The maximum number of messages to page in to the destination from
046     * persistent storage
047     */
048    public static final int MAX_PAGE_SIZE = 200;
049    public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
050    public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
051    public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
052    public static final int MAX_PRODUCERS_TO_AUDIT = 64;
053    public static final int MAX_AUDIT_DEPTH = 2048;
054
055    protected final ActiveMQDestination destination;
056    protected final Broker broker;
057    protected final MessageStore store;
058    protected SystemUsage systemUsage;
059    protected MemoryUsage memoryUsage;
060    private boolean producerFlowControl = true;
061    protected boolean warnOnProducerFlowControl = true;
062    protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
063
064    private int maxProducersToAudit = 1024;
065    private int maxAuditDepth = 2048;
066    private boolean enableAudit = true;
067    private int maxPageSize = MAX_PAGE_SIZE;
068    private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
069    private boolean useCache = true;
070    private int minimumMessageSize = 1024;
071    private boolean lazyDispatch = false;
072    private boolean advisoryForSlowConsumers;
073    private boolean advisdoryForFastProducers;
074    private boolean advisoryForDiscardingMessages;
075    private boolean advisoryWhenFull;
076    private boolean advisoryForDelivery;
077    private boolean advisoryForConsumed;
078    private boolean sendAdvisoryIfNoConsumers;
079    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
080    protected final BrokerService brokerService;
081    protected final Broker regionBroker;
082    protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
083    protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
084    private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
085    protected int cursorMemoryHighWaterMark = 70;
086    protected int storeUsageHighWaterMark = 100;
087    private SlowConsumerStrategy slowConsumerStrategy;
088    private boolean prioritizedMessages;
089    private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
090    private boolean gcIfInactive;
091    private long lastActiveTime=0l;
092    private boolean reduceMemoryFootprint = false;
093
094    /**
095     * @param broker
096     * @param store
097     * @param destination
098     * @param parentStats
099     * @throws Exception
100     */
101    public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
102        this.brokerService = brokerService;
103        this.broker = brokerService.getBroker();
104        this.store = store;
105        this.destination = destination;
106        // let's copy the enabled property from the parent DestinationStatistics
107        this.destinationStatistics.setEnabled(parentStats.isEnabled());
108        this.destinationStatistics.setParent(parentStats);
109        this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString());
110        this.memoryUsage = this.systemUsage.getMemoryUsage();
111        this.memoryUsage.setUsagePortion(1.0f);
112        this.regionBroker = brokerService.getRegionBroker();
113    }
114
115    /**
116     * initialize the destination
117     * 
118     * @throws Exception
119     */
120    public void initialize() throws Exception {
121        // Let the store know what usage manager we are using so that he can
122        // flush messages to disk when usage gets high.
123        if (store != null) {
124            store.setMemoryUsage(this.memoryUsage);
125        }
126    }
127
128    /**
129     * @return the producerFlowControl
130     */
131    public boolean isProducerFlowControl() {
132        return producerFlowControl;
133    }
134
135    /**
136     * @param producerFlowControl the producerFlowControl to set
137     */
138    public void setProducerFlowControl(boolean producerFlowControl) {
139        this.producerFlowControl = producerFlowControl;
140    }
141
142    /**
143     * Set's the interval at which warnings about producers being blocked by
144     * resource usage will be triggered. Values of 0 or less will disable
145     * warnings
146     * 
147     * @param blockedProducerWarningInterval the interval at which warning about
148     *            blocked producers will be triggered.
149     */
150    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
151        this.blockedProducerWarningInterval = blockedProducerWarningInterval;
152    }
153
154    /**
155     * 
156     * @return the interval at which warning about blocked producers will be
157     *         triggered.
158     */
159    public long getBlockedProducerWarningInterval() {
160        return blockedProducerWarningInterval;
161    }
162
163    /**
164     * @return the maxProducersToAudit
165     */
166    public int getMaxProducersToAudit() {
167        return maxProducersToAudit;
168    }
169
170    /**
171     * @param maxProducersToAudit the maxProducersToAudit to set
172     */
173    public void setMaxProducersToAudit(int maxProducersToAudit) {
174        this.maxProducersToAudit = maxProducersToAudit;
175    }
176
177    /**
178     * @return the maxAuditDepth
179     */
180    public int getMaxAuditDepth() {
181        return maxAuditDepth;
182    }
183
184    /**
185     * @param maxAuditDepth the maxAuditDepth to set
186     */
187    public void setMaxAuditDepth(int maxAuditDepth) {
188        this.maxAuditDepth = maxAuditDepth;
189    }
190
191    /**
192     * @return the enableAudit
193     */
194    public boolean isEnableAudit() {
195        return enableAudit;
196    }
197
198    /**
199     * @param enableAudit the enableAudit to set
200     */
201    public void setEnableAudit(boolean enableAudit) {
202        this.enableAudit = enableAudit;
203    }
204
205    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
206        destinationStatistics.getProducers().increment();
207        this.lastActiveTime=0l;
208    }
209
210    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
211        destinationStatistics.getProducers().decrement();
212    }
213    
214    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
215        destinationStatistics.getConsumers().increment();
216        this.lastActiveTime=0l;
217    }
218
219    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
220        destinationStatistics.getConsumers().decrement();
221    }
222
223
224    public final MemoryUsage getMemoryUsage() {
225        return memoryUsage;
226    }
227
228    public DestinationStatistics getDestinationStatistics() {
229        return destinationStatistics;
230    }
231
232    public ActiveMQDestination getActiveMQDestination() {
233        return destination;
234    }
235
236    public final String getName() {
237        return getActiveMQDestination().getPhysicalName();
238    }
239
240    public final MessageStore getMessageStore() {
241        return store;
242    }
243
244    public final boolean isActive() {
245        return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount() != 0;
246    }
247
248    public int getMaxPageSize() {
249        return maxPageSize;
250    }
251
252    public void setMaxPageSize(int maxPageSize) {
253        this.maxPageSize = maxPageSize;
254    }
255
256    public int getMaxBrowsePageSize() {
257        return this.maxBrowsePageSize;
258    }
259
260    public void setMaxBrowsePageSize(int maxPageSize) {
261        this.maxBrowsePageSize = maxPageSize;
262    }
263
264    public int getMaxExpirePageSize() {
265        return this.maxExpirePageSize;
266    }
267
268    public void setMaxExpirePageSize(int maxPageSize) {
269        this.maxExpirePageSize = maxPageSize;
270    }
271
272    public void setExpireMessagesPeriod(long expireMessagesPeriod) {
273        this.expireMessagesPeriod = expireMessagesPeriod;
274    }
275
276    public long getExpireMessagesPeriod() {
277        return expireMessagesPeriod;
278    }
279
280    public boolean isUseCache() {
281        return useCache;
282    }
283
284    public void setUseCache(boolean useCache) {
285        this.useCache = useCache;
286    }
287
288    public int getMinimumMessageSize() {
289        return minimumMessageSize;
290    }
291
292    public void setMinimumMessageSize(int minimumMessageSize) {
293        this.minimumMessageSize = minimumMessageSize;
294    }
295
296    public boolean isLazyDispatch() {
297        return lazyDispatch;
298    }
299
300    public void setLazyDispatch(boolean lazyDispatch) {
301        this.lazyDispatch = lazyDispatch;
302    }
303
304    protected long getDestinationSequenceId() {
305        return regionBroker.getBrokerSequenceId();
306    }
307
308    /**
309     * @return the advisoryForSlowConsumers
310     */
311    public boolean isAdvisoryForSlowConsumers() {
312        return advisoryForSlowConsumers;
313    }
314
315    /**
316     * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
317     */
318    public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
319        this.advisoryForSlowConsumers = advisoryForSlowConsumers;
320    }
321
322    /**
323     * @return the advisoryForDiscardingMessages
324     */
325    public boolean isAdvisoryForDiscardingMessages() {
326        return advisoryForDiscardingMessages;
327    }
328
329    /**
330     * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
331     *            set
332     */
333    public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
334        this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
335    }
336
337    /**
338     * @return the advisoryWhenFull
339     */
340    public boolean isAdvisoryWhenFull() {
341        return advisoryWhenFull;
342    }
343
344    /**
345     * @param advisoryWhenFull the advisoryWhenFull to set
346     */
347    public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
348        this.advisoryWhenFull = advisoryWhenFull;
349    }
350
351    /**
352     * @return the advisoryForDelivery
353     */
354    public boolean isAdvisoryForDelivery() {
355        return advisoryForDelivery;
356    }
357
358    /**
359     * @param advisoryForDelivery the advisoryForDelivery to set
360     */
361    public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
362        this.advisoryForDelivery = advisoryForDelivery;
363    }
364
365    /**
366     * @return the advisoryForConsumed
367     */
368    public boolean isAdvisoryForConsumed() {
369        return advisoryForConsumed;
370    }
371
372    /**
373     * @param advisoryForConsumed the advisoryForConsumed to set
374     */
375    public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
376        this.advisoryForConsumed = advisoryForConsumed;
377    }
378
379    /**
380     * @return the advisdoryForFastProducers
381     */
382    public boolean isAdvisdoryForFastProducers() {
383        return advisdoryForFastProducers;
384    }
385
386    /**
387     * @param advisdoryForFastProducers the advisdoryForFastProducers to set
388     */
389    public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
390        this.advisdoryForFastProducers = advisdoryForFastProducers;
391    }
392
393    public boolean isSendAdvisoryIfNoConsumers() {
394        return sendAdvisoryIfNoConsumers;
395    }
396
397    public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
398        this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
399    }
400
401    /**
402     * @return the dead letter strategy
403     */
404    public DeadLetterStrategy getDeadLetterStrategy() {
405        return deadLetterStrategy;
406    }
407
408    /**
409     * set the dead letter strategy
410     * 
411     * @param deadLetterStrategy
412     */
413    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
414        this.deadLetterStrategy = deadLetterStrategy;
415    }
416
417    public int getCursorMemoryHighWaterMark() {
418        return this.cursorMemoryHighWaterMark;
419    }
420
421    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
422        this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
423    }
424
425    /**
426     * called when message is consumed
427     * 
428     * @param context
429     * @param messageReference
430     */
431    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
432        if (advisoryForConsumed) {
433            broker.messageConsumed(context, messageReference);
434        }
435    }
436
437    /**
438     * Called when message is delivered to the broker
439     * 
440     * @param context
441     * @param messageReference
442     */
443    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
444        if (advisoryForDelivery) {
445            broker.messageDelivered(context, messageReference);
446        }
447    }
448
449    /**
450     * Called when a message is discarded - e.g. running low on memory This will
451     * happen only if the policy is enabled - e.g. non durable topics
452     * 
453     * @param context
454     * @param messageReference
455     */
456    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
457        if (advisoryForDiscardingMessages) {
458            broker.messageDiscarded(context, sub, messageReference);
459        }
460    }
461
462    /**
463     * Called when there is a slow consumer
464     * 
465     * @param context
466     * @param subs
467     */
468    public void slowConsumer(ConnectionContext context, Subscription subs) {
469        if (advisoryForSlowConsumers) {
470            broker.slowConsumer(context, this, subs);
471        }
472        if (slowConsumerStrategy != null) {
473            slowConsumerStrategy.slowConsumer(context, subs);
474        }
475    }
476
477    /**
478     * Called to notify a producer is too fast
479     * 
480     * @param context
481     * @param producerInfo
482     */
483    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
484        if (advisdoryForFastProducers) {
485            broker.fastProducer(context, producerInfo);
486        }
487    }
488
489    /**
490     * Called when a Usage reaches a limit
491     * 
492     * @param context
493     * @param usage
494     */
495    public void isFull(ConnectionContext context, Usage usage) {
496        if (advisoryWhenFull) {
497            broker.isFull(context, this, usage);
498        }
499    }
500
501    public void dispose(ConnectionContext context) throws IOException {
502        if (this.store != null) {
503            this.store.removeAllMessages(context);
504            this.store.dispose(context);
505        }
506        this.destinationStatistics.setParent(null);
507        this.memoryUsage.stop();
508    }
509
510    /**
511     * Provides a hook to allow messages with no consumer to be processed in
512     * some way - such as to send to a dead letter queue or something..
513     */
514    protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
515        if (!msg.isPersistent()) {
516            if (isSendAdvisoryIfNoConsumers()) {
517                // allow messages with no consumers to be dispatched to a dead
518                // letter queue
519                if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
520
521                    Message message = msg.copy();
522                    // The original destination and transaction id do not get
523                    // filled when the message is first sent,
524                    // it is only populated if the message is routed to another
525                    // destination like the DLQ
526                    if (message.getOriginalDestination() != null) {
527                        message.setOriginalDestination(message.getDestination());
528                    }
529                    if (message.getOriginalTransactionId() != null) {
530                        message.setOriginalTransactionId(message.getTransactionId());
531                    }
532
533                    ActiveMQTopic advisoryTopic;
534                    if (destination.isQueue()) {
535                        advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
536                    } else {
537                        advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
538                    }
539                    message.setDestination(advisoryTopic);
540                    message.setTransactionId(null);
541
542                    // Disable flow control for this since since we don't want
543                    // to block.
544                    boolean originalFlowControl = context.isProducerFlowControl();
545                    try {
546                        context.setProducerFlowControl(false);
547                        ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
548                        producerExchange.setMutable(false);
549                        producerExchange.setConnectionContext(context);
550                        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
551                        context.getBroker().send(producerExchange, message);
552                    } finally {
553                        context.setProducerFlowControl(originalFlowControl);
554                    }
555
556                }
557            }
558        }
559    }
560
561    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
562    }
563
564    public final int getStoreUsageHighWaterMark() {
565        return this.storeUsageHighWaterMark;
566    }
567
568    public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
569        this.storeUsageHighWaterMark = storeUsageHighWaterMark;
570    }
571
572    protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
573        waitForSpace(context, usage, 100, warning);
574    }
575    
576    protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
577        if (systemUsage.isSendFailIfNoSpace()) {
578            getLog().debug("sendFailIfNoSpace, forcing exception on send: " + warning);
579            throw new ResourceAllocationException(warning);
580        }
581        if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
582            if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
583                getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send: " + warning);
584                throw new ResourceAllocationException(warning);
585            }
586        } else {
587            long start = System.currentTimeMillis();
588            long nextWarn = start;
589            while (!usage.waitForSpace(1000, highWaterMark)) {
590                if (context.getStopping().get()) {
591                    throw new IOException("Connection closed, send aborted.");
592                }
593    
594                long now = System.currentTimeMillis();
595                if (now >= nextWarn) {
596                    getLog().info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
597                    nextWarn = now + blockedProducerWarningInterval;
598                }
599            }
600        }
601    }
602
603    protected abstract Logger getLog();
604
605    public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
606        this.slowConsumerStrategy = slowConsumerStrategy;
607    }
608
609    public SlowConsumerStrategy getSlowConsumerStrategy() {
610        return this.slowConsumerStrategy;
611    }
612
613   
614    public boolean isPrioritizedMessages() {
615        return this.prioritizedMessages;
616    }
617
618    public void setPrioritizedMessages(boolean prioritizedMessages) {
619        this.prioritizedMessages = prioritizedMessages;
620        if (store != null) {
621            store.setPrioritizedMessages(prioritizedMessages);
622        }
623    }
624
625    /**
626     * @return the inactiveTimoutBeforeGC
627     */
628    public long getInactiveTimoutBeforeGC() {
629        return this.inactiveTimoutBeforeGC;
630    }
631
632    /**
633     * @param inactiveTimoutBeforeGC the inactiveTimoutBeforeGC to set
634     */
635    public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
636        this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
637    }
638
639    /**
640     * @return the gcIfInactive
641     */
642    public boolean isGcIfInactive() {
643        return this.gcIfInactive;
644    }
645
646    /**
647     * @param gcIfInactive the gcIfInactive to set
648     */
649    public void setGcIfInactive(boolean gcIfInactive) {
650        this.gcIfInactive = gcIfInactive;
651    }
652    
653    public void markForGC(long timeStamp) {
654        if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
655                && destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) {
656            this.lastActiveTime = timeStamp;
657        }
658    }
659
660    public boolean canGC() {
661        boolean result = false;
662        if (isGcIfInactive()&& this.lastActiveTime != 0l) {
663            if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimoutBeforeGC()) {
664                result = true;
665            }
666        }
667        return result;
668    }
669
670    public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
671        this.reduceMemoryFootprint = reduceMemoryFootprint;
672    }
673
674    protected boolean isReduceMemoryFootprint() {
675        return this.reduceMemoryFootprint;
676    }
677}