001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.Hashtable;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.Map.Entry;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.CopyOnWriteArraySet;
030import java.util.concurrent.ThreadPoolExecutor;
031import javax.management.InstanceNotFoundException;
032import javax.management.MalformedObjectNameException;
033import javax.management.ObjectName;
034import javax.management.openmbean.CompositeData;
035import javax.management.openmbean.CompositeDataSupport;
036import javax.management.openmbean.CompositeType;
037import javax.management.openmbean.OpenDataException;
038import javax.management.openmbean.TabularData;
039import javax.management.openmbean.TabularDataSupport;
040import javax.management.openmbean.TabularType;
041import org.apache.activemq.broker.Broker;
042import org.apache.activemq.broker.BrokerService;
043import org.apache.activemq.broker.ConnectionContext;
044import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
045import org.apache.activemq.broker.region.Destination;
046import org.apache.activemq.broker.region.DestinationFactory;
047import org.apache.activemq.broker.region.DestinationFactoryImpl;
048import org.apache.activemq.broker.region.DestinationInterceptor;
049import org.apache.activemq.broker.region.Queue;
050import org.apache.activemq.broker.region.Region;
051import org.apache.activemq.broker.region.RegionBroker;
052import org.apache.activemq.broker.region.Subscription;
053import org.apache.activemq.broker.region.Topic;
054import org.apache.activemq.broker.region.TopicRegion;
055import org.apache.activemq.broker.region.TopicSubscription;
056import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
057import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
058import org.apache.activemq.command.ActiveMQDestination;
059import org.apache.activemq.command.ActiveMQMessage;
060import org.apache.activemq.command.ActiveMQTopic;
061import org.apache.activemq.command.ConsumerInfo;
062import org.apache.activemq.command.Message;
063import org.apache.activemq.command.MessageId;
064import org.apache.activemq.command.SubscriptionInfo;
065import org.apache.activemq.store.MessageRecoveryListener;
066import org.apache.activemq.store.PersistenceAdapter;
067import org.apache.activemq.store.TopicMessageStore;
068import org.apache.activemq.thread.Scheduler;
069import org.apache.activemq.thread.TaskRunnerFactory;
070import org.apache.activemq.usage.SystemUsage;
071import org.apache.activemq.util.JMXSupport;
072import org.apache.activemq.util.ServiceStopper;
073import org.apache.activemq.util.SubscriptionKey;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077public class ManagedRegionBroker extends RegionBroker {
078    private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
079    private final ManagementContext managementContext;
080    private final ObjectName brokerObjectName;
081    private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
082    private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
083    private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
084    private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
085    private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
086    private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
087    private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
088    private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
089    private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
090    private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
091    private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
092    private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
093    private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
094    /* This is the first broker in the broker interceptor chain. */
095    private Broker contextBroker;
096
097    public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
098                               DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
099        super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
100        this.managementContext = context;
101        this.brokerObjectName = brokerObjectName;
102    }
103
104    @Override
105    public void start() throws Exception {
106        super.start();
107        // build all existing durable subscriptions
108        buildExistingSubscriptions();
109    }
110
111    @Override
112    protected void doStop(ServiceStopper stopper) {
113        super.doStop(stopper);
114        // lets remove any mbeans not yet removed
115        for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
116            ObjectName name = iter.next();
117            try {
118                managementContext.unregisterMBean(name);
119            } catch (InstanceNotFoundException e) {
120                LOG.warn("The MBean: " + name + " is no longer registered with JMX");
121            } catch (Exception e) {
122                stopper.onException(this, e);
123            }
124        }
125        registeredMBeans.clear();
126    }
127
128    @Override
129    protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
130        return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
131    }
132
133    @Override
134    protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
135        return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
136    }
137
138    @Override
139    protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
140        return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
141    }
142
143    @Override
144    protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
145        return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
146    }
147
148    public void register(ActiveMQDestination destName, Destination destination) {
149        // TODO refactor to allow views for custom destinations
150        try {
151            ObjectName objectName = createObjectName(destName);
152            DestinationView view;
153            if (destination instanceof Queue) {
154                view = new QueueView(this, (Queue)destination);
155            } else if (destination instanceof Topic) {
156                view = new TopicView(this, (Topic)destination);
157            } else {
158                view = null;
159                LOG.warn("JMX View is not supported for custom destination: " + destination);
160            }
161            if (view != null) {
162                registerDestination(objectName, destName, view);
163            }
164        } catch (Exception e) {
165            LOG.error("Failed to register destination " + destName, e);
166        }
167    }
168
169    public void unregister(ActiveMQDestination destName) {
170        try {
171            ObjectName objectName = createObjectName(destName);
172            unregisterDestination(objectName);
173        } catch (Exception e) {
174            LOG.error("Failed to unregister " + destName, e);
175        }
176    }
177
178    public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
179        String connectionClientId = context.getClientId();
180        ObjectName brokerJmxObjectName = brokerObjectName;
181        String objectNameStr = getSubscriptionObjectName(sub, connectionClientId, brokerJmxObjectName);
182        SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
183        try {
184            ObjectName objectName = new ObjectName(objectNameStr);
185            SubscriptionView view;
186            if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
187                // add offline subscribers to inactive list
188                SubscriptionInfo info = new SubscriptionInfo();
189                info.setClientId(context.getClientId());
190                info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
191                info.setDestination(sub.getConsumerInfo().getDestination());
192                addInactiveSubscription(key, info);
193            } else {
194                if (sub.getConsumerInfo().isDurable()) {
195                    view = new DurableSubscriptionView(this, context.getClientId(), sub);
196                } else {
197                    if (sub instanceof TopicSubscription) {
198                        view = new TopicSubscriptionView(context.getClientId(), (TopicSubscription) sub);
199                    } else {
200                        view = new SubscriptionView(context.getClientId(), sub);
201                    }
202                }
203                registerSubscription(objectName, sub.getConsumerInfo(), key, view);
204            }
205            subscriptionMap.put(sub, objectName);
206            return objectName;
207        } catch (Exception e) {
208            LOG.error("Failed to register subscription " + sub, e);
209            return null;
210        }
211    }
212
213    public static String getSubscriptionObjectName(Subscription sub, String connectionClientId, ObjectName brokerJmxObjectName) {
214        Hashtable map = brokerJmxObjectName.getKeyPropertyList();
215        String brokerDomain = brokerJmxObjectName.getDomain();
216        String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,";
217        String destinationType = "destinationType=" + sub.getConsumerInfo().getDestination().getDestinationTypeAsString();
218        String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName());
219        String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
220        String persistentMode = "persistentMode=";
221        String consumerId = "";
222        if (sub.getConsumerInfo().isDurable()) {
223            persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName());
224        } else {
225            persistentMode += "Non-Durable";
226            if (sub.getConsumerInfo() != null && sub.getConsumerInfo().getConsumerId() != null) {
227                consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString());
228            }
229        }
230        objectNameStr += persistentMode + ",";
231        objectNameStr += destinationType + ",";
232        objectNameStr += destinationName + ",";
233        objectNameStr += clientId;
234        objectNameStr += consumerId;
235        return objectNameStr;
236    }
237
238    @Override
239    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
240        Subscription sub = super.addConsumer(context, info);
241        SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
242        ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
243        if (inactiveName != null) {
244            // if it was inactive, register it
245            registerSubscription(context, sub);
246        }
247        return sub;
248    }
249
250    @Override
251    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
252        for (Subscription sub : subscriptionMap.keySet()) {
253            if (sub.getConsumerInfo().equals(info)) {
254               // unregister all consumer subs
255               unregisterSubscription(subscriptionMap.get(sub), true);
256            }
257        }
258        super.removeConsumer(context, info);
259    }
260
261    public void unregisterSubscription(Subscription sub) {
262        ObjectName name = subscriptionMap.remove(sub);
263        if (name != null) {
264            try {
265                SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
266                ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
267                if (inactiveName != null) {
268                    inactiveDurableTopicSubscribers.remove(inactiveName);
269                    managementContext.unregisterMBean(inactiveName);
270                }
271            } catch (Exception e) {
272                LOG.error("Failed to unregister subscription " + sub, e);
273            }
274        }
275    }
276
277    protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
278        if (dest.isQueue()) {
279            if (dest.isTemporary()) {
280                temporaryQueues.put(key, view);
281            } else {
282                queues.put(key, view);
283            }
284        } else {
285            if (dest.isTemporary()) {
286                temporaryTopics.put(key, view);
287            } else {
288                topics.put(key, view);
289            }
290        }
291        try {
292            AnnotatedMBean.registerMBean(managementContext, view, key);
293            registeredMBeans.add(key);
294        } catch (Throwable e) {
295            LOG.warn("Failed to register MBean: " + key);
296            LOG.debug("Failure reason: " + e, e);
297        }
298    }
299
300    protected void unregisterDestination(ObjectName key) throws Exception {
301
302        DestinationView view = null;
303        removeAndRemember(topics, key, view);
304        removeAndRemember(queues, key, view);
305        removeAndRemember(temporaryQueues, key, view);
306        removeAndRemember(temporaryTopics, key, view);
307        if (registeredMBeans.remove(key)) {
308            try {
309                managementContext.unregisterMBean(key);
310            } catch (Throwable e) {
311                LOG.warn("Failed to unregister MBean: " + key);
312                LOG.debug("Failure reason: " + e, e);
313            }
314        }
315        if (view != null) {
316            key = view.getSlowConsumerStrategy();
317            if (key!= null && registeredMBeans.remove(key)) {
318                try {
319                    managementContext.unregisterMBean(key);
320                } catch (Throwable e) {
321                    LOG.warn("Failed to unregister slow consumer strategy MBean: " + key);
322                    LOG.debug("Failure reason: " + e, e);
323                }
324            }
325        }
326    }
327
328    private void removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
329        DestinationView candidate = map.remove(key);
330        if (candidate != null && view == null) {
331            view = candidate;
332        }
333    }
334
335    protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
336        ActiveMQDestination dest = info.getDestination();
337        if (dest.isQueue()) {
338            if (dest.isTemporary()) {
339                temporaryQueueSubscribers.put(key, view);
340            } else {
341                queueSubscribers.put(key, view);
342            }
343        } else {
344            if (dest.isTemporary()) {
345                temporaryTopicSubscribers.put(key, view);
346            } else {
347                if (info.isDurable()) {
348                    durableTopicSubscribers.put(key, view);
349                    // unregister any inactive durable subs
350                    try {
351                        ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
352                        if (inactiveName != null) {
353                            inactiveDurableTopicSubscribers.remove(inactiveName);
354                            registeredMBeans.remove(inactiveName);
355                            managementContext.unregisterMBean(inactiveName);
356                        }
357                    } catch (Throwable e) {
358                        LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e);
359                    }
360                } else {
361                    topicSubscribers.put(key, view);
362                }
363            }
364        }
365
366        try {
367            AnnotatedMBean.registerMBean(managementContext, view, key);
368            registeredMBeans.add(key);
369        } catch (Throwable e) {
370            LOG.warn("Failed to register MBean: " + key);
371            LOG.debug("Failure reason: " + e, e);
372        }
373
374    }
375
376    protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
377        queueSubscribers.remove(key);
378        topicSubscribers.remove(key);
379        temporaryQueueSubscribers.remove(key);
380        temporaryTopicSubscribers.remove(key);
381        if (registeredMBeans.remove(key)) {
382            try {
383                managementContext.unregisterMBean(key);
384            } catch (Throwable e) {
385                LOG.warn("Failed to unregister MBean: " + key);
386                LOG.debug("Failure reason: " + e, e);
387            }
388        }
389        DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
390        if (view != null) {
391            // need to put this back in the inactive list
392            SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
393            if (addToInactive) {
394                SubscriptionInfo info = new SubscriptionInfo();
395                info.setClientId(subscriptionKey.getClientId());
396                info.setSubscriptionName(subscriptionKey.getSubscriptionName());
397                info.setDestination(new ActiveMQTopic(view.getDestinationName()));
398                addInactiveSubscription(subscriptionKey, info);
399            }
400        }
401    }
402
403    protected void buildExistingSubscriptions() throws Exception {
404        Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
405        Set destinations = destinationFactory.getDestinations();
406        if (destinations != null) {
407            for (Iterator iter = destinations.iterator(); iter.hasNext();) {
408                ActiveMQDestination dest = (ActiveMQDestination)iter.next();
409                if (dest.isTopic()) {                
410                    SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
411                    if (infos != null) {
412                        for (int i = 0; i < infos.length; i++) {
413                            SubscriptionInfo info = infos[i];
414                            SubscriptionKey key = new SubscriptionKey(info);
415                            if (!alreadyKnown(key)) {
416                                LOG.debug("Restoring durable subscription mbean: " + info);
417                                subscriptions.put(key, info);
418                            }
419                        }
420                    }
421                }
422            }
423        }
424        for (Iterator i = subscriptions.entrySet().iterator(); i.hasNext();) {
425            Map.Entry entry = (Entry)i.next();
426            SubscriptionKey key = (SubscriptionKey)entry.getKey();
427            SubscriptionInfo info = (SubscriptionInfo)entry.getValue();
428            addInactiveSubscription(key, info);
429        }
430    }
431
432    private boolean alreadyKnown(SubscriptionKey key) {
433        boolean known = false;
434        known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
435        if (LOG.isTraceEnabled()) {
436            LOG.trace("Sub with key: " + key + ", " + (known ? "": "not") +  " already registered");
437        }
438        return known;
439    }
440
441    protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info) {
442        Hashtable map = brokerObjectName.getKeyPropertyList();
443        try {
444            ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type=Subscription," + "active=false,"
445                                                   + "name=" + JMXSupport.encodeObjectNamePart(key.toString()) + "");
446            SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info);
447
448            try {
449                AnnotatedMBean.registerMBean(managementContext, view, objectName);
450                registeredMBeans.add(objectName);
451            } catch (Throwable e) {
452                LOG.warn("Failed to register MBean: " + key);
453                LOG.debug("Failure reason: " + e, e);
454            }
455
456            inactiveDurableTopicSubscribers.put(objectName, view);
457            subscriptionKeys.put(key, objectName);
458        } catch (Exception e) {
459            LOG.error("Failed to register subscription " + info, e);
460        }
461    }
462
463    public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
464        List<Message> messages = getSubscriberMessages(view);
465        CompositeData c[] = new CompositeData[messages.size()];
466        for (int i = 0; i < c.length; i++) {
467            try {
468                c[i] = OpenTypeSupport.convert(messages.get(i));
469            } catch (Throwable e) {
470                LOG.error("failed to browse : " + view, e);
471            }
472        }
473        return c;
474    }
475
476    public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
477        OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
478        List<Message> messages = getSubscriberMessages(view);
479        CompositeType ct = factory.getCompositeType();
480        TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
481        TabularDataSupport rc = new TabularDataSupport(tt);
482        for (int i = 0; i < messages.size(); i++) {
483            rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i))));
484        }
485        return rc;
486    }
487
488    protected List<Message> getSubscriberMessages(SubscriptionView view) {
489        // TODO It is very dangerous operation for big backlogs
490        if (!(destinationFactory instanceof DestinationFactoryImpl)) {
491            throw new RuntimeException("unsupported by " + destinationFactory);
492        }
493        PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
494        final List<Message> result = new ArrayList<Message>();
495        try {
496            ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
497            TopicMessageStore store = adapter.createTopicMessageStore(topic);
498            store.recover(new MessageRecoveryListener() {
499                public boolean recoverMessage(Message message) throws Exception {
500                    result.add(message);
501                    return true;
502                }
503
504                public boolean recoverMessageReference(MessageId messageReference) throws Exception {
505                    throw new RuntimeException("Should not be called.");
506                }
507
508                public boolean hasSpace() {
509                    return true;
510                }
511                
512                public boolean isDuplicate(MessageId id) {
513                    return false;
514                }
515            });
516        } catch (Throwable e) {
517            LOG.error("Failed to browse messages for Subscription " + view, e);
518        }
519        return result;
520
521    }
522
523    protected ObjectName[] getTopics() {
524        Set<ObjectName> set = topics.keySet();
525        return set.toArray(new ObjectName[set.size()]);
526    }
527
528    protected ObjectName[] getQueues() {
529        Set<ObjectName> set = queues.keySet();
530        return set.toArray(new ObjectName[set.size()]);
531    }
532
533    protected ObjectName[] getTemporaryTopics() {
534        Set<ObjectName> set = temporaryTopics.keySet();
535        return set.toArray(new ObjectName[set.size()]);
536    }
537
538    protected ObjectName[] getTemporaryQueues() {
539        Set<ObjectName> set = temporaryQueues.keySet();
540        return set.toArray(new ObjectName[set.size()]);
541    }
542
543    protected ObjectName[] getTopicSubscribers() {
544        Set<ObjectName> set = topicSubscribers.keySet();
545        return set.toArray(new ObjectName[set.size()]);
546    }
547
548    protected ObjectName[] getDurableTopicSubscribers() {
549        Set<ObjectName> set = durableTopicSubscribers.keySet();
550        return set.toArray(new ObjectName[set.size()]);
551    }
552
553    protected ObjectName[] getQueueSubscribers() {
554        Set<ObjectName> set = queueSubscribers.keySet();
555        return set.toArray(new ObjectName[set.size()]);
556    }
557
558    protected ObjectName[] getTemporaryTopicSubscribers() {
559        Set<ObjectName> set = temporaryTopicSubscribers.keySet();
560        return set.toArray(new ObjectName[set.size()]);
561    }
562
563    protected ObjectName[] getTemporaryQueueSubscribers() {
564        Set<ObjectName> set = temporaryQueueSubscribers.keySet();
565        return set.toArray(new ObjectName[set.size()]);
566    }
567
568    protected ObjectName[] getInactiveDurableTopicSubscribers() {
569        Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
570        return set.toArray(new ObjectName[set.size()]);
571    }
572
573    public Broker getContextBroker() {
574        return contextBroker;
575    }
576
577    public void setContextBroker(Broker contextBroker) {
578        this.contextBroker = contextBroker;
579    }
580
581    protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException {
582        // Build the object name for the destination
583        Hashtable map = brokerObjectName.getKeyPropertyList();
584        ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type="
585                                               + JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()) + "," + "Destination="
586                                               + JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
587        return objectName;
588    }
589
590    public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
591        ObjectName objectName = null;
592        try {
593            objectName = createObjectName(strategy);
594            if (!registeredMBeans.contains(objectName))  {
595                AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
596                AnnotatedMBean.registerMBean(managementContext, view, objectName);
597                registeredMBeans.add(objectName);
598            }
599        } catch (Exception e) {
600            LOG.warn("Failed to register MBean: " + strategy);
601            LOG.debug("Failure reason: " + e, e);
602        }
603        return objectName;
604    }
605
606    private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
607        Hashtable map = brokerObjectName.getKeyPropertyList();
608        ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
609                            + "Type=SlowConsumerStrategy," + "InstanceName=" + JMXSupport.encodeObjectNamePart(strategy.getName()));
610        return objectName;            
611    }
612
613    public ObjectName getSubscriberObjectName(Subscription key) {
614        return subscriptionMap.get(key);
615    }
616
617    public Subscription getSubscriber(ObjectName key) {
618        Subscription sub = null;
619        for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
620            if (entry.getValue().equals(key)) {
621                sub = entry.getKey();
622                break;
623            }
624        }
625        return sub;
626    }
627}