001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.File; 020import java.io.InputStream; 021import java.io.Serializable; 022import java.net.URL; 023import java.util.Collections; 024import java.util.Iterator; 025import java.util.List; 026import java.util.concurrent.CopyOnWriteArrayList; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.atomic.AtomicBoolean; 029import javax.jms.BytesMessage; 030import javax.jms.Destination; 031import javax.jms.IllegalStateException; 032import javax.jms.InvalidDestinationException; 033import javax.jms.InvalidSelectorException; 034import javax.jms.JMSException; 035import javax.jms.MapMessage; 036import javax.jms.Message; 037import javax.jms.MessageConsumer; 038import javax.jms.MessageListener; 039import javax.jms.MessageProducer; 040import javax.jms.ObjectMessage; 041import javax.jms.Queue; 042import javax.jms.QueueBrowser; 043import javax.jms.QueueReceiver; 044import javax.jms.QueueSender; 045import javax.jms.QueueSession; 046import javax.jms.Session; 047import javax.jms.StreamMessage; 048import javax.jms.TemporaryQueue; 049import javax.jms.TemporaryTopic; 050import javax.jms.TextMessage; 051import javax.jms.Topic; 052import javax.jms.TopicPublisher; 053import javax.jms.TopicSession; 054import javax.jms.TopicSubscriber; 055import javax.jms.TransactionRolledBackException; 056import org.apache.activemq.blob.BlobDownloader; 057import org.apache.activemq.blob.BlobTransferPolicy; 058import org.apache.activemq.blob.BlobUploader; 059import org.apache.activemq.command.ActiveMQBlobMessage; 060import org.apache.activemq.command.ActiveMQBytesMessage; 061import org.apache.activemq.command.ActiveMQDestination; 062import org.apache.activemq.command.ActiveMQMapMessage; 063import org.apache.activemq.command.ActiveMQMessage; 064import org.apache.activemq.command.ActiveMQObjectMessage; 065import org.apache.activemq.command.ActiveMQQueue; 066import org.apache.activemq.command.ActiveMQStreamMessage; 067import org.apache.activemq.command.ActiveMQTempDestination; 068import org.apache.activemq.command.ActiveMQTempQueue; 069import org.apache.activemq.command.ActiveMQTempTopic; 070import org.apache.activemq.command.ActiveMQTextMessage; 071import org.apache.activemq.command.ActiveMQTopic; 072import org.apache.activemq.command.Command; 073import org.apache.activemq.command.ConsumerId; 074import org.apache.activemq.command.MessageAck; 075import org.apache.activemq.command.MessageDispatch; 076import org.apache.activemq.command.MessageId; 077import org.apache.activemq.command.ProducerId; 078import org.apache.activemq.command.RemoveInfo; 079import org.apache.activemq.command.Response; 080import org.apache.activemq.command.SessionId; 081import org.apache.activemq.command.SessionInfo; 082import org.apache.activemq.command.TransactionId; 083import org.apache.activemq.management.JMSSessionStatsImpl; 084import org.apache.activemq.management.StatsCapable; 085import org.apache.activemq.management.StatsImpl; 086import org.apache.activemq.thread.Scheduler; 087import org.apache.activemq.transaction.Synchronization; 088import org.apache.activemq.usage.MemoryUsage; 089import org.apache.activemq.util.Callback; 090import org.apache.activemq.util.LongSequenceGenerator; 091import org.slf4j.Logger; 092import org.slf4j.LoggerFactory; 093 094/** 095 * <P> 096 * A <CODE>Session</CODE> object is a single-threaded context for producing 097 * and consuming messages. Although it may allocate provider resources outside 098 * the Java virtual machine (JVM), it is considered a lightweight JMS object. 099 * <P> 100 * A session serves several purposes: 101 * <UL> 102 * <LI>It is a factory for its message producers and consumers. 103 * <LI>It supplies provider-optimized message factories. 104 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and 105 * <CODE>TemporaryQueues</CODE>. 106 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> 107 * objects for those clients that need to dynamically manipulate 108 * provider-specific destination names. 109 * <LI>It supports a single series of transactions that combine work spanning 110 * its producers and consumers into atomic units. 111 * <LI>It defines a serial order for the messages it consumes and the messages 112 * it produces. 113 * <LI>It retains messages it consumes until they have been acknowledged. 114 * <LI>It serializes execution of message listeners registered with its message 115 * consumers. 116 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>. 117 * </UL> 118 * <P> 119 * A session can create and service multiple message producers and consumers. 120 * <P> 121 * One typical use is to have a thread block on a synchronous 122 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then 123 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s. 124 * <P> 125 * If a client desires to have one thread produce messages while others consume 126 * them, the client should use a separate session for its producing thread. 127 * <P> 128 * Once a connection has been started, any session with one or more registered 129 * message listeners is dedicated to the thread of control that delivers 130 * messages to it. It is erroneous for client code to use this session or any of 131 * its constituent objects from another thread of control. The only exception to 132 * this rule is the use of the session or connection <CODE>close</CODE> 133 * method. 134 * <P> 135 * It should be easy for most clients to partition their work naturally into 136 * sessions. This model allows clients to start simply and incrementally add 137 * message processing complexity as their need for concurrency grows. 138 * <P> 139 * The <CODE>close</CODE> method is the only session method that can be called 140 * while some other session method is being executed in another thread. 141 * <P> 142 * A session may be specified as transacted. Each transacted session supports a 143 * single series of transactions. Each transaction groups a set of message sends 144 * and a set of message receives into an atomic unit of work. In effect, 145 * transactions organize a session's input message stream and output message 146 * stream into series of atomic units. When a transaction commits, its atomic 147 * unit of input is acknowledged and its associated atomic unit of output is 148 * sent. If a transaction rollback is done, the transaction's sent messages are 149 * destroyed and the session's input is automatically recovered. 150 * <P> 151 * The content of a transaction's input and output units is simply those 152 * messages that have been produced and consumed within the session's current 153 * transaction. 154 * <P> 155 * A transaction is completed using either its session's <CODE>commit</CODE> 156 * method or its session's <CODE>rollback </CODE> method. The completion of a 157 * session's current transaction automatically begins the next. The result is 158 * that a transacted session always has a current transaction within which its 159 * work is done. 160 * <P> 161 * The Java Transaction Service (JTS) or some other transaction monitor may be 162 * used to combine a session's transaction with transactions on other resources 163 * (databases, other JMS sessions, etc.). Since Java distributed transactions 164 * are controlled via the Java Transaction API (JTA), use of the session's 165 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is 166 * prohibited. 167 * <P> 168 * The JMS API does not require support for JTA; however, it does define how a 169 * provider supplies this support. 170 * <P> 171 * Although it is also possible for a JMS client to handle distributed 172 * transactions directly, it is unlikely that many JMS clients will do this. 173 * Support for JTA in the JMS API is targeted at systems vendors who will be 174 * integrating the JMS API into their application server products. 175 * 176 * 177 * @see javax.jms.Session 178 * @see javax.jms.QueueSession 179 * @see javax.jms.TopicSession 180 * @see javax.jms.XASession 181 */ 182public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { 183 184 /** 185 * Only acknowledge an individual message - using message.acknowledge() 186 * as opposed to CLIENT_ACKNOWLEDGE which 187 * acknowledges all messages consumed by a session at when acknowledge() 188 * is called 189 */ 190 public static final int INDIVIDUAL_ACKNOWLEDGE = 4; 191 public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE; 192 193 public static interface DeliveryListener { 194 void beforeDelivery(ActiveMQSession session, Message msg); 195 196 void afterDelivery(ActiveMQSession session, Message msg); 197 } 198 199 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class); 200 private final Scheduler scheduler; 201 private final ThreadPoolExecutor connectionExecutor; 202 203 protected int acknowledgementMode; 204 protected final ActiveMQConnection connection; 205 protected final SessionInfo info; 206 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 207 protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 208 protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator(); 209 protected final ActiveMQSessionExecutor executor; 210 protected final AtomicBoolean started = new AtomicBoolean(false); 211 212 protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>(); 213 protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>(); 214 215 protected boolean closed; 216 private volatile boolean synchronizationRegistered; 217 protected boolean asyncDispatch; 218 protected boolean sessionAsyncDispatch; 219 protected final boolean debug; 220 protected Object sendMutex = new Object(); 221 222 private MessageListener messageListener; 223 private final JMSSessionStatsImpl stats; 224 private TransactionContext transactionContext; 225 private DeliveryListener deliveryListener; 226 private MessageTransformer transformer; 227 private BlobTransferPolicy blobTransferPolicy; 228 private long lastDeliveredSequenceId; 229 230 /** 231 * Construct the Session 232 * 233 * @param connection 234 * @param sessionId 235 * @param acknowledgeMode n.b if transacted - the acknowledgeMode == 236 * Session.SESSION_TRANSACTED 237 * @param asyncDispatch 238 * @param sessionAsyncDispatch 239 * @throws JMSException on internal error 240 */ 241 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException { 242 this.debug = LOG.isDebugEnabled(); 243 this.connection = connection; 244 this.acknowledgementMode = acknowledgeMode; 245 this.asyncDispatch = asyncDispatch; 246 this.sessionAsyncDispatch = sessionAsyncDispatch; 247 this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); 248 setTransactionContext(new TransactionContext(connection)); 249 stats = new JMSSessionStatsImpl(producers, consumers); 250 this.connection.asyncSendPacket(info); 251 setTransformer(connection.getTransformer()); 252 setBlobTransferPolicy(connection.getBlobTransferPolicy()); 253 this.scheduler=connection.getScheduler(); 254 this.connectionExecutor=connection.getExecutor(); 255 this.executor = new ActiveMQSessionExecutor(this); 256 connection.addSession(this); 257 if (connection.isStarted()) { 258 start(); 259 } 260 261 } 262 263 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException { 264 this(connection, sessionId, acknowledgeMode, asyncDispatch, true); 265 } 266 267 /** 268 * Sets the transaction context of the session. 269 * 270 * @param transactionContext - provides the means to control a JMS 271 * transaction. 272 */ 273 public void setTransactionContext(TransactionContext transactionContext) { 274 this.transactionContext = transactionContext; 275 } 276 277 /** 278 * Returns the transaction context of the session. 279 * 280 * @return transactionContext - session's transaction context. 281 */ 282 public TransactionContext getTransactionContext() { 283 return transactionContext; 284 } 285 286 /* 287 * (non-Javadoc) 288 * 289 * @see org.apache.activemq.management.StatsCapable#getStats() 290 */ 291 public StatsImpl getStats() { 292 return stats; 293 } 294 295 /** 296 * Returns the session's statistics. 297 * 298 * @return stats - session's statistics. 299 */ 300 public JMSSessionStatsImpl getSessionStats() { 301 return stats; 302 } 303 304 /** 305 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> 306 * object is used to send a message containing a stream of uninterpreted 307 * bytes. 308 * 309 * @return the an ActiveMQBytesMessage 310 * @throws JMSException if the JMS provider fails to create this message due 311 * to some internal error. 312 */ 313 public BytesMessage createBytesMessage() throws JMSException { 314 ActiveMQBytesMessage message = new ActiveMQBytesMessage(); 315 configureMessage(message); 316 return message; 317 } 318 319 /** 320 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> 321 * object is used to send a self-defining set of name-value pairs, where 322 * names are <CODE>String</CODE> objects and values are primitive values 323 * in the Java programming language. 324 * 325 * @return an ActiveMQMapMessage 326 * @throws JMSException if the JMS provider fails to create this message due 327 * to some internal error. 328 */ 329 public MapMessage createMapMessage() throws JMSException { 330 ActiveMQMapMessage message = new ActiveMQMapMessage(); 331 configureMessage(message); 332 return message; 333 } 334 335 /** 336 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> 337 * interface is the root interface of all JMS messages. A 338 * <CODE>Message</CODE> object holds all the standard message header 339 * information. It can be sent when a message containing only header 340 * information is sufficient. 341 * 342 * @return an ActiveMQMessage 343 * @throws JMSException if the JMS provider fails to create this message due 344 * to some internal error. 345 */ 346 public Message createMessage() throws JMSException { 347 ActiveMQMessage message = new ActiveMQMessage(); 348 configureMessage(message); 349 return message; 350 } 351 352 /** 353 * Creates an <CODE>ObjectMessage</CODE> object. An 354 * <CODE>ObjectMessage</CODE> object is used to send a message that 355 * contains a serializable Java object. 356 * 357 * @return an ActiveMQObjectMessage 358 * @throws JMSException if the JMS provider fails to create this message due 359 * to some internal error. 360 */ 361 public ObjectMessage createObjectMessage() throws JMSException { 362 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 363 configureMessage(message); 364 return message; 365 } 366 367 /** 368 * Creates an initialized <CODE>ObjectMessage</CODE> object. An 369 * <CODE>ObjectMessage</CODE> object is used to send a message that 370 * contains a serializable Java object. 371 * 372 * @param object the object to use to initialize this message 373 * @return an ActiveMQObjectMessage 374 * @throws JMSException if the JMS provider fails to create this message due 375 * to some internal error. 376 */ 377 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 378 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 379 configureMessage(message); 380 message.setObject(object); 381 return message; 382 } 383 384 /** 385 * Creates a <CODE>StreamMessage</CODE> object. A 386 * <CODE>StreamMessage</CODE> object is used to send a self-defining 387 * stream of primitive values in the Java programming language. 388 * 389 * @return an ActiveMQStreamMessage 390 * @throws JMSException if the JMS provider fails to create this message due 391 * to some internal error. 392 */ 393 public StreamMessage createStreamMessage() throws JMSException { 394 ActiveMQStreamMessage message = new ActiveMQStreamMessage(); 395 configureMessage(message); 396 return message; 397 } 398 399 /** 400 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> 401 * object is used to send a message containing a <CODE>String</CODE> 402 * object. 403 * 404 * @return an ActiveMQTextMessage 405 * @throws JMSException if the JMS provider fails to create this message due 406 * to some internal error. 407 */ 408 public TextMessage createTextMessage() throws JMSException { 409 ActiveMQTextMessage message = new ActiveMQTextMessage(); 410 configureMessage(message); 411 return message; 412 } 413 414 /** 415 * Creates an initialized <CODE>TextMessage</CODE> object. A 416 * <CODE>TextMessage</CODE> object is used to send a message containing a 417 * <CODE>String</CODE>. 418 * 419 * @param text the string used to initialize this message 420 * @return an ActiveMQTextMessage 421 * @throws JMSException if the JMS provider fails to create this message due 422 * to some internal error. 423 */ 424 public TextMessage createTextMessage(String text) throws JMSException { 425 ActiveMQTextMessage message = new ActiveMQTextMessage(); 426 message.setText(text); 427 configureMessage(message); 428 return message; 429 } 430 431 /** 432 * Creates an initialized <CODE>BlobMessage</CODE> object. A 433 * <CODE>BlobMessage</CODE> object is used to send a message containing a 434 * <CODE>URL</CODE> which points to some network addressible BLOB. 435 * 436 * @param url the network addressable URL used to pass directly to the 437 * consumer 438 * @return a BlobMessage 439 * @throws JMSException if the JMS provider fails to create this message due 440 * to some internal error. 441 */ 442 public BlobMessage createBlobMessage(URL url) throws JMSException { 443 return createBlobMessage(url, false); 444 } 445 446 /** 447 * Creates an initialized <CODE>BlobMessage</CODE> object. A 448 * <CODE>BlobMessage</CODE> object is used to send a message containing a 449 * <CODE>URL</CODE> which points to some network addressible BLOB. 450 * 451 * @param url the network addressable URL used to pass directly to the 452 * consumer 453 * @param deletedByBroker indicates whether or not the resource is deleted 454 * by the broker when the message is acknowledged 455 * @return a BlobMessage 456 * @throws JMSException if the JMS provider fails to create this message due 457 * to some internal error. 458 */ 459 public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException { 460 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 461 configureMessage(message); 462 message.setURL(url); 463 message.setDeletedByBroker(deletedByBroker); 464 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 465 return message; 466 } 467 468 /** 469 * Creates an initialized <CODE>BlobMessage</CODE> object. A 470 * <CODE>BlobMessage</CODE> object is used to send a message containing 471 * the <CODE>File</CODE> content. Before the message is sent the file 472 * conent will be uploaded to the broker or some other remote repository 473 * depending on the {@link #getBlobTransferPolicy()}. 474 * 475 * @param file the file to be uploaded to some remote repo (or the broker) 476 * depending on the strategy 477 * @return a BlobMessage 478 * @throws JMSException if the JMS provider fails to create this message due 479 * to some internal error. 480 */ 481 public BlobMessage createBlobMessage(File file) throws JMSException { 482 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 483 configureMessage(message); 484 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file)); 485 message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy()))); 486 message.setDeletedByBroker(true); 487 message.setName(file.getName()); 488 return message; 489 } 490 491 /** 492 * Creates an initialized <CODE>BlobMessage</CODE> object. A 493 * <CODE>BlobMessage</CODE> object is used to send a message containing 494 * the <CODE>File</CODE> content. Before the message is sent the file 495 * conent will be uploaded to the broker or some other remote repository 496 * depending on the {@link #getBlobTransferPolicy()}. 497 * 498 * @param in the stream to be uploaded to some remote repo (or the broker) 499 * depending on the strategy 500 * @return a BlobMessage 501 * @throws JMSException if the JMS provider fails to create this message due 502 * to some internal error. 503 */ 504 public BlobMessage createBlobMessage(InputStream in) throws JMSException { 505 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 506 configureMessage(message); 507 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in)); 508 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 509 message.setDeletedByBroker(true); 510 return message; 511 } 512 513 /** 514 * Indicates whether the session is in transacted mode. 515 * 516 * @return true if the session is in transacted mode 517 * @throws JMSException if there is some internal error. 518 */ 519 public boolean getTransacted() throws JMSException { 520 checkClosed(); 521 return isTransacted(); 522 } 523 524 /** 525 * Returns the acknowledgement mode of the session. The acknowledgement mode 526 * is set at the time that the session is created. If the session is 527 * transacted, the acknowledgement mode is ignored. 528 * 529 * @return If the session is not transacted, returns the current 530 * acknowledgement mode for the session. If the session is 531 * transacted, returns SESSION_TRANSACTED. 532 * @throws JMSException 533 * @see javax.jms.Connection#createSession(boolean,int) 534 * @since 1.1 exception JMSException if there is some internal error. 535 */ 536 public int getAcknowledgeMode() throws JMSException { 537 checkClosed(); 538 return this.acknowledgementMode; 539 } 540 541 /** 542 * Commits all messages done in this transaction and releases any locks 543 * currently held. 544 * 545 * @throws JMSException if the JMS provider fails to commit the transaction 546 * due to some internal error. 547 * @throws TransactionRolledBackException if the transaction is rolled back 548 * due to some internal error during commit. 549 * @throws javax.jms.IllegalStateException if the method is not called by a 550 * transacted session. 551 */ 552 public void commit() throws JMSException { 553 checkClosed(); 554 if (!getTransacted()) { 555 throw new javax.jms.IllegalStateException("Not a transacted session"); 556 } 557 if (LOG.isDebugEnabled()) { 558 LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId()); 559 } 560 transactionContext.commit(); 561 } 562 563 /** 564 * Rolls back any messages done in this transaction and releases any locks 565 * currently held. 566 * 567 * @throws JMSException if the JMS provider fails to roll back the 568 * transaction due to some internal error. 569 * @throws javax.jms.IllegalStateException if the method is not called by a 570 * transacted session. 571 */ 572 public void rollback() throws JMSException { 573 checkClosed(); 574 if (!getTransacted()) { 575 throw new javax.jms.IllegalStateException("Not a transacted session"); 576 } 577 if (LOG.isDebugEnabled()) { 578 LOG.debug(getSessionId() + " Transaction Rollback"); 579 } 580 transactionContext.rollback(); 581 } 582 583 /** 584 * Closes the session. 585 * <P> 586 * Since a provider may allocate some resources on behalf of a session 587 * outside the JVM, clients should close the resources when they are not 588 * needed. Relying on garbage collection to eventually reclaim these 589 * resources may not be timely enough. 590 * <P> 591 * There is no need to close the producers and consumers of a closed 592 * session. 593 * <P> 594 * This call will block until a <CODE>receive</CODE> call or message 595 * listener in progress has completed. A blocked message consumer 596 * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session 597 * is closed. 598 * <P> 599 * Closing a transacted session must roll back the transaction in progress. 600 * <P> 601 * This method is the only <CODE>Session</CODE> method that can be called 602 * concurrently. 603 * <P> 604 * Invoking any other <CODE>Session</CODE> method on a closed session must 605 * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a 606 * closed session must <I>not </I> throw an exception. 607 * 608 * @throws JMSException if the JMS provider fails to close the session due 609 * to some internal error. 610 */ 611 public void close() throws JMSException { 612 if (!closed) { 613 if (getTransactionContext().isInXATransaction()) { 614 if (!synchronizationRegistered) { 615 synchronizationRegistered = true; 616 getTransactionContext().addSynchronization(new Synchronization() { 617 618 @Override 619 public void afterCommit() throws Exception { 620 doClose(); 621 synchronizationRegistered = false; 622 } 623 624 @Override 625 public void afterRollback() throws Exception { 626 doClose(); 627 synchronizationRegistered = false; 628 } 629 }); 630 } 631 632 } else { 633 doClose(); 634 } 635 } 636 } 637 638 private void doClose() throws JMSException { 639 dispose(); 640 RemoveInfo removeCommand = info.createRemoveCommand(); 641 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 642 connection.asyncSendPacket(removeCommand); 643 } 644 645 void clearMessagesInProgress() { 646 executor.clearMessagesInProgress(); 647 // we are called from inside the transport reconnection logic 648 // which involves us clearing all the connections' consumers 649 // dispatch and delivered lists. So rather than trying to 650 // grab a mutex (which could be already owned by the message 651 // listener calling the send or an ack) we allow it to complete in 652 // a separate thread via the scheduler and notify us via 653 // connection.transportInterruptionProcessingComplete() 654 // 655 for (final ActiveMQMessageConsumer consumer : consumers) { 656 consumer.inProgressClearRequired(); 657 scheduler.executeAfterDelay(new Runnable() { 658 public void run() { 659 consumer.clearMessagesInProgress(); 660 }}, 0l); 661 } 662 } 663 664 void deliverAcks() { 665 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 666 ActiveMQMessageConsumer consumer = iter.next(); 667 consumer.deliverAcks(); 668 } 669 } 670 671 public synchronized void dispose() throws JMSException { 672 if (!closed) { 673 674 try { 675 executor.stop(); 676 677 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 678 ActiveMQMessageConsumer consumer = iter.next(); 679 consumer.setFailureError(connection.getFirstFailureError()); 680 consumer.dispose(); 681 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId()); 682 } 683 consumers.clear(); 684 685 for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) { 686 ActiveMQMessageProducer producer = iter.next(); 687 producer.dispose(); 688 } 689 producers.clear(); 690 691 try { 692 if (getTransactionContext().isInLocalTransaction()) { 693 rollback(); 694 } 695 } catch (JMSException e) { 696 } 697 698 } finally { 699 connection.removeSession(this); 700 this.transactionContext = null; 701 closed = true; 702 } 703 } 704 } 705 706 /** 707 * Checks that the session is not closed then configures the message 708 */ 709 protected void configureMessage(ActiveMQMessage message) throws IllegalStateException { 710 checkClosed(); 711 message.setConnection(connection); 712 } 713 714 /** 715 * Check if the session is closed. It is used for ensuring that the session 716 * is open before performing various operations. 717 * 718 * @throws IllegalStateException if the Session is closed 719 */ 720 protected void checkClosed() throws IllegalStateException { 721 if (closed) { 722 throw new IllegalStateException("The Session is closed"); 723 } 724 } 725 726 /** 727 * Stops message delivery in this session, and restarts message delivery 728 * with the oldest unacknowledged message. 729 * <P> 730 * All consumers deliver messages in a serial order. Acknowledging a 731 * received message automatically acknowledges all messages that have been 732 * delivered to the client. 733 * <P> 734 * Restarting a session causes it to take the following actions: 735 * <UL> 736 * <LI>Stop message delivery 737 * <LI>Mark all messages that might have been delivered but not 738 * acknowledged as "redelivered" 739 * <LI>Restart the delivery sequence including all unacknowledged messages 740 * that had been previously delivered. Redelivered messages do not have to 741 * be delivered in exactly their original delivery order. 742 * </UL> 743 * 744 * @throws JMSException if the JMS provider fails to stop and restart 745 * message delivery due to some internal error. 746 * @throws IllegalStateException if the method is called by a transacted 747 * session. 748 */ 749 public void recover() throws JMSException { 750 751 checkClosed(); 752 if (getTransacted()) { 753 throw new IllegalStateException("This session is transacted"); 754 } 755 756 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 757 ActiveMQMessageConsumer c = iter.next(); 758 c.rollback(); 759 } 760 761 } 762 763 /** 764 * Returns the session's distinguished message listener (optional). 765 * 766 * @return the message listener associated with this session 767 * @throws JMSException if the JMS provider fails to get the message 768 * listener due to an internal error. 769 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener) 770 * @see javax.jms.ServerSessionPool 771 * @see javax.jms.ServerSession 772 */ 773 public MessageListener getMessageListener() throws JMSException { 774 checkClosed(); 775 return this.messageListener; 776 } 777 778 /** 779 * Sets the session's distinguished message listener (optional). 780 * <P> 781 * When the distinguished message listener is set, no other form of message 782 * receipt in the session can be used; however, all forms of sending 783 * messages are still supported. 784 * <P> 785 * This is an expert facility not used by regular JMS clients. 786 * 787 * @param listener the message listener to associate with this session 788 * @throws JMSException if the JMS provider fails to set the message 789 * listener due to an internal error. 790 * @see javax.jms.Session#getMessageListener() 791 * @see javax.jms.ServerSessionPool 792 * @see javax.jms.ServerSession 793 */ 794 public void setMessageListener(MessageListener listener) throws JMSException { 795 checkClosed(); 796 this.messageListener = listener; 797 798 if (listener != null) { 799 executor.setDispatchedBySessionPool(true); 800 } 801 } 802 803 /** 804 * Optional operation, intended to be used only by Application Servers, not 805 * by ordinary JMS clients. 806 * 807 * @see javax.jms.ServerSession 808 */ 809 public void run() { 810 MessageDispatch messageDispatch; 811 while ((messageDispatch = executor.dequeueNoWait()) != null) { 812 final MessageDispatch md = messageDispatch; 813 ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); 814 if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) { 815 // TODO: Ack it without delivery to client 816 continue; 817 } 818 819 if (isClientAcknowledge()||isIndividualAcknowledge()) { 820 message.setAcknowledgeCallback(new Callback() { 821 public void execute() throws Exception { 822 } 823 }); 824 } 825 826 if (deliveryListener != null) { 827 deliveryListener.beforeDelivery(this, message); 828 } 829 830 md.setDeliverySequenceId(getNextDeliveryId()); 831 832 try { 833 messageListener.onMessage(message); 834 } catch (RuntimeException e) { 835 LOG.error("error dispatching message: ", e); 836 // A problem while invoking the MessageListener does not 837 // in general indicate a problem with the connection to the broker, i.e. 838 // it will usually be sufficient to let the afterDelivery() method either 839 // commit or roll back in order to deal with the exception. 840 // However, we notify any registered client internal exception listener 841 // of the problem. 842 connection.onClientInternalException(e); 843 } 844 845 try { 846 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); 847 ack.setFirstMessageId(md.getMessage().getMessageId()); 848 doStartTransaction(); 849 ack.setTransactionId(getTransactionContext().getTransactionId()); 850 if (ack.getTransactionId() != null) { 851 getTransactionContext().addSynchronization(new Synchronization() { 852 853 @Override 854 public void afterRollback() throws Exception { 855 md.getMessage().onMessageRolledBack(); 856 // ensure we don't filter this as a duplicate 857 connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); 858 RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); 859 int redeliveryCounter = md.getMessage().getRedeliveryCounter(); 860 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 861 && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) { 862 // We need to NACK the messages so that they get 863 // sent to the 864 // DLQ. 865 // Acknowledge the last message. 866 MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 867 ack.setFirstMessageId(md.getMessage().getMessageId()); 868 asyncSendPacket(ack); 869 } else { 870 871 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1); 872 ack.setFirstMessageId(md.getMessage().getMessageId()); 873 asyncSendPacket(ack); 874 875 // Figure out how long we should wait to resend 876 // this message. 877 long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); 878 for (int i = 0; i < redeliveryCounter; i++) { 879 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); 880 } 881 scheduler.executeAfterDelay(new Runnable() { 882 883 public void run() { 884 ((ActiveMQDispatcher)md.getConsumer()).dispatch(md); 885 } 886 }, redeliveryDelay); 887 } 888 } 889 }); 890 } 891 asyncSendPacket(ack); 892 } catch (Throwable e) { 893 connection.onClientInternalException(e); 894 } 895 896 if (deliveryListener != null) { 897 deliveryListener.afterDelivery(this, message); 898 } 899 } 900 } 901 902 /** 903 * Creates a <CODE>MessageProducer</CODE> to send messages to the 904 * specified destination. 905 * <P> 906 * A client uses a <CODE>MessageProducer</CODE> object to send messages to 907 * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both 908 * inherit from <CODE>Destination</CODE>, they can be used in the 909 * destination parameter to create a <CODE>MessageProducer</CODE> object. 910 * 911 * @param destination the <CODE>Destination</CODE> to send to, or null if 912 * this is a producer which does not have a specified 913 * destination. 914 * @return the MessageProducer 915 * @throws JMSException if the session fails to create a MessageProducer due 916 * to some internal error. 917 * @throws InvalidDestinationException if an invalid destination is 918 * specified. 919 * @since 1.1 920 */ 921 public MessageProducer createProducer(Destination destination) throws JMSException { 922 checkClosed(); 923 if (destination instanceof CustomDestination) { 924 CustomDestination customDestination = (CustomDestination)destination; 925 return customDestination.createProducer(this); 926 } 927 int timeSendOut = connection.getSendTimeout(); 928 return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut); 929 } 930 931 /** 932 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 933 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 934 * <CODE>Destination</CODE>, they can be used in the destination 935 * parameter to create a <CODE>MessageConsumer</CODE>. 936 * 937 * @param destination the <CODE>Destination</CODE> to access. 938 * @return the MessageConsumer 939 * @throws JMSException if the session fails to create a consumer due to 940 * some internal error. 941 * @throws InvalidDestinationException if an invalid destination is 942 * specified. 943 * @since 1.1 944 */ 945 public MessageConsumer createConsumer(Destination destination) throws JMSException { 946 return createConsumer(destination, (String) null); 947 } 948 949 /** 950 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 951 * using a message selector. Since <CODE> Queue</CODE> and 952 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 953 * can be used in the destination parameter to create a 954 * <CODE>MessageConsumer</CODE>. 955 * <P> 956 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 957 * that have been sent to a destination. 958 * 959 * @param destination the <CODE>Destination</CODE> to access 960 * @param messageSelector only messages with properties matching the message 961 * selector expression are delivered. A value of null or an 962 * empty string indicates that there is no message selector 963 * for the message consumer. 964 * @return the MessageConsumer 965 * @throws JMSException if the session fails to create a MessageConsumer due 966 * to some internal error. 967 * @throws InvalidDestinationException if an invalid destination is 968 * specified. 969 * @throws InvalidSelectorException if the message selector is invalid. 970 * @since 1.1 971 */ 972 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { 973 return createConsumer(destination, messageSelector, false); 974 } 975 976 /** 977 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 978 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 979 * <CODE>Destination</CODE>, they can be used in the destination 980 * parameter to create a <CODE>MessageConsumer</CODE>. 981 * 982 * @param destination the <CODE>Destination</CODE> to access. 983 * @param messageListener the listener to use for async consumption of messages 984 * @return the MessageConsumer 985 * @throws JMSException if the session fails to create a consumer due to 986 * some internal error. 987 * @throws InvalidDestinationException if an invalid destination is 988 * specified. 989 * @since 1.1 990 */ 991 public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException { 992 return createConsumer(destination, null, messageListener); 993 } 994 995 /** 996 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 997 * using a message selector. Since <CODE> Queue</CODE> and 998 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 999 * can be used in the destination parameter to create a 1000 * <CODE>MessageConsumer</CODE>. 1001 * <P> 1002 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1003 * that have been sent to a destination. 1004 * 1005 * @param destination the <CODE>Destination</CODE> to access 1006 * @param messageSelector only messages with properties matching the message 1007 * selector expression are delivered. A value of null or an 1008 * empty string indicates that there is no message selector 1009 * for the message consumer. 1010 * @param messageListener the listener to use for async consumption of messages 1011 * @return the MessageConsumer 1012 * @throws JMSException if the session fails to create a MessageConsumer due 1013 * to some internal error. 1014 * @throws InvalidDestinationException if an invalid destination is 1015 * specified. 1016 * @throws InvalidSelectorException if the message selector is invalid. 1017 * @since 1.1 1018 */ 1019 public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException { 1020 return createConsumer(destination, messageSelector, false, messageListener); 1021 } 1022 1023 /** 1024 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1025 * using a message selector. This method can specify whether messages 1026 * published by its own connection should be delivered to it, if the 1027 * destination is a topic. 1028 * <P> 1029 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1030 * <CODE>Destination</CODE>, they can be used in the destination 1031 * parameter to create a <CODE>MessageConsumer</CODE>. 1032 * <P> 1033 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1034 * that have been published to a destination. 1035 * <P> 1036 * In some cases, a connection may both publish and subscribe to a topic. 1037 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1038 * inhibit the delivery of messages published by its own connection. The 1039 * default value for this attribute is False. The <CODE>noLocal</CODE> 1040 * value must be supported by destinations that are topics. 1041 * 1042 * @param destination the <CODE>Destination</CODE> to access 1043 * @param messageSelector only messages with properties matching the message 1044 * selector expression are delivered. A value of null or an 1045 * empty string indicates that there is no message selector 1046 * for the message consumer. 1047 * @param noLocal - if true, and the destination is a topic, inhibits the 1048 * delivery of messages published by its own connection. The 1049 * behavior for <CODE>NoLocal</CODE> is not specified if 1050 * the destination is a queue. 1051 * @return the MessageConsumer 1052 * @throws JMSException if the session fails to create a MessageConsumer due 1053 * to some internal error. 1054 * @throws InvalidDestinationException if an invalid destination is 1055 * specified. 1056 * @throws InvalidSelectorException if the message selector is invalid. 1057 * @since 1.1 1058 */ 1059 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { 1060 return createConsumer(destination, messageSelector, noLocal, null); 1061 } 1062 1063 /** 1064 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1065 * using a message selector. This method can specify whether messages 1066 * published by its own connection should be delivered to it, if the 1067 * destination is a topic. 1068 * <P> 1069 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1070 * <CODE>Destination</CODE>, they can be used in the destination 1071 * parameter to create a <CODE>MessageConsumer</CODE>. 1072 * <P> 1073 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1074 * that have been published to a destination. 1075 * <P> 1076 * In some cases, a connection may both publish and subscribe to a topic. 1077 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1078 * inhibit the delivery of messages published by its own connection. The 1079 * default value for this attribute is False. The <CODE>noLocal</CODE> 1080 * value must be supported by destinations that are topics. 1081 * 1082 * @param destination the <CODE>Destination</CODE> to access 1083 * @param messageSelector only messages with properties matching the message 1084 * selector expression are delivered. A value of null or an 1085 * empty string indicates that there is no message selector 1086 * for the message consumer. 1087 * @param noLocal - if true, and the destination is a topic, inhibits the 1088 * delivery of messages published by its own connection. The 1089 * behavior for <CODE>NoLocal</CODE> is not specified if 1090 * the destination is a queue. 1091 * @param messageListener the listener to use for async consumption of messages 1092 * @return the MessageConsumer 1093 * @throws JMSException if the session fails to create a MessageConsumer due 1094 * to some internal error. 1095 * @throws InvalidDestinationException if an invalid destination is 1096 * specified. 1097 * @throws InvalidSelectorException if the message selector is invalid. 1098 * @since 1.1 1099 */ 1100 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException { 1101 checkClosed(); 1102 1103 if (destination instanceof CustomDestination) { 1104 CustomDestination customDestination = (CustomDestination)destination; 1105 return customDestination.createConsumer(this, messageSelector, noLocal); 1106 } 1107 1108 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); 1109 int prefetch = 0; 1110 if (destination instanceof Topic) { 1111 prefetch = prefetchPolicy.getTopicPrefetch(); 1112 } else { 1113 prefetch = prefetchPolicy.getQueuePrefetch(); 1114 } 1115 ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination); 1116 return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector, 1117 prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener); 1118 } 1119 1120 /** 1121 * Creates a queue identity given a <CODE>Queue</CODE> name. 1122 * <P> 1123 * This facility is provided for the rare cases where clients need to 1124 * dynamically manipulate queue identity. It allows the creation of a queue 1125 * identity with a provider-specific name. Clients that depend on this 1126 * ability are not portable. 1127 * <P> 1128 * Note that this method is not for creating the physical queue. The 1129 * physical creation of queues is an administrative task and is not to be 1130 * initiated by the JMS API. The one exception is the creation of temporary 1131 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> 1132 * method. 1133 * 1134 * @param queueName the name of this <CODE>Queue</CODE> 1135 * @return a <CODE>Queue</CODE> with the given name 1136 * @throws JMSException if the session fails to create a queue due to some 1137 * internal error. 1138 * @since 1.1 1139 */ 1140 public Queue createQueue(String queueName) throws JMSException { 1141 checkClosed(); 1142 if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1143 return new ActiveMQTempQueue(queueName); 1144 } 1145 return new ActiveMQQueue(queueName); 1146 } 1147 1148 /** 1149 * Creates a topic identity given a <CODE>Topic</CODE> name. 1150 * <P> 1151 * This facility is provided for the rare cases where clients need to 1152 * dynamically manipulate topic identity. This allows the creation of a 1153 * topic identity with a provider-specific name. Clients that depend on this 1154 * ability are not portable. 1155 * <P> 1156 * Note that this method is not for creating the physical topic. The 1157 * physical creation of topics is an administrative task and is not to be 1158 * initiated by the JMS API. The one exception is the creation of temporary 1159 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> 1160 * method. 1161 * 1162 * @param topicName the name of this <CODE>Topic</CODE> 1163 * @return a <CODE>Topic</CODE> with the given name 1164 * @throws JMSException if the session fails to create a topic due to some 1165 * internal error. 1166 * @since 1.1 1167 */ 1168 public Topic createTopic(String topicName) throws JMSException { 1169 checkClosed(); 1170 if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1171 return new ActiveMQTempTopic(topicName); 1172 } 1173 return new ActiveMQTopic(topicName); 1174 } 1175 1176 /** 1177 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1178 * the specified queue. 1179 * 1180 * @param queue the <CODE>queue</CODE> to access 1181 * @exception InvalidDestinationException if an invalid destination is 1182 * specified 1183 * @since 1.1 1184 */ 1185 /** 1186 * Creates a durable subscriber to the specified topic. 1187 * <P> 1188 * If a client needs to receive all the messages published on a topic, 1189 * including the ones published while the subscriber is inactive, it uses a 1190 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1191 * record of this durable subscription and insures that all messages from 1192 * the topic's publishers are retained until they are acknowledged by this 1193 * durable subscriber or they have expired. 1194 * <P> 1195 * Sessions with durable subscribers must always provide the same client 1196 * identifier. In addition, each client must specify a name that uniquely 1197 * identifies (within client identifier) each durable subscription it 1198 * creates. Only one session at a time can have a 1199 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. 1200 * <P> 1201 * A client can change an existing durable subscription by creating a 1202 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1203 * and/or message selector. Changing a durable subscriber is equivalent to 1204 * unsubscribing (deleting) the old one and creating a new one. 1205 * <P> 1206 * In some cases, a connection may both publish and subscribe to a topic. 1207 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1208 * inhibit the delivery of messages published by its own connection. The 1209 * default value for this attribute is false. 1210 * 1211 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1212 * @param name the name used to identify this subscription 1213 * @return the TopicSubscriber 1214 * @throws JMSException if the session fails to create a subscriber due to 1215 * some internal error. 1216 * @throws InvalidDestinationException if an invalid topic is specified. 1217 * @since 1.1 1218 */ 1219 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { 1220 checkClosed(); 1221 return createDurableSubscriber(topic, name, null, false); 1222 } 1223 1224 /** 1225 * Creates a durable subscriber to the specified topic, using a message 1226 * selector and specifying whether messages published by its own connection 1227 * should be delivered to it. 1228 * <P> 1229 * If a client needs to receive all the messages published on a topic, 1230 * including the ones published while the subscriber is inactive, it uses a 1231 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1232 * record of this durable subscription and insures that all messages from 1233 * the topic's publishers are retained until they are acknowledged by this 1234 * durable subscriber or they have expired. 1235 * <P> 1236 * Sessions with durable subscribers must always provide the same client 1237 * identifier. In addition, each client must specify a name which uniquely 1238 * identifies (within client identifier) each durable subscription it 1239 * creates. Only one session at a time can have a 1240 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An 1241 * inactive durable subscriber is one that exists but does not currently 1242 * have a message consumer associated with it. 1243 * <P> 1244 * A client can change an existing durable subscription by creating a 1245 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1246 * and/or message selector. Changing a durable subscriber is equivalent to 1247 * unsubscribing (deleting) the old one and creating a new one. 1248 * 1249 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1250 * @param name the name used to identify this subscription 1251 * @param messageSelector only messages with properties matching the message 1252 * selector expression are delivered. A value of null or an 1253 * empty string indicates that there is no message selector 1254 * for the message consumer. 1255 * @param noLocal if set, inhibits the delivery of messages published by its 1256 * own connection 1257 * @return the Queue Browser 1258 * @throws JMSException if the session fails to create a subscriber due to 1259 * some internal error. 1260 * @throws InvalidDestinationException if an invalid topic is specified. 1261 * @throws InvalidSelectorException if the message selector is invalid. 1262 * @since 1.1 1263 */ 1264 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { 1265 checkClosed(); 1266 1267 if (topic instanceof CustomDestination) { 1268 CustomDestination customDestination = (CustomDestination)topic; 1269 return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal); 1270 } 1271 1272 connection.checkClientIDWasManuallySpecified(); 1273 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1274 int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch(); 1275 int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit(); 1276 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit, 1277 noLocal, false, asyncDispatch); 1278 } 1279 1280 /** 1281 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1282 * the specified queue. 1283 * 1284 * @param queue the <CODE>queue</CODE> to access 1285 * @return the Queue Browser 1286 * @throws JMSException if the session fails to create a browser due to some 1287 * internal error. 1288 * @throws InvalidDestinationException if an invalid destination is 1289 * specified 1290 * @since 1.1 1291 */ 1292 public QueueBrowser createBrowser(Queue queue) throws JMSException { 1293 checkClosed(); 1294 return createBrowser(queue, null); 1295 } 1296 1297 /** 1298 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1299 * the specified queue using a message selector. 1300 * 1301 * @param queue the <CODE>queue</CODE> to access 1302 * @param messageSelector only messages with properties matching the message 1303 * selector expression are delivered. A value of null or an 1304 * empty string indicates that there is no message selector 1305 * for the message consumer. 1306 * @return the Queue Browser 1307 * @throws JMSException if the session fails to create a browser due to some 1308 * internal error. 1309 * @throws InvalidDestinationException if an invalid destination is 1310 * specified 1311 * @throws InvalidSelectorException if the message selector is invalid. 1312 * @since 1.1 1313 */ 1314 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { 1315 checkClosed(); 1316 return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch); 1317 } 1318 1319 /** 1320 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that 1321 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1322 * 1323 * @return a temporary queue identity 1324 * @throws JMSException if the session fails to create a temporary queue due 1325 * to some internal error. 1326 * @since 1.1 1327 */ 1328 public TemporaryQueue createTemporaryQueue() throws JMSException { 1329 checkClosed(); 1330 return (TemporaryQueue)connection.createTempDestination(false); 1331 } 1332 1333 /** 1334 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that 1335 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1336 * 1337 * @return a temporary topic identity 1338 * @throws JMSException if the session fails to create a temporary topic due 1339 * to some internal error. 1340 * @since 1.1 1341 */ 1342 public TemporaryTopic createTemporaryTopic() throws JMSException { 1343 checkClosed(); 1344 return (TemporaryTopic)connection.createTempDestination(true); 1345 } 1346 1347 /** 1348 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1349 * the specified queue. 1350 * 1351 * @param queue the <CODE>Queue</CODE> to access 1352 * @return 1353 * @throws JMSException if the session fails to create a receiver due to 1354 * some internal error. 1355 * @throws JMSException 1356 * @throws InvalidDestinationException if an invalid queue is specified. 1357 */ 1358 public QueueReceiver createReceiver(Queue queue) throws JMSException { 1359 checkClosed(); 1360 return createReceiver(queue, null); 1361 } 1362 1363 /** 1364 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1365 * the specified queue using a message selector. 1366 * 1367 * @param queue the <CODE>Queue</CODE> to access 1368 * @param messageSelector only messages with properties matching the message 1369 * selector expression are delivered. A value of null or an 1370 * empty string indicates that there is no message selector 1371 * for the message consumer. 1372 * @return QueueReceiver 1373 * @throws JMSException if the session fails to create a receiver due to 1374 * some internal error. 1375 * @throws InvalidDestinationException if an invalid queue is specified. 1376 * @throws InvalidSelectorException if the message selector is invalid. 1377 */ 1378 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { 1379 checkClosed(); 1380 1381 if (queue instanceof CustomDestination) { 1382 CustomDestination customDestination = (CustomDestination)queue; 1383 return customDestination.createReceiver(this, messageSelector); 1384 } 1385 1386 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1387 return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(), 1388 prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch); 1389 } 1390 1391 /** 1392 * Creates a <CODE>QueueSender</CODE> object to send messages to the 1393 * specified queue. 1394 * 1395 * @param queue the <CODE>Queue</CODE> to access, or null if this is an 1396 * unidentified producer 1397 * @return QueueSender 1398 * @throws JMSException if the session fails to create a sender due to some 1399 * internal error. 1400 * @throws InvalidDestinationException if an invalid queue is specified. 1401 */ 1402 public QueueSender createSender(Queue queue) throws JMSException { 1403 checkClosed(); 1404 if (queue instanceof CustomDestination) { 1405 CustomDestination customDestination = (CustomDestination)queue; 1406 return customDestination.createSender(this); 1407 } 1408 int timeSendOut = connection.getSendTimeout(); 1409 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut); 1410 } 1411 1412 /** 1413 * Creates a nondurable subscriber to the specified topic. <p/> 1414 * <P> 1415 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1416 * that have been published to a topic. <p/> 1417 * <P> 1418 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1419 * receive only messages that are published while they are active. <p/> 1420 * <P> 1421 * In some cases, a connection may both publish and subscribe to a topic. 1422 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1423 * inhibit the delivery of messages published by its own connection. The 1424 * default value for this attribute is false. 1425 * 1426 * @param topic the <CODE>Topic</CODE> to subscribe to 1427 * @return TopicSubscriber 1428 * @throws JMSException if the session fails to create a subscriber due to 1429 * some internal error. 1430 * @throws InvalidDestinationException if an invalid topic is specified. 1431 */ 1432 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 1433 checkClosed(); 1434 return createSubscriber(topic, null, false); 1435 } 1436 1437 /** 1438 * Creates a nondurable subscriber to the specified topic, using a message 1439 * selector or specifying whether messages published by its own connection 1440 * should be delivered to it. <p/> 1441 * <P> 1442 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1443 * that have been published to a topic. <p/> 1444 * <P> 1445 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1446 * receive only messages that are published while they are active. <p/> 1447 * <P> 1448 * Messages filtered out by a subscriber's message selector will never be 1449 * delivered to the subscriber. From the subscriber's perspective, they do 1450 * not exist. <p/> 1451 * <P> 1452 * In some cases, a connection may both publish and subscribe to a topic. 1453 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1454 * inhibit the delivery of messages published by its own connection. The 1455 * default value for this attribute is false. 1456 * 1457 * @param topic the <CODE>Topic</CODE> to subscribe to 1458 * @param messageSelector only messages with properties matching the message 1459 * selector expression are delivered. A value of null or an 1460 * empty string indicates that there is no message selector 1461 * for the message consumer. 1462 * @param noLocal if set, inhibits the delivery of messages published by its 1463 * own connection 1464 * @return TopicSubscriber 1465 * @throws JMSException if the session fails to create a subscriber due to 1466 * some internal error. 1467 * @throws InvalidDestinationException if an invalid topic is specified. 1468 * @throws InvalidSelectorException if the message selector is invalid. 1469 */ 1470 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { 1471 checkClosed(); 1472 1473 if (topic instanceof CustomDestination) { 1474 CustomDestination customDestination = (CustomDestination)topic; 1475 return customDestination.createSubscriber(this, messageSelector, noLocal); 1476 } 1477 1478 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1479 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy 1480 .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); 1481 } 1482 1483 /** 1484 * Creates a publisher for the specified topic. <p/> 1485 * <P> 1486 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages 1487 * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on 1488 * a topic, it defines a new sequence of messages that have no ordering 1489 * relationship with the messages it has previously sent. 1490 * 1491 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is 1492 * an unidentified producer 1493 * @return TopicPublisher 1494 * @throws JMSException if the session fails to create a publisher due to 1495 * some internal error. 1496 * @throws InvalidDestinationException if an invalid topic is specified. 1497 */ 1498 public TopicPublisher createPublisher(Topic topic) throws JMSException { 1499 checkClosed(); 1500 1501 if (topic instanceof CustomDestination) { 1502 CustomDestination customDestination = (CustomDestination)topic; 1503 return customDestination.createPublisher(this); 1504 } 1505 int timeSendOut = connection.getSendTimeout(); 1506 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut); 1507 } 1508 1509 /** 1510 * Unsubscribes a durable subscription that has been created by a client. 1511 * <P> 1512 * This method deletes the state being maintained on behalf of the 1513 * subscriber by its provider. 1514 * <P> 1515 * It is erroneous for a client to delete a durable subscription while there 1516 * is an active <CODE>MessageConsumer </CODE> or 1517 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 1518 * message is part of a pending transaction or has not been acknowledged in 1519 * the session. 1520 * 1521 * @param name the name used to identify this subscription 1522 * @throws JMSException if the session fails to unsubscribe to the durable 1523 * subscription due to some internal error. 1524 * @throws InvalidDestinationException if an invalid subscription name is 1525 * specified. 1526 * @since 1.1 1527 */ 1528 public void unsubscribe(String name) throws JMSException { 1529 checkClosed(); 1530 connection.unsubscribe(name); 1531 } 1532 1533 public void dispatch(MessageDispatch messageDispatch) { 1534 try { 1535 executor.execute(messageDispatch); 1536 } catch (InterruptedException e) { 1537 Thread.currentThread().interrupt(); 1538 connection.onClientInternalException(e); 1539 } 1540 } 1541 1542 /** 1543 * Acknowledges all consumed messages of the session of this consumed 1544 * message. 1545 * <P> 1546 * All consumed JMS messages support the <CODE>acknowledge</CODE> method 1547 * for use when a client has specified that its JMS session's consumed 1548 * messages are to be explicitly acknowledged. By invoking 1549 * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges 1550 * all messages consumed by the session that the message was delivered to. 1551 * <P> 1552 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted 1553 * sessions and sessions specified to use implicit acknowledgement modes. 1554 * <P> 1555 * A client may individually acknowledge each message as it is consumed, or 1556 * it may choose to acknowledge messages as an application-defined group 1557 * (which is done by calling acknowledge on the last received message of the 1558 * group, thereby acknowledging all messages consumed by the session.) 1559 * <P> 1560 * Messages that have been received but not acknowledged may be redelivered. 1561 * 1562 * @throws JMSException if the JMS provider fails to acknowledge the 1563 * messages due to some internal error. 1564 * @throws javax.jms.IllegalStateException if this method is called on a 1565 * closed session. 1566 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE 1567 */ 1568 public void acknowledge() throws JMSException { 1569 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1570 ActiveMQMessageConsumer c = iter.next(); 1571 c.acknowledge(); 1572 } 1573 } 1574 1575 /** 1576 * Add a message consumer. 1577 * 1578 * @param consumer - message consumer. 1579 * @throws JMSException 1580 */ 1581 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException { 1582 this.consumers.add(consumer); 1583 if (consumer.isDurableSubscriber()) { 1584 stats.onCreateDurableSubscriber(); 1585 } 1586 this.connection.addDispatcher(consumer.getConsumerId(), this); 1587 } 1588 1589 /** 1590 * Remove the message consumer. 1591 * 1592 * @param consumer - consumer to be removed. 1593 * @throws JMSException 1594 */ 1595 protected void removeConsumer(ActiveMQMessageConsumer consumer) { 1596 this.connection.removeDispatcher(consumer.getConsumerId()); 1597 if (consumer.isDurableSubscriber()) { 1598 stats.onRemoveDurableSubscriber(); 1599 } 1600 this.consumers.remove(consumer); 1601 this.connection.removeDispatcher(consumer); 1602 } 1603 1604 /** 1605 * Adds a message producer. 1606 * 1607 * @param producer - message producer to be added. 1608 * @throws JMSException 1609 */ 1610 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException { 1611 this.producers.add(producer); 1612 this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer); 1613 } 1614 1615 /** 1616 * Removes a message producer. 1617 * 1618 * @param producer - message producer to be removed. 1619 * @throws JMSException 1620 */ 1621 protected void removeProducer(ActiveMQMessageProducer producer) { 1622 this.connection.removeProducer(producer.getProducerInfo().getProducerId()); 1623 this.producers.remove(producer); 1624 } 1625 1626 /** 1627 * Start this Session. 1628 * 1629 * @throws JMSException 1630 */ 1631 protected void start() throws JMSException { 1632 started.set(true); 1633 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1634 ActiveMQMessageConsumer c = iter.next(); 1635 c.start(); 1636 } 1637 executor.start(); 1638 } 1639 1640 /** 1641 * Stops this session. 1642 * 1643 * @throws JMSException 1644 */ 1645 protected void stop() throws JMSException { 1646 1647 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1648 ActiveMQMessageConsumer c = iter.next(); 1649 c.stop(); 1650 } 1651 1652 started.set(false); 1653 executor.stop(); 1654 } 1655 1656 /** 1657 * Returns the session id. 1658 * 1659 * @return value - session id. 1660 */ 1661 protected SessionId getSessionId() { 1662 return info.getSessionId(); 1663 } 1664 1665 /** 1666 * @return 1667 */ 1668 protected ConsumerId getNextConsumerId() { 1669 return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId()); 1670 } 1671 1672 /** 1673 * @return 1674 */ 1675 protected ProducerId getNextProducerId() { 1676 return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId()); 1677 } 1678 1679 /** 1680 * Sends the message for dispatch by the broker. 1681 * 1682 * @param producer - message producer. 1683 * @param destination - message destination. 1684 * @param message - message to be sent. 1685 * @param deliveryMode - JMS messsage delivery mode. 1686 * @param priority - message priority. 1687 * @param timeToLive - message expiration. 1688 * @param producerWindow 1689 * @throws JMSException 1690 */ 1691 protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, 1692 MemoryUsage producerWindow, int sendTimeout) throws JMSException { 1693 1694 checkClosed(); 1695 if (destination.isTemporary() && connection.isDeleted(destination)) { 1696 throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); 1697 } 1698 synchronized (sendMutex) { 1699 // tell the Broker we are about to start a new transaction 1700 doStartTransaction(); 1701 TransactionId txid = transactionContext.getTransactionId(); 1702 long sequenceNumber = producer.getMessageSequence(); 1703 1704 //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 1705 message.setJMSDeliveryMode(deliveryMode); 1706 long expiration = 0L; 1707 if (!producer.getDisableMessageTimestamp()) { 1708 long timeStamp = System.currentTimeMillis(); 1709 message.setJMSTimestamp(timeStamp); 1710 if (timeToLive > 0) { 1711 expiration = timeToLive + timeStamp; 1712 } 1713 } 1714 message.setJMSExpiration(expiration); 1715 message.setJMSPriority(priority); 1716 message.setJMSRedelivered(false); 1717 1718 // transform to our own message format here 1719 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); 1720 1721 // Set the message id. 1722 if (msg == message) { 1723 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); 1724 } else { 1725 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); 1726 message.setJMSMessageID(msg.getMessageId().toString()); 1727 } 1728 //clear the brokerPath in case we are re-sending this message 1729 msg.setBrokerPath(null); 1730 // destination format is provider specific so only set on transformed message 1731 msg.setJMSDestination(destination); 1732 1733 msg.setTransactionId(txid); 1734 if (connection.isCopyMessageOnSend()) { 1735 msg = (ActiveMQMessage)msg.copy(); 1736 } 1737 msg.setConnection(connection); 1738 msg.onSend(); 1739 msg.setProducerId(msg.getMessageId().getProducerId()); 1740 if (LOG.isTraceEnabled()) { 1741 LOG.trace(getSessionId() + " sending message: " + msg); 1742 } 1743 if (sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { 1744 this.connection.asyncSendPacket(msg); 1745 if (producerWindow != null) { 1746 // Since we defer lots of the marshaling till we hit the 1747 // wire, this might not 1748 // provide and accurate size. We may change over to doing 1749 // more aggressive marshaling, 1750 // to get more accurate sizes.. this is more important once 1751 // users start using producer window 1752 // flow control. 1753 int size = msg.getSize(); 1754 producerWindow.increaseUsage(size); 1755 } 1756 } else { 1757 if (sendTimeout > 0) { 1758 this.connection.syncSendPacket(msg,sendTimeout); 1759 }else { 1760 this.connection.syncSendPacket(msg); 1761 } 1762 } 1763 1764 } 1765 } 1766 1767 /** 1768 * Send TransactionInfo to indicate transaction has started 1769 * 1770 * @throws JMSException if some internal error occurs 1771 */ 1772 protected void doStartTransaction() throws JMSException { 1773 if (getTransacted() && !transactionContext.isInXATransaction()) { 1774 transactionContext.begin(); 1775 } 1776 } 1777 1778 /** 1779 * Checks whether the session has unconsumed messages. 1780 * 1781 * @return true - if there are unconsumed messages. 1782 */ 1783 public boolean hasUncomsumedMessages() { 1784 return executor.hasUncomsumedMessages(); 1785 } 1786 1787 /** 1788 * Checks whether the session uses transactions. 1789 * 1790 * @return true - if the session uses transactions. 1791 */ 1792 public boolean isTransacted() { 1793 return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction()); 1794 } 1795 1796 /** 1797 * Checks whether the session used client acknowledgment. 1798 * 1799 * @return true - if the session uses client acknowledgment. 1800 */ 1801 protected boolean isClientAcknowledge() { 1802 return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE; 1803 } 1804 1805 /** 1806 * Checks whether the session used auto acknowledgment. 1807 * 1808 * @return true - if the session uses client acknowledgment. 1809 */ 1810 public boolean isAutoAcknowledge() { 1811 return acknowledgementMode == Session.AUTO_ACKNOWLEDGE; 1812 } 1813 1814 /** 1815 * Checks whether the session used dup ok acknowledgment. 1816 * 1817 * @return true - if the session uses client acknowledgment. 1818 */ 1819 public boolean isDupsOkAcknowledge() { 1820 return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; 1821 } 1822 1823 public boolean isIndividualAcknowledge(){ 1824 return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE; 1825 } 1826 1827 /** 1828 * Returns the message delivery listener. 1829 * 1830 * @return deliveryListener - message delivery listener. 1831 */ 1832 public DeliveryListener getDeliveryListener() { 1833 return deliveryListener; 1834 } 1835 1836 /** 1837 * Sets the message delivery listener. 1838 * 1839 * @param deliveryListener - message delivery listener. 1840 */ 1841 public void setDeliveryListener(DeliveryListener deliveryListener) { 1842 this.deliveryListener = deliveryListener; 1843 } 1844 1845 /** 1846 * Returns the SessionInfo bean. 1847 * 1848 * @return info - SessionInfo bean. 1849 * @throws JMSException 1850 */ 1851 protected SessionInfo getSessionInfo() throws JMSException { 1852 SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue()); 1853 return info; 1854 } 1855 1856 /** 1857 * Send the asynchronus command. 1858 * 1859 * @param command - command to be executed. 1860 * @throws JMSException 1861 */ 1862 public void asyncSendPacket(Command command) throws JMSException { 1863 connection.asyncSendPacket(command); 1864 } 1865 1866 /** 1867 * Send the synchronus command. 1868 * 1869 * @param command - command to be executed. 1870 * @return Response 1871 * @throws JMSException 1872 */ 1873 public Response syncSendPacket(Command command) throws JMSException { 1874 return connection.syncSendPacket(command); 1875 } 1876 1877 public long getNextDeliveryId() { 1878 return deliveryIdGenerator.getNextSequenceId(); 1879 } 1880 1881 public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException { 1882 1883 List<MessageDispatch> c = unconsumedMessages.removeAll(); 1884 for (MessageDispatch md : c) { 1885 this.connection.rollbackDuplicate(dispatcher, md.getMessage()); 1886 } 1887 Collections.reverse(c); 1888 1889 for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) { 1890 MessageDispatch md = iter.next(); 1891 executor.executeFirst(md); 1892 } 1893 1894 } 1895 1896 public boolean isRunning() { 1897 return started.get(); 1898 } 1899 1900 public boolean isAsyncDispatch() { 1901 return asyncDispatch; 1902 } 1903 1904 public void setAsyncDispatch(boolean asyncDispatch) { 1905 this.asyncDispatch = asyncDispatch; 1906 } 1907 1908 /** 1909 * @return Returns the sessionAsyncDispatch. 1910 */ 1911 public boolean isSessionAsyncDispatch() { 1912 return sessionAsyncDispatch; 1913 } 1914 1915 /** 1916 * @param sessionAsyncDispatch The sessionAsyncDispatch to set. 1917 */ 1918 public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) { 1919 this.sessionAsyncDispatch = sessionAsyncDispatch; 1920 } 1921 1922 public MessageTransformer getTransformer() { 1923 return transformer; 1924 } 1925 1926 public ActiveMQConnection getConnection() { 1927 return connection; 1928 } 1929 1930 /** 1931 * Sets the transformer used to transform messages before they are sent on 1932 * to the JMS bus or when they are received from the bus but before they are 1933 * delivered to the JMS client 1934 */ 1935 public void setTransformer(MessageTransformer transformer) { 1936 this.transformer = transformer; 1937 } 1938 1939 public BlobTransferPolicy getBlobTransferPolicy() { 1940 return blobTransferPolicy; 1941 } 1942 1943 /** 1944 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 1945 * OBjects) are transferred from producers to brokers to consumers 1946 */ 1947 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 1948 this.blobTransferPolicy = blobTransferPolicy; 1949 } 1950 1951 public List getUnconsumedMessages() { 1952 return executor.getUnconsumedMessages(); 1953 } 1954 1955 @Override 1956 public String toString() { 1957 return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}"; 1958 } 1959 1960 public void checkMessageListener() throws JMSException { 1961 if (messageListener != null) { 1962 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 1963 } 1964 for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) { 1965 ActiveMQMessageConsumer consumer = i.next(); 1966 if (consumer.getMessageListener() != null) { 1967 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 1968 } 1969 } 1970 } 1971 1972 protected void setOptimizeAcknowledge(boolean value) { 1973 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1974 ActiveMQMessageConsumer c = iter.next(); 1975 c.setOptimizeAcknowledge(value); 1976 } 1977 } 1978 1979 protected void setPrefetchSize(ConsumerId id, int prefetch) { 1980 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1981 ActiveMQMessageConsumer c = iter.next(); 1982 if (c.getConsumerId().equals(id)) { 1983 c.setPrefetchSize(prefetch); 1984 break; 1985 } 1986 } 1987 } 1988 1989 protected void close(ConsumerId id) { 1990 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1991 ActiveMQMessageConsumer c = iter.next(); 1992 if (c.getConsumerId().equals(id)) { 1993 try { 1994 c.close(); 1995 } catch (JMSException e) { 1996 LOG.warn("Exception closing consumer", e); 1997 } 1998 LOG.warn("Closed consumer on Command"); 1999 break; 2000 } 2001 } 2002 } 2003 2004 public boolean isInUse(ActiveMQTempDestination destination) { 2005 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2006 ActiveMQMessageConsumer c = iter.next(); 2007 if (c.isInUse(destination)) { 2008 return true; 2009 } 2010 } 2011 return false; 2012 } 2013 2014 /** 2015 * highest sequence id of the last message delivered by this session. 2016 * Passed to the broker in the close command, maintained by dispose() 2017 * @return lastDeliveredSequenceId 2018 */ 2019 public long getLastDeliveredSequenceId() { 2020 return lastDeliveredSequenceId; 2021 } 2022 2023 protected void sendAck(MessageAck ack) throws JMSException { 2024 sendAck(ack,false); 2025 } 2026 2027 protected void sendAck(MessageAck ack, boolean lazy) throws JMSException { 2028 if (lazy || connection.isSendAcksAsync() || getTransacted()) { 2029 asyncSendPacket(ack); 2030 } else { 2031 syncSendPacket(ack); 2032 } 2033 } 2034 2035 protected Scheduler getScheduler() { 2036 return this.scheduler; 2037 } 2038 2039 protected ThreadPoolExecutor getConnectionExecutor() { 2040 return this.connectionExecutor; 2041 } 2042}