001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.util.HashSet; 020import java.util.Iterator; 021import java.util.List; 022import java.util.Set; 023import java.util.concurrent.ConcurrentHashMap; 024import javax.jms.InvalidDestinationException; 025import javax.jms.JMSException; 026import org.apache.activemq.advisory.AdvisorySupport; 027import org.apache.activemq.broker.ConnectionContext; 028import org.apache.activemq.broker.region.policy.PolicyEntry; 029import org.apache.activemq.command.ActiveMQDestination; 030import org.apache.activemq.command.ConnectionId; 031import org.apache.activemq.command.ConsumerId; 032import org.apache.activemq.command.ConsumerInfo; 033import org.apache.activemq.command.RemoveSubscriptionInfo; 034import org.apache.activemq.command.SessionId; 035import org.apache.activemq.command.SubscriptionInfo; 036import org.apache.activemq.store.TopicMessageStore; 037import org.apache.activemq.thread.TaskRunnerFactory; 038import org.apache.activemq.usage.SystemUsage; 039import org.apache.activemq.util.LongSequenceGenerator; 040import org.apache.activemq.util.SubscriptionKey; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * 046 */ 047public class TopicRegion extends AbstractRegion { 048 private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class); 049 protected final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 050 private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator(); 051 private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId()); 052 private boolean keepDurableSubsActive; 053 054 public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, 055 DestinationFactory destinationFactory) { 056 super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 057 058 } 059 060 @Override 061 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 062 if (info.isDurable()) { 063 ActiveMQDestination destination = info.getDestination(); 064 if (!destination.isPattern()) { 065 // Make sure the destination is created. 066 lookup(context, destination,true); 067 } 068 String clientId = context.getClientId(); 069 String subscriptionName = info.getSubscriptionName(); 070 SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 071 DurableTopicSubscription sub = durableSubscriptions.get(key); 072 if (sub != null) { 073 if (sub.isActive()) { 074 throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subscriptionName); 075 } 076 // Has the selector changed?? 077 if (hasDurableSubChanged(info, sub.getConsumerInfo())) { 078 // Remove the consumer first then add it. 079 durableSubscriptions.remove(key); 080 synchronized (destinationsMutex) { 081 for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) { 082 Destination dest = iter.next(); 083 //Account for virtual destinations 084 if (dest instanceof Topic){ 085 Topic topic = (Topic)dest; 086 topic.deleteSubscription(context, key); 087 } 088 } 089 } 090 super.removeConsumer(context, sub.getConsumerInfo()); 091 super.addConsumer(context, info); 092 sub = durableSubscriptions.get(key); 093 } else { 094 // Change the consumer id key of the durable sub. 095 if (sub.getConsumerInfo().getConsumerId() != null) { 096 subscriptions.remove(sub.getConsumerInfo().getConsumerId()); 097 } 098 subscriptions.put(info.getConsumerId(), sub); 099 } 100 } else { 101 super.addConsumer(context, info); 102 sub = durableSubscriptions.get(key); 103 if (sub == null) { 104 throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " + key.getClientId() 105 + " subscriberName: " + key.getSubscriptionName()); 106 } 107 } 108 sub.activate(usageManager, context, info); 109 return sub; 110 } else { 111 return super.addConsumer(context, info); 112 } 113 } 114 115 @Override 116 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 117 if (info.isDurable()) { 118 119 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 120 DurableTopicSubscription sub = durableSubscriptions.get(key); 121 if (sub != null) { 122 sub.deactivate(keepDurableSubsActive); 123 } 124 125 } else { 126 super.removeConsumer(context, info); 127 } 128 } 129 130 @Override 131 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 132 SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName()); 133 DurableTopicSubscription sub = durableSubscriptions.remove(key); 134 if (sub == null) { 135 throw new InvalidDestinationException("No durable subscription exists for: " + info.getSubscriptionName()); 136 } 137 if (sub.isActive()) { 138 throw new JMSException("Durable consumer is in use"); 139 } 140 141 synchronized (destinationsMutex) { 142 for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) { 143 Destination dest = iter.next(); 144 //Account for virtual destinations 145 if (dest instanceof Topic){ 146 Topic topic = (Topic)dest; 147 topic.deleteSubscription(context, key); 148 } 149 } 150 } 151 if (subscriptions.get(sub.getConsumerInfo()) != null) { 152 super.removeConsumer(context, sub.getConsumerInfo()); 153 } else { 154 // try destroying inactive subscriptions 155 destroySubscription(sub); 156 } 157 } 158 159 @Override 160 public String toString() { 161 return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%"; 162 } 163 164 @Override 165 protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception { 166 List<Subscription> rc = super.addSubscriptionsForDestination(context, dest); 167 Set<Subscription> dupChecker = new HashSet<Subscription>(rc); 168 169 TopicMessageStore store = (TopicMessageStore)dest.getMessageStore(); 170 // Eagerly recover the durable subscriptions 171 if (store != null) { 172 SubscriptionInfo[] infos = store.getAllSubscriptions(); 173 for (int i = 0; i < infos.length; i++) { 174 175 SubscriptionInfo info = infos[i]; 176 LOG.debug("Restoring durable subscription: " + info); 177 SubscriptionKey key = new SubscriptionKey(info); 178 179 // A single durable sub may be subscribing to multiple topics. 180 // so it might exist already. 181 DurableTopicSubscription sub = durableSubscriptions.get(key); 182 ConsumerInfo consumerInfo = createInactiveConsumerInfo(info); 183 if (sub == null) { 184 ConnectionContext c = new ConnectionContext(); 185 c.setBroker(context.getBroker()); 186 c.setClientId(key.getClientId()); 187 c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId()); 188 sub = (DurableTopicSubscription)createSubscription(c, consumerInfo); 189 } 190 191 if (dupChecker.contains(sub)) { 192 continue; 193 } 194 195 dupChecker.add(sub); 196 rc.add(sub); 197 dest.addSubscription(context, sub); 198 } 199 200 // Now perhaps there other durable subscriptions (via wild card) 201 // that would match this destination.. 202 durableSubscriptions.values(); 203 for (Iterator<DurableTopicSubscription> iterator = durableSubscriptions.values().iterator(); iterator.hasNext();) { 204 DurableTopicSubscription sub = iterator.next(); 205 // Skip over subscriptions that we allready added.. 206 if (dupChecker.contains(sub)) { 207 continue; 208 } 209 210 if (sub.matches(dest.getActiveMQDestination())) { 211 rc.add(sub); 212 dest.addSubscription(context, sub); 213 } 214 } 215 } 216 return rc; 217 } 218 219 private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) { 220 ConsumerInfo rc = new ConsumerInfo(); 221 rc.setSelector(info.getSelector()); 222 rc.setSubscriptionName(info.getSubscriptionName()); 223 rc.setDestination(info.getSubscribedDestination()); 224 rc.setConsumerId(createConsumerId()); 225 return rc; 226 } 227 228 private ConsumerId createConsumerId() { 229 return new ConsumerId(recoveredDurableSubSessionId, recoveredDurableSubIdGenerator.getNextSequenceId()); 230 } 231 232 protected void configureTopic(Topic topic, ActiveMQDestination destination) { 233 if (broker.getDestinationPolicy() != null) { 234 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 235 if (entry != null) { 236 entry.configure(broker,topic); 237 } 238 } 239 } 240 241 @Override 242 protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException { 243 ActiveMQDestination destination = info.getDestination(); 244 245 if (info.isDurable()) { 246 if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 247 throw new JMSException("Cannot create a durable subscription for an advisory Topic"); 248 } 249 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 250 DurableTopicSubscription sub = durableSubscriptions.get(key); 251 252 if (sub == null) { 253 254 sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive); 255 256 if (destination != null && broker.getDestinationPolicy() != null) { 257 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 258 if (entry != null) { 259 entry.configure(broker, usageManager, sub); 260 } 261 } 262 durableSubscriptions.put(key, sub); 263 } else { 264 throw new JMSException("That durable subscription is already active."); 265 } 266 return sub; 267 } 268 try { 269 TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager); 270 // lets configure the subscription depending on the destination 271 if (destination != null && broker.getDestinationPolicy() != null) { 272 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 273 if (entry != null) { 274 entry.configure(broker, usageManager, answer); 275 } 276 } 277 answer.init(); 278 return answer; 279 } catch (Exception e) { 280 LOG.error("Failed to create TopicSubscription ", e); 281 JMSException jmsEx = new JMSException("Couldn't create TopicSubscription"); 282 jmsEx.setLinkedException(e); 283 throw jmsEx; 284 } 285 } 286 287 /** 288 */ 289 private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) { 290 if (info1.getSelector() != null ^ info2.getSelector() != null) { 291 return true; 292 } 293 if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) { 294 return true; 295 } 296 return !info1.getDestination().equals(info2.getDestination()); 297 } 298 299 @Override 300 protected Set<ActiveMQDestination> getInactiveDestinations() { 301 Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations(); 302 for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) { 303 ActiveMQDestination dest = iter.next(); 304 if (!dest.isTopic()) { 305 iter.remove(); 306 } 307 } 308 return inactiveDestinations; 309 } 310 311 public boolean isKeepDurableSubsActive() { 312 return keepDurableSubsActive; 313 } 314 315 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 316 this.keepDurableSubsActive = keepDurableSubsActive; 317 } 318 319 public boolean durableSubscriptionExists(SubscriptionKey key) { 320 return this.durableSubscriptions.containsKey(key); 321 } 322 323}