001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.util.ArrayList; 020import java.util.HashMap; 021import java.util.Iterator; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import java.util.concurrent.ConcurrentHashMap; 026import javax.jms.JMSException; 027import org.apache.activemq.broker.ConnectionContext; 028import org.apache.activemq.broker.ConsumerBrokerExchange; 029import org.apache.activemq.broker.DestinationAlreadyExistsException; 030import org.apache.activemq.broker.ProducerBrokerExchange; 031import org.apache.activemq.command.ActiveMQDestination; 032import org.apache.activemq.command.ConsumerControl; 033import org.apache.activemq.command.ConsumerId; 034import org.apache.activemq.command.ConsumerInfo; 035import org.apache.activemq.command.Message; 036import org.apache.activemq.command.MessageAck; 037import org.apache.activemq.command.MessageDispatchNotification; 038import org.apache.activemq.command.MessagePull; 039import org.apache.activemq.command.ProducerInfo; 040import org.apache.activemq.command.RemoveSubscriptionInfo; 041import org.apache.activemq.command.Response; 042import org.apache.activemq.filter.DestinationFilter; 043import org.apache.activemq.filter.DestinationMap; 044import org.apache.activemq.security.SecurityContext; 045import org.apache.activemq.thread.TaskRunnerFactory; 046import org.apache.activemq.usage.SystemUsage; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * 052 */ 053public abstract class AbstractRegion implements Region { 054 055 private static final Logger LOG = LoggerFactory.getLogger(AbstractRegion.class); 056 057 protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 058 protected final DestinationMap destinationMap = new DestinationMap(); 059 protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>(); 060 protected final SystemUsage usageManager; 061 protected final DestinationFactory destinationFactory; 062 protected final DestinationStatistics destinationStatistics; 063 protected final RegionBroker broker; 064 protected boolean autoCreateDestinations = true; 065 protected final TaskRunnerFactory taskRunnerFactory; 066 protected final Object destinationsMutex = new Object(); 067 protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>(); 068 protected boolean started; 069 070 public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, 071 TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 072 if (broker == null) { 073 throw new IllegalArgumentException("null broker"); 074 } 075 this.broker = broker; 076 this.destinationStatistics = destinationStatistics; 077 this.usageManager = memoryManager; 078 this.taskRunnerFactory = taskRunnerFactory; 079 if (broker == null) { 080 throw new IllegalArgumentException("null destinationFactory"); 081 } 082 this.destinationFactory = destinationFactory; 083 } 084 085 public final void start() throws Exception { 086 started = true; 087 088 Set<ActiveMQDestination> inactiveDests = getInactiveDestinations(); 089 for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) { 090 ActiveMQDestination dest = iter.next(); 091 092 ConnectionContext context = new ConnectionContext(); 093 context.setBroker(broker.getBrokerService().getBroker()); 094 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 095 context.getBroker().addDestination(context, dest, false); 096 } 097 synchronized (destinationsMutex) { 098 for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) { 099 Destination dest = i.next(); 100 dest.start(); 101 } 102 } 103 } 104 105 public void stop() throws Exception { 106 started = false; 107 synchronized (destinationsMutex) { 108 for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) { 109 Destination dest = i.next(); 110 dest.stop(); 111 } 112 } 113 destinations.clear(); 114 } 115 116 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, 117 boolean createIfTemporary) throws Exception { 118 LOG.debug(broker.getBrokerName() + " adding destination: " + destination); 119 synchronized (destinationsMutex) { 120 Destination dest = destinations.get(destination); 121 if (dest == null) { 122 if (destination.isTemporary() == false || createIfTemporary) { 123 dest = createDestination(context, destination); 124 // intercept if there is a valid interceptor defined 125 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); 126 if (destinationInterceptor != null) { 127 dest = destinationInterceptor.intercept(dest); 128 } 129 dest.start(); 130 destinations.put(destination, dest); 131 destinationMap.put(destination, dest); 132 addSubscriptionsForDestination(context, dest); 133 } 134 if (dest == null) { 135 throw new JMSException("The destination " + destination + " does not exist."); 136 } 137 } 138 return dest; 139 } 140 } 141 142 public Map<ConsumerId, Subscription> getSubscriptions() { 143 return subscriptions; 144 } 145 146 protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) 147 throws Exception { 148 149 List<Subscription> rc = new ArrayList<Subscription>(); 150 // Add all consumers that are interested in the destination. 151 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 152 Subscription sub = iter.next(); 153 if (sub.matches(dest.getActiveMQDestination())) { 154 dest.addSubscription(context, sub); 155 rc.add(sub); 156 } 157 } 158 return rc; 159 160 } 161 162 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) 163 throws Exception { 164 165 // No timeout.. then try to shut down right way, fails if there are 166 // current subscribers. 167 if (timeout == 0) { 168 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 169 Subscription sub = iter.next(); 170 if (sub.matches(destination)) { 171 throw new JMSException("Destination still has an active subscription: " + destination); 172 } 173 } 174 } 175 176 if (timeout > 0) { 177 // TODO: implement a way to notify the subscribers that we want to 178 // take the down 179 // the destination and that they should un-subscribe.. Then wait up 180 // to timeout time before 181 // dropping the subscription. 182 } 183 184 LOG.debug("Removing destination: " + destination); 185 186 synchronized (destinationsMutex) { 187 Destination dest = destinations.remove(destination); 188 if (dest != null) { 189 // timeout<0 or we timed out, we now force any remaining 190 // subscriptions to un-subscribe. 191 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 192 Subscription sub = iter.next(); 193 if (sub.matches(destination)) { 194 dest.removeSubscription(context, sub, 0l); 195 } 196 } 197 destinationMap.removeAll(destination); 198 dispose(context, dest); 199 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); 200 if (destinationInterceptor != null) { 201 destinationInterceptor.remove(dest); 202 } 203 204 } else { 205 LOG.debug("Destination doesn't exist: " + dest); 206 } 207 } 208 } 209 210 /** 211 * Provide an exact or wildcard lookup of destinations in the region 212 * 213 * @return a set of matching destination objects. 214 */ 215 public Set<Destination> getDestinations(ActiveMQDestination destination) { 216 synchronized (destinationsMutex) { 217 return destinationMap.get(destination); 218 } 219 } 220 221 public Map<ActiveMQDestination, Destination> getDestinationMap() { 222 synchronized (destinationsMutex) { 223 return new HashMap<ActiveMQDestination, Destination>(destinations); 224 } 225 } 226 227 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 228 LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: " 229 + info.getDestination()); 230 ActiveMQDestination destination = info.getDestination(); 231 if (destination != null && !destination.isPattern() && !destination.isComposite()) { 232 // lets auto-create the destination 233 lookup(context, destination,true); 234 } 235 236 Object addGuard; 237 synchronized (consumerChangeMutexMap) { 238 addGuard = consumerChangeMutexMap.get(info.getConsumerId()); 239 if (addGuard == null) { 240 addGuard = new Object(); 241 consumerChangeMutexMap.put(info.getConsumerId(), addGuard); 242 } 243 } 244 synchronized (addGuard) { 245 Subscription o = subscriptions.get(info.getConsumerId()); 246 if (o != null) { 247 LOG 248 .warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this."); 249 return o; 250 } 251 252 // We may need to add some destinations that are in persistent store 253 // but not active 254 // in the broker. 255 // 256 // TODO: think about this a little more. This is good cause 257 // destinations are not loaded into 258 // memory until a client needs to use the queue, but a management 259 // agent viewing the 260 // broker will not see a destination that exists in persistent 261 // store. We may want to 262 // eagerly load all destinations into the broker but have an 263 // inactive state for the 264 // destination which has reduced memory usage. 265 // 266 DestinationFilter.parseFilter(info.getDestination()); 267 268 Subscription sub = createSubscription(context, info); 269 270 subscriptions.put(info.getConsumerId(), sub); 271 272 // At this point we're done directly manipulating subscriptions, 273 // but we need to retain the synchronized block here. Consider 274 // otherwise what would happen if at this point a second 275 // thread added, then removed, as would be allowed with 276 // no mutex held. Remove is only essentially run once 277 // so everything after this point would be leaked. 278 279 // Add the subscription to all the matching queues. 280 // But copy the matches first - to prevent deadlocks 281 List<Destination> addList = new ArrayList<Destination>(); 282 synchronized (destinationsMutex) { 283 for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { 284 Destination dest = (Destination) iter.next(); 285 addList.add(dest); 286 } 287 } 288 289 for (Destination dest : addList) { 290 dest.addSubscription(context, sub); 291 } 292 293 if (info.isBrowser()) { 294 ((QueueBrowserSubscription) sub).destinationsAdded(); 295 } 296 297 return sub; 298 } 299 } 300 301 /** 302 * Get all the Destinations that are in storage 303 * 304 * @return Set of all stored destinations 305 */ 306 public Set getDurableDestinations() { 307 return destinationFactory.getDestinations(); 308 } 309 310 /** 311 * @return all Destinations that don't have active consumers 312 */ 313 protected Set<ActiveMQDestination> getInactiveDestinations() { 314 Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations(); 315 synchronized (destinationsMutex) { 316 inactiveDests.removeAll(destinations.keySet()); 317 } 318 return inactiveDests; 319 } 320 321 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 322 LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: " 323 + info.getDestination()); 324 325 Subscription sub = subscriptions.remove(info.getConsumerId()); 326 // The sub could be removed elsewhere - see ConnectionSplitBroker 327 if (sub != null) { 328 329 // remove the subscription from all the matching queues. 330 List<Destination> removeList = new ArrayList<Destination>(); 331 synchronized (destinationsMutex) { 332 for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { 333 Destination dest = (Destination) iter.next(); 334 removeList.add(dest); 335 336 } 337 } 338 for (Destination dest : removeList) { 339 dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId()); 340 } 341 342 destroySubscription(sub); 343 } 344 synchronized (consumerChangeMutexMap) { 345 consumerChangeMutexMap.remove(info.getConsumerId()); 346 } 347 } 348 349 protected void destroySubscription(Subscription sub) { 350 sub.destroy(); 351 } 352 353 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 354 throw new JMSException("Invalid operation."); 355 } 356 357 public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 358 final ConnectionContext context = producerExchange.getConnectionContext(); 359 360 if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) { 361 final Destination regionDestination = lookup(context, messageSend.getDestination(),false); 362 producerExchange.setRegionDestination(regionDestination); 363 } 364 365 producerExchange.getRegionDestination().send(producerExchange, messageSend); 366 } 367 368 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 369 Subscription sub = consumerExchange.getSubscription(); 370 if (sub == null) { 371 sub = subscriptions.get(ack.getConsumerId()); 372 if (sub == null) { 373 if (!consumerExchange.getConnectionContext().isInRecoveryMode()) { 374 LOG.warn("Ack for non existent subscription, ack:" + ack); 375 throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId()); 376 } else { 377 LOG.debug("Ack for non existent subscription in recovery, ack:" + ack); 378 return; 379 } 380 } 381 consumerExchange.setSubscription(sub); 382 } 383 sub.acknowledge(consumerExchange.getConnectionContext(), ack); 384 } 385 386 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 387 Subscription sub = subscriptions.get(pull.getConsumerId()); 388 if (sub == null) { 389 throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId()); 390 } 391 return sub.pullMessage(context, pull); 392 } 393 394 protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception { 395 Destination dest = null; 396 synchronized (destinationsMutex) { 397 dest = destinations.get(destination); 398 } 399 if (dest == null) { 400 if (isAutoCreateDestinations()) { 401 // Try to auto create the destination... re-invoke broker 402 // from the 403 // top so that the proper security checks are performed. 404 try { 405 context.getBroker().addDestination(context, destination, createTemporary); 406 dest = addDestination(context, destination, false); 407 } catch (DestinationAlreadyExistsException e) { 408 // if the destination already exists then lets ignore 409 // this error 410 } 411 // We should now have the dest created. 412 synchronized (destinationsMutex) { 413 dest = destinations.get(destination); 414 } 415 } 416 if (dest == null) { 417 throw new JMSException("The destination " + destination + " does not exist."); 418 } 419 } 420 return dest; 421 } 422 423 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 424 Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId()); 425 if (sub != null) { 426 sub.processMessageDispatchNotification(messageDispatchNotification); 427 } else { 428 throw new JMSException("Slave broker out of sync with master - Subscription: " 429 + messageDispatchNotification.getConsumerId() + " on " 430 + messageDispatchNotification.getDestination() + " does not exist for dispatch of message: " 431 + messageDispatchNotification.getMessageId()); 432 } 433 } 434 435 /* 436 * For a Queue/TempQueue, dispatch order is imperative to match acks, so the 437 * dispatch is deferred till the notification to ensure that the 438 * subscription chosen by the master is used. AMQ-2102 439 */ 440 protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification) 441 throws Exception { 442 Destination dest = null; 443 synchronized (destinationsMutex) { 444 dest = destinations.get(messageDispatchNotification.getDestination()); 445 } 446 if (dest != null) { 447 dest.processDispatchNotification(messageDispatchNotification); 448 } else { 449 throw new JMSException("Slave broker out of sync with master - Destination: " 450 + messageDispatchNotification.getDestination() + " does not exist for consumer " 451 + messageDispatchNotification.getConsumerId() + " with message: " 452 + messageDispatchNotification.getMessageId()); 453 } 454 } 455 456 public void gc() { 457 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 458 Subscription sub = iter.next(); 459 sub.gc(); 460 } 461 synchronized (destinationsMutex) { 462 for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) { 463 Destination dest = iter.next(); 464 dest.gc(); 465 } 466 } 467 } 468 469 protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception; 470 471 protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) 472 throws Exception { 473 return destinationFactory.createDestination(context, destination, destinationStatistics); 474 } 475 476 public boolean isAutoCreateDestinations() { 477 return autoCreateDestinations; 478 } 479 480 public void setAutoCreateDestinations(boolean autoCreateDestinations) { 481 this.autoCreateDestinations = autoCreateDestinations; 482 } 483 484 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 485 synchronized (destinationsMutex) { 486 for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { 487 Destination dest = (Destination) iter.next(); 488 dest.addProducer(context, info); 489 } 490 } 491 } 492 493 /** 494 * Removes a Producer. 495 * 496 * @param context 497 * the environment the operation is being executed under. 498 * @throws Exception 499 * TODO 500 */ 501 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 502 synchronized (destinationsMutex) { 503 for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { 504 Destination dest = (Destination) iter.next(); 505 dest.removeProducer(context, info); 506 } 507 } 508 } 509 510 protected void dispose(ConnectionContext context, Destination dest) throws Exception { 511 dest.dispose(context); 512 dest.stop(); 513 destinationFactory.removeDestination(dest); 514 } 515 516 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { 517 Subscription sub = subscriptions.get(control.getConsumerId()); 518 if (sub != null && sub instanceof AbstractSubscription) { 519 ((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch()); 520 if (LOG.isDebugEnabled()) { 521 LOG.debug("setting prefetch: " + control.getPrefetch() + ", on subscription: " 522 + control.getConsumerId()); 523 } 524 try { 525 lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup(); 526 } catch (Exception e) { 527 LOG.warn("failed to deliver consumerControl to destination: " + control.getDestination(), e); 528 } 529 } 530 } 531}