001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.List;
021import java.util.Set;
022import org.apache.activemq.broker.Broker;
023import org.apache.activemq.broker.ConnectionContext;
024import org.apache.activemq.broker.ProducerBrokerExchange;
025import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
026import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
027import org.apache.activemq.command.ActiveMQDestination;
028import org.apache.activemq.command.Message;
029import org.apache.activemq.command.MessageAck;
030import org.apache.activemq.command.MessageDispatchNotification;
031import org.apache.activemq.command.ProducerInfo;
032import org.apache.activemq.store.MessageStore;
033import org.apache.activemq.usage.MemoryUsage;
034import org.apache.activemq.usage.Usage;
035
036/**
037 * 
038 * 
039 */
040public class DestinationFilter implements Destination {
041
042    private final Destination next;
043
044    public DestinationFilter(Destination next) {
045        this.next = next;
046    }
047
048    public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
049        next.acknowledge(context, sub, ack, node);
050    }
051
052    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
053        next.addSubscription(context, sub);
054    }
055
056    public Message[] browse() {
057        return next.browse();
058    }
059
060    public void dispose(ConnectionContext context) throws IOException {
061        next.dispose(context);
062    }
063
064    public void gc() {
065        next.gc();
066    }
067
068    public ActiveMQDestination getActiveMQDestination() {
069        return next.getActiveMQDestination();
070    }
071
072    public DeadLetterStrategy getDeadLetterStrategy() {
073        return next.getDeadLetterStrategy();
074    }
075
076    public DestinationStatistics getDestinationStatistics() {
077        return next.getDestinationStatistics();
078    }
079
080    public String getName() {
081        return next.getName();
082    }
083
084    public MemoryUsage getMemoryUsage() {
085        return next.getMemoryUsage();
086    }
087
088    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
089        next.removeSubscription(context, sub, lastDeliveredSequenceId);
090    }
091
092    public void send(ProducerBrokerExchange context, Message messageSend) throws Exception {
093        next.send(context, messageSend);
094    }
095
096    public void start() throws Exception {
097        next.start();
098    }
099
100    public void stop() throws Exception {
101        next.stop();
102    }
103
104    public List<Subscription> getConsumers() {
105        return next.getConsumers();
106    }
107
108    /**
109     * Sends a message to the given destination which may be a wildcard
110     * 
111     * @param context broker context
112     * @param message message to send
113     * @param destination possibly wildcard destination to send the message to
114     * @throws Exception on error
115     */
116    protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
117        Broker broker = context.getConnectionContext().getBroker();
118        Set<Destination> destinations = broker.getDestinations(destination);
119
120        for (Destination dest : destinations) {
121            dest.send(context, message.copy());
122        }
123    }
124
125    public MessageStore getMessageStore() {
126        return next.getMessageStore();
127    }
128
129    public boolean isProducerFlowControl() {
130        return next.isProducerFlowControl();
131    }
132
133    public void setProducerFlowControl(boolean value) {
134        next.setProducerFlowControl(value);
135    }
136
137    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
138        next.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
139    }
140    
141    public long getBlockedProducerWarningInterval() {
142        return next.getBlockedProducerWarningInterval();
143    }
144
145    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
146        next.addProducer(context, info);
147
148    }
149
150    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
151        next.removeProducer(context, info);
152    }
153
154    public int getMaxAuditDepth() {
155        return next.getMaxAuditDepth();
156    }
157
158    public int getMaxProducersToAudit() {
159        return next.getMaxProducersToAudit();
160    }
161
162    public boolean isEnableAudit() {
163        return next.isEnableAudit();
164    }
165
166    public void setEnableAudit(boolean enableAudit) {
167        next.setEnableAudit(enableAudit);
168    }
169
170    public void setMaxAuditDepth(int maxAuditDepth) {
171        next.setMaxAuditDepth(maxAuditDepth);
172    }
173
174    public void setMaxProducersToAudit(int maxProducersToAudit) {
175        next.setMaxProducersToAudit(maxProducersToAudit);
176    }
177
178    public boolean isActive() {
179        return next.isActive();
180    }
181
182    public int getMaxPageSize() {
183        return next.getMaxPageSize();
184    }
185
186    public void setMaxPageSize(int maxPageSize) {
187        next.setMaxPageSize(maxPageSize);
188    }
189
190    public boolean isUseCache() {
191        return next.isUseCache();
192    }
193
194    public void setUseCache(boolean useCache) {
195        next.setUseCache(useCache);
196    }
197
198    public int getMinimumMessageSize() {
199        return next.getMinimumMessageSize();
200    }
201
202    public void setMinimumMessageSize(int minimumMessageSize) {
203        next.setMinimumMessageSize(minimumMessageSize);
204    }
205
206    public void wakeup() {
207        next.wakeup();
208    }
209
210    public boolean isLazyDispatch() {
211        return next.isLazyDispatch();
212    }
213
214    public void setLazyDispatch(boolean value) {
215        next.setLazyDispatch(value);
216    }
217
218    public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) {
219        next.messageExpired(context, prefetchSubscription, node);
220    }
221
222    public boolean iterate() {
223        return next.iterate();
224    }
225
226    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
227        next.fastProducer(context, producerInfo);
228    }
229
230    public void isFull(ConnectionContext context, Usage usage) {
231        next.isFull(context, usage);
232    }
233
234    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
235        next.messageConsumed(context, messageReference);
236    }
237
238    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
239        next.messageDelivered(context, messageReference);
240    }
241
242    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
243        next.messageDiscarded(context, sub, messageReference);
244    }
245
246    public void slowConsumer(ConnectionContext context, Subscription subs) {
247        next.slowConsumer(context, subs);
248    }
249
250    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference node) {
251        next.messageExpired(context, subs, node);
252    }
253
254    public int getMaxBrowsePageSize() {
255        return next.getMaxBrowsePageSize();
256    }
257
258    public void setMaxBrowsePageSize(int maxPageSize) {
259        next.setMaxBrowsePageSize(maxPageSize);
260    }
261
262    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
263        next.processDispatchNotification(messageDispatchNotification);
264    }
265
266    public int getCursorMemoryHighWaterMark() {
267        return next.getCursorMemoryHighWaterMark();
268    }
269
270    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
271        next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
272    }
273
274    public boolean isPrioritizedMessages() {
275        return next.isPrioritizedMessages();
276    }
277
278    public SlowConsumerStrategy getSlowConsumerStrategy() {
279        return next.getSlowConsumerStrategy();
280    }
281}