001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.util.ArrayList;
020import java.util.HashMap;
021import java.util.Iterator;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026import javax.jms.JMSException;
027import org.apache.activemq.broker.ConnectionContext;
028import org.apache.activemq.broker.ConsumerBrokerExchange;
029import org.apache.activemq.broker.DestinationAlreadyExistsException;
030import org.apache.activemq.broker.ProducerBrokerExchange;
031import org.apache.activemq.command.ActiveMQDestination;
032import org.apache.activemq.command.ConsumerControl;
033import org.apache.activemq.command.ConsumerId;
034import org.apache.activemq.command.ConsumerInfo;
035import org.apache.activemq.command.Message;
036import org.apache.activemq.command.MessageAck;
037import org.apache.activemq.command.MessageDispatchNotification;
038import org.apache.activemq.command.MessagePull;
039import org.apache.activemq.command.ProducerInfo;
040import org.apache.activemq.command.RemoveSubscriptionInfo;
041import org.apache.activemq.command.Response;
042import org.apache.activemq.filter.DestinationFilter;
043import org.apache.activemq.filter.DestinationMap;
044import org.apache.activemq.security.SecurityContext;
045import org.apache.activemq.thread.TaskRunnerFactory;
046import org.apache.activemq.usage.SystemUsage;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * 
052 */
053public abstract class AbstractRegion implements Region {
054
055    private static final Logger LOG = LoggerFactory.getLogger(AbstractRegion.class);
056
057    protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
058    protected final DestinationMap destinationMap = new DestinationMap();
059    protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
060    protected final SystemUsage usageManager;
061    protected final DestinationFactory destinationFactory;
062    protected final DestinationStatistics destinationStatistics;
063    protected final RegionBroker broker;
064    protected boolean autoCreateDestinations = true;
065    protected final TaskRunnerFactory taskRunnerFactory;
066    protected final Object destinationsMutex = new Object();
067    protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
068    protected boolean started;
069
070    public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager,
071            TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
072        if (broker == null) {
073            throw new IllegalArgumentException("null broker");
074        }
075        this.broker = broker;
076        this.destinationStatistics = destinationStatistics;
077        this.usageManager = memoryManager;
078        this.taskRunnerFactory = taskRunnerFactory;
079        if (broker == null) {
080            throw new IllegalArgumentException("null destinationFactory");
081        }
082        this.destinationFactory = destinationFactory;
083    }
084
085    public final void start() throws Exception {
086        started = true;
087
088        Set<ActiveMQDestination> inactiveDests = getInactiveDestinations();
089        for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) {
090            ActiveMQDestination dest = iter.next();
091
092            ConnectionContext context = new ConnectionContext();
093            context.setBroker(broker.getBrokerService().getBroker());
094            context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
095            context.getBroker().addDestination(context, dest, false);
096        }
097        synchronized (destinationsMutex) {
098            for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
099                Destination dest = i.next();
100                dest.start();
101            }
102        }
103    }
104
105    public void stop() throws Exception {
106        started = false;
107        synchronized (destinationsMutex) {
108            for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
109                Destination dest = i.next();
110                dest.stop();
111            }
112        }
113        destinations.clear();
114    }
115
116    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
117            boolean createIfTemporary) throws Exception {
118        LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
119        synchronized (destinationsMutex) {
120            Destination dest = destinations.get(destination);
121            if (dest == null) {
122                if (destination.isTemporary() == false || createIfTemporary) {
123                    dest = createDestination(context, destination);
124                    // intercept if there is a valid interceptor defined
125                    DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
126                    if (destinationInterceptor != null) {
127                        dest = destinationInterceptor.intercept(dest);
128                    }
129                    dest.start();
130                    destinations.put(destination, dest);
131                    destinationMap.put(destination, dest);
132                    addSubscriptionsForDestination(context, dest);
133                }
134                if (dest == null) {
135                    throw new JMSException("The destination " + destination + " does not exist.");
136                }
137            }
138            return dest;
139        }
140    }
141
142    public Map<ConsumerId, Subscription> getSubscriptions() {
143        return subscriptions;
144    }
145
146    protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest)
147            throws Exception {
148
149        List<Subscription> rc = new ArrayList<Subscription>();
150        // Add all consumers that are interested in the destination.
151        for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
152            Subscription sub = iter.next();
153            if (sub.matches(dest.getActiveMQDestination())) {
154                dest.addSubscription(context, sub);
155                rc.add(sub);
156            }
157        }
158        return rc;
159
160    }
161
162    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
163            throws Exception {
164
165        // No timeout.. then try to shut down right way, fails if there are
166        // current subscribers.
167        if (timeout == 0) {
168            for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
169                Subscription sub = iter.next();
170                if (sub.matches(destination)) {
171                    throw new JMSException("Destination still has an active subscription: " + destination);
172                }
173            }
174        }
175
176        if (timeout > 0) {
177            // TODO: implement a way to notify the subscribers that we want to
178            // take the down
179            // the destination and that they should un-subscribe.. Then wait up
180            // to timeout time before
181            // dropping the subscription.
182        }
183
184        LOG.debug("Removing destination: " + destination);
185
186        synchronized (destinationsMutex) {
187            Destination dest = destinations.remove(destination);
188            if (dest != null) {
189                // timeout<0 or we timed out, we now force any remaining
190                // subscriptions to un-subscribe.
191                for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
192                    Subscription sub = iter.next();
193                    if (sub.matches(destination)) {
194                        dest.removeSubscription(context, sub, 0l);
195                    }
196                }
197                destinationMap.removeAll(destination);
198                dispose(context, dest);
199                DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
200                if (destinationInterceptor != null) {
201                    destinationInterceptor.remove(dest);
202                }
203
204            } else {
205                LOG.debug("Destination doesn't exist: " + dest);
206            }
207        }
208    }
209
210    /**
211     * Provide an exact or wildcard lookup of destinations in the region
212     * 
213     * @return a set of matching destination objects.
214     */
215    public Set<Destination> getDestinations(ActiveMQDestination destination) {
216        synchronized (destinationsMutex) {
217            return destinationMap.get(destination);
218        }
219    }
220
221    public Map<ActiveMQDestination, Destination> getDestinationMap() {
222        synchronized (destinationsMutex) {
223            return new HashMap<ActiveMQDestination, Destination>(destinations);
224        }
225    }
226
227    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
228        LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: "
229                + info.getDestination());
230        ActiveMQDestination destination = info.getDestination();
231        if (destination != null && !destination.isPattern() && !destination.isComposite()) {
232            // lets auto-create the destination
233            lookup(context, destination,true);
234        }
235
236        Object addGuard;
237        synchronized (consumerChangeMutexMap) {
238            addGuard = consumerChangeMutexMap.get(info.getConsumerId());
239            if (addGuard == null) {
240                addGuard = new Object();
241                consumerChangeMutexMap.put(info.getConsumerId(), addGuard);
242            }
243        }
244        synchronized (addGuard) {
245            Subscription o = subscriptions.get(info.getConsumerId());
246            if (o != null) {
247                LOG
248                        .warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
249                return o;
250            }
251
252            // We may need to add some destinations that are in persistent store
253            // but not active
254            // in the broker.
255            //
256            // TODO: think about this a little more. This is good cause
257            // destinations are not loaded into
258            // memory until a client needs to use the queue, but a management
259            // agent viewing the
260            // broker will not see a destination that exists in persistent
261            // store. We may want to
262            // eagerly load all destinations into the broker but have an
263            // inactive state for the
264            // destination which has reduced memory usage.
265            //
266            DestinationFilter.parseFilter(info.getDestination());
267
268            Subscription sub = createSubscription(context, info);
269
270            subscriptions.put(info.getConsumerId(), sub);
271
272            // At this point we're done directly manipulating subscriptions,
273            // but we need to retain the synchronized block here. Consider
274            // otherwise what would happen if at this point a second
275            // thread added, then removed, as would be allowed with
276            // no mutex held. Remove is only essentially run once
277            // so everything after this point would be leaked.
278
279            // Add the subscription to all the matching queues.
280            // But copy the matches first - to prevent deadlocks
281            List<Destination> addList = new ArrayList<Destination>();
282            synchronized (destinationsMutex) {
283                for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
284                    Destination dest = (Destination) iter.next();
285                    addList.add(dest);
286                }
287            }
288
289            for (Destination dest : addList) {
290                dest.addSubscription(context, sub);
291            }
292
293            if (info.isBrowser()) {
294                ((QueueBrowserSubscription) sub).destinationsAdded();
295            }
296
297            return sub;
298        }
299    }
300
301    /**
302     * Get all the Destinations that are in storage
303     * 
304     * @return Set of all stored destinations
305     */
306    public Set getDurableDestinations() {
307        return destinationFactory.getDestinations();
308    }
309
310    /**
311     * @return all Destinations that don't have active consumers
312     */
313    protected Set<ActiveMQDestination> getInactiveDestinations() {
314        Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
315        synchronized (destinationsMutex) {
316            inactiveDests.removeAll(destinations.keySet());
317        }
318        return inactiveDests;
319    }
320
321    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
322        LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: "
323                + info.getDestination());
324
325        Subscription sub = subscriptions.remove(info.getConsumerId());
326        // The sub could be removed elsewhere - see ConnectionSplitBroker
327        if (sub != null) {
328
329            // remove the subscription from all the matching queues.
330            List<Destination> removeList = new ArrayList<Destination>();
331            synchronized (destinationsMutex) {
332                for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
333                    Destination dest = (Destination) iter.next();
334                    removeList.add(dest);
335
336                }
337            }
338            for (Destination dest : removeList) {
339                dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
340            }
341
342            destroySubscription(sub);
343        }
344        synchronized (consumerChangeMutexMap) {
345            consumerChangeMutexMap.remove(info.getConsumerId());
346        }
347    }
348
349    protected void destroySubscription(Subscription sub) {
350        sub.destroy();
351    }
352
353    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
354        throw new JMSException("Invalid operation.");
355    }
356
357    public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
358        final ConnectionContext context = producerExchange.getConnectionContext();
359
360        if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) {
361            final Destination regionDestination = lookup(context, messageSend.getDestination(),false);
362            producerExchange.setRegionDestination(regionDestination);
363        }
364
365        producerExchange.getRegionDestination().send(producerExchange, messageSend);
366    }
367
368    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
369        Subscription sub = consumerExchange.getSubscription();
370        if (sub == null) {
371            sub = subscriptions.get(ack.getConsumerId());
372            if (sub == null) {
373                if (!consumerExchange.getConnectionContext().isInRecoveryMode()) {
374                    LOG.warn("Ack for non existent subscription, ack:" + ack);
375                    throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
376                } else {
377                    LOG.debug("Ack for non existent subscription in recovery, ack:" + ack);
378                    return;
379                }
380            }
381            consumerExchange.setSubscription(sub);
382        }
383        sub.acknowledge(consumerExchange.getConnectionContext(), ack);
384    }
385
386    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
387        Subscription sub = subscriptions.get(pull.getConsumerId());
388        if (sub == null) {
389            throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId());
390        }
391        return sub.pullMessage(context, pull);
392    }
393
394    protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception {
395        Destination dest = null;
396        synchronized (destinationsMutex) {
397            dest = destinations.get(destination);
398        }
399        if (dest == null) {
400            if (isAutoCreateDestinations()) {
401                // Try to auto create the destination... re-invoke broker
402                // from the
403                // top so that the proper security checks are performed.
404                try {
405                    context.getBroker().addDestination(context, destination, createTemporary);
406                    dest = addDestination(context, destination, false);
407                } catch (DestinationAlreadyExistsException e) {
408                    // if the destination already exists then lets ignore
409                    // this error
410                }
411                // We should now have the dest created.
412                synchronized (destinationsMutex) {
413                    dest = destinations.get(destination);
414                }
415            }
416            if (dest == null) {
417                throw new JMSException("The destination " + destination + " does not exist.");
418            }
419        }
420        return dest;
421    }
422
423    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
424        Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId());
425        if (sub != null) {
426            sub.processMessageDispatchNotification(messageDispatchNotification);
427        } else {
428            throw new JMSException("Slave broker out of sync with master - Subscription: "
429                    + messageDispatchNotification.getConsumerId() + " on "
430                    + messageDispatchNotification.getDestination() + " does not exist for dispatch of message: "
431                    + messageDispatchNotification.getMessageId());
432        }
433    }
434
435    /*
436     * For a Queue/TempQueue, dispatch order is imperative to match acks, so the
437     * dispatch is deferred till the notification to ensure that the
438     * subscription chosen by the master is used. AMQ-2102
439     */
440    protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification)
441            throws Exception {
442        Destination dest = null;
443        synchronized (destinationsMutex) {
444            dest = destinations.get(messageDispatchNotification.getDestination());
445        }
446        if (dest != null) {
447            dest.processDispatchNotification(messageDispatchNotification);
448        } else {
449            throw new JMSException("Slave broker out of sync with master - Destination: "
450                    + messageDispatchNotification.getDestination() + " does not exist for consumer "
451                    + messageDispatchNotification.getConsumerId() + " with message: "
452                    + messageDispatchNotification.getMessageId());
453        }
454    }
455
456    public void gc() {
457        for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
458            Subscription sub = iter.next();
459            sub.gc();
460        }
461        synchronized (destinationsMutex) {
462            for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
463                Destination dest = iter.next();
464                dest.gc();
465            }
466        }
467    }
468
469    protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception;
470
471    protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination)
472            throws Exception {
473        return destinationFactory.createDestination(context, destination, destinationStatistics);
474    }
475
476    public boolean isAutoCreateDestinations() {
477        return autoCreateDestinations;
478    }
479
480    public void setAutoCreateDestinations(boolean autoCreateDestinations) {
481        this.autoCreateDestinations = autoCreateDestinations;
482    }
483
484    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
485        synchronized (destinationsMutex) {
486            for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
487                Destination dest = (Destination) iter.next();
488                dest.addProducer(context, info);
489            }
490        }
491    }
492
493    /**
494     * Removes a Producer.
495     * 
496     * @param context
497     *            the environment the operation is being executed under.
498     * @throws Exception
499     *             TODO
500     */
501    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
502        synchronized (destinationsMutex) {
503            for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
504                Destination dest = (Destination) iter.next();
505                dest.removeProducer(context, info);
506            }
507        }
508    }
509
510    protected void dispose(ConnectionContext context, Destination dest) throws Exception {
511        dest.dispose(context);
512        dest.stop();
513        destinationFactory.removeDestination(dest);
514    }
515
516    public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
517        Subscription sub = subscriptions.get(control.getConsumerId());
518        if (sub != null && sub instanceof AbstractSubscription) {
519            ((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch());
520            if (LOG.isDebugEnabled()) {
521                LOG.debug("setting prefetch: " + control.getPrefetch() + ", on subscription: "
522                        + control.getConsumerId());
523            }
524            try {
525                lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
526            } catch (Exception e) {
527                LOG.warn("failed to deliver consumerControl to destination: " + control.getDestination(), e);
528            }
529        }
530    }
531}