001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.util.HashSet;
020import java.util.Iterator;
021import java.util.List;
022import java.util.Set;
023import java.util.concurrent.ConcurrentHashMap;
024import javax.jms.InvalidDestinationException;
025import javax.jms.JMSException;
026import org.apache.activemq.advisory.AdvisorySupport;
027import org.apache.activemq.broker.ConnectionContext;
028import org.apache.activemq.broker.region.policy.PolicyEntry;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.ConnectionId;
031import org.apache.activemq.command.ConsumerId;
032import org.apache.activemq.command.ConsumerInfo;
033import org.apache.activemq.command.RemoveSubscriptionInfo;
034import org.apache.activemq.command.SessionId;
035import org.apache.activemq.command.SubscriptionInfo;
036import org.apache.activemq.store.TopicMessageStore;
037import org.apache.activemq.thread.TaskRunnerFactory;
038import org.apache.activemq.usage.SystemUsage;
039import org.apache.activemq.util.LongSequenceGenerator;
040import org.apache.activemq.util.SubscriptionKey;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * 
046 */
047public class TopicRegion extends AbstractRegion {
048    private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class);
049    protected final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
050    private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator();
051    private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
052    private boolean keepDurableSubsActive;
053
054    public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
055                       DestinationFactory destinationFactory) {
056        super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
057
058    }
059
060    @Override
061    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
062        if (info.isDurable()) {
063            ActiveMQDestination destination = info.getDestination();
064            if (!destination.isPattern()) {
065                // Make sure the destination is created.
066                lookup(context, destination,true);
067            }
068            String clientId = context.getClientId();
069            String subscriptionName = info.getSubscriptionName();
070            SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
071            DurableTopicSubscription sub = durableSubscriptions.get(key);
072            if (sub != null) {
073                if (sub.isActive()) {
074                    throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subscriptionName);
075                }
076                // Has the selector changed??
077                if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
078                    // Remove the consumer first then add it.
079                    durableSubscriptions.remove(key);
080                    synchronized (destinationsMutex) {
081                        for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
082                            Destination dest = iter.next();
083                            //Account for virtual destinations
084                            if (dest instanceof Topic){
085                                Topic topic = (Topic)dest;
086                                topic.deleteSubscription(context, key);
087                            }
088                        }
089                    }
090                    super.removeConsumer(context, sub.getConsumerInfo());
091                    super.addConsumer(context, info);
092                    sub = durableSubscriptions.get(key);
093                } else {
094                    // Change the consumer id key of the durable sub.
095                    if (sub.getConsumerInfo().getConsumerId() != null) {
096                        subscriptions.remove(sub.getConsumerInfo().getConsumerId());
097                    }
098                    subscriptions.put(info.getConsumerId(), sub);
099                }
100            } else {
101                super.addConsumer(context, info);
102                sub = durableSubscriptions.get(key);
103                if (sub == null) {
104                    throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " + key.getClientId()
105                                           + " subscriberName: " + key.getSubscriptionName());
106                }
107            }
108            sub.activate(usageManager, context, info);
109            return sub;
110        } else {
111            return super.addConsumer(context, info);
112        }
113    }
114
115    @Override
116    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
117        if (info.isDurable()) {
118
119            SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
120            DurableTopicSubscription sub = durableSubscriptions.get(key);
121            if (sub != null) {
122                sub.deactivate(keepDurableSubsActive);
123            }
124
125        } else {
126            super.removeConsumer(context, info);
127        }
128    }
129
130    @Override
131    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
132        SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName());
133        DurableTopicSubscription sub = durableSubscriptions.remove(key);
134        if (sub == null) {
135            throw new InvalidDestinationException("No durable subscription exists for: " + info.getSubscriptionName());
136        }
137        if (sub.isActive()) {
138            throw new JMSException("Durable consumer is in use");
139        }
140
141        synchronized (destinationsMutex) {
142            for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
143                Destination dest = iter.next();
144                //Account for virtual destinations
145                if (dest instanceof Topic){
146                    Topic topic = (Topic)dest;
147                    topic.deleteSubscription(context, key);
148                }
149            }
150        }
151        if (subscriptions.get(sub.getConsumerInfo()) != null) {
152            super.removeConsumer(context, sub.getConsumerInfo());
153        } else {
154            // try destroying inactive subscriptions
155            destroySubscription(sub);
156        }
157    }
158
159    @Override
160    public String toString() {
161        return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
162    }
163
164    @Override
165    protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
166        List<Subscription> rc = super.addSubscriptionsForDestination(context, dest);
167        Set<Subscription> dupChecker = new HashSet<Subscription>(rc);
168
169        TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
170        // Eagerly recover the durable subscriptions
171        if (store != null) {
172            SubscriptionInfo[] infos = store.getAllSubscriptions();
173            for (int i = 0; i < infos.length; i++) {
174
175                SubscriptionInfo info = infos[i];
176                LOG.debug("Restoring durable subscription: " + info);
177                SubscriptionKey key = new SubscriptionKey(info);
178
179                // A single durable sub may be subscribing to multiple topics.
180                // so it might exist already.
181                DurableTopicSubscription sub = durableSubscriptions.get(key);
182                ConsumerInfo consumerInfo = createInactiveConsumerInfo(info);
183                if (sub == null) {
184                    ConnectionContext c = new ConnectionContext();
185                    c.setBroker(context.getBroker());
186                    c.setClientId(key.getClientId());
187                    c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId());
188                    sub = (DurableTopicSubscription)createSubscription(c, consumerInfo);
189                }
190
191                if (dupChecker.contains(sub)) {
192                    continue;
193                }
194
195                dupChecker.add(sub);
196                rc.add(sub);
197                dest.addSubscription(context, sub);
198            }
199
200            // Now perhaps there other durable subscriptions (via wild card)
201            // that would match this destination..
202            durableSubscriptions.values();
203            for (Iterator<DurableTopicSubscription> iterator = durableSubscriptions.values().iterator(); iterator.hasNext();) {
204                DurableTopicSubscription sub = iterator.next();
205                // Skip over subscriptions that we allready added..
206                if (dupChecker.contains(sub)) {
207                    continue;
208                }
209
210                if (sub.matches(dest.getActiveMQDestination())) {
211                    rc.add(sub);
212                    dest.addSubscription(context, sub);
213                }
214            }
215        }
216        return rc;
217    }
218
219    private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
220        ConsumerInfo rc = new ConsumerInfo();
221        rc.setSelector(info.getSelector());
222        rc.setSubscriptionName(info.getSubscriptionName());
223        rc.setDestination(info.getSubscribedDestination());
224        rc.setConsumerId(createConsumerId());
225        return rc;
226    }
227
228    private ConsumerId createConsumerId() {
229        return new ConsumerId(recoveredDurableSubSessionId, recoveredDurableSubIdGenerator.getNextSequenceId());
230    }
231
232    protected void configureTopic(Topic topic, ActiveMQDestination destination) {
233        if (broker.getDestinationPolicy() != null) {
234            PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
235            if (entry != null) {
236                entry.configure(broker,topic);
237            }
238        }
239    }
240
241    @Override
242    protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
243        ActiveMQDestination destination = info.getDestination();
244        
245        if (info.isDurable()) {
246            if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
247                throw new JMSException("Cannot create a durable subscription for an advisory Topic");
248            }
249            SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
250            DurableTopicSubscription sub = durableSubscriptions.get(key);
251            
252            if (sub == null) {
253                
254                sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
255
256                if (destination != null && broker.getDestinationPolicy() != null) {
257                    PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
258                    if (entry != null) {
259                        entry.configure(broker, usageManager, sub);
260                    }
261                }
262                durableSubscriptions.put(key, sub);
263            } else {
264                throw new JMSException("That durable subscription is already active.");
265            }
266            return sub;
267        }
268        try {
269            TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
270            // lets configure the subscription depending on the destination
271            if (destination != null && broker.getDestinationPolicy() != null) {
272                PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
273                if (entry != null) {
274                    entry.configure(broker, usageManager, answer);
275                }
276            }
277            answer.init();
278            return answer;
279        } catch (Exception e) {
280            LOG.error("Failed to create TopicSubscription ", e);
281            JMSException jmsEx = new JMSException("Couldn't create TopicSubscription");
282            jmsEx.setLinkedException(e);
283            throw jmsEx;
284        }
285    }
286
287    /**
288     */
289    private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) {
290        if (info1.getSelector() != null ^ info2.getSelector() != null) {
291            return true;
292        }
293        if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
294            return true;
295        }
296        return !info1.getDestination().equals(info2.getDestination());
297    }
298
299    @Override
300    protected Set<ActiveMQDestination> getInactiveDestinations() {
301        Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
302        for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {
303            ActiveMQDestination dest = iter.next();
304            if (!dest.isTopic()) {
305                iter.remove();
306            }
307        }
308        return inactiveDestinations;
309    }
310
311    public boolean isKeepDurableSubsActive() {
312        return keepDurableSubsActive;
313    }
314
315    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
316        this.keepDurableSubsActive = keepDurableSubsActive;
317    }
318
319    public boolean durableSubscriptionExists(SubscriptionKey key) {
320        return this.durableSubscriptions.containsKey(key);
321    }
322
323}