001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.io.OutputStream; 022import java.net.URI; 023import java.net.URISyntaxException; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.CopyOnWriteArrayList; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.ThreadFactory; 032import java.util.concurrent.ThreadPoolExecutor; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicInteger; 036import javax.jms.Connection; 037import javax.jms.ConnectionConsumer; 038import javax.jms.ConnectionMetaData; 039import javax.jms.DeliveryMode; 040import javax.jms.Destination; 041import javax.jms.ExceptionListener; 042import javax.jms.IllegalStateException; 043import javax.jms.InvalidDestinationException; 044import javax.jms.JMSException; 045import javax.jms.Queue; 046import javax.jms.QueueConnection; 047import javax.jms.QueueSession; 048import javax.jms.ServerSessionPool; 049import javax.jms.Session; 050import javax.jms.Topic; 051import javax.jms.TopicConnection; 052import javax.jms.TopicSession; 053import javax.jms.XAConnection; 054import org.apache.activemq.advisory.DestinationSource; 055import org.apache.activemq.blob.BlobTransferPolicy; 056import org.apache.activemq.command.ActiveMQDestination; 057import org.apache.activemq.command.ActiveMQMessage; 058import org.apache.activemq.command.ActiveMQTempDestination; 059import org.apache.activemq.command.ActiveMQTempQueue; 060import org.apache.activemq.command.ActiveMQTempTopic; 061import org.apache.activemq.command.BrokerInfo; 062import org.apache.activemq.command.Command; 063import org.apache.activemq.command.CommandTypes; 064import org.apache.activemq.command.ConnectionControl; 065import org.apache.activemq.command.ConnectionError; 066import org.apache.activemq.command.ConnectionId; 067import org.apache.activemq.command.ConnectionInfo; 068import org.apache.activemq.command.ConsumerControl; 069import org.apache.activemq.command.ConsumerId; 070import org.apache.activemq.command.ConsumerInfo; 071import org.apache.activemq.command.ControlCommand; 072import org.apache.activemq.command.DestinationInfo; 073import org.apache.activemq.command.ExceptionResponse; 074import org.apache.activemq.command.Message; 075import org.apache.activemq.command.MessageDispatch; 076import org.apache.activemq.command.MessageId; 077import org.apache.activemq.command.ProducerAck; 078import org.apache.activemq.command.ProducerId; 079import org.apache.activemq.command.RemoveInfo; 080import org.apache.activemq.command.RemoveSubscriptionInfo; 081import org.apache.activemq.command.Response; 082import org.apache.activemq.command.SessionId; 083import org.apache.activemq.command.ShutdownInfo; 084import org.apache.activemq.command.WireFormatInfo; 085import org.apache.activemq.management.JMSConnectionStatsImpl; 086import org.apache.activemq.management.JMSStatsImpl; 087import org.apache.activemq.management.StatsCapable; 088import org.apache.activemq.management.StatsImpl; 089import org.apache.activemq.state.CommandVisitorAdapter; 090import org.apache.activemq.thread.Scheduler; 091import org.apache.activemq.thread.TaskRunnerFactory; 092import org.apache.activemq.transport.Transport; 093import org.apache.activemq.transport.TransportListener; 094import org.apache.activemq.transport.failover.FailoverTransport; 095import org.apache.activemq.util.IdGenerator; 096import org.apache.activemq.util.IntrospectionSupport; 097import org.apache.activemq.util.JMSExceptionSupport; 098import org.apache.activemq.util.LongSequenceGenerator; 099import org.apache.activemq.util.ServiceSupport; 100import org.slf4j.Logger; 101import org.slf4j.LoggerFactory; 102 103public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection { 104 105 public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER; 106 public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; 107 public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; 108 109 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class); 110 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 111 112 public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>(); 113 114 protected boolean dispatchAsync=true; 115 protected boolean alwaysSessionAsync = true; 116 117 private TaskRunnerFactory sessionTaskRunner; 118 private final ThreadPoolExecutor executor; 119 120 // Connection state variables 121 private final ConnectionInfo info; 122 private ExceptionListener exceptionListener; 123 private ClientInternalExceptionListener clientInternalExceptionListener; 124 private boolean clientIDSet; 125 private boolean isConnectionInfoSentToBroker; 126 private boolean userSpecifiedClientID; 127 128 // Configuration options variables 129 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 130 private BlobTransferPolicy blobTransferPolicy; 131 private RedeliveryPolicy redeliveryPolicy; 132 private MessageTransformer transformer; 133 134 private boolean disableTimeStampsByDefault; 135 private boolean optimizedMessageDispatch = true; 136 private boolean copyMessageOnSend = true; 137 private boolean useCompression; 138 private boolean objectMessageSerializationDefered; 139 private boolean useAsyncSend; 140 private boolean optimizeAcknowledge; 141 private boolean nestedMapAndListEnabled = true; 142 private boolean useRetroactiveConsumer; 143 private boolean exclusiveConsumer; 144 private boolean alwaysSyncSend; 145 private int closeTimeout = 15000; 146 private boolean watchTopicAdvisories = true; 147 private long warnAboutUnstartedConnectionTimeout = 500L; 148 private int sendTimeout =0; 149 private boolean sendAcksAsync=true; 150 private boolean checkForDuplicates = true; 151 152 private final Transport transport; 153 private final IdGenerator clientIdGenerator; 154 private final JMSStatsImpl factoryStats; 155 private final JMSConnectionStatsImpl stats; 156 157 private final AtomicBoolean started = new AtomicBoolean(false); 158 private final AtomicBoolean closing = new AtomicBoolean(false); 159 private final AtomicBoolean closed = new AtomicBoolean(false); 160 private final AtomicBoolean transportFailed = new AtomicBoolean(false); 161 private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>(); 162 private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>(); 163 private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>(); 164 private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>(); 165 private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>(); 166 167 // Maps ConsumerIds to ActiveMQConsumer objects 168 private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>(); 169 private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>(); 170 private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator(); 171 private final SessionId connectionSessionId; 172 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 173 private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 174 private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator(); 175 private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator(); 176 177 private AdvisoryConsumer advisoryConsumer; 178 private final CountDownLatch brokerInfoReceived = new CountDownLatch(1); 179 private BrokerInfo brokerInfo; 180 private IOException firstFailureError; 181 private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE; 182 183 // Assume that protocol is the latest. Change to the actual protocol 184 // version when a WireFormatInfo is received. 185 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 186 private final long timeCreated; 187 private final ConnectionAudit connectionAudit = new ConnectionAudit(); 188 private DestinationSource destinationSource; 189 private final Object ensureConnectionInfoSentMutex = new Object(); 190 private boolean useDedicatedTaskRunner; 191 protected volatile CountDownLatch transportInterruptionProcessingComplete; 192 private long consumerFailoverRedeliveryWaitPeriod; 193 private final Scheduler scheduler; 194 private boolean messagePrioritySupported=true; 195 196 /** 197 * Construct an <code>ActiveMQConnection</code> 198 * 199 * @param transport 200 * @param factoryStats 201 * @throws Exception 202 */ 203 protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception { 204 205 this.transport = transport; 206 this.clientIdGenerator = clientIdGenerator; 207 this.factoryStats = factoryStats; 208 209 // Configure a single threaded executor who's core thread can timeout if 210 // idle 211 executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 212 public Thread newThread(Runnable r) { 213 Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport); 214 thread.setDaemon(true); 215 return thread; 216 } 217 }); 218 // asyncConnectionThread.allowCoreThreadTimeOut(true); 219 String uniqueId = CONNECTION_ID_GENERATOR.generateId(); 220 this.info = new ConnectionInfo(new ConnectionId(uniqueId)); 221 this.info.setManageable(true); 222 this.info.setFaultTolerant(transport.isFaultTolerant()); 223 this.connectionSessionId = new SessionId(info.getConnectionId(), -1); 224 225 this.transport.setTransportListener(this); 226 227 this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection); 228 this.factoryStats.addConnection(this); 229 this.timeCreated = System.currentTimeMillis(); 230 this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant()); 231 this.scheduler = new Scheduler("ActiveMQConnection["+uniqueId+"] Scheduler"); 232 this.scheduler.start(); 233 } 234 235 protected void setUserName(String userName) { 236 this.info.setUserName(userName); 237 } 238 239 protected void setPassword(String password) { 240 this.info.setPassword(password); 241 } 242 243 /** 244 * A static helper method to create a new connection 245 * 246 * @return an ActiveMQConnection 247 * @throws JMSException 248 */ 249 public static ActiveMQConnection makeConnection() throws JMSException { 250 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); 251 return (ActiveMQConnection)factory.createConnection(); 252 } 253 254 /** 255 * A static helper method to create a new connection 256 * 257 * @param uri 258 * @return and ActiveMQConnection 259 * @throws JMSException 260 */ 261 public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException { 262 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); 263 return (ActiveMQConnection)factory.createConnection(); 264 } 265 266 /** 267 * A static helper method to create a new connection 268 * 269 * @param user 270 * @param password 271 * @param uri 272 * @return an ActiveMQConnection 273 * @throws JMSException 274 */ 275 public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException { 276 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri)); 277 return (ActiveMQConnection)factory.createConnection(); 278 } 279 280 /** 281 * @return a number unique for this connection 282 */ 283 public JMSConnectionStatsImpl getConnectionStats() { 284 return stats; 285 } 286 287 /** 288 * Creates a <CODE>Session</CODE> object. 289 * 290 * @param transacted indicates whether the session is transacted 291 * @param acknowledgeMode indicates whether the consumer or the client will 292 * acknowledge any messages it receives; ignored if the 293 * session is transacted. Legal values are 294 * <code>Session.AUTO_ACKNOWLEDGE</code>, 295 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 296 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 297 * @return a newly created session 298 * @throws JMSException if the <CODE>Connection</CODE> object fails to 299 * create a session due to some internal error or lack of 300 * support for the specific transaction and acknowledgement 301 * mode. 302 * @see Session#AUTO_ACKNOWLEDGE 303 * @see Session#CLIENT_ACKNOWLEDGE 304 * @see Session#DUPS_OK_ACKNOWLEDGE 305 * @since 1.1 306 */ 307 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { 308 checkClosedOrFailed(); 309 ensureConnectionInfoSent(); 310 if(!transacted) { 311 if (acknowledgeMode==Session.SESSION_TRANSACTED) { 312 throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session"); 313 } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) { 314 throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " + 315 "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)"); 316 } 317 } 318 return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED 319 ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync()); 320 } 321 322 /** 323 * @return sessionId 324 */ 325 protected SessionId getNextSessionId() { 326 return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId()); 327 } 328 329 /** 330 * Gets the client identifier for this connection. 331 * <P> 332 * This value is specific to the JMS provider. It is either preconfigured by 333 * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned 334 * dynamically by the application by calling the <code>setClientID</code> 335 * method. 336 * 337 * @return the unique client identifier 338 * @throws JMSException if the JMS provider fails to return the client ID 339 * for this connection due to some internal error. 340 */ 341 public String getClientID() throws JMSException { 342 checkClosedOrFailed(); 343 return this.info.getClientId(); 344 } 345 346 /** 347 * Sets the client identifier for this connection. 348 * <P> 349 * The preferred way to assign a JMS client's client identifier is for it to 350 * be configured in a client-specific <CODE>ConnectionFactory</CODE> 351 * object and transparently assigned to the <CODE>Connection</CODE> object 352 * it creates. 353 * <P> 354 * Alternatively, a client can set a connection's client identifier using a 355 * provider-specific value. The facility to set a connection's client 356 * identifier explicitly is not a mechanism for overriding the identifier 357 * that has been administratively configured. It is provided for the case 358 * where no administratively specified identifier exists. If one does exist, 359 * an attempt to change it by setting it must throw an 360 * <CODE>IllegalStateException</CODE>. If a client sets the client 361 * identifier explicitly, it must do so immediately after it creates the 362 * connection and before any other action on the connection is taken. After 363 * this point, setting the client identifier is a programming error that 364 * should throw an <CODE>IllegalStateException</CODE>. 365 * <P> 366 * The purpose of the client identifier is to associate a connection and its 367 * objects with a state maintained on behalf of the client by a provider. 368 * The only such state identified by the JMS API is that required to support 369 * durable subscriptions. 370 * <P> 371 * If another connection with the same <code>clientID</code> is already 372 * running when this method is called, the JMS provider should detect the 373 * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>. 374 * 375 * @param newClientID the unique client identifier 376 * @throws JMSException if the JMS provider fails to set the client ID for 377 * this connection due to some internal error. 378 * @throws javax.jms.InvalidClientIDException if the JMS client specifies an 379 * invalid or duplicate client ID. 380 * @throws javax.jms.IllegalStateException if the JMS client attempts to set 381 * a connection's client ID at the wrong time or when it has 382 * been administratively configured. 383 */ 384 public void setClientID(String newClientID) throws JMSException { 385 checkClosedOrFailed(); 386 387 if (this.clientIDSet) { 388 throw new IllegalStateException("The clientID has already been set"); 389 } 390 391 if (this.isConnectionInfoSentToBroker) { 392 throw new IllegalStateException("Setting clientID on a used Connection is not allowed"); 393 } 394 395 this.info.setClientId(newClientID); 396 this.userSpecifiedClientID = true; 397 ensureConnectionInfoSent(); 398 } 399 400 /** 401 * Sets the default client id that the connection will use if explicitly not 402 * set with the setClientId() call. 403 */ 404 public void setDefaultClientID(String clientID) throws JMSException { 405 this.info.setClientId(clientID); 406 this.userSpecifiedClientID = true; 407 } 408 409 /** 410 * Gets the metadata for this connection. 411 * 412 * @return the connection metadata 413 * @throws JMSException if the JMS provider fails to get the connection 414 * metadata for this connection. 415 * @see javax.jms.ConnectionMetaData 416 */ 417 public ConnectionMetaData getMetaData() throws JMSException { 418 checkClosedOrFailed(); 419 return ActiveMQConnectionMetaData.INSTANCE; 420 } 421 422 /** 423 * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not 424 * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE> 425 * associated with it. 426 * 427 * @return the <CODE>ExceptionListener</CODE> for this connection, or 428 * null, if no <CODE>ExceptionListener</CODE> is associated with 429 * this connection. 430 * @throws JMSException if the JMS provider fails to get the 431 * <CODE>ExceptionListener</CODE> for this connection. 432 * @see javax.jms.Connection#setExceptionListener(ExceptionListener) 433 */ 434 public ExceptionListener getExceptionListener() throws JMSException { 435 checkClosedOrFailed(); 436 return this.exceptionListener; 437 } 438 439 /** 440 * Sets an exception listener for this connection. 441 * <P> 442 * If a JMS provider detects a serious problem with a connection, it informs 443 * the connection's <CODE> ExceptionListener</CODE>, if one has been 444 * registered. It does this by calling the listener's <CODE>onException 445 * </CODE> 446 * method, passing it a <CODE>JMSException</CODE> object describing the 447 * problem. 448 * <P> 449 * An exception listener allows a client to be notified of a problem 450 * asynchronously. Some connections only consume messages, so they would 451 * have no other way to learn their connection has failed. 452 * <P> 453 * A connection serializes execution of its <CODE>ExceptionListener</CODE>. 454 * <P> 455 * A JMS provider should attempt to resolve connection problems itself 456 * before it notifies the client of them. 457 * 458 * @param listener the exception listener 459 * @throws JMSException if the JMS provider fails to set the exception 460 * listener for this connection. 461 */ 462 public void setExceptionListener(ExceptionListener listener) throws JMSException { 463 checkClosedOrFailed(); 464 this.exceptionListener = listener; 465 } 466 467 /** 468 * Gets the <code>ClientInternalExceptionListener</code> object for this connection. 469 * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE> 470 * associated with it. 471 * 472 * @return the listener or <code>null</code> if no listener is registered with the connection. 473 */ 474 public ClientInternalExceptionListener getClientInternalExceptionListener() 475 { 476 return clientInternalExceptionListener; 477 } 478 479 /** 480 * Sets a client internal exception listener for this connection. 481 * The connection will notify the listener, if one has been registered, of exceptions thrown by container components 482 * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message. 483 * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code> 484 * describing the problem. 485 * 486 * @param listener the exception listener 487 */ 488 public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) 489 { 490 this.clientInternalExceptionListener = listener; 491 } 492 493 /** 494 * Starts (or restarts) a connection's delivery of incoming messages. A call 495 * to <CODE>start</CODE> on a connection that has already been started is 496 * ignored. 497 * 498 * @throws JMSException if the JMS provider fails to start message delivery 499 * due to some internal error. 500 * @see javax.jms.Connection#stop() 501 */ 502 public void start() throws JMSException { 503 checkClosedOrFailed(); 504 ensureConnectionInfoSent(); 505 if (started.compareAndSet(false, true)) { 506 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) { 507 ActiveMQSession session = i.next(); 508 session.start(); 509 } 510 } 511 } 512 513 /** 514 * Temporarily stops a connection's delivery of incoming messages. Delivery 515 * can be restarted using the connection's <CODE>start</CODE> method. When 516 * the connection is stopped, delivery to all the connection's message 517 * consumers is inhibited: synchronous receives block, and messages are not 518 * delivered to message listeners. 519 * <P> 520 * This call blocks until receives and/or message listeners in progress have 521 * completed. 522 * <P> 523 * Stopping a connection has no effect on its ability to send messages. A 524 * call to <CODE>stop</CODE> on a connection that has already been stopped 525 * is ignored. 526 * <P> 527 * A call to <CODE>stop</CODE> must not return until delivery of messages 528 * has paused. This means that a client can rely on the fact that none of 529 * its message listeners will be called and that all threads of control 530 * waiting for <CODE>receive</CODE> calls to return will not return with a 531 * message until the connection is restarted. The receive timers for a 532 * stopped connection continue to advance, so receives may time out while 533 * the connection is stopped. 534 * <P> 535 * If message listeners are running when <CODE>stop</CODE> is invoked, the 536 * <CODE>stop</CODE> call must wait until all of them have returned before 537 * it may return. While these message listeners are completing, they must 538 * have the full services of the connection available to them. 539 * 540 * @throws JMSException if the JMS provider fails to stop message delivery 541 * due to some internal error. 542 * @see javax.jms.Connection#start() 543 */ 544 public void stop() throws JMSException { 545 checkClosedOrFailed(); 546 if (started.compareAndSet(true, false)) { 547 synchronized(sessions) { 548 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) { 549 ActiveMQSession s = i.next(); 550 s.stop(); 551 } 552 } 553 } 554 } 555 556 /** 557 * Closes the connection. 558 * <P> 559 * Since a provider typically allocates significant resources outside the 560 * JVM on behalf of a connection, clients should close these resources when 561 * they are not needed. Relying on garbage collection to eventually reclaim 562 * these resources may not be timely enough. 563 * <P> 564 * There is no need to close the sessions, producers, and consumers of a 565 * closed connection. 566 * <P> 567 * Closing a connection causes all temporary destinations to be deleted. 568 * <P> 569 * When this method is invoked, it should not return until message 570 * processing has been shut down in an orderly fashion. This means that all 571 * message listeners that may have been running have returned, and that all 572 * pending receives have returned. A close terminates all pending message 573 * receives on the connection's sessions' consumers. The receives may return 574 * with a message or with null, depending on whether there was a message 575 * available at the time of the close. If one or more of the connection's 576 * sessions' message listeners is processing a message at the time when 577 * connection <CODE>close</CODE> is invoked, all the facilities of the 578 * connection and its sessions must remain available to those listeners 579 * until they return control to the JMS provider. 580 * <P> 581 * Closing a connection causes any of its sessions' transactions in progress 582 * to be rolled back. In the case where a session's work is coordinated by 583 * an external transaction manager, a session's <CODE>commit</CODE> and 584 * <CODE> rollback</CODE> methods are not used and the result of a closed 585 * session's work is determined later by the transaction manager. Closing a 586 * connection does NOT force an acknowledgment of client-acknowledged 587 * sessions. 588 * <P> 589 * Invoking the <CODE>acknowledge</CODE> method of a received message from 590 * a closed connection's session must throw an 591 * <CODE>IllegalStateException</CODE>. Closing a closed connection must 592 * NOT throw an exception. 593 * 594 * @throws JMSException if the JMS provider fails to close the connection 595 * due to some internal error. For example, a failure to 596 * release resources or to close a socket connection can 597 * cause this exception to be thrown. 598 */ 599 public void close() throws JMSException { 600 try { 601 // If we were running, lets stop first. 602 if (!closed.get() && !transportFailed.get()) { 603 stop(); 604 } 605 606 synchronized (this) { 607 if (!closed.get()) { 608 closing.set(true); 609 610 if (destinationSource != null) { 611 destinationSource.stop(); 612 destinationSource = null; 613 } 614 if (advisoryConsumer != null) { 615 advisoryConsumer.dispose(); 616 advisoryConsumer = null; 617 } 618 if (this.scheduler != null) { 619 try { 620 this.scheduler.stop(); 621 } catch (Exception e) { 622 JMSException ex = JMSExceptionSupport.create(e); 623 throw ex; 624 } 625 } 626 627 long lastDeliveredSequenceId = 0; 628 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 629 ActiveMQSession s = i.next(); 630 s.dispose(); 631 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId()); 632 } 633 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) { 634 ActiveMQConnectionConsumer c = i.next(); 635 c.dispose(); 636 } 637 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) { 638 ActiveMQInputStream c = i.next(); 639 c.dispose(); 640 } 641 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) { 642 ActiveMQOutputStream c = i.next(); 643 c.dispose(); 644 } 645 646 // As TemporaryQueue and TemporaryTopic instances are bound 647 // to a connection we should just delete them after the connection 648 // is closed to free up memory 649 for (Iterator<ActiveMQTempDestination> i = this.activeTempDestinations.values().iterator(); i.hasNext();) { 650 ActiveMQTempDestination c = i.next(); 651 c.delete(); 652 } 653 654 if (isConnectionInfoSentToBroker) { 655 // If we announced ourselfs to the broker.. Try to let 656 // the broker 657 // know that the connection is being shutdown. 658 RemoveInfo removeCommand = info.createRemoveCommand(); 659 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 660 doSyncSendPacket(info.createRemoveCommand(), closeTimeout); 661 doAsyncSendPacket(new ShutdownInfo()); 662 } 663 664 ServiceSupport.dispose(this.transport); 665 666 started.set(false); 667 668 // TODO if we move the TaskRunnerFactory to the connection 669 // factory 670 // then we may need to call 671 // factory.onConnectionClose(this); 672 if (sessionTaskRunner != null) { 673 sessionTaskRunner.shutdown(); 674 } 675 closed.set(true); 676 closing.set(false); 677 } 678 } 679 } finally { 680 try { 681 if (executor != null){ 682 executor.shutdown(); 683 } 684 }catch(Throwable e) { 685 LOG.error("Error shutting down thread pool " + e,e); 686 } 687 factoryStats.removeConnection(this); 688 } 689 } 690 691 /** 692 * Tells the broker to terminate its VM. This can be used to cleanly 693 * terminate a broker running in a standalone java process. Server must have 694 * property enable.vm.shutdown=true defined to allow this to work. 695 */ 696 // TODO : org.apache.activemq.message.BrokerAdminCommand not yet 697 // implemented. 698 /* 699 * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand 700 * command = new BrokerAdminCommand(); 701 * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM); 702 * asyncSendPacket(command); } 703 */ 704 705 /** 706 * Create a durable connection consumer for this connection (optional 707 * operation). This is an expert facility not used by regular JMS clients. 708 * 709 * @param topic topic to access 710 * @param subscriptionName durable subscription name 711 * @param messageSelector only messages with properties matching the message 712 * selector expression are delivered. A value of null or an 713 * empty string indicates that there is no message selector 714 * for the message consumer. 715 * @param sessionPool the server session pool to associate with this durable 716 * connection consumer 717 * @param maxMessages the maximum number of messages that can be assigned to 718 * a server session at one time 719 * @return the durable connection consumer 720 * @throws JMSException if the <CODE>Connection</CODE> object fails to 721 * create a connection consumer due to some internal error 722 * or invalid arguments for <CODE>sessionPool</CODE> and 723 * <CODE>messageSelector</CODE>. 724 * @throws javax.jms.InvalidDestinationException if an invalid destination 725 * is specified. 726 * @throws javax.jms.InvalidSelectorException if the message selector is 727 * invalid. 728 * @see javax.jms.ConnectionConsumer 729 * @since 1.1 730 */ 731 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) 732 throws JMSException { 733 return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false); 734 } 735 736 /** 737 * Create a durable connection consumer for this connection (optional 738 * operation). This is an expert facility not used by regular JMS clients. 739 * 740 * @param topic topic to access 741 * @param subscriptionName durable subscription name 742 * @param messageSelector only messages with properties matching the message 743 * selector expression are delivered. A value of null or an 744 * empty string indicates that there is no message selector 745 * for the message consumer. 746 * @param sessionPool the server session pool to associate with this durable 747 * connection consumer 748 * @param maxMessages the maximum number of messages that can be assigned to 749 * a server session at one time 750 * @param noLocal set true if you want to filter out messages published 751 * locally 752 * @return the durable connection consumer 753 * @throws JMSException if the <CODE>Connection</CODE> object fails to 754 * create a connection consumer due to some internal error 755 * or invalid arguments for <CODE>sessionPool</CODE> and 756 * <CODE>messageSelector</CODE>. 757 * @throws javax.jms.InvalidDestinationException if an invalid destination 758 * is specified. 759 * @throws javax.jms.InvalidSelectorException if the message selector is 760 * invalid. 761 * @see javax.jms.ConnectionConsumer 762 * @since 1.1 763 */ 764 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages, 765 boolean noLocal) throws JMSException { 766 checkClosedOrFailed(); 767 ensureConnectionInfoSent(); 768 SessionId sessionId = new SessionId(info.getConnectionId(), -1); 769 ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId())); 770 info.setDestination(ActiveMQMessageTransformation.transformDestination(topic)); 771 info.setSubscriptionName(subscriptionName); 772 info.setSelector(messageSelector); 773 info.setPrefetchSize(maxMessages); 774 info.setDispatchAsync(isDispatchAsync()); 775 776 // Allows the options on the destination to configure the consumerInfo 777 if (info.getDestination().getOptions() != null) { 778 Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions()); 779 IntrospectionSupport.setProperties(this.info, options, "consumer."); 780 } 781 782 return new ActiveMQConnectionConsumer(this, sessionPool, info); 783 } 784 785 // Properties 786 // ------------------------------------------------------------------------- 787 788 /** 789 * Returns true if this connection has been started 790 * 791 * @return true if this Connection is started 792 */ 793 public boolean isStarted() { 794 return started.get(); 795 } 796 797 /** 798 * Returns true if the connection is closed 799 */ 800 public boolean isClosed() { 801 return closed.get(); 802 } 803 804 /** 805 * Returns true if the connection is in the process of being closed 806 */ 807 public boolean isClosing() { 808 return closing.get(); 809 } 810 811 /** 812 * Returns true if the underlying transport has failed 813 */ 814 public boolean isTransportFailed() { 815 return transportFailed.get(); 816 } 817 818 /** 819 * @return Returns the prefetchPolicy. 820 */ 821 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 822 return prefetchPolicy; 823 } 824 825 /** 826 * Sets the <a 827 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch 828 * policy</a> for consumers created by this connection. 829 */ 830 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 831 this.prefetchPolicy = prefetchPolicy; 832 } 833 834 /** 835 */ 836 public Transport getTransportChannel() { 837 return transport; 838 } 839 840 /** 841 * @return Returns the clientID of the connection, forcing one to be 842 * generated if one has not yet been configured. 843 */ 844 public String getInitializedClientID() throws JMSException { 845 ensureConnectionInfoSent(); 846 return info.getClientId(); 847 } 848 849 /** 850 * @return Returns the timeStampsDisableByDefault. 851 */ 852 public boolean isDisableTimeStampsByDefault() { 853 return disableTimeStampsByDefault; 854 } 855 856 /** 857 * Sets whether or not timestamps on messages should be disabled or not. If 858 * you disable them it adds a small performance boost. 859 */ 860 public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) { 861 this.disableTimeStampsByDefault = timeStampsDisableByDefault; 862 } 863 864 /** 865 * @return Returns the dispatchOptimizedMessage. 866 */ 867 public boolean isOptimizedMessageDispatch() { 868 return optimizedMessageDispatch; 869 } 870 871 /** 872 * If this flag is set then an larger prefetch limit is used - only 873 * applicable for durable topic subscribers. 874 */ 875 public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) { 876 this.optimizedMessageDispatch = dispatchOptimizedMessage; 877 } 878 879 /** 880 * @return Returns the closeTimeout. 881 */ 882 public int getCloseTimeout() { 883 return closeTimeout; 884 } 885 886 /** 887 * Sets the timeout before a close is considered complete. Normally a 888 * close() on a connection waits for confirmation from the broker; this 889 * allows that operation to timeout to save the client hanging if there is 890 * no broker 891 */ 892 public void setCloseTimeout(int closeTimeout) { 893 this.closeTimeout = closeTimeout; 894 } 895 896 /** 897 * @return ConnectionInfo 898 */ 899 public ConnectionInfo getConnectionInfo() { 900 return this.info; 901 } 902 903 public boolean isUseRetroactiveConsumer() { 904 return useRetroactiveConsumer; 905 } 906 907 /** 908 * Sets whether or not retroactive consumers are enabled. Retroactive 909 * consumers allow non-durable topic subscribers to receive old messages 910 * that were published before the non-durable subscriber started. 911 */ 912 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) { 913 this.useRetroactiveConsumer = useRetroactiveConsumer; 914 } 915 916 public boolean isNestedMapAndListEnabled() { 917 return nestedMapAndListEnabled; 918 } 919 920 /** 921 * Enables/disables whether or not Message properties and MapMessage entries 922 * support <a 923 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested 924 * Structures</a> of Map and List objects 925 */ 926 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { 927 this.nestedMapAndListEnabled = structuredMapsEnabled; 928 } 929 930 public boolean isExclusiveConsumer() { 931 return exclusiveConsumer; 932 } 933 934 /** 935 * Enables or disables whether or not queue consumers should be exclusive or 936 * not for example to preserve ordering when not using <a 937 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 938 * 939 * @param exclusiveConsumer 940 */ 941 public void setExclusiveConsumer(boolean exclusiveConsumer) { 942 this.exclusiveConsumer = exclusiveConsumer; 943 } 944 945 /** 946 * Adds a transport listener so that a client can be notified of events in 947 * the underlying transport 948 */ 949 public void addTransportListener(TransportListener transportListener) { 950 transportListeners.add(transportListener); 951 } 952 953 public void removeTransportListener(TransportListener transportListener) { 954 transportListeners.remove(transportListener); 955 } 956 957 public boolean isUseDedicatedTaskRunner() { 958 return useDedicatedTaskRunner; 959 } 960 961 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 962 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 963 } 964 965 public TaskRunnerFactory getSessionTaskRunner() { 966 synchronized (this) { 967 if (sessionTaskRunner == null) { 968 sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner()); 969 } 970 } 971 return sessionTaskRunner; 972 } 973 974 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) { 975 this.sessionTaskRunner = sessionTaskRunner; 976 } 977 978 public MessageTransformer getTransformer() { 979 return transformer; 980 } 981 982 /** 983 * Sets the transformer used to transform messages before they are sent on 984 * to the JMS bus or when they are received from the bus but before they are 985 * delivered to the JMS client 986 */ 987 public void setTransformer(MessageTransformer transformer) { 988 this.transformer = transformer; 989 } 990 991 /** 992 * @return the statsEnabled 993 */ 994 public boolean isStatsEnabled() { 995 return this.stats.isEnabled(); 996 } 997 998 /** 999 * @param statsEnabled the statsEnabled to set 1000 */ 1001 public void setStatsEnabled(boolean statsEnabled) { 1002 this.stats.setEnabled(statsEnabled); 1003 } 1004 1005 /** 1006 * Returns the {@link DestinationSource} object which can be used to listen to destinations 1007 * being created or destroyed or to enquire about the current destinations available on the broker 1008 * 1009 * @return a lazily created destination source 1010 * @throws JMSException 1011 */ 1012 public DestinationSource getDestinationSource() throws JMSException { 1013 if (destinationSource == null) { 1014 destinationSource = new DestinationSource(this); 1015 destinationSource.start(); 1016 } 1017 return destinationSource; 1018 } 1019 1020 // Implementation methods 1021 // ------------------------------------------------------------------------- 1022 1023 /** 1024 * Used internally for adding Sessions to the Connection 1025 * 1026 * @param session 1027 * @throws JMSException 1028 * @throws JMSException 1029 */ 1030 protected void addSession(ActiveMQSession session) throws JMSException { 1031 this.sessions.add(session); 1032 if (sessions.size() > 1 || session.isTransacted()) { 1033 optimizedMessageDispatch = false; 1034 } 1035 } 1036 1037 /** 1038 * Used interanlly for removing Sessions from a Connection 1039 * 1040 * @param session 1041 */ 1042 protected void removeSession(ActiveMQSession session) { 1043 this.sessions.remove(session); 1044 this.removeDispatcher(session); 1045 } 1046 1047 /** 1048 * Add a ConnectionConsumer 1049 * 1050 * @param connectionConsumer 1051 * @throws JMSException 1052 */ 1053 protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException { 1054 this.connectionConsumers.add(connectionConsumer); 1055 } 1056 1057 /** 1058 * Remove a ConnectionConsumer 1059 * 1060 * @param connectionConsumer 1061 */ 1062 protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) { 1063 this.connectionConsumers.remove(connectionConsumer); 1064 this.removeDispatcher(connectionConsumer); 1065 } 1066 1067 /** 1068 * Creates a <CODE>TopicSession</CODE> object. 1069 * 1070 * @param transacted indicates whether the session is transacted 1071 * @param acknowledgeMode indicates whether the consumer or the client will 1072 * acknowledge any messages it receives; ignored if the 1073 * session is transacted. Legal values are 1074 * <code>Session.AUTO_ACKNOWLEDGE</code>, 1075 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 1076 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 1077 * @return a newly created topic session 1078 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails 1079 * to create a session due to some internal error or lack of 1080 * support for the specific transaction and acknowledgement 1081 * mode. 1082 * @see Session#AUTO_ACKNOWLEDGE 1083 * @see Session#CLIENT_ACKNOWLEDGE 1084 * @see Session#DUPS_OK_ACKNOWLEDGE 1085 */ 1086 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { 1087 return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode)); 1088 } 1089 1090 /** 1091 * Creates a connection consumer for this connection (optional operation). 1092 * This is an expert facility not used by regular JMS clients. 1093 * 1094 * @param topic the topic to access 1095 * @param messageSelector only messages with properties matching the message 1096 * selector expression are delivered. A value of null or an 1097 * empty string indicates that there is no message selector 1098 * for the message consumer. 1099 * @param sessionPool the server session pool to associate with this 1100 * connection consumer 1101 * @param maxMessages the maximum number of messages that can be assigned to 1102 * a server session at one time 1103 * @return the connection consumer 1104 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails 1105 * to create a connection consumer due to some internal 1106 * error or invalid arguments for <CODE>sessionPool</CODE> 1107 * and <CODE>messageSelector</CODE>. 1108 * @throws javax.jms.InvalidDestinationException if an invalid topic is 1109 * specified. 1110 * @throws javax.jms.InvalidSelectorException if the message selector is 1111 * invalid. 1112 * @see javax.jms.ConnectionConsumer 1113 */ 1114 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1115 return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false); 1116 } 1117 1118 /** 1119 * Creates a connection consumer for this connection (optional operation). 1120 * This is an expert facility not used by regular JMS clients. 1121 * 1122 * @param queue the queue to access 1123 * @param messageSelector only messages with properties matching the message 1124 * selector expression are delivered. A value of null or an 1125 * empty string indicates that there is no message selector 1126 * for the message consumer. 1127 * @param sessionPool the server session pool to associate with this 1128 * connection consumer 1129 * @param maxMessages the maximum number of messages that can be assigned to 1130 * a server session at one time 1131 * @return the connection consumer 1132 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails 1133 * to create a connection consumer due to some internal 1134 * error or invalid arguments for <CODE>sessionPool</CODE> 1135 * and <CODE>messageSelector</CODE>. 1136 * @throws javax.jms.InvalidDestinationException if an invalid queue is 1137 * specified. 1138 * @throws javax.jms.InvalidSelectorException if the message selector is 1139 * invalid. 1140 * @see javax.jms.ConnectionConsumer 1141 */ 1142 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1143 return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false); 1144 } 1145 1146 /** 1147 * Creates a connection consumer for this connection (optional operation). 1148 * This is an expert facility not used by regular JMS clients. 1149 * 1150 * @param destination the destination to access 1151 * @param messageSelector only messages with properties matching the message 1152 * selector expression are delivered. A value of null or an 1153 * empty string indicates that there is no message selector 1154 * for the message consumer. 1155 * @param sessionPool the server session pool to associate with this 1156 * connection consumer 1157 * @param maxMessages the maximum number of messages that can be assigned to 1158 * a server session at one time 1159 * @return the connection consumer 1160 * @throws JMSException if the <CODE>Connection</CODE> object fails to 1161 * create a connection consumer due to some internal error 1162 * or invalid arguments for <CODE>sessionPool</CODE> and 1163 * <CODE>messageSelector</CODE>. 1164 * @throws javax.jms.InvalidDestinationException if an invalid destination 1165 * is specified. 1166 * @throws javax.jms.InvalidSelectorException if the message selector is 1167 * invalid. 1168 * @see javax.jms.ConnectionConsumer 1169 * @since 1.1 1170 */ 1171 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1172 return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false); 1173 } 1174 1175 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal) 1176 throws JMSException { 1177 1178 checkClosedOrFailed(); 1179 ensureConnectionInfoSent(); 1180 1181 ConsumerId consumerId = createConsumerId(); 1182 ConsumerInfo consumerInfo = new ConsumerInfo(consumerId); 1183 consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination)); 1184 consumerInfo.setSelector(messageSelector); 1185 consumerInfo.setPrefetchSize(maxMessages); 1186 consumerInfo.setNoLocal(noLocal); 1187 consumerInfo.setDispatchAsync(isDispatchAsync()); 1188 1189 // Allows the options on the destination to configure the consumerInfo 1190 if (consumerInfo.getDestination().getOptions() != null) { 1191 Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions()); 1192 IntrospectionSupport.setProperties(consumerInfo, options, "consumer."); 1193 } 1194 1195 return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo); 1196 } 1197 1198 /** 1199 * @return 1200 */ 1201 private ConsumerId createConsumerId() { 1202 return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId()); 1203 } 1204 1205 /** 1206 * @return 1207 */ 1208 private ProducerId createProducerId() { 1209 return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId()); 1210 } 1211 1212 /** 1213 * Creates a <CODE>QueueSession</CODE> object. 1214 * 1215 * @param transacted indicates whether the session is transacted 1216 * @param acknowledgeMode indicates whether the consumer or the client will 1217 * acknowledge any messages it receives; ignored if the 1218 * session is transacted. Legal values are 1219 * <code>Session.AUTO_ACKNOWLEDGE</code>, 1220 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 1221 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 1222 * @return a newly created queue session 1223 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails 1224 * to create a session due to some internal error or lack of 1225 * support for the specific transaction and acknowledgement 1226 * mode. 1227 * @see Session#AUTO_ACKNOWLEDGE 1228 * @see Session#CLIENT_ACKNOWLEDGE 1229 * @see Session#DUPS_OK_ACKNOWLEDGE 1230 */ 1231 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { 1232 return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode)); 1233 } 1234 1235 /** 1236 * Ensures that the clientID was manually specified and not auto-generated. 1237 * If the clientID was not specified this method will throw an exception. 1238 * This method is used to ensure that the clientID + durableSubscriber name 1239 * are used correctly. 1240 * 1241 * @throws JMSException 1242 */ 1243 public void checkClientIDWasManuallySpecified() throws JMSException { 1244 if (!userSpecifiedClientID) { 1245 throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection"); 1246 } 1247 } 1248 1249 /** 1250 * send a Packet through the Connection - for internal use only 1251 * 1252 * @param command 1253 * @throws JMSException 1254 */ 1255 public void asyncSendPacket(Command command) throws JMSException { 1256 if (isClosed()) { 1257 throw new ConnectionClosedException(); 1258 } else { 1259 doAsyncSendPacket(command); 1260 } 1261 } 1262 1263 private void doAsyncSendPacket(Command command) throws JMSException { 1264 try { 1265 this.transport.oneway(command); 1266 } catch (IOException e) { 1267 throw JMSExceptionSupport.create(e); 1268 } 1269 } 1270 1271 /** 1272 * Send a packet through a Connection - for internal use only 1273 * 1274 * @param command 1275 * @return 1276 * @throws JMSException 1277 */ 1278 public Response syncSendPacket(Command command) throws JMSException { 1279 if (isClosed()) { 1280 throw new ConnectionClosedException(); 1281 } else { 1282 1283 try { 1284 Response response = (Response)this.transport.request(command); 1285 if (response.isException()) { 1286 ExceptionResponse er = (ExceptionResponse)response; 1287 if (er.getException() instanceof JMSException) { 1288 throw (JMSException)er.getException(); 1289 } else { 1290 if (isClosed()||closing.get()) { 1291 LOG.debug("Received an exception but connection is closing"); 1292 } 1293 JMSException jmsEx = null; 1294 try { 1295 jmsEx = JMSExceptionSupport.create(er.getException()); 1296 }catch(Throwable e) { 1297 LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e); 1298 } 1299 //dispose of transport for security exceptions 1300 if (er.getException() instanceof SecurityException){ 1301 Transport t = this.transport; 1302 if (null != t){ 1303 ServiceSupport.dispose(t); 1304 } 1305 } 1306 if(jmsEx !=null) { 1307 throw jmsEx; 1308 } 1309 } 1310 } 1311 return response; 1312 } catch (IOException e) { 1313 throw JMSExceptionSupport.create(e); 1314 } 1315 } 1316 } 1317 1318 /** 1319 * Send a packet through a Connection - for internal use only 1320 * 1321 * @param command 1322 * @return 1323 * @throws JMSException 1324 */ 1325 public Response syncSendPacket(Command command, int timeout) throws JMSException { 1326 if (isClosed() || closing.get()) { 1327 throw new ConnectionClosedException(); 1328 } else { 1329 return doSyncSendPacket(command, timeout); 1330 } 1331 } 1332 1333 private Response doSyncSendPacket(Command command, int timeout) 1334 throws JMSException { 1335 try { 1336 Response response = (Response) (timeout > 0 1337 ? this.transport.request(command, timeout) 1338 : this.transport.request(command)); 1339 if (response != null && response.isException()) { 1340 ExceptionResponse er = (ExceptionResponse)response; 1341 if (er.getException() instanceof JMSException) { 1342 throw (JMSException)er.getException(); 1343 } else { 1344 throw JMSExceptionSupport.create(er.getException()); 1345 } 1346 } 1347 return response; 1348 } catch (IOException e) { 1349 throw JMSExceptionSupport.create(e); 1350 } 1351 } 1352 1353 /** 1354 * @return statistics for this Connection 1355 */ 1356 public StatsImpl getStats() { 1357 return stats; 1358 } 1359 1360 /** 1361 * simply throws an exception if the Connection is already closed or the 1362 * Transport has failed 1363 * 1364 * @throws JMSException 1365 */ 1366 protected synchronized void checkClosedOrFailed() throws JMSException { 1367 checkClosed(); 1368 if (transportFailed.get()) { 1369 throw new ConnectionFailedException(firstFailureError); 1370 } 1371 } 1372 1373 /** 1374 * simply throws an exception if the Connection is already closed 1375 * 1376 * @throws JMSException 1377 */ 1378 protected synchronized void checkClosed() throws JMSException { 1379 if (closed.get()) { 1380 throw new ConnectionClosedException(); 1381 } 1382 } 1383 1384 /** 1385 * Send the ConnectionInfo to the Broker 1386 * 1387 * @throws JMSException 1388 */ 1389 protected void ensureConnectionInfoSent() throws JMSException { 1390 synchronized(this.ensureConnectionInfoSentMutex) { 1391 // Can we skip sending the ConnectionInfo packet?? 1392 if (isConnectionInfoSentToBroker || closed.get()) { 1393 return; 1394 } 1395 //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID? 1396 if (info.getClientId() == null || info.getClientId().trim().length() == 0) { 1397 info.setClientId(clientIdGenerator.generateId()); 1398 } 1399 syncSendPacket(info.copy()); 1400 1401 this.isConnectionInfoSentToBroker = true; 1402 // Add a temp destination advisory consumer so that 1403 // We know what the valid temporary destinations are on the 1404 // broker without having to do an RPC to the broker. 1405 1406 ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId()); 1407 if (watchTopicAdvisories) { 1408 advisoryConsumer = new AdvisoryConsumer(this, consumerId); 1409 } 1410 } 1411 } 1412 1413 public synchronized boolean isWatchTopicAdvisories() { 1414 return watchTopicAdvisories; 1415 } 1416 1417 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { 1418 this.watchTopicAdvisories = watchTopicAdvisories; 1419 } 1420 1421 /** 1422 * @return Returns the useAsyncSend. 1423 */ 1424 public boolean isUseAsyncSend() { 1425 return useAsyncSend; 1426 } 1427 1428 /** 1429 * Forces the use of <a 1430 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which 1431 * adds a massive performance boost; but means that the send() method will 1432 * return immediately whether the message has been sent or not which could 1433 * lead to message loss. 1434 */ 1435 public void setUseAsyncSend(boolean useAsyncSend) { 1436 this.useAsyncSend = useAsyncSend; 1437 } 1438 1439 /** 1440 * @return true if always sync send messages 1441 */ 1442 public boolean isAlwaysSyncSend() { 1443 return this.alwaysSyncSend; 1444 } 1445 1446 /** 1447 * Set true if always require messages to be sync sent 1448 * 1449 * @param alwaysSyncSend 1450 */ 1451 public void setAlwaysSyncSend(boolean alwaysSyncSend) { 1452 this.alwaysSyncSend = alwaysSyncSend; 1453 } 1454 1455 /** 1456 * @return the messagePrioritySupported 1457 */ 1458 public boolean isMessagePrioritySupported() { 1459 return this.messagePrioritySupported; 1460 } 1461 1462 /** 1463 * @param messagePrioritySupported the messagePrioritySupported to set 1464 */ 1465 public void setMessagePrioritySupported(boolean messagePrioritySupported) { 1466 this.messagePrioritySupported = messagePrioritySupported; 1467 } 1468 1469 /** 1470 * Cleans up this connection so that it's state is as if the connection was 1471 * just created. This allows the Resource Adapter to clean up a connection 1472 * so that it can be reused without having to close and recreate the 1473 * connection. 1474 */ 1475 public void cleanup() throws JMSException { 1476 1477 if (advisoryConsumer != null && !isTransportFailed()) { 1478 advisoryConsumer.dispose(); 1479 advisoryConsumer = null; 1480 } 1481 1482 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 1483 ActiveMQSession s = i.next(); 1484 s.dispose(); 1485 } 1486 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) { 1487 ActiveMQConnectionConsumer c = i.next(); 1488 c.dispose(); 1489 } 1490 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) { 1491 ActiveMQInputStream c = i.next(); 1492 c.dispose(); 1493 } 1494 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) { 1495 ActiveMQOutputStream c = i.next(); 1496 c.dispose(); 1497 } 1498 1499 if (isConnectionInfoSentToBroker) { 1500 if (!transportFailed.get() && !closing.get()) { 1501 syncSendPacket(info.createRemoveCommand()); 1502 } 1503 isConnectionInfoSentToBroker = false; 1504 } 1505 if (userSpecifiedClientID) { 1506 info.setClientId(null); 1507 userSpecifiedClientID = false; 1508 } 1509 clientIDSet = false; 1510 1511 started.set(false); 1512 } 1513 1514 public void finalize() throws Throwable{ 1515 if (scheduler != null){ 1516 scheduler.stop(); 1517 } 1518 } 1519 1520 /** 1521 * Changes the associated username/password that is associated with this 1522 * connection. If the connection has been used, you must called cleanup() 1523 * before calling this method. 1524 * 1525 * @throws IllegalStateException if the connection is in used. 1526 */ 1527 public void changeUserInfo(String userName, String password) throws JMSException { 1528 if (isConnectionInfoSentToBroker) { 1529 throw new IllegalStateException("changeUserInfo used Connection is not allowed"); 1530 } 1531 this.info.setUserName(userName); 1532 this.info.setPassword(password); 1533 } 1534 1535 /** 1536 * @return Returns the resourceManagerId. 1537 * @throws JMSException 1538 */ 1539 public String getResourceManagerId() throws JMSException { 1540 waitForBrokerInfo(); 1541 if (brokerInfo == null) { 1542 throw new JMSException("Connection failed before Broker info was received."); 1543 } 1544 return brokerInfo.getBrokerId().getValue(); 1545 } 1546 1547 /** 1548 * Returns the broker name if one is available or null if one is not 1549 * available yet. 1550 */ 1551 public String getBrokerName() { 1552 try { 1553 brokerInfoReceived.await(5, TimeUnit.SECONDS); 1554 if (brokerInfo == null) { 1555 return null; 1556 } 1557 return brokerInfo.getBrokerName(); 1558 } catch (InterruptedException e) { 1559 Thread.currentThread().interrupt(); 1560 return null; 1561 } 1562 } 1563 1564 /** 1565 * Returns the broker information if it is available or null if it is not 1566 * available yet. 1567 */ 1568 public BrokerInfo getBrokerInfo() { 1569 return brokerInfo; 1570 } 1571 1572 /** 1573 * @return Returns the RedeliveryPolicy. 1574 * @throws JMSException 1575 */ 1576 public RedeliveryPolicy getRedeliveryPolicy() throws JMSException { 1577 return redeliveryPolicy; 1578 } 1579 1580 /** 1581 * Sets the redelivery policy to be used when messages are rolled back 1582 */ 1583 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 1584 this.redeliveryPolicy = redeliveryPolicy; 1585 } 1586 1587 public BlobTransferPolicy getBlobTransferPolicy() { 1588 if (blobTransferPolicy == null) { 1589 blobTransferPolicy = createBlobTransferPolicy(); 1590 } 1591 return blobTransferPolicy; 1592 } 1593 1594 /** 1595 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 1596 * OBjects) are transferred from producers to brokers to consumers 1597 */ 1598 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 1599 this.blobTransferPolicy = blobTransferPolicy; 1600 } 1601 1602 /** 1603 * @return Returns the alwaysSessionAsync. 1604 */ 1605 public boolean isAlwaysSessionAsync() { 1606 return alwaysSessionAsync; 1607 } 1608 1609 /** 1610 * If this flag is set then a separate thread is not used for dispatching 1611 * messages for each Session in the Connection. However, a separate thread 1612 * is always used if there is more than one session, or the session isn't in 1613 * auto acknowledge or duplicates ok mode 1614 */ 1615 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) { 1616 this.alwaysSessionAsync = alwaysSessionAsync; 1617 } 1618 1619 /** 1620 * @return Returns the optimizeAcknowledge. 1621 */ 1622 public boolean isOptimizeAcknowledge() { 1623 return optimizeAcknowledge; 1624 } 1625 1626 /** 1627 * Enables an optimised acknowledgement mode where messages are acknowledged 1628 * in batches rather than individually 1629 * 1630 * @param optimizeAcknowledge The optimizeAcknowledge to set. 1631 */ 1632 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) { 1633 this.optimizeAcknowledge = optimizeAcknowledge; 1634 } 1635 1636 public long getWarnAboutUnstartedConnectionTimeout() { 1637 return warnAboutUnstartedConnectionTimeout; 1638 } 1639 1640 /** 1641 * Enables the timeout from a connection creation to when a warning is 1642 * generated if the connection is not properly started via {@link #start()} 1643 * and a message is received by a consumer. It is a very common gotcha to 1644 * forget to <a 1645 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start 1646 * the connection</a> so this option makes the default case to create a 1647 * warning if the user forgets. To disable the warning just set the value to < 1648 * 0 (say -1). 1649 */ 1650 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) { 1651 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; 1652 } 1653 1654 /** 1655 * @return the sendTimeout 1656 */ 1657 public int getSendTimeout() { 1658 return sendTimeout; 1659 } 1660 1661 /** 1662 * @param sendTimeout the sendTimeout to set 1663 */ 1664 public void setSendTimeout(int sendTimeout) { 1665 this.sendTimeout = sendTimeout; 1666 } 1667 1668 /** 1669 * @return the sendAcksAsync 1670 */ 1671 public boolean isSendAcksAsync() { 1672 return sendAcksAsync; 1673 } 1674 1675 /** 1676 * @param sendAcksAsync the sendAcksAsync to set 1677 */ 1678 public void setSendAcksAsync(boolean sendAcksAsync) { 1679 this.sendAcksAsync = sendAcksAsync; 1680 } 1681 1682 1683 /** 1684 * Returns the time this connection was created 1685 */ 1686 public long getTimeCreated() { 1687 return timeCreated; 1688 } 1689 1690 private void waitForBrokerInfo() throws JMSException { 1691 try { 1692 brokerInfoReceived.await(); 1693 } catch (InterruptedException e) { 1694 Thread.currentThread().interrupt(); 1695 throw JMSExceptionSupport.create(e); 1696 } 1697 } 1698 1699 // Package protected so that it can be used in unit tests 1700 public Transport getTransport() { 1701 return transport; 1702 } 1703 1704 public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) { 1705 producers.put(producerId, producer); 1706 } 1707 1708 public void removeProducer(ProducerId producerId) { 1709 producers.remove(producerId); 1710 } 1711 1712 public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) { 1713 dispatchers.put(consumerId, dispatcher); 1714 } 1715 1716 public void removeDispatcher(ConsumerId consumerId) { 1717 dispatchers.remove(consumerId); 1718 } 1719 1720 /** 1721 * @param o - the command to consume 1722 */ 1723 public void onCommand(final Object o) { 1724 final Command command = (Command)o; 1725 if (!closed.get() && command != null) { 1726 try { 1727 command.visit(new CommandVisitorAdapter() { 1728 @Override 1729 public Response processMessageDispatch(MessageDispatch md) throws Exception { 1730 waitForTransportInterruptionProcessingToComplete(); 1731 ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId()); 1732 if (dispatcher != null) { 1733 // Copy in case a embedded broker is dispatching via 1734 // vm:// 1735 // md.getMessage() == null to signal end of queue 1736 // browse. 1737 Message msg = md.getMessage(); 1738 if (msg != null) { 1739 msg = msg.copy(); 1740 msg.setReadOnlyBody(true); 1741 msg.setReadOnlyProperties(true); 1742 msg.setRedeliveryCounter(md.getRedeliveryCounter()); 1743 msg.setConnection(ActiveMQConnection.this); 1744 md.setMessage(msg); 1745 } 1746 dispatcher.dispatch(md); 1747 } 1748 return null; 1749 } 1750 1751 @Override 1752 public Response processProducerAck(ProducerAck pa) throws Exception { 1753 if (pa != null && pa.getProducerId() != null) { 1754 ActiveMQMessageProducer producer = producers.get(pa.getProducerId()); 1755 if (producer != null) { 1756 producer.onProducerAck(pa); 1757 } 1758 } 1759 return null; 1760 } 1761 1762 @Override 1763 public Response processBrokerInfo(BrokerInfo info) throws Exception { 1764 brokerInfo = info; 1765 brokerInfoReceived.countDown(); 1766 optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration(); 1767 getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl()); 1768 return null; 1769 } 1770 1771 @Override 1772 public Response processConnectionError(final ConnectionError error) throws Exception { 1773 executor.execute(new Runnable() { 1774 public void run() { 1775 onAsyncException(error.getException()); 1776 } 1777 }); 1778 return null; 1779 } 1780 1781 @Override 1782 public Response processControlCommand(ControlCommand command) throws Exception { 1783 onControlCommand(command); 1784 return null; 1785 } 1786 1787 @Override 1788 public Response processConnectionControl(ConnectionControl control) throws Exception { 1789 onConnectionControl((ConnectionControl)command); 1790 return null; 1791 } 1792 1793 @Override 1794 public Response processConsumerControl(ConsumerControl control) throws Exception { 1795 onConsumerControl((ConsumerControl)command); 1796 return null; 1797 } 1798 1799 @Override 1800 public Response processWireFormat(WireFormatInfo info) throws Exception { 1801 onWireFormatInfo((WireFormatInfo)command); 1802 return null; 1803 } 1804 }); 1805 } catch (Exception e) { 1806 onClientInternalException(e); 1807 } 1808 1809 } 1810 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 1811 TransportListener listener = iter.next(); 1812 listener.onCommand(command); 1813 } 1814 } 1815 1816 protected void onWireFormatInfo(WireFormatInfo info) { 1817 protocolVersion.set(info.getVersion()); 1818 } 1819 1820 /** 1821 * Handles async client internal exceptions. 1822 * A client internal exception is usually one that has been thrown 1823 * by a container runtime component during asynchronous processing of a 1824 * message that does not affect the connection itself. 1825 * This method notifies the <code>ClientInternalExceptionListener</code> by invoking 1826 * its <code>onException</code> method, if one has been registered with this connection. 1827 * 1828 * @param error the exception that the problem 1829 */ 1830 public void onClientInternalException(final Throwable error) { 1831 if ( !closed.get() && !closing.get() ) { 1832 if ( this.clientInternalExceptionListener != null ) { 1833 executor.execute(new Runnable() { 1834 public void run() { 1835 ActiveMQConnection.this.clientInternalExceptionListener.onException(error); 1836 } 1837 }); 1838 } else { 1839 LOG.debug("Async client internal exception occurred with no exception listener registered: " 1840 + error, error); 1841 } 1842 } 1843 } 1844 /** 1845 * Used for handling async exceptions 1846 * 1847 * @param error 1848 */ 1849 public void onAsyncException(Throwable error) { 1850 if (!closed.get() && !closing.get()) { 1851 if (this.exceptionListener != null) { 1852 1853 if (!(error instanceof JMSException)) { 1854 error = JMSExceptionSupport.create(error); 1855 } 1856 final JMSException e = (JMSException)error; 1857 1858 executor.execute(new Runnable() { 1859 public void run() { 1860 ActiveMQConnection.this.exceptionListener.onException(e); 1861 } 1862 }); 1863 1864 } else { 1865 LOG.debug("Async exception with no exception listener: " + error, error); 1866 } 1867 } 1868 } 1869 1870 public void onException(final IOException error) { 1871 onAsyncException(error); 1872 if (!closing.get() && !closed.get()) { 1873 executor.execute(new Runnable() { 1874 public void run() { 1875 transportFailed(error); 1876 ServiceSupport.dispose(ActiveMQConnection.this.transport); 1877 brokerInfoReceived.countDown(); 1878 try { 1879 cleanup(); 1880 } catch (JMSException e) { 1881 LOG.warn("Exception during connection cleanup, " + e, e); 1882 } 1883 for (Iterator<TransportListener> iter = transportListeners 1884 .iterator(); iter.hasNext();) { 1885 TransportListener listener = iter.next(); 1886 listener.onException(error); 1887 } 1888 } 1889 }); 1890 } 1891 } 1892 1893 public void transportInterupted() { 1894 this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0)); 1895 if (LOG.isDebugEnabled()) { 1896 LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount()); 1897 } 1898 signalInterruptionProcessingNeeded(); 1899 1900 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 1901 ActiveMQSession s = i.next(); 1902 s.clearMessagesInProgress(); 1903 } 1904 1905 for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) { 1906 connectionConsumer.clearMessagesInProgress(); 1907 } 1908 1909 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 1910 TransportListener listener = iter.next(); 1911 listener.transportInterupted(); 1912 } 1913 } 1914 1915 public void transportResumed() { 1916 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 1917 TransportListener listener = iter.next(); 1918 listener.transportResumed(); 1919 } 1920 } 1921 1922 /** 1923 * Create the DestinationInfo object for the temporary destination. 1924 * 1925 * @param topic - if its true topic, else queue. 1926 * @return DestinationInfo 1927 * @throws JMSException 1928 */ 1929 protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException { 1930 1931 // Check if Destination info is of temporary type. 1932 ActiveMQTempDestination dest; 1933 if (topic) { 1934 dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 1935 } else { 1936 dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 1937 } 1938 1939 DestinationInfo info = new DestinationInfo(); 1940 info.setConnectionId(this.info.getConnectionId()); 1941 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); 1942 info.setDestination(dest); 1943 syncSendPacket(info); 1944 1945 dest.setConnection(this); 1946 activeTempDestinations.put(dest, dest); 1947 return dest; 1948 } 1949 1950 /** 1951 * @param destination 1952 * @throws JMSException 1953 */ 1954 public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException { 1955 1956 checkClosedOrFailed(); 1957 1958 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 1959 ActiveMQSession s = i.next(); 1960 if (s.isInUse(destination)) { 1961 throw new JMSException("A consumer is consuming from the temporary destination"); 1962 } 1963 } 1964 1965 activeTempDestinations.remove(destination); 1966 1967 DestinationInfo destInfo = new DestinationInfo(); 1968 destInfo.setConnectionId(this.info.getConnectionId()); 1969 destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 1970 destInfo.setDestination(destination); 1971 destInfo.setTimeout(0); 1972 syncSendPacket(destInfo); 1973 } 1974 1975 public boolean isDeleted(ActiveMQDestination dest) { 1976 1977 // If we are not watching the advisories.. then 1978 // we will assume that the temp destination does exist. 1979 if (advisoryConsumer == null) { 1980 return false; 1981 } 1982 1983 return !activeTempDestinations.contains(dest); 1984 } 1985 1986 public boolean isCopyMessageOnSend() { 1987 return copyMessageOnSend; 1988 } 1989 1990 public LongSequenceGenerator getLocalTransactionIdGenerator() { 1991 return localTransactionIdGenerator; 1992 } 1993 1994 public boolean isUseCompression() { 1995 return useCompression; 1996 } 1997 1998 /** 1999 * Enables the use of compression of the message bodies 2000 */ 2001 public void setUseCompression(boolean useCompression) { 2002 this.useCompression = useCompression; 2003 } 2004 2005 public void destroyDestination(ActiveMQDestination destination) throws JMSException { 2006 2007 checkClosedOrFailed(); 2008 ensureConnectionInfoSent(); 2009 2010 DestinationInfo info = new DestinationInfo(); 2011 info.setConnectionId(this.info.getConnectionId()); 2012 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 2013 info.setDestination(destination); 2014 info.setTimeout(0); 2015 syncSendPacket(info); 2016 2017 } 2018 2019 public boolean isDispatchAsync() { 2020 return dispatchAsync; 2021 } 2022 2023 /** 2024 * Enables or disables the default setting of whether or not consumers have 2025 * their messages <a 2026 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched 2027 * synchronously or asynchronously by the broker</a>. For non-durable 2028 * topics for example we typically dispatch synchronously by default to 2029 * minimize context switches which boost performance. However sometimes its 2030 * better to go slower to ensure that a single blocked consumer socket does 2031 * not block delivery to other consumers. 2032 * 2033 * @param asyncDispatch If true then consumers created on this connection 2034 * will default to having their messages dispatched 2035 * asynchronously. The default value is false. 2036 */ 2037 public void setDispatchAsync(boolean asyncDispatch) { 2038 this.dispatchAsync = asyncDispatch; 2039 } 2040 2041 public boolean isObjectMessageSerializationDefered() { 2042 return objectMessageSerializationDefered; 2043 } 2044 2045 /** 2046 * When an object is set on an ObjectMessage, the JMS spec requires the 2047 * object to be serialized by that set method. Enabling this flag causes the 2048 * object to not get serialized. The object may subsequently get serialized 2049 * if the message needs to be sent over a socket or stored to disk. 2050 */ 2051 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) { 2052 this.objectMessageSerializationDefered = objectMessageSerializationDefered; 2053 } 2054 2055 public InputStream createInputStream(Destination dest) throws JMSException { 2056 return createInputStream(dest, null); 2057 } 2058 2059 public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException { 2060 return createInputStream(dest, messageSelector, false); 2061 } 2062 2063 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException { 2064 return createInputStream(dest, messageSelector, noLocal, -1); 2065 } 2066 2067 2068 2069 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException { 2070 return doCreateInputStream(dest, messageSelector, noLocal, null, timeout); 2071 } 2072 2073 public InputStream createDurableInputStream(Topic dest, String name) throws JMSException { 2074 return createInputStream(dest, null, false); 2075 } 2076 2077 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException { 2078 return createDurableInputStream(dest, name, messageSelector, false); 2079 } 2080 2081 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException { 2082 return createDurableInputStream(dest, name, messageSelector, noLocal, -1); 2083 } 2084 2085 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException { 2086 return doCreateInputStream(dest, messageSelector, noLocal, name, timeout); 2087 } 2088 2089 private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException { 2090 checkClosedOrFailed(); 2091 ensureConnectionInfoSent(); 2092 return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout); 2093 } 2094 2095 /** 2096 * Creates a persistent output stream; individual messages will be written 2097 * to disk/database by the broker 2098 */ 2099 public OutputStream createOutputStream(Destination dest) throws JMSException { 2100 return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); 2101 } 2102 2103 /** 2104 * Creates a non persistent output stream; messages will not be written to 2105 * disk 2106 */ 2107 public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException { 2108 return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); 2109 } 2110 2111 /** 2112 * Creates an output stream allowing full control over the delivery mode, 2113 * the priority and time to live of the messages and the properties added to 2114 * messages on the stream. 2115 * 2116 * @param streamProperties defines a map of key-value pairs where the keys 2117 * are strings and the values are primitive values (numbers 2118 * and strings) which are appended to the messages similarly 2119 * to using the 2120 * {@link javax.jms.Message#setObjectProperty(String, Object)} 2121 * method 2122 */ 2123 public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException { 2124 checkClosedOrFailed(); 2125 ensureConnectionInfoSent(); 2126 return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive); 2127 } 2128 2129 /** 2130 * Unsubscribes a durable subscription that has been created by a client. 2131 * <P> 2132 * This method deletes the state being maintained on behalf of the 2133 * subscriber by its provider. 2134 * <P> 2135 * It is erroneous for a client to delete a durable subscription while there 2136 * is an active <CODE>MessageConsumer </CODE> or 2137 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 2138 * message is part of a pending transaction or has not been acknowledged in 2139 * the session. 2140 * 2141 * @param name the name used to identify this subscription 2142 * @throws JMSException if the session fails to unsubscribe to the durable 2143 * subscription due to some internal error. 2144 * @throws InvalidDestinationException if an invalid subscription name is 2145 * specified. 2146 * @since 1.1 2147 */ 2148 public void unsubscribe(String name) throws InvalidDestinationException, JMSException { 2149 checkClosedOrFailed(); 2150 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); 2151 rsi.setConnectionId(getConnectionInfo().getConnectionId()); 2152 rsi.setSubscriptionName(name); 2153 rsi.setClientId(getConnectionInfo().getClientId()); 2154 syncSendPacket(rsi); 2155 } 2156 2157 /** 2158 * Internal send method optimized: - It does not copy the message - It can 2159 * only handle ActiveMQ messages. - You can specify if the send is async or 2160 * sync - Does not allow you to send /w a transaction. 2161 */ 2162 void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException { 2163 checkClosedOrFailed(); 2164 2165 if (destination.isTemporary() && isDeleted(destination)) { 2166 throw new JMSException("Cannot publish to a deleted Destination: " + destination); 2167 } 2168 2169 msg.setJMSDestination(destination); 2170 msg.setJMSDeliveryMode(deliveryMode); 2171 long expiration = 0L; 2172 2173 if (!isDisableTimeStampsByDefault()) { 2174 long timeStamp = System.currentTimeMillis(); 2175 msg.setJMSTimestamp(timeStamp); 2176 if (timeToLive > 0) { 2177 expiration = timeToLive + timeStamp; 2178 } 2179 } 2180 2181 msg.setJMSExpiration(expiration); 2182 msg.setJMSPriority(priority); 2183 2184 msg.setJMSRedelivered(false); 2185 msg.setMessageId(messageId); 2186 2187 msg.onSend(); 2188 2189 msg.setProducerId(msg.getMessageId().getProducerId()); 2190 2191 if (LOG.isDebugEnabled()) { 2192 LOG.debug("Sending message: " + msg); 2193 } 2194 2195 if (async) { 2196 asyncSendPacket(msg); 2197 } else { 2198 syncSendPacket(msg); 2199 } 2200 2201 } 2202 2203 public void addOutputStream(ActiveMQOutputStream stream) { 2204 outputStreams.add(stream); 2205 } 2206 2207 public void removeOutputStream(ActiveMQOutputStream stream) { 2208 outputStreams.remove(stream); 2209 } 2210 2211 public void addInputStream(ActiveMQInputStream stream) { 2212 inputStreams.add(stream); 2213 } 2214 2215 public void removeInputStream(ActiveMQInputStream stream) { 2216 inputStreams.remove(stream); 2217 } 2218 2219 protected void onControlCommand(ControlCommand command) { 2220 String text = command.getCommand(); 2221 if (text != null) { 2222 if ("shutdown".equals(text)) { 2223 LOG.info("JVM told to shutdown"); 2224 System.exit(0); 2225 } 2226 if (false && "close".equals(text)){ 2227 LOG.error("Broker " + getBrokerInfo() + "shutdown connection"); 2228 try { 2229 close(); 2230 } catch (JMSException e) { 2231 } 2232 } 2233 } 2234 } 2235 2236 protected void onConnectionControl(ConnectionControl command) { 2237 if (command.isFaultTolerant()) { 2238 this.optimizeAcknowledge = false; 2239 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 2240 ActiveMQSession s = i.next(); 2241 s.setOptimizeAcknowledge(false); 2242 } 2243 } 2244 } 2245 2246 protected void onConsumerControl(ConsumerControl command) { 2247 if (command.isClose()) { 2248 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 2249 ActiveMQSession s = i.next(); 2250 s.close(command.getConsumerId()); 2251 } 2252 } else { 2253 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 2254 ActiveMQSession s = i.next(); 2255 s.setPrefetchSize(command.getConsumerId(), command.getPrefetch()); 2256 } 2257 } 2258 } 2259 2260 protected void transportFailed(IOException error) { 2261 transportFailed.set(true); 2262 if (firstFailureError == null) { 2263 firstFailureError = error; 2264 } 2265 } 2266 2267 /** 2268 * Should a JMS message be copied to a new JMS Message object as part of the 2269 * send() method in JMS. This is enabled by default to be compliant with the 2270 * JMS specification. You can disable it if you do not mutate JMS messages 2271 * after they are sent for a performance boost 2272 */ 2273 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 2274 this.copyMessageOnSend = copyMessageOnSend; 2275 } 2276 2277 @Override 2278 public String toString() { 2279 return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}"; 2280 } 2281 2282 protected BlobTransferPolicy createBlobTransferPolicy() { 2283 return new BlobTransferPolicy(); 2284 } 2285 2286 public int getProtocolVersion() { 2287 return protocolVersion.get(); 2288 } 2289 2290 public int getProducerWindowSize() { 2291 return producerWindowSize; 2292 } 2293 2294 public void setProducerWindowSize(int producerWindowSize) { 2295 this.producerWindowSize = producerWindowSize; 2296 } 2297 2298 public void setAuditDepth(int auditDepth) { 2299 connectionAudit.setAuditDepth(auditDepth); 2300 } 2301 2302 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { 2303 connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber); 2304 } 2305 2306 protected void removeDispatcher(ActiveMQDispatcher dispatcher) { 2307 connectionAudit.removeDispatcher(dispatcher); 2308 } 2309 2310 protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) { 2311 return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message); 2312 } 2313 2314 protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) { 2315 connectionAudit.rollbackDuplicate(dispatcher, message); 2316 } 2317 2318 public IOException getFirstFailureError() { 2319 return firstFailureError; 2320 } 2321 2322 protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException { 2323 CountDownLatch cdl = this.transportInterruptionProcessingComplete; 2324 if (cdl != null) { 2325 if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) { 2326 LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete.."); 2327 cdl.await(10, TimeUnit.SECONDS); 2328 } 2329 signalInterruptionProcessingComplete(); 2330 } 2331 } 2332 2333 protected void transportInterruptionProcessingComplete() { 2334 CountDownLatch cdl = this.transportInterruptionProcessingComplete; 2335 if (cdl != null) { 2336 cdl.countDown(); 2337 try { 2338 signalInterruptionProcessingComplete(); 2339 } catch (InterruptedException ignored) {} 2340 } 2341 } 2342 2343 private void signalInterruptionProcessingComplete() throws InterruptedException { 2344 CountDownLatch cdl = this.transportInterruptionProcessingComplete; 2345 if (cdl.getCount()==0) { 2346 if (LOG.isDebugEnabled()) { 2347 LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId()); 2348 } 2349 this.transportInterruptionProcessingComplete = null; 2350 2351 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); 2352 if (failoverTransport != null) { 2353 failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId()); 2354 if (LOG.isDebugEnabled()) { 2355 LOG.debug("notified failover transport (" + failoverTransport 2356 + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId()); 2357 } 2358 } 2359 2360 } 2361 } 2362 2363 private void signalInterruptionProcessingNeeded() { 2364 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); 2365 if (failoverTransport != null) { 2366 failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId()); 2367 if (LOG.isDebugEnabled()) { 2368 LOG.debug("notified failover transport (" + failoverTransport 2369 + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId()); 2370 } 2371 } 2372 } 2373 2374 /* 2375 * specify the amount of time in milliseconds that a consumer with a transaction pending recovery 2376 * will wait to receive re dispatched messages. 2377 * default value is 0 so there is no wait by default. 2378 */ 2379 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) { 2380 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod; 2381 } 2382 2383 public long getConsumerFailoverRedeliveryWaitPeriod() { 2384 return consumerFailoverRedeliveryWaitPeriod; 2385 } 2386 2387 protected Scheduler getScheduler() { 2388 return this.scheduler; 2389 } 2390 2391 protected ThreadPoolExecutor getExecutor() { 2392 return this.executor; 2393 } 2394 2395 /** 2396 * @return the checkForDuplicates 2397 */ 2398 public boolean isCheckForDuplicates() { 2399 return this.checkForDuplicates; 2400 } 2401 2402 /** 2403 * @param checkForDuplicates the checkForDuplicates to set 2404 */ 2405 public void setCheckForDuplicates(boolean checkForDuplicates) { 2406 this.checkForDuplicates = checkForDuplicates; 2407 } 2408 2409}