001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.net.URI;
020import java.util.Map;
021import java.util.Set;
022import java.util.concurrent.ThreadPoolExecutor;
023import java.util.concurrent.atomic.AtomicReference;
024import org.apache.activemq.broker.region.Destination;
025import org.apache.activemq.broker.region.MessageReference;
026import org.apache.activemq.broker.region.Subscription;
027import org.apache.activemq.command.ActiveMQDestination;
028import org.apache.activemq.command.BrokerId;
029import org.apache.activemq.command.BrokerInfo;
030import org.apache.activemq.command.ConnectionInfo;
031import org.apache.activemq.command.ConsumerControl;
032import org.apache.activemq.command.ConsumerInfo;
033import org.apache.activemq.command.DestinationInfo;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.command.MessageAck;
036import org.apache.activemq.command.MessageDispatch;
037import org.apache.activemq.command.MessageDispatchNotification;
038import org.apache.activemq.command.MessagePull;
039import org.apache.activemq.command.ProducerInfo;
040import org.apache.activemq.command.RemoveSubscriptionInfo;
041import org.apache.activemq.command.Response;
042import org.apache.activemq.command.SessionInfo;
043import org.apache.activemq.command.TransactionId;
044import org.apache.activemq.store.kahadb.plist.PListStore;
045import org.apache.activemq.thread.Scheduler;
046import org.apache.activemq.usage.Usage;
047
048/**
049 * Like a BrokerFilter but it allows you to switch the getNext().broker. This
050 * has more overhead than a BrokerFilter since access to the getNext().broker
051 * has to synchronized since it is mutable
052 * 
053 * 
054 */
055public class MutableBrokerFilter implements Broker {
056
057    protected AtomicReference<Broker> next = new AtomicReference<Broker>();
058
059    public MutableBrokerFilter(Broker next) {
060        this.next.set(next);
061    }
062
063    public Broker getAdaptor(Class type) {
064        if (type.isInstance(this)) {
065            return this;
066        }
067        return next.get().getAdaptor(type);
068    }
069
070    public Broker getNext() {
071        return next.get();
072    }
073
074    public void setNext(Broker next) {
075        this.next.set(next);
076    }
077
078    public Map<ActiveMQDestination, Destination> getDestinationMap() {
079        return getNext().getDestinationMap();
080    }
081
082    public Set getDestinations(ActiveMQDestination destination) {
083        return getNext().getDestinations(destination);
084    }
085
086    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
087        getNext().acknowledge(consumerExchange, ack);
088    }
089
090    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
091        getNext().addConnection(context, info);
092    }
093
094    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
095        return getNext().addConsumer(context, info);
096    }
097
098    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
099        getNext().addProducer(context, info);
100    }
101
102    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
103        getNext().commitTransaction(context, xid, onePhase);
104    }
105
106    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
107        getNext().removeSubscription(context, info);
108    }
109
110    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
111        return getNext().getPreparedTransactions(context);
112    }
113
114    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
115        return getNext().prepareTransaction(context, xid);
116    }
117
118    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
119        getNext().removeConnection(context, info, error);
120    }
121
122    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
123        getNext().removeConsumer(context, info);
124    }
125
126    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
127        getNext().removeProducer(context, info);
128    }
129
130    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
131        getNext().rollbackTransaction(context, xid);
132    }
133
134    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
135        getNext().send(producerExchange, messageSend);
136    }
137
138    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
139        getNext().beginTransaction(context, xid);
140    }
141
142    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
143        getNext().forgetTransaction(context, transactionId);
144    }
145
146    public Connection[] getClients() throws Exception {
147        return getNext().getClients();
148    }
149
150    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception {
151        return getNext().addDestination(context, destination,createIfTemporary);
152    }
153
154    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
155        getNext().removeDestination(context, destination, timeout);
156    }
157
158    public ActiveMQDestination[] getDestinations() throws Exception {
159        return getNext().getDestinations();
160    }
161
162    public void start() throws Exception {
163        getNext().start();
164    }
165
166    public void stop() throws Exception {
167        getNext().stop();
168    }
169
170    public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
171        getNext().addSession(context, info);
172    }
173
174    public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
175        getNext().removeSession(context, info);
176    }
177
178    public BrokerId getBrokerId() {
179        return getNext().getBrokerId();
180    }
181
182    public String getBrokerName() {
183        return getNext().getBrokerName();
184    }
185
186    public void gc() {
187        getNext().gc();
188    }
189
190    public void addBroker(Connection connection, BrokerInfo info) {
191        getNext().addBroker(connection, info);
192    }
193
194    public void removeBroker(Connection connection, BrokerInfo info) {
195        getNext().removeBroker(connection, info);
196    }
197
198    public BrokerInfo[] getPeerBrokerInfos() {
199        return getNext().getPeerBrokerInfos();
200    }
201
202    public void preProcessDispatch(MessageDispatch messageDispatch) {
203        getNext().preProcessDispatch(messageDispatch);
204    }
205
206    public void postProcessDispatch(MessageDispatch messageDispatch) {
207        getNext().postProcessDispatch(messageDispatch);
208    }
209
210    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
211        getNext().processDispatchNotification(messageDispatchNotification);
212    }
213
214    public boolean isStopped() {
215        return getNext().isStopped();
216    }
217
218    public Set<ActiveMQDestination> getDurableDestinations() {
219        return getNext().getDurableDestinations();
220    }
221
222    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
223        getNext().addDestinationInfo(context, info);
224
225    }
226
227    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
228        getNext().removeDestinationInfo(context, info);
229
230    }
231
232    public boolean isFaultTolerantConfiguration() {
233        return getNext().isFaultTolerantConfiguration();
234    }
235
236    public ConnectionContext getAdminConnectionContext() {
237        return getNext().getAdminConnectionContext();
238    }
239
240    public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
241        getNext().setAdminConnectionContext(adminConnectionContext);
242    }
243
244    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
245        return getNext().messagePull(context, pull);
246    }
247
248    public PListStore getTempDataStore() {
249        return getNext().getTempDataStore();
250    }
251
252    public URI getVmConnectorURI() {
253        return getNext().getVmConnectorURI();
254    }
255
256    public void brokerServiceStarted() {
257        getNext().brokerServiceStarted();
258    }
259
260    public BrokerService getBrokerService() {
261        return getNext().getBrokerService();
262    }
263
264    public boolean isExpired(MessageReference messageReference) {
265        return getNext().isExpired(messageReference);
266    }
267
268    public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
269        getNext().messageExpired(context, message, subscription);
270    }
271
272    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
273                                      Subscription subscription) {
274        getNext().sendToDeadLetterQueue(context, messageReference, subscription);
275    }
276
277    public Broker getRoot() {
278        return getNext().getRoot();
279    }
280    
281    public long getBrokerSequenceId() {
282        return getNext().getBrokerSequenceId();
283    }
284    
285    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
286        getNext().fastProducer(context, producerInfo);
287    }
288
289    public void isFull(ConnectionContext context,Destination destination, Usage usage) {
290        getNext().isFull(context,destination, usage);
291    }
292
293    public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
294        getNext().messageConsumed(context, messageReference);
295    }
296
297    public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
298        getNext().messageDelivered(context, messageReference);
299    }
300
301    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
302        getNext().messageDiscarded(context, sub, messageReference);
303    }
304
305    public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs) {
306        getNext().slowConsumer(context, dest,subs);
307    }
308    
309    public void nowMasterBroker() {   
310       getNext().nowMasterBroker();
311    }
312
313    public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
314            ConsumerControl control) {
315        getNext().processConsumerControl(consumerExchange, control);
316    }
317
318    public Scheduler getScheduler() {
319       return getNext().getScheduler();
320    }
321
322    public ThreadPoolExecutor getExecutor() {
323       return getNext().getExecutor();
324    }
325
326    public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex) {
327        getNext().networkBridgeStarted(brokerInfo, createdByDuplex);
328    }
329
330    public void networkBridgeStopped(BrokerInfo brokerInfo) {
331        getNext().networkBridgeStopped(brokerInfo);
332    }
333}