001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.File;
020import java.io.IOException;
021import java.net.URI;
022import java.net.URISyntaxException;
023import java.net.UnknownHostException;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.CopyOnWriteArrayList;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.LinkedBlockingQueue;
033import java.util.concurrent.SynchronousQueue;
034import java.util.concurrent.ThreadFactory;
035import java.util.concurrent.ThreadPoolExecutor;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicBoolean;
038
039import javax.annotation.PostConstruct;
040import javax.annotation.PreDestroy;
041import javax.management.MalformedObjectNameException;
042import javax.management.ObjectName;
043
044import org.apache.activemq.ActiveMQConnectionMetaData;
045import org.apache.activemq.ConfigurationException;
046import org.apache.activemq.Service;
047import org.apache.activemq.advisory.AdvisoryBroker;
048import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
049import org.apache.activemq.broker.ft.MasterConnector;
050import org.apache.activemq.broker.jmx.AnnotatedMBean;
051import org.apache.activemq.broker.jmx.BrokerView;
052import org.apache.activemq.broker.jmx.ConnectorView;
053import org.apache.activemq.broker.jmx.ConnectorViewMBean;
054import org.apache.activemq.broker.jmx.FTConnectorView;
055import org.apache.activemq.broker.jmx.JmsConnectorView;
056import org.apache.activemq.broker.jmx.JobSchedulerView;
057import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
058import org.apache.activemq.broker.jmx.ManagedRegionBroker;
059import org.apache.activemq.broker.jmx.ManagementContext;
060import org.apache.activemq.broker.jmx.NetworkConnectorView;
061import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
062import org.apache.activemq.broker.jmx.ProxyConnectorView;
063import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
064import org.apache.activemq.broker.region.Destination;
065import org.apache.activemq.broker.region.DestinationFactory;
066import org.apache.activemq.broker.region.DestinationFactoryImpl;
067import org.apache.activemq.broker.region.DestinationInterceptor;
068import org.apache.activemq.broker.region.RegionBroker;
069import org.apache.activemq.broker.region.policy.PolicyMap;
070import org.apache.activemq.broker.region.virtual.MirroredQueue;
071import org.apache.activemq.broker.region.virtual.VirtualDestination;
072import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
073import org.apache.activemq.broker.region.virtual.VirtualTopic;
074import org.apache.activemq.broker.scheduler.SchedulerBroker;
075import org.apache.activemq.command.ActiveMQDestination;
076import org.apache.activemq.command.BrokerId;
077import org.apache.activemq.network.ConnectionFilter;
078import org.apache.activemq.network.DiscoveryNetworkConnector;
079import org.apache.activemq.network.NetworkConnector;
080import org.apache.activemq.network.jms.JmsConnector;
081import org.apache.activemq.proxy.ProxyConnector;
082import org.apache.activemq.security.MessageAuthorizationPolicy;
083import org.apache.activemq.selector.SelectorParser;
084import org.apache.activemq.store.PersistenceAdapter;
085import org.apache.activemq.store.PersistenceAdapterFactory;
086import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
087import org.apache.activemq.store.kahadb.plist.PListStore;
088import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
089import org.apache.activemq.thread.Scheduler;
090import org.apache.activemq.thread.TaskRunnerFactory;
091import org.apache.activemq.transport.TransportFactory;
092import org.apache.activemq.transport.TransportServer;
093import org.apache.activemq.transport.vm.VMTransportFactory;
094import org.apache.activemq.usage.SystemUsage;
095import org.apache.activemq.util.*;
096import org.slf4j.Logger;
097import org.slf4j.LoggerFactory;
098import org.slf4j.MDC;
099
100/**
101 * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
102 * number of transport connectors, network connectors and a bunch of properties
103 * which can be used to configure the broker as its lazily created.
104 * 
105 * 
106 * @org.apache.xbean.XBean
107 */
108public class BrokerService implements Service {
109    protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
110    public static final String DEFAULT_PORT = "61616";
111    public static final String LOCAL_HOST_NAME;
112    public static final String DEFAULT_BROKER_NAME = "localhost";
113    private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
114    private static final long serialVersionUID = 7353129142305630237L;
115    private boolean useJmx = true;
116    private boolean enableStatistics = true;
117    private boolean persistent = true;
118    private boolean populateJMSXUserID;
119    private boolean useAuthenticatedPrincipalForJMSXUserID;
120
121    private boolean useShutdownHook = true;
122    private boolean useLoggingForShutdownErrors;
123    private boolean shutdownOnMasterFailure;
124    private boolean shutdownOnSlaveFailure;
125    private boolean waitForSlave;
126    private long waitForSlaveTimeout = 600000L;
127    private boolean passiveSlave;
128    private String brokerName = DEFAULT_BROKER_NAME;
129    private File dataDirectoryFile;
130    private File tmpDataDirectory;
131    private Broker broker;
132    private BrokerView adminView;
133    private ManagementContext managementContext;
134    private ObjectName brokerObjectName;
135    private TaskRunnerFactory taskRunnerFactory;
136    private TaskRunnerFactory persistenceTaskRunnerFactory;
137    private SystemUsage systemUsage;
138    private SystemUsage producerSystemUsage;
139    private SystemUsage consumerSystemUsaage;
140    private PersistenceAdapter persistenceAdapter;
141    private PersistenceAdapterFactory persistenceFactory;
142    protected DestinationFactory destinationFactory;
143    private MessageAuthorizationPolicy messageAuthorizationPolicy;
144    private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
145    private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
146    private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
147    private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
148    private final List<Service> services = new ArrayList<Service>();
149    private MasterConnector masterConnector;
150    private String masterConnectorURI;
151    private transient Thread shutdownHook;
152    private String[] transportConnectorURIs;
153    private String[] networkConnectorURIs;
154    private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
155    // to other jms messaging
156    // systems
157    private boolean deleteAllMessagesOnStartup;
158    private boolean advisorySupport = true;
159    private URI vmConnectorURI;
160    private String defaultSocketURIString;
161    private PolicyMap destinationPolicy;
162    private final AtomicBoolean started = new AtomicBoolean(false);
163    private final AtomicBoolean stopped = new AtomicBoolean(false);
164    private BrokerPlugin[] plugins;
165    private boolean keepDurableSubsActive = true;
166    private boolean useVirtualTopics = true;
167    private boolean useMirroredQueues = false;
168    private boolean useTempMirroredQueues = true;
169    private BrokerId brokerId;
170    private DestinationInterceptor[] destinationInterceptors;
171    private ActiveMQDestination[] destinations;
172    private PListStore tempDataStore;
173    private int persistenceThreadPriority = Thread.MAX_PRIORITY;
174    private boolean useLocalHostBrokerName;
175    private final CountDownLatch stoppedLatch = new CountDownLatch(1);
176    private final CountDownLatch startedLatch = new CountDownLatch(1);
177    private boolean supportFailOver;
178    private Broker regionBroker;
179    private int producerSystemUsagePortion = 60;
180    private int consumerSystemUsagePortion = 40;
181    private boolean splitSystemUsageForProducersConsumers;
182    private boolean monitorConnectionSplits = false;
183    private int taskRunnerPriority = Thread.NORM_PRIORITY;
184    private boolean dedicatedTaskRunner;
185    private boolean cacheTempDestinations = false;// useful for failover
186    private int timeBeforePurgeTempDestinations = 5000;
187    private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
188    private boolean systemExitOnShutdown;
189    private int systemExitOnShutdownExitCode;
190    private SslContext sslContext;
191    private boolean forceStart = false;
192    private IOExceptionHandler ioExceptionHandler;
193    private boolean schedulerSupport = false;
194    private File schedulerDirectoryFile;
195    private Scheduler scheduler;
196    private ThreadPoolExecutor executor;
197    private boolean slave = true;
198    private int schedulePeriodForDestinationPurge=5000;
199    private BrokerContext brokerContext;
200    private boolean networkConnectorStartAsync = false;
201
202        static {
203        String localHostName = "localhost";
204        try {
205            localHostName =  InetAddressUtil.getLocalHostName();
206        } catch (UnknownHostException e) {
207            LOG.error("Failed to resolve localhost");
208        }
209        LOCAL_HOST_NAME = localHostName;
210    }
211
212    @Override
213    public String toString() {
214        return "BrokerService[" + getBrokerName() + "]";
215    }
216
217    /**
218     * Adds a new transport connector for the given bind address
219     * 
220     * @return the newly created and added transport connector
221     * @throws Exception
222     */
223    public TransportConnector addConnector(String bindAddress) throws Exception {
224        return addConnector(new URI(bindAddress));
225    }
226
227    /**
228     * Adds a new transport connector for the given bind address
229     * 
230     * @return the newly created and added transport connector
231     * @throws Exception
232     */
233    public TransportConnector addConnector(URI bindAddress) throws Exception {
234        return addConnector(createTransportConnector(bindAddress));
235    }
236
237    /**
238     * Adds a new transport connector for the given TransportServer transport
239     * 
240     * @return the newly created and added transport connector
241     * @throws Exception
242     */
243    public TransportConnector addConnector(TransportServer transport) throws Exception {
244        return addConnector(new TransportConnector(transport));
245    }
246
247    /**
248     * Adds a new transport connector
249     * 
250     * @return the transport connector
251     * @throws Exception
252     */
253    public TransportConnector addConnector(TransportConnector connector) throws Exception {
254        transportConnectors.add(connector);
255        return connector;
256    }
257
258    /**
259     * Stops and removes a transport connector from the broker.
260     * 
261     * @param connector
262     * @return true if the connector has been previously added to the broker
263     * @throws Exception
264     */
265    public boolean removeConnector(TransportConnector connector) throws Exception {
266        boolean rc = transportConnectors.remove(connector);
267        if (rc) {
268            unregisterConnectorMBean(connector);
269        }
270        return rc;
271    }
272
273    /**
274     * Adds a new network connector using the given discovery address
275     * 
276     * @return the newly created and added network connector
277     * @throws Exception
278     */
279    public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
280        return addNetworkConnector(new URI(discoveryAddress));
281    }
282
283    /**
284     * Adds a new proxy connector using the given bind address
285     * 
286     * @return the newly created and added network connector
287     * @throws Exception
288     */
289    public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
290        return addProxyConnector(new URI(bindAddress));
291    }
292
293    /**
294     * Adds a new network connector using the given discovery address
295     * 
296     * @return the newly created and added network connector
297     * @throws Exception
298     */
299    public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
300        if (!isAdvisorySupport()) {
301            throw new javax.jms.IllegalStateException(
302                    "Networks require advisory messages to function - advisories are currently disabled");
303        }
304        NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
305        return addNetworkConnector(connector);
306    }
307
308    /**
309     * Adds a new proxy connector using the given bind address
310     * 
311     * @return the newly created and added network connector
312     * @throws Exception
313     */
314    public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
315        ProxyConnector connector = new ProxyConnector();
316        connector.setBind(bindAddress);
317        connector.setRemote(new URI("fanout:multicast://default"));
318        return addProxyConnector(connector);
319    }
320
321    /**
322     * Adds a new network connector to connect this broker to a federated
323     * network
324     */
325    public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
326        connector.setBrokerService(this);
327        URI uri = getVmConnectorURI();
328        Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
329        map.put("network", "true");
330        uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
331        connector.setLocalUri(uri);
332        // Set a connection filter so that the connector does not establish loop
333        // back connections.
334        connector.setConnectionFilter(new ConnectionFilter() {
335            public boolean connectTo(URI location) {
336                List<TransportConnector> transportConnectors = getTransportConnectors();
337                for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
338                    try {
339                        TransportConnector tc = iter.next();
340                        if (location.equals(tc.getConnectUri())) {
341                            return false;
342                        }
343                    } catch (Throwable e) {
344                    }
345                }
346                return true;
347            }
348        });
349        networkConnectors.add(connector);
350        if (isUseJmx()) {
351            registerNetworkConnectorMBean(connector);
352        }
353        return connector;
354    }
355
356    /**
357     * Removes the given network connector without stopping it. The caller
358     * should call {@link NetworkConnector#stop()} to close the connector
359     */
360    public boolean removeNetworkConnector(NetworkConnector connector) {
361        boolean answer = networkConnectors.remove(connector);
362        if (answer) {
363            unregisterNetworkConnectorMBean(connector);
364        }
365        return answer;
366    }
367
368    public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
369        URI uri = getVmConnectorURI();
370        connector.setLocalUri(uri);
371        proxyConnectors.add(connector);
372        if (isUseJmx()) {
373            registerProxyConnectorMBean(connector);
374        }
375        return connector;
376    }
377
378    public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
379        connector.setBrokerService(this);
380        jmsConnectors.add(connector);
381        if (isUseJmx()) {
382            registerJmsConnectorMBean(connector);
383        }
384        return connector;
385    }
386
387    public JmsConnector removeJmsConnector(JmsConnector connector) {
388        if (jmsConnectors.remove(connector)) {
389            return connector;
390        }
391        return null;
392    }
393
394    /**
395     * @return Returns the masterConnectorURI.
396     */
397    public String getMasterConnectorURI() {
398        return masterConnectorURI;
399    }
400
401    /**
402     * @param masterConnectorURI
403     *            The masterConnectorURI to set.
404     */
405    public void setMasterConnectorURI(String masterConnectorURI) {
406        this.masterConnectorURI = masterConnectorURI;
407    }
408
409    /**
410     * @return true if this Broker is a slave to a Master
411     */
412    public boolean isSlave() {
413        return (masterConnector != null && masterConnector.isSlave()) ||
414            (masterConnector != null && masterConnector.isStoppedBeforeStart()) ||
415            (masterConnector == null && slave);
416    }
417
418    public void masterFailed() {
419        if (shutdownOnMasterFailure) {
420            LOG.error("The Master has failed ... shutting down");
421            try {
422                stop();
423            } catch (Exception e) {
424                LOG.error("Failed to stop for master failure", e);
425            }
426        } else {
427            LOG.warn("Master Failed - starting all connectors");
428            try {
429                startAllConnectors();
430                broker.nowMasterBroker();
431            } catch (Exception e) {
432                LOG.error("Failed to startAllConnectors", e);
433            }
434        }
435    }
436
437    public boolean isStarted() {
438        return started.get();
439    }
440
441    public void start(boolean force) throws Exception {
442        forceStart = force;
443        stopped.set(false);
444        started.set(false);
445        start();
446    }
447
448    // Service interface
449    // -------------------------------------------------------------------------
450
451    protected boolean shouldAutostart() {
452        return true;
453    }
454
455    /**
456     *
457     * @throws Exception
458     * @org. apache.xbean.InitMethod
459     */
460    @PostConstruct
461    public void autoStart() throws Exception {
462        if(shouldAutostart()) {
463            start();
464        }
465    }
466
467    public void start() throws Exception {
468        if (stopped.get() || !started.compareAndSet(false, true)) {
469            // lets just ignore redundant start() calls
470            // as its way too easy to not be completely sure if start() has been
471            // called or not with the gazillion of different configuration
472            // mechanisms
473            // throw new IllegalStateException("Allready started.");
474            return;
475        }
476
477        MDC.put("activemq.broker", brokerName);
478
479        try {
480                if (systemExitOnShutdown && useShutdownHook) {
481                        throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
482                }
483            processHelperProperties();
484            if (isUseJmx()) {
485                startManagementContext();
486            }
487            getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
488            getPersistenceAdapter().setBrokerName(getBrokerName());
489            LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
490            if (deleteAllMessagesOnStartup) {
491                deleteAllMessages();
492            }
493            getPersistenceAdapter().start();
494            slave = false;
495            startDestinations();
496            addShutdownHook();
497            getBroker().start();
498            if (isUseJmx()) {
499                if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
500                        // try to restart management context
501                        // typical for slaves that use the same ports as master
502                        managementContext.stop();
503                        startManagementContext();
504                }
505                ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
506                managedBroker.setContextBroker(broker);
507                adminView.setBroker(managedBroker);
508            }
509            BrokerRegistry.getInstance().bind(getBrokerName(), this);
510            // see if there is a MasterBroker service and if so, configure
511            // it and start it.
512            for (Service service : services) {
513                if (service instanceof MasterConnector) {
514                    configureService(service);
515                    service.start();
516                }
517            }
518            if (!isSlave() && (this.masterConnector == null || isShutdownOnMasterFailure() == false)) {
519                startAllConnectors();
520            }
521            if (!stopped.get()) {
522                if (isUseJmx() && masterConnector != null) {
523                    registerFTConnectorMBean(masterConnector);
524                }
525            }
526            if (brokerId == null) {
527                brokerId = broker.getBrokerId();
528            }
529            if (ioExceptionHandler == null) {
530                setIoExceptionHandler(new DefaultIOExceptionHandler());
531            }
532            LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
533            getBroker().brokerServiceStarted();
534            startedLatch.countDown();
535        } catch (Exception e) {
536            LOG.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e);
537            try {
538                if (!stopped.get()) {
539                    stop();
540                }
541            } catch (Exception ex) {
542                LOG.warn("Failed to stop broker after failure in start ", ex);
543            }
544            throw e;
545        } finally {
546            MDC.remove("activemq.broker");
547        }
548    }
549
550    /**
551     *
552     * @throws Exception
553     * @org.apache .xbean.DestroyMethod
554     */
555    @PreDestroy
556    public void stop() throws Exception {
557        if (!started.get()) {
558            return;
559        }
560
561        MDC.put("activemq.broker", brokerName);
562
563        if (systemExitOnShutdown) {
564                new Thread() {
565                        @Override
566                public void run() {
567                                System.exit(systemExitOnShutdownExitCode);
568                        }
569                }.start();
570        }
571
572        LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " + brokerId + ") is shutting down");
573        removeShutdownHook();
574        ServiceStopper stopper = new ServiceStopper();
575        if (services != null) {
576            for (Service service : services) {
577                stopper.stop(service);
578            }
579        }
580        stopAllConnectors(stopper);
581        // remove any VMTransports connected
582        // this has to be done after services are stopped,
583        // to avoid timimg issue with discovery (spinning up a new instance)
584        BrokerRegistry.getInstance().unbind(getBrokerName());
585        VMTransportFactory.stopped(getBrokerName());
586        if (broker != null) {
587            stopper.stop(broker);
588            broker = null;
589        }
590
591        if (tempDataStore != null) {
592            tempDataStore.stop();
593            tempDataStore = null;
594        }
595        stopper.stop(persistenceAdapter);
596        persistenceAdapter = null;
597        slave = true;
598        if (isUseJmx()) {
599            stopper.stop(getManagementContext());
600            managementContext = null;
601        }
602        // Clear SelectorParser cache to free memory
603        SelectorParser.clearCache();
604        stopped.set(true);
605        stoppedLatch.countDown();
606        if (masterConnectorURI == null) {
607            // master start has not finished yet
608            if (slaveStartSignal.getCount() == 1) {
609                started.set(false);
610                slaveStartSignal.countDown();
611            }
612        } else {
613            for (Service service : services) {
614                if (service instanceof MasterConnector) {
615                    MasterConnector mConnector = (MasterConnector) service;
616                    if (!mConnector.isSlave()) {
617                        // means should be slave but not connected to master yet
618                        started.set(false);
619                        mConnector.stopBeforeConnected();
620                    }
621                }
622            }
623        }
624        if (this.taskRunnerFactory != null) {
625            this.taskRunnerFactory.shutdown();
626            this.taskRunnerFactory = null;
627        }
628        if (this.scheduler != null) {
629            this.scheduler.stop();
630            this.scheduler = null;
631        }
632        if (this.executor != null) {
633            this.executor.shutdownNow();
634            this.executor = null;
635        }
636
637        this.destinationInterceptors = null;
638        this.destinationFactory = null;
639
640        LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped");
641        synchronized (shutdownHooks) {
642            for (Runnable hook : shutdownHooks) {
643                try {
644                    hook.run();
645                } catch (Throwable e) {
646                    stopper.onException(hook, e);
647                }
648            }
649        }
650
651        MDC.remove("activemq.broker");
652
653        stopper.throwFirstException();
654    }
655    
656    public boolean checkQueueSize(String queueName) {
657        long count = 0;
658        long queueSize = 0;
659        Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
660        for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
661            if (entry.getKey().isQueue()) {
662                if (entry.getValue().getName().matches(queueName)) {
663                    queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
664                    count += queueSize;
665                    if (queueSize > 0) {
666                        LOG.info("Queue has pending message:" + entry.getValue().getName() + " queueSize is:"
667                                + queueSize);
668                    }
669                }
670            }
671        }
672        return count == 0;
673    }
674
675    /**
676     * This method (both connectorName and queueName are using regex to match)
677     * 1. stop the connector (supposed the user input the connector which the
678     * clients connect to) 2. to check whether there is any pending message on
679     * the queues defined by queueName 3. supposedly, after stop the connector,
680     * client should failover to other broker and pending messages should be
681     * forwarded. if no pending messages, the method finally call stop to stop
682     * the broker.
683     * 
684     * @param connectorName
685     * @param queueName
686     * @param timeout
687     * @param pollInterval
688     * @throws Exception
689     */
690    public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
691            throws Exception {
692        if (isUseJmx()) {
693            if (connectorName == null || queueName == null || timeout <= 0) {
694                throw new Exception(
695                        "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
696            }
697            if (pollInterval <= 0) {
698                pollInterval = 30;
699            }
700            LOG.info("Stop gracefully with connectorName:" + connectorName + " queueName:" + queueName + " timeout:"
701                    + timeout + " pollInterval:" + pollInterval);
702            TransportConnector connector;
703            for (int i = 0; i < transportConnectors.size(); i++) {
704                connector = transportConnectors.get(i);
705                if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
706                    connector.stop();
707                }
708            }
709            long start = System.currentTimeMillis();
710            while (System.currentTimeMillis() - start < timeout * 1000) {
711                // check quesize until it gets zero
712                if (checkQueueSize(queueName)) {
713                    stop();
714                    break;
715                } else {
716                    Thread.sleep(pollInterval * 1000);
717                }
718            }
719            if (stopped.get()) {
720                LOG.info("Successfully stop the broker.");
721            } else {
722                LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
723            }
724        }
725    }
726
727    /**
728     * A helper method to block the caller thread until the broker has been
729     * stopped
730     */
731    public void waitUntilStopped() {
732        while (isStarted() && !stopped.get()) {
733            try {
734                stoppedLatch.await();
735            } catch (InterruptedException e) {
736                // ignore
737            }
738        }
739    }
740
741    /**
742     * A helper method to block the caller thread until the broker has fully started
743     * @return boolean true if wait succeeded false if broker was not started or was stopped
744     */
745    public boolean waitUntilStarted() {
746        boolean waitSucceeded = false;
747        while (isStarted() && !stopped.get() && !waitSucceeded) {
748            try {
749                waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
750            } catch (InterruptedException ignore) {
751            }
752        }
753        return waitSucceeded;
754    }
755
756    // Properties
757    // -------------------------------------------------------------------------
758    /**
759     * Returns the message broker
760     */
761    public Broker getBroker() throws Exception {
762        if (broker == null) {
763            LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker ("
764                    + getBrokerName() + ") is starting");
765            LOG.info("For help or more information please see: http://activemq.apache.org/");
766            broker = createBroker();
767        }
768        return broker;
769    }
770
771    /**
772     * Returns the administration view of the broker; used to create and destroy
773     * resources such as queues and topics. Note this method returns null if JMX
774     * is disabled.
775     */
776    public BrokerView getAdminView() throws Exception {
777        if (adminView == null) {
778            // force lazy creation
779            getBroker();
780        }
781        return adminView;
782    }
783
784    public void setAdminView(BrokerView adminView) {
785        this.adminView = adminView;
786    }
787
788    public String getBrokerName() {
789        return brokerName;
790    }
791
792    /**
793     * Sets the name of this broker; which must be unique in the network
794     * 
795     * @param brokerName
796     */
797    public void setBrokerName(String brokerName) {
798        if (brokerName == null) {
799            throw new NullPointerException("The broker name cannot be null");
800        }
801        String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_");
802        if (!str.equals(brokerName)) {
803            LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str);
804        }
805        this.brokerName = str.trim();
806    }
807
808    public PersistenceAdapterFactory getPersistenceFactory() {
809        return persistenceFactory;
810    }
811
812    public File getDataDirectoryFile() {
813        if (dataDirectoryFile == null) {
814            dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
815        }
816        return dataDirectoryFile;
817    }
818
819    public File getBrokerDataDirectory() {
820        String brokerDir = getBrokerName();
821        return new File(getDataDirectoryFile(), brokerDir);
822    }
823
824    /**
825     * Sets the directory in which the data files will be stored by default for
826     * the JDBC and Journal persistence adaptors.
827     * 
828     * @param dataDirectory
829     *            the directory to store data files
830     */
831    public void setDataDirectory(String dataDirectory) {
832        setDataDirectoryFile(new File(dataDirectory));
833    }
834
835    /**
836     * Sets the directory in which the data files will be stored by default for
837     * the JDBC and Journal persistence adaptors.
838     * 
839     * @param dataDirectoryFile
840     *            the directory to store data files
841     */
842    public void setDataDirectoryFile(File dataDirectoryFile) {
843        this.dataDirectoryFile = dataDirectoryFile;
844    }
845
846    /**
847     * @return the tmpDataDirectory
848     */
849    public File getTmpDataDirectory() {
850        if (tmpDataDirectory == null) {
851            tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
852        }
853        return tmpDataDirectory;
854    }
855
856    /**
857     * @param tmpDataDirectory
858     *            the tmpDataDirectory to set
859     */
860    public void setTmpDataDirectory(File tmpDataDirectory) {
861        this.tmpDataDirectory = tmpDataDirectory;
862    }
863
864    public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
865        this.persistenceFactory = persistenceFactory;
866    }
867
868    public void setDestinationFactory(DestinationFactory destinationFactory) {
869        this.destinationFactory = destinationFactory;
870    }
871
872    public boolean isPersistent() {
873        return persistent;
874    }
875
876    /**
877     * Sets whether or not persistence is enabled or disabled.
878     */
879    public void setPersistent(boolean persistent) {
880        this.persistent = persistent;
881    }
882
883    public boolean isPopulateJMSXUserID() {
884        return populateJMSXUserID;
885    }
886
887    /**
888     * Sets whether or not the broker should populate the JMSXUserID header.
889     */
890    public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
891        this.populateJMSXUserID = populateJMSXUserID;
892    }
893
894    public SystemUsage getSystemUsage() {
895        try {
896            if (systemUsage == null) {
897                systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
898                systemUsage.setExecutor(getExecutor());
899                systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default
900                                                                         // 64
901                                                                         // Meg
902                systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 100); // 10
903                                                                                // Gb
904                systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100
905                                                                                 // GB
906                addService(this.systemUsage);
907            }
908            return systemUsage;
909        } catch (IOException e) {
910            LOG.error("Cannot create SystemUsage", e);
911            throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage());
912        }
913    }
914
915    public void setSystemUsage(SystemUsage memoryManager) {
916        if (this.systemUsage != null) {
917            removeService(this.systemUsage);
918        }
919        this.systemUsage = memoryManager;
920        if (this.systemUsage.getExecutor()==null) {
921            this.systemUsage.setExecutor(getExecutor());
922        }
923        addService(this.systemUsage);
924    }
925
926    /**
927     * @return the consumerUsageManager
928     * @throws IOException
929     */
930    public SystemUsage getConsumerSystemUsage() throws IOException {
931        if (this.consumerSystemUsaage == null) {
932            if (splitSystemUsageForProducersConsumers) {
933                this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
934                float portion = consumerSystemUsagePortion / 100f;
935                this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
936                addService(this.consumerSystemUsaage);
937            } else {
938                consumerSystemUsaage = getSystemUsage();
939            }
940        }
941        return this.consumerSystemUsaage;
942    }
943
944    /**
945     * @param consumerSystemUsaage
946     *            the storeSystemUsage to set
947     */
948    public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
949        if (this.consumerSystemUsaage != null) {
950            removeService(this.consumerSystemUsaage);
951        }
952        this.consumerSystemUsaage = consumerSystemUsaage;
953        addService(this.consumerSystemUsaage);
954    }
955
956    /**
957     * @return the producerUsageManager
958     * @throws IOException
959     */
960    public SystemUsage getProducerSystemUsage() throws IOException {
961        if (producerSystemUsage == null) {
962            if (splitSystemUsageForProducersConsumers) {
963                producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
964                float portion = producerSystemUsagePortion / 100f;
965                producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
966                addService(producerSystemUsage);
967            } else {
968                producerSystemUsage = getSystemUsage();
969            }
970        }
971        return producerSystemUsage;
972    }
973
974    /**
975     * @param producerUsageManager
976     *            the producerUsageManager to set
977     */
978    public void setProducerSystemUsage(SystemUsage producerUsageManager) {
979        if (this.producerSystemUsage != null) {
980            removeService(this.producerSystemUsage);
981        }
982        this.producerSystemUsage = producerUsageManager;
983        addService(this.producerSystemUsage);
984    }
985
986    public PersistenceAdapter getPersistenceAdapter() throws IOException {
987        if (persistenceAdapter == null) {
988            persistenceAdapter = createPersistenceAdapter();
989            configureService(persistenceAdapter);
990            this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
991        }
992        return persistenceAdapter;
993    }
994
995    /**
996     * Sets the persistence adaptor implementation to use for this broker
997     * 
998     * @throws IOException
999     */
1000    public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
1001        this.persistenceAdapter = persistenceAdapter;
1002        configureService(this.persistenceAdapter);
1003        this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1004    }
1005
1006    public TaskRunnerFactory getTaskRunnerFactory() {
1007        if (this.taskRunnerFactory == null) {
1008            this.taskRunnerFactory = new TaskRunnerFactory("BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
1009                    isDedicatedTaskRunner());
1010        }
1011        return this.taskRunnerFactory;
1012    }
1013
1014    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
1015        this.taskRunnerFactory = taskRunnerFactory;
1016    }
1017
1018    public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
1019        if (taskRunnerFactory == null) {
1020            persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
1021                    true, 1000, isDedicatedTaskRunner());
1022        }
1023        return persistenceTaskRunnerFactory;
1024    }
1025
1026    public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
1027        this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
1028    }
1029
1030    public boolean isUseJmx() {
1031        return useJmx;
1032    }
1033
1034    public boolean isEnableStatistics() {
1035        return enableStatistics;
1036    }
1037
1038    /**
1039     * Sets whether or not the Broker's services enable statistics or not.
1040     */
1041    public void setEnableStatistics(boolean enableStatistics) {
1042        this.enableStatistics = enableStatistics;
1043    }
1044
1045    /**
1046     * Sets whether or not the Broker's services should be exposed into JMX or
1047     * not.
1048     */
1049    public void setUseJmx(boolean useJmx) {
1050        this.useJmx = useJmx;
1051    }
1052
1053    public ObjectName getBrokerObjectName() throws IOException {
1054        if (brokerObjectName == null) {
1055            brokerObjectName = createBrokerObjectName();
1056        }
1057        return brokerObjectName;
1058    }
1059
1060    /**
1061     * Sets the JMX ObjectName for this broker
1062     */
1063    public void setBrokerObjectName(ObjectName brokerObjectName) {
1064        this.brokerObjectName = brokerObjectName;
1065    }
1066
1067    public ManagementContext getManagementContext() {
1068        if (managementContext == null) {
1069            managementContext = new ManagementContext();
1070        }
1071        return managementContext;
1072    }
1073
1074    public void setManagementContext(ManagementContext managementContext) {
1075        this.managementContext = managementContext;
1076    }
1077
1078    public NetworkConnector getNetworkConnectorByName(String connectorName) {
1079        for (NetworkConnector connector : networkConnectors) {
1080            if (connector.getName().equals(connectorName)) {
1081                return connector;
1082            }
1083        }
1084        return null;
1085    }
1086
1087    public String[] getNetworkConnectorURIs() {
1088        return networkConnectorURIs;
1089    }
1090
1091    public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
1092        this.networkConnectorURIs = networkConnectorURIs;
1093    }
1094
1095    public TransportConnector getConnectorByName(String connectorName) {
1096        for (TransportConnector connector : transportConnectors) {
1097            if (connector.getName().equals(connectorName)) {
1098                return connector;
1099            }
1100        }
1101        return null;
1102    }
1103    
1104    public Map<String, String> getTransportConnectorURIsAsMap() {
1105        Map<String, String> answer = new HashMap<String, String>();
1106        for (TransportConnector connector : transportConnectors) {
1107            try {
1108                URI uri = connector.getConnectUri();
1109                String scheme = uri.getScheme();
1110                if (scheme != null) {
1111                    answer.put(scheme.toLowerCase(), uri.toString());
1112                }
1113            } catch (Exception e) {
1114                LOG.debug("Failed to read URI to build transportURIsAsMap", e);
1115            }
1116        }
1117        return answer;
1118    }
1119
1120    public String[] getTransportConnectorURIs() {
1121        return transportConnectorURIs;
1122    }
1123
1124    public void setTransportConnectorURIs(String[] transportConnectorURIs) {
1125        this.transportConnectorURIs = transportConnectorURIs;
1126    }
1127
1128    /**
1129     * @return Returns the jmsBridgeConnectors.
1130     */
1131    public JmsConnector[] getJmsBridgeConnectors() {
1132        return jmsBridgeConnectors;
1133    }
1134
1135    /**
1136     * @param jmsConnectors
1137     *            The jmsBridgeConnectors to set.
1138     */
1139    public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
1140        this.jmsBridgeConnectors = jmsConnectors;
1141    }
1142
1143    public Service[] getServices() {
1144        return services.toArray(new Service[0]);
1145    }
1146
1147    /**
1148     * Sets the services associated with this broker such as a
1149     * {@link MasterConnector}
1150     */
1151    public void setServices(Service[] services) {
1152        this.services.clear();
1153        if (services != null) {
1154            for (int i = 0; i < services.length; i++) {
1155                this.services.add(services[i]);
1156            }
1157        }
1158    }
1159
1160    /**
1161     * Adds a new service so that it will be started as part of the broker
1162     * lifecycle
1163     */
1164    public void addService(Service service) {
1165        services.add(service);
1166    }
1167
1168    public void removeService(Service service) {
1169        services.remove(service);
1170    }
1171
1172    public boolean isUseLoggingForShutdownErrors() {
1173        return useLoggingForShutdownErrors;
1174    }
1175
1176    /**
1177     * Sets whether or not we should use commons-logging when reporting errors
1178     * when shutting down the broker
1179     */
1180    public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1181        this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1182    }
1183
1184    public boolean isUseShutdownHook() {
1185        return useShutdownHook;
1186    }
1187
1188    /**
1189     * Sets whether or not we should use a shutdown handler to close down the
1190     * broker cleanly if the JVM is terminated. It is recommended you leave this
1191     * enabled.
1192     */
1193    public void setUseShutdownHook(boolean useShutdownHook) {
1194        this.useShutdownHook = useShutdownHook;
1195    }
1196
1197    public boolean isAdvisorySupport() {
1198        return advisorySupport;
1199    }
1200
1201    /**
1202     * Allows the support of advisory messages to be disabled for performance
1203     * reasons.
1204     */
1205    public void setAdvisorySupport(boolean advisorySupport) {
1206        this.advisorySupport = advisorySupport;
1207    }
1208
1209    public List<TransportConnector> getTransportConnectors() {
1210        return new ArrayList<TransportConnector>(transportConnectors);
1211    }
1212
1213    /**
1214     * Sets the transport connectors which this broker will listen on for new
1215     * clients
1216     * 
1217     * @org.apache.xbean.Property 
1218     *                            nestedType="org.apache.activemq.broker.TransportConnector"
1219     */
1220    public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1221        for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
1222            TransportConnector connector = iter.next();
1223            addConnector(connector);
1224        }
1225    }
1226
1227    public List<NetworkConnector> getNetworkConnectors() {
1228        return new ArrayList<NetworkConnector>(networkConnectors);
1229    }
1230
1231    public List<ProxyConnector> getProxyConnectors() {
1232        return new ArrayList<ProxyConnector>(proxyConnectors);
1233    }
1234
1235    /**
1236     * Sets the network connectors which this broker will use to connect to
1237     * other brokers in a federated network
1238     * 
1239     * @org.apache.xbean.Property 
1240     *                            nestedType="org.apache.activemq.network.NetworkConnector"
1241     */
1242    public void setNetworkConnectors(List networkConnectors) throws Exception {
1243        for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) {
1244            NetworkConnector connector = (NetworkConnector) iter.next();
1245            addNetworkConnector(connector);
1246        }
1247    }
1248
1249    /**
1250     * Sets the network connectors which this broker will use to connect to
1251     * other brokers in a federated network
1252     */
1253    public void setProxyConnectors(List proxyConnectors) throws Exception {
1254        for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) {
1255            ProxyConnector connector = (ProxyConnector) iter.next();
1256            addProxyConnector(connector);
1257        }
1258    }
1259
1260    public PolicyMap getDestinationPolicy() {
1261        return destinationPolicy;
1262    }
1263
1264    /**
1265     * Sets the destination specific policies available either for exact
1266     * destinations or for wildcard areas of destinations.
1267     */
1268    public void setDestinationPolicy(PolicyMap policyMap) {
1269        this.destinationPolicy = policyMap;
1270    }
1271
1272    public BrokerPlugin[] getPlugins() {
1273        return plugins;
1274    }
1275
1276    /**
1277     * Sets a number of broker plugins to install such as for security
1278     * authentication or authorization
1279     */
1280    public void setPlugins(BrokerPlugin[] plugins) {
1281        this.plugins = plugins;
1282    }
1283
1284    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1285        return messageAuthorizationPolicy;
1286    }
1287
1288    /**
1289     * Sets the policy used to decide if the current connection is authorized to
1290     * consume a given message
1291     */
1292    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1293        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1294    }
1295
1296    /**
1297     * Delete all messages from the persistent store
1298     * 
1299     * @throws IOException
1300     */
1301    public void deleteAllMessages() throws IOException {
1302        getPersistenceAdapter().deleteAllMessages();
1303    }
1304
1305    public boolean isDeleteAllMessagesOnStartup() {
1306        return deleteAllMessagesOnStartup;
1307    }
1308
1309    /**
1310     * Sets whether or not all messages are deleted on startup - mostly only
1311     * useful for testing.
1312     */
1313    public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1314        this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1315    }
1316
1317    public URI getVmConnectorURI() {
1318        if (vmConnectorURI == null) {
1319            try {
1320                vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"));
1321            } catch (URISyntaxException e) {
1322                LOG.error("Badly formed URI from " + getBrokerName(), e);
1323            }
1324        }
1325        return vmConnectorURI;
1326    }
1327
1328    public void setVmConnectorURI(URI vmConnectorURI) {
1329        this.vmConnectorURI = vmConnectorURI;
1330    }
1331    
1332    public String getDefaultSocketURIString() {
1333       
1334            if (started.get()) {
1335                if (this.defaultSocketURIString ==null) {
1336                    for (TransportConnector tc:this.transportConnectors) {
1337                        String result = null;
1338                        try {
1339                            result = tc.getPublishableConnectString();
1340                        } catch (Exception e) {
1341                          LOG.warn("Failed to get the ConnectURI for "+tc,e);
1342                        }
1343                        if (result != null) {
1344                            this.defaultSocketURIString =result;
1345                            break;
1346                        }
1347                    }
1348                }
1349                return this.defaultSocketURIString;
1350            }
1351       return null;
1352    }
1353
1354    /**
1355     * @return Returns the shutdownOnMasterFailure.
1356     */
1357    public boolean isShutdownOnMasterFailure() {
1358        return shutdownOnMasterFailure;
1359    }
1360
1361    /**
1362     * @param shutdownOnMasterFailure
1363     *            The shutdownOnMasterFailure to set.
1364     */
1365    public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1366        this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1367    }
1368
1369    public boolean isKeepDurableSubsActive() {
1370        return keepDurableSubsActive;
1371    }
1372
1373    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1374        this.keepDurableSubsActive = keepDurableSubsActive;
1375    }
1376
1377    public boolean isUseVirtualTopics() {
1378        return useVirtualTopics;
1379    }
1380
1381    /**
1382     * Sets whether or not <a
1383     * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1384     * Topics</a> should be supported by default if they have not been
1385     * explicitly configured.
1386     */
1387    public void setUseVirtualTopics(boolean useVirtualTopics) {
1388        this.useVirtualTopics = useVirtualTopics;
1389    }
1390
1391    public DestinationInterceptor[] getDestinationInterceptors() {
1392        return destinationInterceptors;
1393    }
1394
1395    public boolean isUseMirroredQueues() {
1396        return useMirroredQueues;
1397    }
1398
1399    /**
1400     * Sets whether or not <a
1401     * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1402     * Queues</a> should be supported by default if they have not been
1403     * explicitly configured.
1404     */
1405    public void setUseMirroredQueues(boolean useMirroredQueues) {
1406        this.useMirroredQueues = useMirroredQueues;
1407    }
1408
1409    /**
1410     * Sets the destination interceptors to use
1411     */
1412    public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1413        this.destinationInterceptors = destinationInterceptors;
1414    }
1415
1416    public ActiveMQDestination[] getDestinations() {
1417        return destinations;
1418    }
1419
1420    /**
1421     * Sets the destinations which should be loaded/created on startup
1422     */
1423    public void setDestinations(ActiveMQDestination[] destinations) {
1424        this.destinations = destinations;
1425    }
1426
1427    /**
1428     * @return the tempDataStore
1429     */
1430    public synchronized PListStore getTempDataStore() {
1431        if (tempDataStore == null) {
1432            if (!isPersistent()) {
1433                return null;
1434            }
1435            boolean result = true;
1436            boolean empty = true;
1437            try {
1438                File directory = getTmpDataDirectory();
1439                if (directory.exists() && directory.isDirectory()) {
1440                    File[] files = directory.listFiles();
1441                    if (files != null && files.length > 0) {
1442                        empty = false;
1443                        for (int i = 0; i < files.length; i++) {
1444                            File file = files[i];
1445                            if (!file.isDirectory()) {
1446                                result &= file.delete();
1447                            }
1448                        }
1449                    }
1450                }
1451                if (!empty) {
1452                    String str = result ? "Successfully deleted" : "Failed to delete";
1453                    LOG.info(str + " temporary storage");
1454                }
1455                this.tempDataStore = new PListStore();
1456                this.tempDataStore.setDirectory(getTmpDataDirectory());
1457                this.tempDataStore.start();
1458            } catch (Exception e) {
1459                throw new RuntimeException(e);
1460            }
1461        }
1462        return tempDataStore;
1463    }
1464
1465    /**
1466     * @param tempDataStore
1467     *            the tempDataStore to set
1468     */
1469    public void setTempDataStore(PListStore tempDataStore) {
1470        this.tempDataStore = tempDataStore;
1471    }
1472
1473    public int getPersistenceThreadPriority() {
1474        return persistenceThreadPriority;
1475    }
1476
1477    public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1478        this.persistenceThreadPriority = persistenceThreadPriority;
1479    }
1480
1481    /**
1482     * @return the useLocalHostBrokerName
1483     */
1484    public boolean isUseLocalHostBrokerName() {
1485        return this.useLocalHostBrokerName;
1486    }
1487
1488    /**
1489     * @param useLocalHostBrokerName
1490     *            the useLocalHostBrokerName to set
1491     */
1492    public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1493        this.useLocalHostBrokerName = useLocalHostBrokerName;
1494        if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1495            brokerName = LOCAL_HOST_NAME;
1496        }
1497    }
1498
1499    /**
1500     * @return the supportFailOver
1501     */
1502    public boolean isSupportFailOver() {
1503        return this.supportFailOver;
1504    }
1505
1506    /**
1507     * @param supportFailOver
1508     *            the supportFailOver to set
1509     */
1510    public void setSupportFailOver(boolean supportFailOver) {
1511        this.supportFailOver = supportFailOver;
1512    }
1513
1514    /**
1515     * Looks up and lazily creates if necessary the destination for the given
1516     * JMS name
1517     */
1518    public Destination getDestination(ActiveMQDestination destination) throws Exception {
1519        return getBroker().addDestination(getAdminConnectionContext(), destination,false);
1520    }
1521
1522    public void removeDestination(ActiveMQDestination destination) throws Exception {
1523        getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
1524    }
1525
1526    public int getProducerSystemUsagePortion() {
1527        return producerSystemUsagePortion;
1528    }
1529
1530    public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1531        this.producerSystemUsagePortion = producerSystemUsagePortion;
1532    }
1533
1534    public int getConsumerSystemUsagePortion() {
1535        return consumerSystemUsagePortion;
1536    }
1537
1538    public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1539        this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1540    }
1541
1542    public boolean isSplitSystemUsageForProducersConsumers() {
1543        return splitSystemUsageForProducersConsumers;
1544    }
1545
1546    public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
1547        this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1548    }
1549
1550    public boolean isMonitorConnectionSplits() {
1551        return monitorConnectionSplits;
1552    }
1553
1554    public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1555        this.monitorConnectionSplits = monitorConnectionSplits;
1556    }
1557
1558    public int getTaskRunnerPriority() {
1559        return taskRunnerPriority;
1560    }
1561
1562    public void setTaskRunnerPriority(int taskRunnerPriority) {
1563        this.taskRunnerPriority = taskRunnerPriority;
1564    }
1565
1566    public boolean isDedicatedTaskRunner() {
1567        return dedicatedTaskRunner;
1568    }
1569
1570    public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1571        this.dedicatedTaskRunner = dedicatedTaskRunner;
1572    }
1573
1574    public boolean isCacheTempDestinations() {
1575        return cacheTempDestinations;
1576    }
1577
1578    public void setCacheTempDestinations(boolean cacheTempDestinations) {
1579        this.cacheTempDestinations = cacheTempDestinations;
1580    }
1581
1582    public int getTimeBeforePurgeTempDestinations() {
1583        return timeBeforePurgeTempDestinations;
1584    }
1585
1586    public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
1587        this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1588    }
1589
1590    public boolean isUseTempMirroredQueues() {
1591        return useTempMirroredQueues;
1592    }
1593
1594    public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1595        this.useTempMirroredQueues = useTempMirroredQueues;
1596    }
1597
1598    //
1599    // Implementation methods
1600    // -------------------------------------------------------------------------
1601    /**
1602     * Handles any lazy-creation helper properties which are added to make
1603     * things easier to configure inside environments such as Spring
1604     * 
1605     * @throws Exception
1606     */
1607    protected void processHelperProperties() throws Exception {
1608        boolean masterServiceExists = false;
1609        if (transportConnectorURIs != null) {
1610            for (int i = 0; i < transportConnectorURIs.length; i++) {
1611                String uri = transportConnectorURIs[i];
1612                addConnector(uri);
1613            }
1614        }
1615        if (networkConnectorURIs != null) {
1616            for (int i = 0; i < networkConnectorURIs.length; i++) {
1617                String uri = networkConnectorURIs[i];
1618                addNetworkConnector(uri);
1619            }
1620        }
1621        if (jmsBridgeConnectors != null) {
1622            for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1623                addJmsConnector(jmsBridgeConnectors[i]);
1624            }
1625        }
1626        for (Service service : services) {
1627            if (service instanceof MasterConnector) {
1628                masterServiceExists = true;
1629                break;
1630            }
1631        }
1632        if (masterConnectorURI != null) {
1633            if (masterServiceExists) {
1634                throw new IllegalStateException(
1635                        "Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
1636            } else {
1637                addService(new MasterConnector(masterConnectorURI));
1638            }
1639        }
1640    }
1641
1642    public void stopAllConnectors(ServiceStopper stopper) {
1643        for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
1644            NetworkConnector connector = iter.next();
1645            unregisterNetworkConnectorMBean(connector);
1646            stopper.stop(connector);
1647        }
1648        for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
1649            ProxyConnector connector = iter.next();
1650            stopper.stop(connector);
1651        }
1652        for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
1653            JmsConnector connector = iter.next();
1654            stopper.stop(connector);
1655        }
1656        for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
1657            TransportConnector connector = iter.next();
1658            stopper.stop(connector);
1659        }
1660    }
1661
1662    protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
1663        try {
1664            ObjectName objectName = createConnectorObjectName(connector);
1665            connector = connector.asManagedConnector(getManagementContext(), objectName);
1666            ConnectorViewMBean view = new ConnectorView(connector);
1667            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1668            return connector;
1669        } catch (Throwable e) {
1670            throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
1671        }
1672    }
1673
1674    protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
1675        if (isUseJmx()) {
1676            try {
1677                ObjectName objectName = createConnectorObjectName(connector);
1678                getManagementContext().unregisterMBean(objectName);
1679            } catch (Throwable e) {
1680                throw IOExceptionSupport.create(
1681                        "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
1682            }
1683        }
1684    }
1685
1686    protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1687        return adaptor;
1688    }
1689
1690    protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1691        if (isUseJmx()) {
1692        }
1693    }
1694
1695    private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
1696        return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1697                + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector," + "ConnectorName="
1698                + JMXSupport.encodeObjectNamePart(connector.getName()));
1699    }
1700
1701    protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
1702        NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
1703        try {
1704            ObjectName objectName = createNetworkConnectorObjectName(connector);
1705            connector.setObjectName(objectName);
1706            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1707        } catch (Throwable e) {
1708            throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
1709        }
1710    }
1711
1712    protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector)
1713            throws MalformedObjectNameException {
1714        return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1715                + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
1716                + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1717    }
1718
1719
1720    public ObjectName createDuplexNetworkConnectorObjectName(String transport)
1721            throws MalformedObjectNameException {
1722        return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1723                + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
1724                + "NetworkConnectorName=duplex" + JMXSupport.encodeObjectNamePart(transport));
1725    }
1726
1727    protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
1728        if (isUseJmx()) {
1729            try {
1730                ObjectName objectName = createNetworkConnectorObjectName(connector);
1731                getManagementContext().unregisterMBean(objectName);
1732            } catch (Exception e) {
1733                LOG.error("Network Connector could not be unregistered from JMX: " + e, e);
1734            }
1735        }
1736    }
1737
1738    protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
1739        ProxyConnectorView view = new ProxyConnectorView(connector);
1740        try {
1741            ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1742                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=ProxyConnector,"
1743                    + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1744            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1745        } catch (Throwable e) {
1746            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1747        }
1748    }
1749
1750    protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
1751        FTConnectorView view = new FTConnectorView(connector);
1752        try {
1753            ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1754                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector");
1755            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1756        } catch (Throwable e) {
1757            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1758        }
1759    }
1760
1761    protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
1762        JmsConnectorView view = new JmsConnectorView(connector);
1763        try {
1764            ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1765                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=JmsConnector,"
1766                    + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1767            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1768        } catch (Throwable e) {
1769            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1770        }
1771    }
1772
1773    /**
1774     * Factory method to create a new broker
1775     * 
1776     * @throws Exception
1777     * @throws
1778     * @throws
1779     */
1780    protected Broker createBroker() throws Exception {
1781        regionBroker = createRegionBroker();
1782        Broker broker = addInterceptors(regionBroker);
1783        // Add a filter that will stop access to the broker once stopped
1784        broker = new MutableBrokerFilter(broker) {
1785            Broker old;
1786
1787            @Override
1788            public void stop() throws Exception {
1789                old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
1790                    // Just ignore additional stop actions.
1791                    @Override
1792                    public void stop() throws Exception {
1793                    }
1794                });
1795                old.stop();
1796            }
1797
1798            @Override
1799            public void start() throws Exception {
1800                if (forceStart && old != null) {
1801                    this.next.set(old);
1802                }
1803                getNext().start();
1804            }
1805        };
1806        return broker;
1807    }
1808
1809    /**
1810     * Factory method to create the core region broker onto which interceptors
1811     * are added
1812     * 
1813     * @throws Exception
1814     */
1815    protected Broker createRegionBroker() throws Exception {
1816        if (destinationInterceptors == null) {
1817            destinationInterceptors = createDefaultDestinationInterceptor();
1818        }
1819        configureServices(destinationInterceptors);
1820        DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
1821        if (destinationFactory == null) {
1822            destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
1823        }
1824        return createRegionBroker(destinationInterceptor);
1825    }
1826
1827    protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
1828        RegionBroker regionBroker;
1829        if (isUseJmx()) {
1830            regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
1831                    getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
1832        } else {
1833            regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
1834                    destinationInterceptor,getScheduler(),getExecutor());
1835        }
1836        destinationFactory.setRegionBroker(regionBroker);
1837        regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
1838        regionBroker.setBrokerName(getBrokerName());
1839        regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
1840        if (brokerId != null) {
1841            regionBroker.setBrokerId(brokerId);
1842        }
1843        return regionBroker;
1844    }
1845
1846    /**
1847     * Create the default destination interceptor
1848     */
1849    protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
1850        List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
1851        if (isUseVirtualTopics()) {
1852            VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
1853            VirtualTopic virtualTopic = new VirtualTopic();
1854            virtualTopic.setName("VirtualTopic.>");
1855            VirtualDestination[] virtualDestinations = { virtualTopic };
1856            interceptor.setVirtualDestinations(virtualDestinations);
1857            answer.add(interceptor);
1858        }
1859        if (isUseMirroredQueues()) {
1860            MirroredQueue interceptor = new MirroredQueue();
1861            answer.add(interceptor);
1862        }
1863        DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
1864        answer.toArray(array);
1865        return array;
1866    }
1867
1868    /**
1869     * Strategy method to add interceptors to the broker
1870     * 
1871     * @throws IOException
1872     */
1873    protected Broker addInterceptors(Broker broker) throws Exception {
1874        if (isSchedulerSupport()) {
1875            SchedulerBroker sb = new SchedulerBroker(broker, getSchedulerDirectoryFile());
1876            if (isUseJmx()) {
1877                JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
1878                try {
1879                    ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":"
1880                            + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
1881                            + "Type=jobScheduler," + "jobSchedulerName=JMS");
1882
1883                    AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1884                    this.adminView.setJMSJobScheduler(objectName);
1885                } catch (Throwable e) {
1886                    throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
1887                            + e.getMessage(), e);
1888                }
1889
1890            }
1891            broker = sb;
1892        }
1893        if (isAdvisorySupport()) {
1894            broker = new AdvisoryBroker(broker);
1895        }
1896        broker = new CompositeDestinationBroker(broker);
1897        broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
1898        if (isPopulateJMSXUserID()) {
1899            UserIDBroker userIDBroker = new UserIDBroker(broker);
1900            userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID());
1901            broker = userIDBroker;
1902        }
1903        if (isMonitorConnectionSplits()) {
1904            broker = new ConnectionSplitBroker(broker);
1905        }
1906        if (plugins != null) {
1907            for (int i = 0; i < plugins.length; i++) {
1908                BrokerPlugin plugin = plugins[i];
1909                broker = plugin.installPlugin(broker);
1910            }
1911        }
1912        return broker;
1913    }
1914
1915    protected PersistenceAdapter createPersistenceAdapter() throws IOException {
1916        if (isPersistent()) {
1917            PersistenceAdapterFactory fac = getPersistenceFactory();
1918            if (fac != null) {
1919                return fac.createPersistenceAdapter();
1920            }else {
1921                KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
1922                File dir = new File(getBrokerDataDirectory(),"KahaDB");
1923                adaptor.setDirectory(dir);
1924                return adaptor;
1925            }
1926        } else {
1927            return new MemoryPersistenceAdapter();
1928        }
1929    }
1930
1931    protected ObjectName createBrokerObjectName() throws IOException {
1932        try {
1933            return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1934                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker");
1935        } catch (Throwable e) {
1936            throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e);
1937        }
1938    }
1939
1940    protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
1941        TransportServer transport = TransportFactory.bind(this, brokerURI);
1942        return new TransportConnector(transport);
1943    }
1944
1945    /**
1946     * Extracts the port from the options
1947     */
1948    protected Object getPort(Map options) {
1949        Object port = options.get("port");
1950        if (port == null) {
1951            port = DEFAULT_PORT;
1952            LOG.warn("No port specified so defaulting to: " + port);
1953        }
1954        return port;
1955    }
1956
1957    protected void addShutdownHook() {
1958        if (useShutdownHook) {
1959            shutdownHook = new Thread("ActiveMQ ShutdownHook") {
1960                @Override
1961                public void run() {
1962                    containerShutdown();
1963                }
1964            };
1965            Runtime.getRuntime().addShutdownHook(shutdownHook);
1966        }
1967    }
1968
1969    protected void removeShutdownHook() {
1970        if (shutdownHook != null) {
1971            try {
1972                Runtime.getRuntime().removeShutdownHook(shutdownHook);
1973            } catch (Exception e) {
1974                LOG.debug("Caught exception, must be shutting down: " + e);
1975            }
1976        }
1977    }
1978
1979    /**
1980     * Sets hooks to be executed when broker shut down
1981     * 
1982     * @org.apache.xbean.Property
1983     */
1984    public void setShutdownHooks(List<Runnable> hooks) throws Exception {
1985        for (Runnable hook : hooks) {
1986            addShutdownHook(hook);
1987        }
1988    }
1989    
1990    /**
1991     * Causes a clean shutdown of the container when the VM is being shut down
1992     */
1993    protected void containerShutdown() {
1994        try {
1995            stop();
1996        } catch (IOException e) {
1997            Throwable linkedException = e.getCause();
1998            if (linkedException != null) {
1999                logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
2000            } else {
2001                logError("Failed to shut down: " + e, e);
2002            }
2003            if (!useLoggingForShutdownErrors) {
2004                e.printStackTrace(System.err);
2005            }
2006        } catch (Exception e) {
2007            logError("Failed to shut down: " + e, e);
2008        }
2009    }
2010
2011    protected void logError(String message, Throwable e) {
2012        if (useLoggingForShutdownErrors) {
2013            LOG.error("Failed to shut down: " + e);
2014        } else {
2015            System.err.println("Failed to shut down: " + e);
2016        }
2017    }
2018
2019    /**
2020     * Starts any configured destinations on startup
2021     */
2022    protected void startDestinations() throws Exception {
2023        if (destinations != null) {
2024            ConnectionContext adminConnectionContext = getAdminConnectionContext();
2025            for (int i = 0; i < destinations.length; i++) {
2026                ActiveMQDestination destination = destinations[i];
2027                getBroker().addDestination(adminConnectionContext, destination,true);
2028            }
2029        }
2030    }
2031
2032    /**
2033     * Returns the broker's administration connection context used for
2034     * configuring the broker at startup
2035     */
2036    public ConnectionContext getAdminConnectionContext() throws Exception {
2037        return BrokerSupport.getConnectionContext(getBroker());
2038    }
2039
2040    protected void waitForSlave() {
2041        try {
2042            if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) {
2043                throw new IllegalStateException("Gave up waiting for slave to start after " + waitForSlaveTimeout + " milliseconds."); 
2044            }
2045        } catch (InterruptedException e) {
2046            LOG.error("Exception waiting for slave:" + e);
2047        }
2048    }
2049
2050    protected void slaveConnectionEstablished() {
2051        slaveStartSignal.countDown();
2052    }
2053    
2054    protected void startManagementContext() throws Exception {
2055        getManagementContext().start();
2056        adminView = new BrokerView(this, null);
2057        ObjectName objectName = getBrokerObjectName();
2058        AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName);
2059    }
2060
2061    /**
2062     * Start all transport and network connections, proxies and bridges
2063     * 
2064     * @throws Exception
2065     */
2066    public void startAllConnectors() throws Exception {
2067        if (!isSlave()) {
2068            Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
2069            List<TransportConnector> al = new ArrayList<TransportConnector>();
2070            for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2071                TransportConnector connector = iter.next();
2072                connector.setBrokerService(this);
2073                al.add(startTransportConnector(connector));
2074            }
2075            if (al.size() > 0) {
2076                // let's clear the transportConnectors list and replace it with
2077                // the started transportConnector instances
2078                this.transportConnectors.clear();
2079                setTransportConnectors(al);
2080            }
2081            URI uri = getVmConnectorURI();
2082            Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
2083            map.put("network", "true");
2084            map.put("async", "false");
2085            uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
2086            if (isWaitForSlave()) {
2087                waitForSlave();
2088            }
2089            if (!stopped.get()) {
2090                ThreadPoolExecutor networkConnectorStartExecutor = null;
2091                if (isNetworkConnectorStartAsync()) {
2092                    // spin up as many threads as needed
2093                    networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
2094                            10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
2095                            new ThreadFactory() {
2096                                int count=0;
2097                                public Thread newThread(Runnable runnable) {
2098                                    Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
2099                                    thread.setDaemon(true);
2100                                    return thread;
2101                                }
2102                            });
2103                }
2104
2105                for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2106                    final NetworkConnector connector = iter.next();
2107                    connector.setLocalUri(uri);
2108                    connector.setBrokerName(getBrokerName());
2109                    connector.setDurableDestinations(durableDestinations);
2110                    if (getDefaultSocketURIString() != null) {
2111                        connector.setBrokerURL(getDefaultSocketURIString());
2112                    }
2113                    if (networkConnectorStartExecutor != null) {
2114                        final Map context = MDCHelper.getCopyOfContextMap();
2115                        networkConnectorStartExecutor.execute(new Runnable() {
2116                            public void run() {
2117                                try {
2118                                    MDCHelper.setContextMap(context);
2119                                    LOG.info("Async start of " + connector);
2120                                    connector.start();
2121                                } catch(Exception e) {
2122                                    LOG.error("Async start of network connector: " + connector + " failed", e);
2123                                }
2124                            }
2125                        });
2126                    } else {
2127                        connector.start();
2128                    }
2129                }
2130                if (networkConnectorStartExecutor != null) {
2131                    // executor done when enqueued tasks are complete
2132                    networkConnectorStartExecutor.shutdown();
2133                    networkConnectorStartExecutor = null;
2134                }
2135
2136                for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2137                    ProxyConnector connector = iter.next();
2138                    connector.start();
2139                }
2140                for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2141                    JmsConnector connector = iter.next();
2142                    connector.start();
2143                }
2144                for (Service service : services) {
2145                    configureService(service);
2146                    service.start();
2147                }
2148            }
2149        }
2150    }
2151
2152    protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
2153        connector.setTaskRunnerFactory(getTaskRunnerFactory());
2154        MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
2155        if (policy != null) {
2156            connector.setMessageAuthorizationPolicy(policy);
2157        }
2158        if (isUseJmx()) {
2159            connector = registerConnectorMBean(connector);
2160        }
2161        connector.getStatistics().setEnabled(enableStatistics);
2162        connector.start();
2163        return connector;
2164    }
2165
2166    /**
2167     * Perform any custom dependency injection
2168     */
2169    protected void configureServices(Object[] services) {
2170        for (Object service : services) {
2171            configureService(service);
2172        }
2173    }
2174
2175    /**
2176     * Perform any custom dependency injection
2177     */
2178    protected void configureService(Object service) {
2179        if (service instanceof BrokerServiceAware) {
2180            BrokerServiceAware serviceAware = (BrokerServiceAware) service;
2181            serviceAware.setBrokerService(this);
2182        }
2183        if (masterConnector == null) {
2184            if (service instanceof MasterConnector) {
2185                masterConnector = (MasterConnector) service;
2186                supportFailOver = true;
2187            }
2188        }
2189    }
2190    
2191    public void handleIOException(IOException exception) {
2192        if (ioExceptionHandler != null) {
2193            ioExceptionHandler.handle(exception);
2194         } else {
2195            LOG.info("Ignoring IO exception, " + exception, exception);
2196         }
2197    }
2198
2199    /**
2200     * Starts all destiantions in persistence store. This includes all inactive
2201     * destinations
2202     */
2203    protected void startDestinationsInPersistenceStore(Broker broker) throws Exception {
2204        Set destinations = destinationFactory.getDestinations();
2205        if (destinations != null) {
2206            Iterator iter = destinations.iterator();
2207            ConnectionContext adminConnectionContext = broker.getAdminConnectionContext();
2208            if (adminConnectionContext == null) {
2209                ConnectionContext context = new ConnectionContext();
2210                context.setBroker(broker);
2211                adminConnectionContext = context;
2212                broker.setAdminConnectionContext(adminConnectionContext);
2213            }
2214            while (iter.hasNext()) {
2215                ActiveMQDestination destination = (ActiveMQDestination) iter.next();
2216                broker.addDestination(adminConnectionContext, destination,false);
2217            }
2218        }
2219    }
2220    
2221    protected synchronized ThreadPoolExecutor getExecutor() {
2222        if (this.executor == null) {
2223        this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
2224            public Thread newThread(Runnable runnable) {
2225                Thread thread = new Thread(runnable, "Usage Async Task");
2226                thread.setDaemon(true);
2227                return thread;
2228            }
2229        });
2230        }
2231        return this.executor;
2232    }
2233    
2234    public synchronized Scheduler getScheduler() {
2235        if (this.scheduler==null) {
2236            this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
2237            try {
2238                this.scheduler.start();
2239            } catch (Exception e) {
2240               LOG.error("Failed to start Scheduler ",e);
2241            }
2242        }
2243        return this.scheduler;
2244    }
2245
2246    public Broker getRegionBroker() {
2247        return regionBroker;
2248    }
2249
2250    public void setRegionBroker(Broker regionBroker) {
2251        this.regionBroker = regionBroker;
2252    }
2253
2254    public void addShutdownHook(Runnable hook) {
2255        synchronized (shutdownHooks) {
2256            shutdownHooks.add(hook);
2257        }
2258    }
2259
2260    public void removeShutdownHook(Runnable hook) {
2261        synchronized (shutdownHooks) {
2262            shutdownHooks.remove(hook);
2263        }
2264    }
2265
2266    public boolean isSystemExitOnShutdown() {
2267        return systemExitOnShutdown;
2268    }
2269
2270    public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2271        this.systemExitOnShutdown = systemExitOnShutdown;
2272    }
2273
2274    public int getSystemExitOnShutdownExitCode() {
2275        return systemExitOnShutdownExitCode;
2276    }
2277
2278    public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2279        this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2280    }
2281
2282    public SslContext getSslContext() {
2283        return sslContext;
2284    }
2285
2286    public void setSslContext(SslContext sslContext) {
2287        this.sslContext = sslContext;
2288    }
2289
2290    public boolean isShutdownOnSlaveFailure() {
2291        return shutdownOnSlaveFailure;
2292    }
2293
2294    public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2295        this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2296    }
2297
2298    public boolean isWaitForSlave() {
2299        return waitForSlave;
2300    }
2301
2302    public void setWaitForSlave(boolean waitForSlave) {
2303        this.waitForSlave = waitForSlave;
2304    }
2305  
2306    public long getWaitForSlaveTimeout() {
2307        return this.waitForSlaveTimeout;
2308    }
2309    
2310    public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
2311        this.waitForSlaveTimeout = waitForSlaveTimeout;
2312    }
2313    
2314    public CountDownLatch getSlaveStartSignal() {
2315        return slaveStartSignal;
2316    }
2317
2318    /**
2319     * Get the passiveSlave
2320     * @return the passiveSlave
2321     */
2322    public boolean isPassiveSlave() {
2323        return this.passiveSlave;
2324    }
2325
2326    /**
2327     * Set the passiveSlave
2328     * @param passiveSlave the passiveSlave to set
2329     */
2330    public void setPassiveSlave(boolean passiveSlave) {
2331        this.passiveSlave = passiveSlave;
2332    }
2333
2334    /**
2335     * override the Default IOException handler, called when persistence adapter
2336     * has experiences File or JDBC I/O Exceptions
2337     *
2338     * @param ioExceptionHandler
2339     */
2340    public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
2341        configureService(ioExceptionHandler);
2342        this.ioExceptionHandler = ioExceptionHandler;
2343    }
2344
2345    public IOExceptionHandler getIoExceptionHandler() {
2346        return ioExceptionHandler;
2347    }
2348
2349    /**
2350     * @return the schedulerSupport
2351     */
2352    public boolean isSchedulerSupport() {
2353        return this.schedulerSupport;
2354    }
2355
2356    /**
2357     * @param schedulerSupport the schedulerSupport to set
2358     */
2359    public void setSchedulerSupport(boolean schedulerSupport) {
2360        this.schedulerSupport = schedulerSupport;
2361    }
2362
2363    /**
2364     * @return the schedulerDirectory
2365     */
2366    public File getSchedulerDirectoryFile() {
2367        if (this.schedulerDirectoryFile == null) {
2368            this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler");
2369        }
2370        return schedulerDirectoryFile;
2371    }
2372
2373    /**
2374     * @param schedulerDirectory the schedulerDirectory to set
2375     */
2376    public void setSchedulerDirectoryFile(File schedulerDirectory) {
2377        this.schedulerDirectoryFile = schedulerDirectory;
2378    }
2379    
2380    public void setSchedulerDirectory(String schedulerDirectory) {
2381        setSchedulerDirectoryFile(new File(schedulerDirectory));
2382    }
2383
2384    public int getSchedulePeriodForDestinationPurge() {
2385        return this.schedulePeriodForDestinationPurge;
2386    }
2387
2388    public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
2389        this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
2390    }
2391
2392    public BrokerContext getBrokerContext() {
2393        return brokerContext;
2394    }
2395
2396    public void setBrokerContext(BrokerContext brokerContext) {
2397        this.brokerContext = brokerContext;
2398    }
2399
2400    public void setBrokerId(String brokerId) {
2401        this.brokerId = new BrokerId(brokerId);
2402    }
2403
2404    public boolean isUseAuthenticatedPrincipalForJMSXUserID() {
2405        return useAuthenticatedPrincipalForJMSXUserID;
2406    }
2407
2408    public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) {
2409        this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID;
2410    }
2411
2412    public boolean isNetworkConnectorStartAsync() {
2413        return networkConnectorStartAsync;
2414    }
2415
2416    public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) {
2417        this.networkConnectorStartAsync = networkConnectorStartAsync;
2418    }
2419}