001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.net.URI;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.CopyOnWriteArrayList;
029import java.util.concurrent.ThreadPoolExecutor;
030import javax.jms.InvalidClientIDException;
031import javax.jms.JMSException;
032import org.apache.activemq.advisory.AdvisorySupport;
033import org.apache.activemq.broker.Broker;
034import org.apache.activemq.broker.BrokerService;
035import org.apache.activemq.broker.Connection;
036import org.apache.activemq.broker.ConnectionContext;
037import org.apache.activemq.broker.ConsumerBrokerExchange;
038import org.apache.activemq.broker.EmptyBroker;
039import org.apache.activemq.broker.ProducerBrokerExchange;
040import org.apache.activemq.broker.TransportConnector;
041import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
042import org.apache.activemq.broker.region.policy.PolicyMap;
043import org.apache.activemq.command.ActiveMQDestination;
044import org.apache.activemq.command.BrokerId;
045import org.apache.activemq.command.BrokerInfo;
046import org.apache.activemq.command.ConnectionId;
047import org.apache.activemq.command.ConnectionInfo;
048import org.apache.activemq.command.ConsumerControl;
049import org.apache.activemq.command.ConsumerInfo;
050import org.apache.activemq.command.DestinationInfo;
051import org.apache.activemq.command.Message;
052import org.apache.activemq.command.MessageAck;
053import org.apache.activemq.command.MessageDispatch;
054import org.apache.activemq.command.MessageDispatchNotification;
055import org.apache.activemq.command.MessagePull;
056import org.apache.activemq.command.ProducerInfo;
057import org.apache.activemq.command.RemoveSubscriptionInfo;
058import org.apache.activemq.command.Response;
059import org.apache.activemq.command.TransactionId;
060import org.apache.activemq.state.ConnectionState;
061import org.apache.activemq.store.kahadb.plist.PListStore;
062import org.apache.activemq.thread.Scheduler;
063import org.apache.activemq.thread.TaskRunnerFactory;
064import org.apache.activemq.usage.SystemUsage;
065import org.apache.activemq.util.BrokerSupport;
066import org.apache.activemq.util.IdGenerator;
067import org.apache.activemq.util.InetAddressUtil;
068import org.apache.activemq.util.LongSequenceGenerator;
069import org.apache.activemq.util.ServiceStopper;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073/**
074 * Routes Broker operations to the correct messaging regions for processing.
075 * 
076 * 
077 */
078public class RegionBroker extends EmptyBroker {
079    public static final String ORIGINAL_EXPIRATION = "originalExpiration";
080    private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class);
081    private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
082
083    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
084    protected DestinationFactory destinationFactory;
085    protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
086
087    private final Region queueRegion;
088    private final Region topicRegion;
089    private final Region tempQueueRegion;
090    private final Region tempTopicRegion;
091    protected final BrokerService brokerService;
092    private boolean started;
093    private boolean keepDurableSubsActive;
094
095    private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
096    private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
097    private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>();
098
099    private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
100    private BrokerId brokerId;
101    private String brokerName;
102    private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>();
103    private final DestinationInterceptor destinationInterceptor;
104    private ConnectionContext adminConnectionContext;
105    private final Scheduler scheduler;
106    private final ThreadPoolExecutor executor;
107    
108    private final Runnable purgeInactiveDestinationsTask = new Runnable() {
109        public void run() {
110            purgeInactiveDestinations();
111        }
112    };
113
114    public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
115                        DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
116        this.brokerService = brokerService;
117        this.executor=executor;
118        this.scheduler = scheduler;
119        if (destinationFactory == null) {
120            throw new IllegalArgumentException("null destinationFactory");
121        }
122        this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId());
123        this.destinationFactory = destinationFactory;
124        queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
125        topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
126        this.destinationInterceptor = destinationInterceptor;
127        tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
128        tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
129    }
130
131    @Override
132    public Map<ActiveMQDestination, Destination> getDestinationMap() {
133        Map<ActiveMQDestination, Destination> answer = getQueueRegion().getDestinationMap();
134        answer.putAll(getTopicRegion().getDestinationMap());
135        return answer;
136    }
137
138    @Override
139    public Set <Destination> getDestinations(ActiveMQDestination destination) {
140        switch (destination.getDestinationType()) {
141        case ActiveMQDestination.QUEUE_TYPE:
142            return queueRegion.getDestinations(destination);
143        case ActiveMQDestination.TOPIC_TYPE:
144            return topicRegion.getDestinations(destination);
145        case ActiveMQDestination.TEMP_QUEUE_TYPE:
146            return tempQueueRegion.getDestinations(destination);
147        case ActiveMQDestination.TEMP_TOPIC_TYPE:
148            return tempTopicRegion.getDestinations(destination);
149        default:
150            return Collections.emptySet();
151        }
152    }
153
154    @Override
155    public Broker getAdaptor(Class type) {
156        if (type.isInstance(this)) {
157            return this;
158        }
159        return null;
160    }
161
162    public Region getQueueRegion() {
163        return queueRegion;
164    }
165
166    public Region getTempQueueRegion() {
167        return tempQueueRegion;
168    }
169
170    public Region getTempTopicRegion() {
171        return tempTopicRegion;
172    }
173
174    public Region getTopicRegion() {
175        return topicRegion;
176    }
177
178    protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
179        return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
180    }
181
182    protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
183        return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
184    }
185
186    protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
187        return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
188    }
189
190    protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
191        return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
192    }
193
194    @Override
195    public void start() throws Exception {
196        ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
197        started = true;
198        queueRegion.start();
199        topicRegion.start();
200        tempQueueRegion.start();
201        tempTopicRegion.start();
202        int period = this.brokerService.getSchedulePeriodForDestinationPurge();
203        if (period > 0) {
204            this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period);
205        }
206    }
207
208    @Override
209    public void stop() throws Exception {
210        started = false;
211        this.scheduler.cancel(purgeInactiveDestinationsTask);
212        ServiceStopper ss = new ServiceStopper();
213        doStop(ss);
214        ss.throwFirstException();
215        // clear the state
216        clientIdSet.clear();
217        connections.clear();
218        destinations.clear();
219        brokerInfos.clear();
220    }
221
222    public PolicyMap getDestinationPolicy() {
223        return brokerService != null ? brokerService.getDestinationPolicy() : null;
224    }
225
226    @Override
227    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
228        String clientId = info.getClientId();
229        if (clientId == null) {
230            throw new InvalidClientIDException("No clientID specified for connection request");
231        }
232        synchronized (clientIdSet) {
233            ConnectionContext oldContext = clientIdSet.get(clientId);
234            if (oldContext != null) {
235                if (context.isFaultTolerant() || context.isNetworkConnection()){
236                        //remove the old connection
237                        try{
238                                removeConnection(oldContext, info, new Exception("remove stale client"));
239                        }catch(Exception e){
240                                LOG.warn("Failed to remove stale connection ",e);
241                        }
242                }else{
243                throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
244                                                   + oldContext.getConnection().getRemoteAddress());
245                }
246            } else {
247                clientIdSet.put(clientId, context);
248            }
249        }
250
251        connections.add(context.getConnection());
252    }
253
254    @Override
255    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
256        String clientId = info.getClientId();
257        if (clientId == null) {
258            throw new InvalidClientIDException("No clientID specified for connection disconnect request");
259        }
260        synchronized (clientIdSet) {
261            ConnectionContext oldValue = clientIdSet.get(clientId);
262            // we may be removing the duplicate connection, not the first
263            // connection to be created
264            // so lets check that their connection IDs are the same
265            if (oldValue == context) {
266                if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) {
267                    clientIdSet.remove(clientId);
268                }
269            }
270        }
271        connections.remove(context.getConnection());
272    }
273
274    protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) {
275        return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
276    }
277
278    @Override
279    public Connection[] getClients() throws Exception {
280        ArrayList<Connection> l = new ArrayList<Connection>(connections);
281        Connection rc[] = new Connection[l.size()];
282        l.toArray(rc);
283        return rc;
284    }
285
286    @Override
287    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
288
289        Destination answer;
290
291        answer = destinations.get(destination);
292        if (answer != null) {
293            return answer;
294        }
295
296        switch (destination.getDestinationType()) {
297        case ActiveMQDestination.QUEUE_TYPE:
298            answer = queueRegion.addDestination(context, destination,true);
299            break;
300        case ActiveMQDestination.TOPIC_TYPE:
301            answer = topicRegion.addDestination(context, destination,true);
302            break;
303        case ActiveMQDestination.TEMP_QUEUE_TYPE:
304            answer = tempQueueRegion.addDestination(context, destination,create);
305            break;
306        case ActiveMQDestination.TEMP_TOPIC_TYPE:
307            answer = tempTopicRegion.addDestination(context, destination,create);
308            break;
309        default:
310            throw createUnknownDestinationTypeException(destination);
311        }
312
313        destinations.put(destination, answer);
314        return answer;
315
316    }
317
318    @Override
319    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
320
321        if (destinations.containsKey(destination)) {
322            switch (destination.getDestinationType()) {
323            case ActiveMQDestination.QUEUE_TYPE:
324                queueRegion.removeDestination(context, destination, timeout);
325                removeAdvisoryTopics("Queue.", context, destination, timeout);
326                break;
327            case ActiveMQDestination.TOPIC_TYPE:
328                topicRegion.removeDestination(context, destination, timeout);
329                removeAdvisoryTopics("Topic.", context, destination, timeout);
330                break;
331            case ActiveMQDestination.TEMP_QUEUE_TYPE:
332                tempQueueRegion.removeDestination(context, destination, timeout);
333                break;
334            case ActiveMQDestination.TEMP_TOPIC_TYPE:
335                tempTopicRegion.removeDestination(context, destination, timeout);
336                break;
337            default:
338                throw createUnknownDestinationTypeException(destination);
339            }
340            destinations.remove(destination);
341
342        }
343
344    }
345
346    public void removeAdvisoryTopics(String destinationType, ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
347        if (this.brokerService.isAdvisorySupport()) {
348            String producerAdvisoryTopic = AdvisorySupport.PRODUCER_ADVISORY_TOPIC_PREFIX + destinationType + destination.getPhysicalName();
349            String consumerAdvisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + destinationType + destination.getPhysicalName();
350
351            ActiveMQDestination dests[] = getDestinations();
352            for (ActiveMQDestination dest: dests) {
353                String name = dest.getPhysicalName();
354                if ( name.equals(producerAdvisoryTopic) || name.equals(consumerAdvisoryTopic) ) {
355                    try {
356                        removeDestination(context, dest, timeout);
357                    } catch (JMSException ignore) {
358                        // at least ignore the Unknown Destination Type JMSException
359                    }
360                }
361            }
362        }
363    }
364
365    @Override
366    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
367        addDestination(context, info.getDestination(),true);
368
369    }
370
371    @Override
372    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
373        removeDestination(context, info.getDestination(), info.getTimeout());
374
375    }
376
377    @Override
378    public ActiveMQDestination[] getDestinations() throws Exception {
379        ArrayList<ActiveMQDestination> l;
380
381        l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet());
382
383        ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
384        l.toArray(rc);
385        return rc;
386    }
387
388    @Override
389    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
390        ActiveMQDestination destination = info.getDestination();
391        synchronized (purgeInactiveDestinationsTask) {
392            if (destination != null) {
393
394                // This seems to cause the destination to be added but without
395                // advisories firing...
396                context.getBroker().addDestination(context, destination, false);
397                switch (destination.getDestinationType()) {
398                case ActiveMQDestination.QUEUE_TYPE:
399                    queueRegion.addProducer(context, info);
400                    break;
401                case ActiveMQDestination.TOPIC_TYPE:
402                    topicRegion.addProducer(context, info);
403                    break;
404                case ActiveMQDestination.TEMP_QUEUE_TYPE:
405                    tempQueueRegion.addProducer(context, info);
406                    break;
407                case ActiveMQDestination.TEMP_TOPIC_TYPE:
408                    tempTopicRegion.addProducer(context, info);
409                    break;
410                }
411            }
412        }
413    }
414
415    @Override
416    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
417        ActiveMQDestination destination = info.getDestination();
418        synchronized (purgeInactiveDestinationsTask) {
419            if (destination != null) {
420                switch (destination.getDestinationType()) {
421                case ActiveMQDestination.QUEUE_TYPE:
422                    queueRegion.removeProducer(context, info);
423                    break;
424                case ActiveMQDestination.TOPIC_TYPE:
425                    topicRegion.removeProducer(context, info);
426                    break;
427                case ActiveMQDestination.TEMP_QUEUE_TYPE:
428                    tempQueueRegion.removeProducer(context, info);
429                    break;
430                case ActiveMQDestination.TEMP_TOPIC_TYPE:
431                    tempTopicRegion.removeProducer(context, info);
432                    break;
433                }
434            }
435        }
436    }
437
438    @Override
439    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
440        ActiveMQDestination destination = info.getDestination();
441        if (destinationInterceptor != null) {
442            destinationInterceptor.create(this, context, destination);
443        }
444        synchronized (purgeInactiveDestinationsTask) {
445            switch (destination.getDestinationType()) {
446            case ActiveMQDestination.QUEUE_TYPE:
447                return queueRegion.addConsumer(context, info);
448
449            case ActiveMQDestination.TOPIC_TYPE:
450                return topicRegion.addConsumer(context, info);
451
452            case ActiveMQDestination.TEMP_QUEUE_TYPE:
453                return tempQueueRegion.addConsumer(context, info);
454
455            case ActiveMQDestination.TEMP_TOPIC_TYPE:
456                return tempTopicRegion.addConsumer(context, info);
457
458            default:
459                throw createUnknownDestinationTypeException(destination);
460            }
461        }
462    }
463
464    @Override
465    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
466        ActiveMQDestination destination = info.getDestination();
467        synchronized (purgeInactiveDestinationsTask) {
468            switch (destination.getDestinationType()) {
469
470            case ActiveMQDestination.QUEUE_TYPE:
471                queueRegion.removeConsumer(context, info);
472                break;
473            case ActiveMQDestination.TOPIC_TYPE:
474                topicRegion.removeConsumer(context, info);
475                break;
476            case ActiveMQDestination.TEMP_QUEUE_TYPE:
477                tempQueueRegion.removeConsumer(context, info);
478                break;
479            case ActiveMQDestination.TEMP_TOPIC_TYPE:
480                tempTopicRegion.removeConsumer(context, info);
481                break;
482            default:
483                throw createUnknownDestinationTypeException(destination);
484            }
485        }
486    }
487
488    @Override
489    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
490        synchronized (purgeInactiveDestinationsTask) {
491            topicRegion.removeSubscription(context, info);
492        }
493    }
494
495    @Override
496    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
497        message.setBrokerInTime(System.currentTimeMillis());
498        if (producerExchange.isMutable() || producerExchange.getRegion() == null
499                || (producerExchange.getRegion() != null && producerExchange.getRegion().getDestinationMap().get(message.getDestination()) == null)) {
500            ActiveMQDestination destination = message.getDestination();
501            // ensure the destination is registered with the RegionBroker
502            producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination,false);
503            Region region;
504            switch (destination.getDestinationType()) {
505            case ActiveMQDestination.QUEUE_TYPE:
506                region = queueRegion;
507                break;
508            case ActiveMQDestination.TOPIC_TYPE:
509                region = topicRegion;
510                break;
511            case ActiveMQDestination.TEMP_QUEUE_TYPE:
512                region = tempQueueRegion;
513                break;
514            case ActiveMQDestination.TEMP_TOPIC_TYPE:
515                region = tempTopicRegion;
516                break;
517            default:
518                throw createUnknownDestinationTypeException(destination);
519            }
520            producerExchange.setRegion(region);
521            producerExchange.setRegionDestination(null);
522        }
523        producerExchange.getRegion().send(producerExchange, message);
524    }
525
526    @Override
527    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
528        if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) {
529            ActiveMQDestination destination = ack.getDestination();
530            Region region;
531            switch (destination.getDestinationType()) {
532            case ActiveMQDestination.QUEUE_TYPE:
533                region = queueRegion;
534                break;
535            case ActiveMQDestination.TOPIC_TYPE:
536                region = topicRegion;
537                break;
538            case ActiveMQDestination.TEMP_QUEUE_TYPE:
539                region = tempQueueRegion;
540                break;
541            case ActiveMQDestination.TEMP_TOPIC_TYPE:
542                region = tempTopicRegion;
543                break;
544            default:
545                throw createUnknownDestinationTypeException(destination);
546            }
547            consumerExchange.setRegion(region);
548        }
549        consumerExchange.getRegion().acknowledge(consumerExchange, ack);
550    }
551
552    @Override
553    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
554        ActiveMQDestination destination = pull.getDestination();
555        switch (destination.getDestinationType()) {
556        case ActiveMQDestination.QUEUE_TYPE:
557            return queueRegion.messagePull(context, pull);
558
559        case ActiveMQDestination.TOPIC_TYPE:
560            return topicRegion.messagePull(context, pull);
561
562        case ActiveMQDestination.TEMP_QUEUE_TYPE:
563            return tempQueueRegion.messagePull(context, pull);
564
565        case ActiveMQDestination.TEMP_TOPIC_TYPE:
566            return tempTopicRegion.messagePull(context, pull);
567        default:
568            throw createUnknownDestinationTypeException(destination);
569        }
570    }
571
572    @Override
573    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
574        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
575    }
576
577    @Override
578    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
579        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
580    }
581
582    @Override
583    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
584        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
585    }
586
587    @Override
588    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
589        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
590    }
591
592    @Override
593    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
594        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
595    }
596
597    @Override
598    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
599        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
600    }
601
602    @Override
603    public void gc() {
604        queueRegion.gc();
605        topicRegion.gc();
606    }
607
608    @Override
609    public BrokerId getBrokerId() {
610        if (brokerId == null) {
611            brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
612        }
613        return brokerId;
614    }
615
616    public void setBrokerId(BrokerId brokerId) {
617        this.brokerId = brokerId;
618    }
619
620    @Override
621    public String getBrokerName() {
622        if (brokerName == null) {
623            try {
624                brokerName = InetAddressUtil.getLocalHostName().toLowerCase();
625            } catch (Exception e) {
626                brokerName = "localhost";
627            }
628        }
629        return brokerName;
630    }
631
632    public void setBrokerName(String brokerName) {
633        this.brokerName = brokerName;
634    }
635
636    public DestinationStatistics getDestinationStatistics() {
637        return destinationStatistics;
638    }
639
640    protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) {
641        return new JMSException("Unknown destination type: " + destination.getDestinationType());
642    }
643
644    @Override
645    public synchronized void addBroker(Connection connection, BrokerInfo info) {
646        BrokerInfo existing = brokerInfos.get(info.getBrokerId());
647        if (existing == null) {
648            existing = info.copy();
649            existing.setPeerBrokerInfos(null);
650            brokerInfos.put(info.getBrokerId(), existing);
651        }
652        existing.incrementRefCount();
653        LOG.debug(getBrokerName() + " addBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
654        addBrokerInClusterUpdate();
655    }
656
657    @Override
658    public synchronized void removeBroker(Connection connection, BrokerInfo info) {
659        if (info != null) {
660            BrokerInfo existing = brokerInfos.get(info.getBrokerId());
661            if (existing != null && existing.decrementRefCount() == 0) {
662               brokerInfos.remove(info.getBrokerId());
663            }
664            LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
665            removeBrokerInClusterUpdate();
666        }
667    }
668
669    @Override
670    public synchronized BrokerInfo[] getPeerBrokerInfos() {
671        BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
672        result = brokerInfos.values().toArray(result);
673        return result;
674    }
675
676    @Override
677    public void preProcessDispatch(MessageDispatch messageDispatch) {
678        Message message = messageDispatch.getMessage();
679        if (message != null) {
680            long endTime = System.currentTimeMillis();
681            message.setBrokerOutTime(endTime);
682            if (getBrokerService().isEnableStatistics()) {
683                long totalTime = endTime - message.getBrokerInTime();
684                message.getRegionDestination().getDestinationStatistics().getProcessTime().addTime(totalTime);
685            }
686        }
687    }
688
689    @Override
690    public void postProcessDispatch(MessageDispatch messageDispatch) {
691    }
692
693    @Override
694    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
695        ActiveMQDestination destination = messageDispatchNotification.getDestination();
696        switch (destination.getDestinationType()) {
697        case ActiveMQDestination.QUEUE_TYPE:
698            queueRegion.processDispatchNotification(messageDispatchNotification);
699            break;
700        case ActiveMQDestination.TOPIC_TYPE:
701            topicRegion.processDispatchNotification(messageDispatchNotification);
702            break;
703        case ActiveMQDestination.TEMP_QUEUE_TYPE:
704            tempQueueRegion.processDispatchNotification(messageDispatchNotification);
705            break;
706        case ActiveMQDestination.TEMP_TOPIC_TYPE:
707            tempTopicRegion.processDispatchNotification(messageDispatchNotification);
708            break;
709        default:
710            throw createUnknownDestinationTypeException(destination);
711        }
712    }
713
714    public boolean isSlaveBroker() {
715        return brokerService.isSlave();
716    }
717
718    @Override
719    public boolean isStopped() {
720        return !started;
721    }
722
723    @Override
724    public Set<ActiveMQDestination> getDurableDestinations() {
725        return destinationFactory.getDestinations();
726    }
727
728    protected void doStop(ServiceStopper ss) {
729        ss.stop(queueRegion);
730        ss.stop(topicRegion);
731        ss.stop(tempQueueRegion);
732        ss.stop(tempTopicRegion);
733    }
734
735    public boolean isKeepDurableSubsActive() {
736        return keepDurableSubsActive;
737    }
738
739    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
740        this.keepDurableSubsActive = keepDurableSubsActive;
741    }
742
743    public DestinationInterceptor getDestinationInterceptor() {
744        return destinationInterceptor;
745    }
746
747    @Override
748    public ConnectionContext getAdminConnectionContext() {
749        return adminConnectionContext;
750    }
751
752    @Override
753    public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
754        this.adminConnectionContext = adminConnectionContext;
755    }
756
757    public Map<ConnectionId, ConnectionState> getConnectionStates() {
758        return connectionStates;
759    }
760
761    @Override
762    public PListStore getTempDataStore() {
763        return brokerService.getTempDataStore();
764    }
765
766    @Override
767    public URI getVmConnectorURI() {
768        return brokerService.getVmConnectorURI();
769    }
770
771    @Override
772    public void brokerServiceStarted() {
773    }
774
775    @Override
776    public BrokerService getBrokerService() {
777        return brokerService;
778    }
779
780    @Override
781    public boolean isExpired(MessageReference messageReference) {
782        boolean expired = false;
783        if (messageReference.isExpired()) {
784            try {
785                // prevent duplicate expiry processing
786                Message message = messageReference.getMessage();
787                synchronized (message) {
788                    expired = stampAsExpired(message);
789                }
790            } catch (IOException e) {
791                LOG.warn("unexpected exception on message expiry determination for: " + messageReference, e);
792            }
793        }
794        return expired;
795    }
796   
797    private boolean stampAsExpired(Message message) throws IOException {
798        boolean stamped=false;
799        if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
800            long expiration=message.getExpiration();     
801            message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
802            stamped = true;
803        }
804        return stamped;
805    }
806
807    
808    @Override
809    public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
810        if (LOG.isDebugEnabled()) {
811            LOG.debug("Message expired " + node);
812        }
813        getRoot().sendToDeadLetterQueue(context, node, subscription);
814    }
815    
816    @Override
817    public void sendToDeadLetterQueue(ConnectionContext context,
818                MessageReference node, Subscription subscription){
819                try{
820                        if(node!=null){
821                                Message message=node.getMessage();
822                                if(message!=null && node.getRegionDestination()!=null){
823                                        DeadLetterStrategy deadLetterStrategy=node
824                                                .getRegionDestination().getDeadLetterStrategy();
825                                        if(deadLetterStrategy!=null){
826                                                if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
827                                                    // message may be inflight to other subscriptions so do not modify
828                                                    message = message.copy();
829                                                    stampAsExpired(message);
830                                                    message.setExpiration(0);
831                                                    if(!message.isPersistent()){
832                                                            message.setPersistent(true);
833                                                            message.setProperty("originalDeliveryMode",
834                                                                        "NON_PERSISTENT");
835                                                        }
836                                                        // The original destination and transaction id do
837                                                        // not get filled when the message is first sent,
838                                                        // it is only populated if the message is routed to
839                                                        // another destination like the DLQ
840                                                        ActiveMQDestination deadLetterDestination=deadLetterStrategy
841                                                                .getDeadLetterQueueFor(message, subscription);
842                                                        if (context.getBroker()==null) {
843                                                                context.setBroker(getRoot());
844                                                        }
845                                                        BrokerSupport.resendNoCopy(context,message,
846                                                                deadLetterDestination);
847                                                }
848                                        } else {
849                                            if (LOG.isDebugEnabled()) {
850                                                LOG.debug("Dead Letter message with no DLQ strategy in place, message id: "
851                                    + message.getMessageId() + ", destination: " + message.getDestination());
852                                            }
853                                        }
854                                }
855                        }
856                }catch(Exception e){
857                        LOG.warn("Caught an exception sending to DLQ: "+node,e);
858                }
859        }
860
861    @Override
862    public Broker getRoot() {
863        try {
864            return getBrokerService().getBroker();
865        } catch (Exception e) {
866            LOG.error("Trying to get Root Broker " + e);
867            throw new RuntimeException("The broker from the BrokerService should not throw an exception");
868        }
869    }
870    
871    /**
872     * @return the broker sequence id
873     */
874    @Override
875    public long getBrokerSequenceId() {
876        synchronized(sequenceGenerator) {
877            return sequenceGenerator.getNextSequenceId();
878        }
879    }
880    
881    
882    @Override
883    public Scheduler getScheduler() {
884        return this.scheduler;
885    }
886    
887    public ThreadPoolExecutor getExecutor() {
888        return this.executor;
889    }
890    
891    @Override
892    public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
893        ActiveMQDestination destination = control.getDestination();
894        switch (destination.getDestinationType()) {
895        case ActiveMQDestination.QUEUE_TYPE:
896            queueRegion.processConsumerControl(consumerExchange, control);
897            break;
898
899        case ActiveMQDestination.TOPIC_TYPE:
900            topicRegion.processConsumerControl(consumerExchange, control);
901            break;
902            
903        case ActiveMQDestination.TEMP_QUEUE_TYPE:
904            tempQueueRegion.processConsumerControl(consumerExchange, control);
905            break;
906            
907        case ActiveMQDestination.TEMP_TOPIC_TYPE:
908            tempTopicRegion.processConsumerControl(consumerExchange, control);
909            break;
910            
911        default:
912            LOG.warn("unmatched destination: " + destination + ", in consumerControl: "  + control);
913        }
914    }
915    
916    protected void addBrokerInClusterUpdate() {
917        List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
918        for (TransportConnector connector : connectors) {
919            if (connector.isUpdateClusterClients()) {
920                connector.updateClientClusterInfo();
921            }
922        }
923    }
924
925    protected void removeBrokerInClusterUpdate() {
926        List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
927        for (TransportConnector connector : connectors) {
928            if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove()) {
929                connector.updateClientClusterInfo();
930            }
931        }
932    }
933    
934    protected void purgeInactiveDestinations() {
935        synchronized (purgeInactiveDestinationsTask) {
936            List<BaseDestination> list = new ArrayList<BaseDestination>();
937            Map<ActiveMQDestination, Destination> map = getDestinationMap();
938            long timeStamp = System.currentTimeMillis();
939            for (Destination d : map.values()) {
940                if (d instanceof BaseDestination) {
941                    BaseDestination bd = (BaseDestination) d;
942                    bd.markForGC(timeStamp);
943                    if (bd.canGC()) {
944                        list.add(bd);
945                    }
946                }
947            }
948
949            if (list.isEmpty() == false) {
950
951                ConnectionContext context = BrokerSupport.getConnectionContext(this);
952                context.setBroker(this);
953
954                for (BaseDestination dest : list) {
955                    dest.getLog().info(
956                            dest.getName() + " Inactive for longer than " + dest.getInactiveTimoutBeforeGC()
957                                    + " ms - removing ...");
958                    try {
959                        getRoot().removeDestination(context, dest.getActiveMQDestination(), 0);
960                    } catch (Exception e) {
961                        LOG.error("Failed to remove inactive destination " + dest, e);
962                    }
963                }
964            }
965        }
966    }
967}