001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.policy;
018
019import org.apache.activemq.broker.region.DurableTopicSubscription;
020import org.apache.activemq.broker.region.Subscription;
021import org.apache.activemq.command.ActiveMQDestination;
022import org.apache.activemq.command.ActiveMQQueue;
023import org.apache.activemq.command.ActiveMQTopic;
024import org.apache.activemq.command.Message;
025
026/**
027 * A {@link DeadLetterStrategy} where each destination has its own individual
028 * DLQ using the subject naming hierarchy.
029 * 
030 * @org.apache.xbean.XBean
031 * 
032 */
033public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
034
035    private String topicPrefix = "ActiveMQ.DLQ.Topic.";
036    private String queuePrefix = "ActiveMQ.DLQ.Queue.";
037    private boolean useQueueForQueueMessages = true;
038    private boolean useQueueForTopicMessages = true;
039    private boolean destinationPerDurableSubscriber;
040
041    public ActiveMQDestination getDeadLetterQueueFor(Message message,
042                                                     Subscription subscription) {
043        if (message.getDestination().isQueue()) {
044            return createDestination(message, queuePrefix, useQueueForQueueMessages, subscription);
045        } else {
046            return createDestination(message, topicPrefix, useQueueForTopicMessages, subscription);
047        }
048    }
049
050    // Properties
051    // -------------------------------------------------------------------------
052
053    public String getQueuePrefix() {
054        return queuePrefix;
055    }
056
057    /**
058     * Sets the prefix to use for all dead letter queues for queue messages
059     */
060    public void setQueuePrefix(String queuePrefix) {
061        this.queuePrefix = queuePrefix;
062    }
063
064    public String getTopicPrefix() {
065        return topicPrefix;
066    }
067
068    /**
069     * Sets the prefix to use for all dead letter queues for topic messages
070     */
071    public void setTopicPrefix(String topicPrefix) {
072        this.topicPrefix = topicPrefix;
073    }
074
075    public boolean isUseQueueForQueueMessages() {
076        return useQueueForQueueMessages;
077    }
078
079    /**
080     * Sets whether a queue or topic should be used for queue messages sent to a
081     * DLQ. The default is to use a Queue
082     */
083    public void setUseQueueForQueueMessages(boolean useQueueForQueueMessages) {
084        this.useQueueForQueueMessages = useQueueForQueueMessages;
085    }
086
087    public boolean isUseQueueForTopicMessages() {
088        return useQueueForTopicMessages;
089    }
090
091    /**
092     * Sets whether a queue or topic should be used for topic messages sent to a
093     * DLQ. The default is to use a Queue
094     */
095    public void setUseQueueForTopicMessages(boolean useQueueForTopicMessages) {
096        this.useQueueForTopicMessages = useQueueForTopicMessages;
097    }
098
099    public boolean isDestinationPerDurableSubscriber() {
100        return destinationPerDurableSubscriber;
101    }
102
103    /**
104     * sets whether durable topic subscriptions are to get individual dead letter destinations.
105     * When true, the DLQ is of the form 'topicPrefix.clientId:subscriptionName'
106     * The default is false.
107     * @param destinationPerDurableSubscriber
108     */
109    public void setDestinationPerDurableSubscriber(boolean destinationPerDurableSubscriber) {
110        this.destinationPerDurableSubscriber = destinationPerDurableSubscriber;
111    }
112
113    // Implementation methods
114    // -------------------------------------------------------------------------
115    protected ActiveMQDestination createDestination(Message message,
116                                                    String prefix,
117                                                    boolean useQueue,
118                                                    Subscription subscription ) {
119        String name = prefix + message.getDestination().getPhysicalName();
120        if (destinationPerDurableSubscriber && subscription instanceof DurableTopicSubscription) {
121            name += "." + ((DurableTopicSubscription)subscription).getSubscriptionKey();
122        }
123        if (useQueue) {
124            return new ActiveMQQueue(name);
125        } else {
126            return new ActiveMQTopic(name);
127        }
128    }
129
130}