001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.File;
020import java.io.InputStream;
021import java.io.Serializable;
022import java.net.URL;
023import java.util.Collections;
024import java.util.Iterator;
025import java.util.List;
026import java.util.concurrent.CopyOnWriteArrayList;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.atomic.AtomicBoolean;
029import javax.jms.BytesMessage;
030import javax.jms.Destination;
031import javax.jms.IllegalStateException;
032import javax.jms.InvalidDestinationException;
033import javax.jms.InvalidSelectorException;
034import javax.jms.JMSException;
035import javax.jms.MapMessage;
036import javax.jms.Message;
037import javax.jms.MessageConsumer;
038import javax.jms.MessageListener;
039import javax.jms.MessageProducer;
040import javax.jms.ObjectMessage;
041import javax.jms.Queue;
042import javax.jms.QueueBrowser;
043import javax.jms.QueueReceiver;
044import javax.jms.QueueSender;
045import javax.jms.QueueSession;
046import javax.jms.Session;
047import javax.jms.StreamMessage;
048import javax.jms.TemporaryQueue;
049import javax.jms.TemporaryTopic;
050import javax.jms.TextMessage;
051import javax.jms.Topic;
052import javax.jms.TopicPublisher;
053import javax.jms.TopicSession;
054import javax.jms.TopicSubscriber;
055import javax.jms.TransactionRolledBackException;
056import org.apache.activemq.blob.BlobDownloader;
057import org.apache.activemq.blob.BlobTransferPolicy;
058import org.apache.activemq.blob.BlobUploader;
059import org.apache.activemq.command.ActiveMQBlobMessage;
060import org.apache.activemq.command.ActiveMQBytesMessage;
061import org.apache.activemq.command.ActiveMQDestination;
062import org.apache.activemq.command.ActiveMQMapMessage;
063import org.apache.activemq.command.ActiveMQMessage;
064import org.apache.activemq.command.ActiveMQObjectMessage;
065import org.apache.activemq.command.ActiveMQQueue;
066import org.apache.activemq.command.ActiveMQStreamMessage;
067import org.apache.activemq.command.ActiveMQTempDestination;
068import org.apache.activemq.command.ActiveMQTempQueue;
069import org.apache.activemq.command.ActiveMQTempTopic;
070import org.apache.activemq.command.ActiveMQTextMessage;
071import org.apache.activemq.command.ActiveMQTopic;
072import org.apache.activemq.command.Command;
073import org.apache.activemq.command.ConsumerId;
074import org.apache.activemq.command.MessageAck;
075import org.apache.activemq.command.MessageDispatch;
076import org.apache.activemq.command.MessageId;
077import org.apache.activemq.command.ProducerId;
078import org.apache.activemq.command.RemoveInfo;
079import org.apache.activemq.command.Response;
080import org.apache.activemq.command.SessionId;
081import org.apache.activemq.command.SessionInfo;
082import org.apache.activemq.command.TransactionId;
083import org.apache.activemq.management.JMSSessionStatsImpl;
084import org.apache.activemq.management.StatsCapable;
085import org.apache.activemq.management.StatsImpl;
086import org.apache.activemq.thread.Scheduler;
087import org.apache.activemq.transaction.Synchronization;
088import org.apache.activemq.usage.MemoryUsage;
089import org.apache.activemq.util.Callback;
090import org.apache.activemq.util.LongSequenceGenerator;
091import org.slf4j.Logger;
092import org.slf4j.LoggerFactory;
093
094/**
095 * <P>
096 * A <CODE>Session</CODE> object is a single-threaded context for producing
097 * and consuming messages. Although it may allocate provider resources outside
098 * the Java virtual machine (JVM), it is considered a lightweight JMS object.
099 * <P>
100 * A session serves several purposes:
101 * <UL>
102 * <LI>It is a factory for its message producers and consumers.
103 * <LI>It supplies provider-optimized message factories.
104 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
105 * <CODE>TemporaryQueues</CODE>.
106 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
107 * objects for those clients that need to dynamically manipulate
108 * provider-specific destination names.
109 * <LI>It supports a single series of transactions that combine work spanning
110 * its producers and consumers into atomic units.
111 * <LI>It defines a serial order for the messages it consumes and the messages
112 * it produces.
113 * <LI>It retains messages it consumes until they have been acknowledged.
114 * <LI>It serializes execution of message listeners registered with its message
115 * consumers.
116 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
117 * </UL>
118 * <P>
119 * A session can create and service multiple message producers and consumers.
120 * <P>
121 * One typical use is to have a thread block on a synchronous
122 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
123 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
124 * <P>
125 * If a client desires to have one thread produce messages while others consume
126 * them, the client should use a separate session for its producing thread.
127 * <P>
128 * Once a connection has been started, any session with one or more registered
129 * message listeners is dedicated to the thread of control that delivers
130 * messages to it. It is erroneous for client code to use this session or any of
131 * its constituent objects from another thread of control. The only exception to
132 * this rule is the use of the session or connection <CODE>close</CODE>
133 * method.
134 * <P>
135 * It should be easy for most clients to partition their work naturally into
136 * sessions. This model allows clients to start simply and incrementally add
137 * message processing complexity as their need for concurrency grows.
138 * <P>
139 * The <CODE>close</CODE> method is the only session method that can be called
140 * while some other session method is being executed in another thread.
141 * <P>
142 * A session may be specified as transacted. Each transacted session supports a
143 * single series of transactions. Each transaction groups a set of message sends
144 * and a set of message receives into an atomic unit of work. In effect,
145 * transactions organize a session's input message stream and output message
146 * stream into series of atomic units. When a transaction commits, its atomic
147 * unit of input is acknowledged and its associated atomic unit of output is
148 * sent. If a transaction rollback is done, the transaction's sent messages are
149 * destroyed and the session's input is automatically recovered.
150 * <P>
151 * The content of a transaction's input and output units is simply those
152 * messages that have been produced and consumed within the session's current
153 * transaction.
154 * <P>
155 * A transaction is completed using either its session's <CODE>commit</CODE>
156 * method or its session's <CODE>rollback </CODE> method. The completion of a
157 * session's current transaction automatically begins the next. The result is
158 * that a transacted session always has a current transaction within which its
159 * work is done.
160 * <P>
161 * The Java Transaction Service (JTS) or some other transaction monitor may be
162 * used to combine a session's transaction with transactions on other resources
163 * (databases, other JMS sessions, etc.). Since Java distributed transactions
164 * are controlled via the Java Transaction API (JTA), use of the session's
165 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
166 * prohibited.
167 * <P>
168 * The JMS API does not require support for JTA; however, it does define how a
169 * provider supplies this support.
170 * <P>
171 * Although it is also possible for a JMS client to handle distributed
172 * transactions directly, it is unlikely that many JMS clients will do this.
173 * Support for JTA in the JMS API is targeted at systems vendors who will be
174 * integrating the JMS API into their application server products.
175 * 
176 * 
177 * @see javax.jms.Session
178 * @see javax.jms.QueueSession
179 * @see javax.jms.TopicSession
180 * @see javax.jms.XASession
181 */
182public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
183        
184        /**
185         * Only acknowledge an individual message - using message.acknowledge()
186         * as opposed to CLIENT_ACKNOWLEDGE which 
187         * acknowledges all messages consumed by a session at when acknowledge()
188         * is called
189         */
190    public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
191    public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
192
193    public static interface DeliveryListener {
194        void beforeDelivery(ActiveMQSession session, Message msg);
195
196        void afterDelivery(ActiveMQSession session, Message msg);
197    }
198
199    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
200    private final Scheduler scheduler;
201    private final ThreadPoolExecutor connectionExecutor;
202
203    protected int acknowledgementMode;
204    protected final ActiveMQConnection connection;
205    protected final SessionInfo info;
206    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
207    protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
208    protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
209    protected final ActiveMQSessionExecutor executor;
210    protected final AtomicBoolean started = new AtomicBoolean(false);
211
212    protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
213    protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
214
215    protected boolean closed;
216    private volatile boolean synchronizationRegistered;
217    protected boolean asyncDispatch;
218    protected boolean sessionAsyncDispatch;
219    protected final boolean debug;
220    protected Object sendMutex = new Object();
221
222    private MessageListener messageListener;
223    private final JMSSessionStatsImpl stats;
224    private TransactionContext transactionContext;
225    private DeliveryListener deliveryListener;
226    private MessageTransformer transformer;
227    private BlobTransferPolicy blobTransferPolicy;
228    private long lastDeliveredSequenceId;
229
230    /**
231     * Construct the Session
232     * 
233     * @param connection
234     * @param sessionId
235     * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
236     *                Session.SESSION_TRANSACTED
237     * @param asyncDispatch
238     * @param sessionAsyncDispatch
239     * @throws JMSException on internal error
240     */
241    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
242        this.debug = LOG.isDebugEnabled();
243        this.connection = connection;
244        this.acknowledgementMode = acknowledgeMode;
245        this.asyncDispatch = asyncDispatch;
246        this.sessionAsyncDispatch = sessionAsyncDispatch;
247        this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
248        setTransactionContext(new TransactionContext(connection));
249        stats = new JMSSessionStatsImpl(producers, consumers);
250        this.connection.asyncSendPacket(info);
251        setTransformer(connection.getTransformer());
252        setBlobTransferPolicy(connection.getBlobTransferPolicy());
253        this.scheduler=connection.getScheduler();
254        this.connectionExecutor=connection.getExecutor();
255        this.executor = new ActiveMQSessionExecutor(this);
256        connection.addSession(this);        
257        if (connection.isStarted()) {
258            start();
259        }
260
261    }
262
263    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
264        this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
265    }
266
267    /**
268     * Sets the transaction context of the session.
269     * 
270     * @param transactionContext - provides the means to control a JMS
271     *                transaction.
272     */
273    public void setTransactionContext(TransactionContext transactionContext) {
274        this.transactionContext = transactionContext;
275    }
276
277    /**
278     * Returns the transaction context of the session.
279     * 
280     * @return transactionContext - session's transaction context.
281     */
282    public TransactionContext getTransactionContext() {
283        return transactionContext;
284    }
285
286    /*
287     * (non-Javadoc)
288     * 
289     * @see org.apache.activemq.management.StatsCapable#getStats()
290     */
291    public StatsImpl getStats() {
292        return stats;
293    }
294
295    /**
296     * Returns the session's statistics.
297     * 
298     * @return stats - session's statistics.
299     */
300    public JMSSessionStatsImpl getSessionStats() {
301        return stats;
302    }
303
304    /**
305     * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
306     * object is used to send a message containing a stream of uninterpreted
307     * bytes.
308     * 
309     * @return the an ActiveMQBytesMessage
310     * @throws JMSException if the JMS provider fails to create this message due
311     *                 to some internal error.
312     */
313    public BytesMessage createBytesMessage() throws JMSException {
314        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
315        configureMessage(message);
316        return message;
317    }
318
319    /**
320     * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
321     * object is used to send a self-defining set of name-value pairs, where
322     * names are <CODE>String</CODE> objects and values are primitive values
323     * in the Java programming language.
324     * 
325     * @return an ActiveMQMapMessage
326     * @throws JMSException if the JMS provider fails to create this message due
327     *                 to some internal error.
328     */
329    public MapMessage createMapMessage() throws JMSException {
330        ActiveMQMapMessage message = new ActiveMQMapMessage();
331        configureMessage(message);
332        return message;
333    }
334
335    /**
336     * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
337     * interface is the root interface of all JMS messages. A
338     * <CODE>Message</CODE> object holds all the standard message header
339     * information. It can be sent when a message containing only header
340     * information is sufficient.
341     * 
342     * @return an ActiveMQMessage
343     * @throws JMSException if the JMS provider fails to create this message due
344     *                 to some internal error.
345     */
346    public Message createMessage() throws JMSException {
347        ActiveMQMessage message = new ActiveMQMessage();
348        configureMessage(message);
349        return message;
350    }
351
352    /**
353     * Creates an <CODE>ObjectMessage</CODE> object. An
354     * <CODE>ObjectMessage</CODE> object is used to send a message that
355     * contains a serializable Java object.
356     * 
357     * @return an ActiveMQObjectMessage
358     * @throws JMSException if the JMS provider fails to create this message due
359     *                 to some internal error.
360     */
361    public ObjectMessage createObjectMessage() throws JMSException {
362        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
363        configureMessage(message);
364        return message;
365    }
366
367    /**
368     * Creates an initialized <CODE>ObjectMessage</CODE> object. An
369     * <CODE>ObjectMessage</CODE> object is used to send a message that
370     * contains a serializable Java object.
371     * 
372     * @param object the object to use to initialize this message
373     * @return an ActiveMQObjectMessage
374     * @throws JMSException if the JMS provider fails to create this message due
375     *                 to some internal error.
376     */
377    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
378        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
379        configureMessage(message);
380        message.setObject(object);
381        return message;
382    }
383
384    /**
385     * Creates a <CODE>StreamMessage</CODE> object. A
386     * <CODE>StreamMessage</CODE> object is used to send a self-defining
387     * stream of primitive values in the Java programming language.
388     * 
389     * @return an ActiveMQStreamMessage
390     * @throws JMSException if the JMS provider fails to create this message due
391     *                 to some internal error.
392     */
393    public StreamMessage createStreamMessage() throws JMSException {
394        ActiveMQStreamMessage message = new ActiveMQStreamMessage();
395        configureMessage(message);
396        return message;
397    }
398
399    /**
400     * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
401     * object is used to send a message containing a <CODE>String</CODE>
402     * object.
403     * 
404     * @return an ActiveMQTextMessage
405     * @throws JMSException if the JMS provider fails to create this message due
406     *                 to some internal error.
407     */
408    public TextMessage createTextMessage() throws JMSException {
409        ActiveMQTextMessage message = new ActiveMQTextMessage();
410        configureMessage(message);
411        return message;
412    }
413
414    /**
415     * Creates an initialized <CODE>TextMessage</CODE> object. A
416     * <CODE>TextMessage</CODE> object is used to send a message containing a
417     * <CODE>String</CODE>.
418     * 
419     * @param text the string used to initialize this message
420     * @return an ActiveMQTextMessage
421     * @throws JMSException if the JMS provider fails to create this message due
422     *                 to some internal error.
423     */
424    public TextMessage createTextMessage(String text) throws JMSException {
425        ActiveMQTextMessage message = new ActiveMQTextMessage();
426        message.setText(text);
427        configureMessage(message);
428        return message;
429    }
430
431    /**
432     * Creates an initialized <CODE>BlobMessage</CODE> object. A
433     * <CODE>BlobMessage</CODE> object is used to send a message containing a
434     * <CODE>URL</CODE> which points to some network addressible BLOB.
435     * 
436     * @param url the network addressable URL used to pass directly to the
437     *                consumer
438     * @return a BlobMessage
439     * @throws JMSException if the JMS provider fails to create this message due
440     *                 to some internal error.
441     */
442    public BlobMessage createBlobMessage(URL url) throws JMSException {
443        return createBlobMessage(url, false);
444    }
445
446    /**
447     * Creates an initialized <CODE>BlobMessage</CODE> object. A
448     * <CODE>BlobMessage</CODE> object is used to send a message containing a
449     * <CODE>URL</CODE> which points to some network addressible BLOB.
450     * 
451     * @param url the network addressable URL used to pass directly to the
452     *                consumer
453     * @param deletedByBroker indicates whether or not the resource is deleted
454     *                by the broker when the message is acknowledged
455     * @return a BlobMessage
456     * @throws JMSException if the JMS provider fails to create this message due
457     *                 to some internal error.
458     */
459    public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
460        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
461        configureMessage(message);
462        message.setURL(url);
463        message.setDeletedByBroker(deletedByBroker);
464        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
465        return message;
466    }
467
468    /**
469     * Creates an initialized <CODE>BlobMessage</CODE> object. A
470     * <CODE>BlobMessage</CODE> object is used to send a message containing
471     * the <CODE>File</CODE> content. Before the message is sent the file
472     * conent will be uploaded to the broker or some other remote repository
473     * depending on the {@link #getBlobTransferPolicy()}.
474     * 
475     * @param file the file to be uploaded to some remote repo (or the broker)
476     *                depending on the strategy
477     * @return a BlobMessage
478     * @throws JMSException if the JMS provider fails to create this message due
479     *                 to some internal error.
480     */
481    public BlobMessage createBlobMessage(File file) throws JMSException {
482        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
483        configureMessage(message);
484        message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
485        message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
486        message.setDeletedByBroker(true);
487        message.setName(file.getName());
488        return message;
489    }
490
491    /**
492     * Creates an initialized <CODE>BlobMessage</CODE> object. A
493     * <CODE>BlobMessage</CODE> object is used to send a message containing
494     * the <CODE>File</CODE> content. Before the message is sent the file
495     * conent will be uploaded to the broker or some other remote repository
496     * depending on the {@link #getBlobTransferPolicy()}.
497     * 
498     * @param in the stream to be uploaded to some remote repo (or the broker)
499     *                depending on the strategy
500     * @return a BlobMessage
501     * @throws JMSException if the JMS provider fails to create this message due
502     *                 to some internal error.
503     */
504    public BlobMessage createBlobMessage(InputStream in) throws JMSException {
505        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
506        configureMessage(message);
507        message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
508        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
509        message.setDeletedByBroker(true);
510        return message;
511    }
512
513    /**
514     * Indicates whether the session is in transacted mode.
515     * 
516     * @return true if the session is in transacted mode
517     * @throws JMSException if there is some internal error.
518     */
519    public boolean getTransacted() throws JMSException {
520        checkClosed();
521        return isTransacted();
522    }
523
524    /**
525     * Returns the acknowledgement mode of the session. The acknowledgement mode
526     * is set at the time that the session is created. If the session is
527     * transacted, the acknowledgement mode is ignored.
528     * 
529     * @return If the session is not transacted, returns the current
530     *         acknowledgement mode for the session. If the session is
531     *         transacted, returns SESSION_TRANSACTED.
532     * @throws JMSException
533     * @see javax.jms.Connection#createSession(boolean,int)
534     * @since 1.1 exception JMSException if there is some internal error.
535     */
536    public int getAcknowledgeMode() throws JMSException {
537        checkClosed();
538        return this.acknowledgementMode;
539    }
540
541    /**
542     * Commits all messages done in this transaction and releases any locks
543     * currently held.
544     * 
545     * @throws JMSException if the JMS provider fails to commit the transaction
546     *                 due to some internal error.
547     * @throws TransactionRolledBackException if the transaction is rolled back
548     *                 due to some internal error during commit.
549     * @throws javax.jms.IllegalStateException if the method is not called by a
550     *                 transacted session.
551     */
552    public void commit() throws JMSException {
553        checkClosed();
554        if (!getTransacted()) {
555            throw new javax.jms.IllegalStateException("Not a transacted session");
556        }
557        if (LOG.isDebugEnabled()) {
558            LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
559        }
560        transactionContext.commit();
561    }
562
563    /**
564     * Rolls back any messages done in this transaction and releases any locks
565     * currently held.
566     * 
567     * @throws JMSException if the JMS provider fails to roll back the
568     *                 transaction due to some internal error.
569     * @throws javax.jms.IllegalStateException if the method is not called by a
570     *                 transacted session.
571     */
572    public void rollback() throws JMSException {
573        checkClosed();
574        if (!getTransacted()) {
575            throw new javax.jms.IllegalStateException("Not a transacted session");
576        }
577        if (LOG.isDebugEnabled()) {
578            LOG.debug(getSessionId() + " Transaction Rollback");
579        }
580        transactionContext.rollback();
581    }
582
583    /**
584     * Closes the session.
585     * <P>
586     * Since a provider may allocate some resources on behalf of a session
587     * outside the JVM, clients should close the resources when they are not
588     * needed. Relying on garbage collection to eventually reclaim these
589     * resources may not be timely enough.
590     * <P>
591     * There is no need to close the producers and consumers of a closed
592     * session.
593     * <P>
594     * This call will block until a <CODE>receive</CODE> call or message
595     * listener in progress has completed. A blocked message consumer
596     * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
597     * is closed.
598     * <P>
599     * Closing a transacted session must roll back the transaction in progress.
600     * <P>
601     * This method is the only <CODE>Session</CODE> method that can be called
602     * concurrently.
603     * <P>
604     * Invoking any other <CODE>Session</CODE> method on a closed session must
605     * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
606     * closed session must <I>not </I> throw an exception.
607     * 
608     * @throws JMSException if the JMS provider fails to close the session due
609     *                 to some internal error.
610     */
611    public void close() throws JMSException {
612        if (!closed) {
613            if (getTransactionContext().isInXATransaction()) {
614                if (!synchronizationRegistered) {
615                    synchronizationRegistered = true;
616                    getTransactionContext().addSynchronization(new Synchronization() {
617
618                                        @Override
619                                        public void afterCommit() throws Exception {
620                                            doClose();
621                                            synchronizationRegistered = false;
622                                        }
623
624                                        @Override
625                                        public void afterRollback() throws Exception {
626                                            doClose();
627                                            synchronizationRegistered = false;
628                                        }
629                                    });
630                }
631
632            } else {
633                doClose();
634            }
635        }
636    }
637
638    private void doClose() throws JMSException {
639        dispose();
640        RemoveInfo removeCommand = info.createRemoveCommand();
641        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
642        connection.asyncSendPacket(removeCommand);
643    }
644
645    void clearMessagesInProgress() {
646        executor.clearMessagesInProgress();        
647        // we are called from inside the transport reconnection logic
648        // which involves us clearing all the connections' consumers
649        // dispatch and delivered lists. So rather than trying to 
650        // grab a mutex (which could be already owned by the message 
651        // listener calling the send or an ack) we allow it to complete in 
652        // a separate thread via the scheduler and notify us via 
653        // connection.transportInterruptionProcessingComplete()
654        //
655        for (final ActiveMQMessageConsumer consumer : consumers) {
656            consumer.inProgressClearRequired();
657            scheduler.executeAfterDelay(new Runnable() {
658                public void run() {
659                    consumer.clearMessagesInProgress();
660                }}, 0l);
661        }
662    }
663
664    void deliverAcks() {
665        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
666            ActiveMQMessageConsumer consumer = iter.next();
667            consumer.deliverAcks();
668        }
669    }
670
671    public synchronized void dispose() throws JMSException {
672        if (!closed) {
673
674            try {
675                executor.stop();
676
677                for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
678                    ActiveMQMessageConsumer consumer = iter.next();
679                    consumer.setFailureError(connection.getFirstFailureError());
680                    consumer.dispose();
681                    lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
682                }
683                consumers.clear();
684
685                for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
686                    ActiveMQMessageProducer producer = iter.next();
687                    producer.dispose();
688                }
689                producers.clear();
690
691                try {
692                    if (getTransactionContext().isInLocalTransaction()) {
693                        rollback();
694                    }
695                } catch (JMSException e) {
696                }
697
698            } finally {
699                connection.removeSession(this);
700                this.transactionContext = null;
701                closed = true;
702            }
703        }
704    }
705
706    /**
707     * Checks that the session is not closed then configures the message
708     */
709    protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
710        checkClosed();
711        message.setConnection(connection);
712    }
713
714    /**
715     * Check if the session is closed. It is used for ensuring that the session
716     * is open before performing various operations.
717     * 
718     * @throws IllegalStateException if the Session is closed
719     */
720    protected void checkClosed() throws IllegalStateException {
721        if (closed) {
722            throw new IllegalStateException("The Session is closed");
723        }
724    }
725
726    /**
727     * Stops message delivery in this session, and restarts message delivery
728     * with the oldest unacknowledged message.
729     * <P>
730     * All consumers deliver messages in a serial order. Acknowledging a
731     * received message automatically acknowledges all messages that have been
732     * delivered to the client.
733     * <P>
734     * Restarting a session causes it to take the following actions:
735     * <UL>
736     * <LI>Stop message delivery
737     * <LI>Mark all messages that might have been delivered but not
738     * acknowledged as "redelivered"
739     * <LI>Restart the delivery sequence including all unacknowledged messages
740     * that had been previously delivered. Redelivered messages do not have to
741     * be delivered in exactly their original delivery order.
742     * </UL>
743     * 
744     * @throws JMSException if the JMS provider fails to stop and restart
745     *                 message delivery due to some internal error.
746     * @throws IllegalStateException if the method is called by a transacted
747     *                 session.
748     */
749    public void recover() throws JMSException {
750
751        checkClosed();
752        if (getTransacted()) {
753            throw new IllegalStateException("This session is transacted");
754        }
755
756        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
757            ActiveMQMessageConsumer c = iter.next();
758            c.rollback();
759        }
760
761    }
762
763    /**
764     * Returns the session's distinguished message listener (optional).
765     * 
766     * @return the message listener associated with this session
767     * @throws JMSException if the JMS provider fails to get the message
768     *                 listener due to an internal error.
769     * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
770     * @see javax.jms.ServerSessionPool
771     * @see javax.jms.ServerSession
772     */
773    public MessageListener getMessageListener() throws JMSException {
774        checkClosed();
775        return this.messageListener;
776    }
777
778    /**
779     * Sets the session's distinguished message listener (optional).
780     * <P>
781     * When the distinguished message listener is set, no other form of message
782     * receipt in the session can be used; however, all forms of sending
783     * messages are still supported.
784     * <P>
785     * This is an expert facility not used by regular JMS clients.
786     * 
787     * @param listener the message listener to associate with this session
788     * @throws JMSException if the JMS provider fails to set the message
789     *                 listener due to an internal error.
790     * @see javax.jms.Session#getMessageListener()
791     * @see javax.jms.ServerSessionPool
792     * @see javax.jms.ServerSession
793     */
794    public void setMessageListener(MessageListener listener) throws JMSException {
795        checkClosed();
796        this.messageListener = listener;
797
798        if (listener != null) {
799            executor.setDispatchedBySessionPool(true);
800        }
801    }
802
803    /**
804     * Optional operation, intended to be used only by Application Servers, not
805     * by ordinary JMS clients.
806     * 
807     * @see javax.jms.ServerSession
808     */
809    public void run() {
810        MessageDispatch messageDispatch;
811        while ((messageDispatch = executor.dequeueNoWait()) != null) {
812            final MessageDispatch md = messageDispatch;
813            ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
814            if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) {
815                // TODO: Ack it without delivery to client
816                continue;
817            }
818
819            if (isClientAcknowledge()||isIndividualAcknowledge()) {
820                message.setAcknowledgeCallback(new Callback() {
821                    public void execute() throws Exception {
822                    }
823                });
824            }
825
826            if (deliveryListener != null) {
827                deliveryListener.beforeDelivery(this, message);
828            }
829
830            md.setDeliverySequenceId(getNextDeliveryId());
831
832            try {
833                messageListener.onMessage(message);
834            } catch (RuntimeException e) {
835                LOG.error("error dispatching message: ", e);
836                // A problem while invoking the MessageListener does not
837                // in general indicate a problem with the connection to the broker, i.e.
838                // it will usually be sufficient to let the afterDelivery() method either
839                // commit or roll back in order to deal with the exception.
840                // However, we notify any registered client internal exception listener
841                // of the problem.
842                connection.onClientInternalException(e);
843            }
844
845            try {
846                MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
847                ack.setFirstMessageId(md.getMessage().getMessageId());
848                doStartTransaction();
849                ack.setTransactionId(getTransactionContext().getTransactionId());
850                if (ack.getTransactionId() != null) {
851                    getTransactionContext().addSynchronization(new Synchronization() {
852
853                        @Override
854                        public void afterRollback() throws Exception {
855                            md.getMessage().onMessageRolledBack();
856                            // ensure we don't filter this as a duplicate
857                            connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
858                            RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
859                            int redeliveryCounter = md.getMessage().getRedeliveryCounter();
860                            if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
861                                && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
862                                // We need to NACK the messages so that they get
863                                // sent to the
864                                // DLQ.
865                                // Acknowledge the last message.
866                                MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
867                                ack.setFirstMessageId(md.getMessage().getMessageId());
868                                asyncSendPacket(ack);
869                            } else {
870                                
871                                MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
872                                ack.setFirstMessageId(md.getMessage().getMessageId());
873                                asyncSendPacket(ack);
874
875                                // Figure out how long we should wait to resend
876                                // this message.
877                                long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
878                                for (int i = 0; i < redeliveryCounter; i++) {
879                                    redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
880                                }
881                                scheduler.executeAfterDelay(new Runnable() {
882
883                                    public void run() {
884                                        ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
885                                    }
886                                }, redeliveryDelay);
887                            }
888                        }
889                    });
890                }
891                asyncSendPacket(ack);
892            } catch (Throwable e) {
893                connection.onClientInternalException(e);
894            }
895
896            if (deliveryListener != null) {
897                deliveryListener.afterDelivery(this, message);
898            }
899        }
900    }
901
902    /**
903     * Creates a <CODE>MessageProducer</CODE> to send messages to the
904     * specified destination.
905     * <P>
906     * A client uses a <CODE>MessageProducer</CODE> object to send messages to
907     * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
908     * inherit from <CODE>Destination</CODE>, they can be used in the
909     * destination parameter to create a <CODE>MessageProducer</CODE> object.
910     * 
911     * @param destination the <CODE>Destination</CODE> to send to, or null if
912     *                this is a producer which does not have a specified
913     *                destination.
914     * @return the MessageProducer
915     * @throws JMSException if the session fails to create a MessageProducer due
916     *                 to some internal error.
917     * @throws InvalidDestinationException if an invalid destination is
918     *                 specified.
919     * @since 1.1
920     */
921    public MessageProducer createProducer(Destination destination) throws JMSException {
922        checkClosed();
923        if (destination instanceof CustomDestination) {
924            CustomDestination customDestination = (CustomDestination)destination;
925            return customDestination.createProducer(this);
926        }
927        int timeSendOut = connection.getSendTimeout();
928        return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
929    }
930
931    /**
932     * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
933     * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
934     * <CODE>Destination</CODE>, they can be used in the destination
935     * parameter to create a <CODE>MessageConsumer</CODE>.
936     * 
937     * @param destination the <CODE>Destination</CODE> to access.
938     * @return the MessageConsumer
939     * @throws JMSException if the session fails to create a consumer due to
940     *                 some internal error.
941     * @throws InvalidDestinationException if an invalid destination is
942     *                 specified.
943     * @since 1.1
944     */
945    public MessageConsumer createConsumer(Destination destination) throws JMSException {
946        return createConsumer(destination, (String) null);
947    }
948
949    /**
950     * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
951     * using a message selector. Since <CODE> Queue</CODE> and
952     * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
953     * can be used in the destination parameter to create a
954     * <CODE>MessageConsumer</CODE>.
955     * <P>
956     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
957     * that have been sent to a destination.
958     * 
959     * @param destination the <CODE>Destination</CODE> to access
960     * @param messageSelector only messages with properties matching the message
961     *                selector expression are delivered. A value of null or an
962     *                empty string indicates that there is no message selector
963     *                for the message consumer.
964     * @return the MessageConsumer
965     * @throws JMSException if the session fails to create a MessageConsumer due
966     *                 to some internal error.
967     * @throws InvalidDestinationException if an invalid destination is
968     *                 specified.
969     * @throws InvalidSelectorException if the message selector is invalid.
970     * @since 1.1
971     */
972    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
973        return createConsumer(destination, messageSelector, false);
974    }
975
976    /**
977     * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
978     * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
979     * <CODE>Destination</CODE>, they can be used in the destination
980     * parameter to create a <CODE>MessageConsumer</CODE>.
981     *
982     * @param destination the <CODE>Destination</CODE> to access.
983     * @param messageListener the listener to use for async consumption of messages
984     * @return the MessageConsumer
985     * @throws JMSException if the session fails to create a consumer due to
986     *                 some internal error.
987     * @throws InvalidDestinationException if an invalid destination is
988     *                 specified.
989     * @since 1.1
990     */
991    public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
992        return createConsumer(destination, null, messageListener);
993    }
994
995    /**
996     * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
997     * using a message selector. Since <CODE> Queue</CODE> and
998     * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
999     * can be used in the destination parameter to create a
1000     * <CODE>MessageConsumer</CODE>.
1001     * <P>
1002     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1003     * that have been sent to a destination.
1004     *
1005     * @param destination the <CODE>Destination</CODE> to access
1006     * @param messageSelector only messages with properties matching the message
1007     *                selector expression are delivered. A value of null or an
1008     *                empty string indicates that there is no message selector
1009     *                for the message consumer.
1010     * @param messageListener the listener to use for async consumption of messages
1011     * @return the MessageConsumer
1012     * @throws JMSException if the session fails to create a MessageConsumer due
1013     *                 to some internal error.
1014     * @throws InvalidDestinationException if an invalid destination is
1015     *                 specified.
1016     * @throws InvalidSelectorException if the message selector is invalid.
1017     * @since 1.1
1018     */
1019    public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
1020        return createConsumer(destination, messageSelector, false, messageListener);
1021    }
1022
1023    /**
1024     * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1025     * using a message selector. This method can specify whether messages
1026     * published by its own connection should be delivered to it, if the
1027     * destination is a topic.
1028     * <P>
1029     * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1030     * <CODE>Destination</CODE>, they can be used in the destination
1031     * parameter to create a <CODE>MessageConsumer</CODE>.
1032     * <P>
1033     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1034     * that have been published to a destination.
1035     * <P>
1036     * In some cases, a connection may both publish and subscribe to a topic.
1037     * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1038     * inhibit the delivery of messages published by its own connection. The
1039     * default value for this attribute is False. The <CODE>noLocal</CODE>
1040     * value must be supported by destinations that are topics.
1041     * 
1042     * @param destination the <CODE>Destination</CODE> to access
1043     * @param messageSelector only messages with properties matching the message
1044     *                selector expression are delivered. A value of null or an
1045     *                empty string indicates that there is no message selector
1046     *                for the message consumer.
1047     * @param noLocal - if true, and the destination is a topic, inhibits the
1048     *                delivery of messages published by its own connection. The
1049     *                behavior for <CODE>NoLocal</CODE> is not specified if
1050     *                the destination is a queue.
1051     * @return the MessageConsumer
1052     * @throws JMSException if the session fails to create a MessageConsumer due
1053     *                 to some internal error.
1054     * @throws InvalidDestinationException if an invalid destination is
1055     *                 specified.
1056     * @throws InvalidSelectorException if the message selector is invalid.
1057     * @since 1.1
1058     */
1059    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
1060        return createConsumer(destination, messageSelector, noLocal, null);
1061    }
1062
1063    /**
1064     * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1065     * using a message selector. This method can specify whether messages
1066     * published by its own connection should be delivered to it, if the
1067     * destination is a topic.
1068     * <P>
1069     * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1070     * <CODE>Destination</CODE>, they can be used in the destination
1071     * parameter to create a <CODE>MessageConsumer</CODE>.
1072     * <P>
1073     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1074     * that have been published to a destination.
1075     * <P>
1076     * In some cases, a connection may both publish and subscribe to a topic.
1077     * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1078     * inhibit the delivery of messages published by its own connection. The
1079     * default value for this attribute is False. The <CODE>noLocal</CODE>
1080     * value must be supported by destinations that are topics.
1081     *
1082     * @param destination the <CODE>Destination</CODE> to access
1083     * @param messageSelector only messages with properties matching the message
1084     *                selector expression are delivered. A value of null or an
1085     *                empty string indicates that there is no message selector
1086     *                for the message consumer.
1087     * @param noLocal - if true, and the destination is a topic, inhibits the
1088     *                delivery of messages published by its own connection. The
1089     *                behavior for <CODE>NoLocal</CODE> is not specified if
1090     *                the destination is a queue.
1091     * @param messageListener the listener to use for async consumption of messages
1092     * @return the MessageConsumer
1093     * @throws JMSException if the session fails to create a MessageConsumer due
1094     *                 to some internal error.
1095     * @throws InvalidDestinationException if an invalid destination is
1096     *                 specified.
1097     * @throws InvalidSelectorException if the message selector is invalid.
1098     * @since 1.1
1099     */
1100    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1101        checkClosed();
1102
1103        if (destination instanceof CustomDestination) {
1104            CustomDestination customDestination = (CustomDestination)destination;
1105            return customDestination.createConsumer(this, messageSelector, noLocal);
1106        }
1107
1108        ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1109        int prefetch = 0;
1110        if (destination instanceof Topic) {
1111            prefetch = prefetchPolicy.getTopicPrefetch();
1112        } else {
1113            prefetch = prefetchPolicy.getQueuePrefetch();
1114        }
1115        ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1116        return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1117                prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1118    }
1119
1120    /**
1121     * Creates a queue identity given a <CODE>Queue</CODE> name.
1122     * <P>
1123     * This facility is provided for the rare cases where clients need to
1124     * dynamically manipulate queue identity. It allows the creation of a queue
1125     * identity with a provider-specific name. Clients that depend on this
1126     * ability are not portable.
1127     * <P>
1128     * Note that this method is not for creating the physical queue. The
1129     * physical creation of queues is an administrative task and is not to be
1130     * initiated by the JMS API. The one exception is the creation of temporary
1131     * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1132     * method.
1133     * 
1134     * @param queueName the name of this <CODE>Queue</CODE>
1135     * @return a <CODE>Queue</CODE> with the given name
1136     * @throws JMSException if the session fails to create a queue due to some
1137     *                 internal error.
1138     * @since 1.1
1139     */
1140    public Queue createQueue(String queueName) throws JMSException {
1141        checkClosed();
1142        if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1143            return new ActiveMQTempQueue(queueName);
1144        }
1145        return new ActiveMQQueue(queueName);
1146    }
1147
1148    /**
1149     * Creates a topic identity given a <CODE>Topic</CODE> name.
1150     * <P>
1151     * This facility is provided for the rare cases where clients need to
1152     * dynamically manipulate topic identity. This allows the creation of a
1153     * topic identity with a provider-specific name. Clients that depend on this
1154     * ability are not portable.
1155     * <P>
1156     * Note that this method is not for creating the physical topic. The
1157     * physical creation of topics is an administrative task and is not to be
1158     * initiated by the JMS API. The one exception is the creation of temporary
1159     * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1160     * method.
1161     * 
1162     * @param topicName the name of this <CODE>Topic</CODE>
1163     * @return a <CODE>Topic</CODE> with the given name
1164     * @throws JMSException if the session fails to create a topic due to some
1165     *                 internal error.
1166     * @since 1.1
1167     */
1168    public Topic createTopic(String topicName) throws JMSException {
1169        checkClosed();
1170        if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1171            return new ActiveMQTempTopic(topicName);
1172        }
1173        return new ActiveMQTopic(topicName);
1174    }
1175
1176    /**
1177     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1178     * the specified queue.
1179     * 
1180     * @param queue the <CODE>queue</CODE> to access
1181     * @exception InvalidDestinationException if an invalid destination is
1182     *                    specified
1183     * @since 1.1
1184     */
1185    /**
1186     * Creates a durable subscriber to the specified topic.
1187     * <P>
1188     * If a client needs to receive all the messages published on a topic,
1189     * including the ones published while the subscriber is inactive, it uses a
1190     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1191     * record of this durable subscription and insures that all messages from
1192     * the topic's publishers are retained until they are acknowledged by this
1193     * durable subscriber or they have expired.
1194     * <P>
1195     * Sessions with durable subscribers must always provide the same client
1196     * identifier. In addition, each client must specify a name that uniquely
1197     * identifies (within client identifier) each durable subscription it
1198     * creates. Only one session at a time can have a
1199     * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1200     * <P>
1201     * A client can change an existing durable subscription by creating a
1202     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1203     * and/or message selector. Changing a durable subscriber is equivalent to
1204     * unsubscribing (deleting) the old one and creating a new one.
1205     * <P>
1206     * In some cases, a connection may both publish and subscribe to a topic.
1207     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1208     * inhibit the delivery of messages published by its own connection. The
1209     * default value for this attribute is false.
1210     * 
1211     * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1212     * @param name the name used to identify this subscription
1213     * @return the TopicSubscriber
1214     * @throws JMSException if the session fails to create a subscriber due to
1215     *                 some internal error.
1216     * @throws InvalidDestinationException if an invalid topic is specified.
1217     * @since 1.1
1218     */
1219    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1220        checkClosed();
1221        return createDurableSubscriber(topic, name, null, false);
1222    }
1223
1224    /**
1225     * Creates a durable subscriber to the specified topic, using a message
1226     * selector and specifying whether messages published by its own connection
1227     * should be delivered to it.
1228     * <P>
1229     * If a client needs to receive all the messages published on a topic,
1230     * including the ones published while the subscriber is inactive, it uses a
1231     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1232     * record of this durable subscription and insures that all messages from
1233     * the topic's publishers are retained until they are acknowledged by this
1234     * durable subscriber or they have expired.
1235     * <P>
1236     * Sessions with durable subscribers must always provide the same client
1237     * identifier. In addition, each client must specify a name which uniquely
1238     * identifies (within client identifier) each durable subscription it
1239     * creates. Only one session at a time can have a
1240     * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1241     * inactive durable subscriber is one that exists but does not currently
1242     * have a message consumer associated with it.
1243     * <P>
1244     * A client can change an existing durable subscription by creating a
1245     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1246     * and/or message selector. Changing a durable subscriber is equivalent to
1247     * unsubscribing (deleting) the old one and creating a new one.
1248     * 
1249     * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1250     * @param name the name used to identify this subscription
1251     * @param messageSelector only messages with properties matching the message
1252     *                selector expression are delivered. A value of null or an
1253     *                empty string indicates that there is no message selector
1254     *                for the message consumer.
1255     * @param noLocal if set, inhibits the delivery of messages published by its
1256     *                own connection
1257     * @return the Queue Browser
1258     * @throws JMSException if the session fails to create a subscriber due to
1259     *                 some internal error.
1260     * @throws InvalidDestinationException if an invalid topic is specified.
1261     * @throws InvalidSelectorException if the message selector is invalid.
1262     * @since 1.1
1263     */
1264    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1265        checkClosed();
1266
1267        if (topic instanceof CustomDestination) {
1268            CustomDestination customDestination = (CustomDestination)topic;
1269            return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1270        }
1271
1272        connection.checkClientIDWasManuallySpecified();
1273        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1274        int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1275        int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1276        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1277                                           noLocal, false, asyncDispatch);
1278    }
1279
1280    /**
1281     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1282     * the specified queue.
1283     * 
1284     * @param queue the <CODE>queue</CODE> to access
1285     * @return the Queue Browser
1286     * @throws JMSException if the session fails to create a browser due to some
1287     *                 internal error.
1288     * @throws InvalidDestinationException if an invalid destination is
1289     *                 specified
1290     * @since 1.1
1291     */
1292    public QueueBrowser createBrowser(Queue queue) throws JMSException {
1293        checkClosed();
1294        return createBrowser(queue, null);
1295    }
1296
1297    /**
1298     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1299     * the specified queue using a message selector.
1300     * 
1301     * @param queue the <CODE>queue</CODE> to access
1302     * @param messageSelector only messages with properties matching the message
1303     *                selector expression are delivered. A value of null or an
1304     *                empty string indicates that there is no message selector
1305     *                for the message consumer.
1306     * @return the Queue Browser
1307     * @throws JMSException if the session fails to create a browser due to some
1308     *                 internal error.
1309     * @throws InvalidDestinationException if an invalid destination is
1310     *                 specified
1311     * @throws InvalidSelectorException if the message selector is invalid.
1312     * @since 1.1
1313     */
1314    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1315        checkClosed();
1316        return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1317    }
1318
1319    /**
1320     * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1321     * of the <CODE>Connection</CODE> unless it is deleted earlier.
1322     * 
1323     * @return a temporary queue identity
1324     * @throws JMSException if the session fails to create a temporary queue due
1325     *                 to some internal error.
1326     * @since 1.1
1327     */
1328    public TemporaryQueue createTemporaryQueue() throws JMSException {
1329        checkClosed();
1330        return (TemporaryQueue)connection.createTempDestination(false);
1331    }
1332
1333    /**
1334     * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1335     * of the <CODE>Connection</CODE> unless it is deleted earlier.
1336     * 
1337     * @return a temporary topic identity
1338     * @throws JMSException if the session fails to create a temporary topic due
1339     *                 to some internal error.
1340     * @since 1.1
1341     */
1342    public TemporaryTopic createTemporaryTopic() throws JMSException {
1343        checkClosed();
1344        return (TemporaryTopic)connection.createTempDestination(true);
1345    }
1346
1347    /**
1348     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1349     * the specified queue.
1350     * 
1351     * @param queue the <CODE>Queue</CODE> to access
1352     * @return
1353     * @throws JMSException if the session fails to create a receiver due to
1354     *                 some internal error.
1355     * @throws JMSException
1356     * @throws InvalidDestinationException if an invalid queue is specified.
1357     */
1358    public QueueReceiver createReceiver(Queue queue) throws JMSException {
1359        checkClosed();
1360        return createReceiver(queue, null);
1361    }
1362
1363    /**
1364     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1365     * the specified queue using a message selector.
1366     * 
1367     * @param queue the <CODE>Queue</CODE> to access
1368     * @param messageSelector only messages with properties matching the message
1369     *                selector expression are delivered. A value of null or an
1370     *                empty string indicates that there is no message selector
1371     *                for the message consumer.
1372     * @return QueueReceiver
1373     * @throws JMSException if the session fails to create a receiver due to
1374     *                 some internal error.
1375     * @throws InvalidDestinationException if an invalid queue is specified.
1376     * @throws InvalidSelectorException if the message selector is invalid.
1377     */
1378    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1379        checkClosed();
1380
1381        if (queue instanceof CustomDestination) {
1382            CustomDestination customDestination = (CustomDestination)queue;
1383            return customDestination.createReceiver(this, messageSelector);
1384        }
1385
1386        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1387        return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1388                                         prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1389    }
1390
1391    /**
1392     * Creates a <CODE>QueueSender</CODE> object to send messages to the
1393     * specified queue.
1394     * 
1395     * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1396     *                unidentified producer
1397     * @return QueueSender
1398     * @throws JMSException if the session fails to create a sender due to some
1399     *                 internal error.
1400     * @throws InvalidDestinationException if an invalid queue is specified.
1401     */
1402    public QueueSender createSender(Queue queue) throws JMSException {
1403        checkClosed();
1404        if (queue instanceof CustomDestination) {
1405            CustomDestination customDestination = (CustomDestination)queue;
1406            return customDestination.createSender(this);
1407        }
1408        int timeSendOut = connection.getSendTimeout();
1409        return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1410    }
1411
1412    /**
1413     * Creates a nondurable subscriber to the specified topic. <p/>
1414     * <P>
1415     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1416     * that have been published to a topic. <p/>
1417     * <P>
1418     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1419     * receive only messages that are published while they are active. <p/>
1420     * <P>
1421     * In some cases, a connection may both publish and subscribe to a topic.
1422     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1423     * inhibit the delivery of messages published by its own connection. The
1424     * default value for this attribute is false.
1425     * 
1426     * @param topic the <CODE>Topic</CODE> to subscribe to
1427     * @return TopicSubscriber
1428     * @throws JMSException if the session fails to create a subscriber due to
1429     *                 some internal error.
1430     * @throws InvalidDestinationException if an invalid topic is specified.
1431     */
1432    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1433        checkClosed();
1434        return createSubscriber(topic, null, false);
1435    }
1436
1437    /**
1438     * Creates a nondurable subscriber to the specified topic, using a message
1439     * selector or specifying whether messages published by its own connection
1440     * should be delivered to it. <p/>
1441     * <P>
1442     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1443     * that have been published to a topic. <p/>
1444     * <P>
1445     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1446     * receive only messages that are published while they are active. <p/>
1447     * <P>
1448     * Messages filtered out by a subscriber's message selector will never be
1449     * delivered to the subscriber. From the subscriber's perspective, they do
1450     * not exist. <p/>
1451     * <P>
1452     * In some cases, a connection may both publish and subscribe to a topic.
1453     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1454     * inhibit the delivery of messages published by its own connection. The
1455     * default value for this attribute is false.
1456     * 
1457     * @param topic the <CODE>Topic</CODE> to subscribe to
1458     * @param messageSelector only messages with properties matching the message
1459     *                selector expression are delivered. A value of null or an
1460     *                empty string indicates that there is no message selector
1461     *                for the message consumer.
1462     * @param noLocal if set, inhibits the delivery of messages published by its
1463     *                own connection
1464     * @return TopicSubscriber
1465     * @throws JMSException if the session fails to create a subscriber due to
1466     *                 some internal error.
1467     * @throws InvalidDestinationException if an invalid topic is specified.
1468     * @throws InvalidSelectorException if the message selector is invalid.
1469     */
1470    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1471        checkClosed();
1472
1473        if (topic instanceof CustomDestination) {
1474            CustomDestination customDestination = (CustomDestination)topic;
1475            return customDestination.createSubscriber(this, messageSelector, noLocal);
1476        }
1477
1478        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1479        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1480            .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1481    }
1482
1483    /**
1484     * Creates a publisher for the specified topic. <p/>
1485     * <P>
1486     * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1487     * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1488     * a topic, it defines a new sequence of messages that have no ordering
1489     * relationship with the messages it has previously sent.
1490     * 
1491     * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1492     *                an unidentified producer
1493     * @return TopicPublisher
1494     * @throws JMSException if the session fails to create a publisher due to
1495     *                 some internal error.
1496     * @throws InvalidDestinationException if an invalid topic is specified.
1497     */
1498    public TopicPublisher createPublisher(Topic topic) throws JMSException {
1499        checkClosed();
1500
1501        if (topic instanceof CustomDestination) {
1502            CustomDestination customDestination = (CustomDestination)topic;
1503            return customDestination.createPublisher(this);
1504        }
1505        int timeSendOut = connection.getSendTimeout();
1506        return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1507    }
1508
1509    /**
1510     * Unsubscribes a durable subscription that has been created by a client.
1511     * <P>
1512     * This method deletes the state being maintained on behalf of the
1513     * subscriber by its provider.
1514     * <P>
1515     * It is erroneous for a client to delete a durable subscription while there
1516     * is an active <CODE>MessageConsumer </CODE> or
1517     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1518     * message is part of a pending transaction or has not been acknowledged in
1519     * the session.
1520     * 
1521     * @param name the name used to identify this subscription
1522     * @throws JMSException if the session fails to unsubscribe to the durable
1523     *                 subscription due to some internal error.
1524     * @throws InvalidDestinationException if an invalid subscription name is
1525     *                 specified.
1526     * @since 1.1
1527     */
1528    public void unsubscribe(String name) throws JMSException {
1529        checkClosed();
1530        connection.unsubscribe(name);
1531    }
1532
1533    public void dispatch(MessageDispatch messageDispatch) {
1534        try {
1535            executor.execute(messageDispatch);
1536        } catch (InterruptedException e) {
1537            Thread.currentThread().interrupt();
1538            connection.onClientInternalException(e);
1539        }
1540    }
1541
1542    /**
1543     * Acknowledges all consumed messages of the session of this consumed
1544     * message.
1545     * <P>
1546     * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1547     * for use when a client has specified that its JMS session's consumed
1548     * messages are to be explicitly acknowledged. By invoking
1549     * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1550     * all messages consumed by the session that the message was delivered to.
1551     * <P>
1552     * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1553     * sessions and sessions specified to use implicit acknowledgement modes.
1554     * <P>
1555     * A client may individually acknowledge each message as it is consumed, or
1556     * it may choose to acknowledge messages as an application-defined group
1557     * (which is done by calling acknowledge on the last received message of the
1558     * group, thereby acknowledging all messages consumed by the session.)
1559     * <P>
1560     * Messages that have been received but not acknowledged may be redelivered.
1561     * 
1562     * @throws JMSException if the JMS provider fails to acknowledge the
1563     *                 messages due to some internal error.
1564     * @throws javax.jms.IllegalStateException if this method is called on a
1565     *                 closed session.
1566     * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1567     */
1568    public void acknowledge() throws JMSException {
1569        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1570            ActiveMQMessageConsumer c = iter.next();
1571            c.acknowledge();
1572        }
1573    }
1574
1575    /**
1576     * Add a message consumer.
1577     * 
1578     * @param consumer - message consumer.
1579     * @throws JMSException
1580     */
1581    protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1582        this.consumers.add(consumer);
1583        if (consumer.isDurableSubscriber()) {
1584            stats.onCreateDurableSubscriber();
1585        }
1586        this.connection.addDispatcher(consumer.getConsumerId(), this);
1587    }
1588
1589    /**
1590     * Remove the message consumer.
1591     * 
1592     * @param consumer - consumer to be removed.
1593     * @throws JMSException
1594     */
1595    protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1596        this.connection.removeDispatcher(consumer.getConsumerId());
1597        if (consumer.isDurableSubscriber()) {
1598            stats.onRemoveDurableSubscriber();
1599        }
1600        this.consumers.remove(consumer);
1601        this.connection.removeDispatcher(consumer);
1602    }
1603
1604    /**
1605     * Adds a message producer.
1606     * 
1607     * @param producer - message producer to be added.
1608     * @throws JMSException
1609     */
1610    protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1611        this.producers.add(producer);
1612        this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1613    }
1614
1615    /**
1616     * Removes a message producer.
1617     * 
1618     * @param producer - message producer to be removed.
1619     * @throws JMSException
1620     */
1621    protected void removeProducer(ActiveMQMessageProducer producer) {
1622        this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1623        this.producers.remove(producer);
1624    }
1625
1626    /**
1627     * Start this Session.
1628     * 
1629     * @throws JMSException
1630     */
1631    protected void start() throws JMSException {
1632        started.set(true);
1633        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1634            ActiveMQMessageConsumer c = iter.next();
1635            c.start();
1636        }
1637        executor.start();
1638    }
1639
1640    /**
1641     * Stops this session.
1642     * 
1643     * @throws JMSException
1644     */
1645    protected void stop() throws JMSException {
1646
1647        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1648            ActiveMQMessageConsumer c = iter.next();
1649            c.stop();
1650        }
1651
1652        started.set(false);
1653        executor.stop();
1654    }
1655
1656    /**
1657     * Returns the session id.
1658     * 
1659     * @return value - session id.
1660     */
1661    protected SessionId getSessionId() {
1662        return info.getSessionId();
1663    }
1664
1665    /**
1666     * @return
1667     */
1668    protected ConsumerId getNextConsumerId() {
1669        return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1670    }
1671
1672    /**
1673     * @return
1674     */
1675    protected ProducerId getNextProducerId() {
1676        return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1677    }
1678
1679    /**
1680     * Sends the message for dispatch by the broker.
1681     * 
1682     * @param producer - message producer.
1683     * @param destination - message destination.
1684     * @param message - message to be sent.
1685     * @param deliveryMode - JMS messsage delivery mode.
1686     * @param priority - message priority.
1687     * @param timeToLive - message expiration.
1688     * @param producerWindow
1689     * @throws JMSException
1690     */
1691    protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1692                        MemoryUsage producerWindow, int sendTimeout) throws JMSException {
1693
1694        checkClosed();
1695        if (destination.isTemporary() && connection.isDeleted(destination)) {
1696            throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1697        }
1698        synchronized (sendMutex) {
1699            // tell the Broker we are about to start a new transaction
1700            doStartTransaction();
1701            TransactionId txid = transactionContext.getTransactionId();
1702            long sequenceNumber = producer.getMessageSequence();
1703
1704            //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
1705            message.setJMSDeliveryMode(deliveryMode);
1706            long expiration = 0L;
1707            if (!producer.getDisableMessageTimestamp()) {
1708                long timeStamp = System.currentTimeMillis();
1709                message.setJMSTimestamp(timeStamp);
1710                if (timeToLive > 0) {
1711                    expiration = timeToLive + timeStamp;
1712                }
1713            }
1714            message.setJMSExpiration(expiration);
1715            message.setJMSPriority(priority);
1716            message.setJMSRedelivered(false);
1717
1718            // transform to our own message format here
1719            ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1720
1721            // Set the message id.
1722            if (msg == message) {
1723                msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1724            } else {
1725                msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1726                message.setJMSMessageID(msg.getMessageId().toString());
1727            }
1728            //clear the brokerPath in case we are re-sending this message
1729            msg.setBrokerPath(null);
1730            // destination format is provider specific so only set on transformed message
1731            msg.setJMSDestination(destination);
1732
1733            msg.setTransactionId(txid);
1734            if (connection.isCopyMessageOnSend()) {
1735                msg = (ActiveMQMessage)msg.copy();
1736            }
1737            msg.setConnection(connection);
1738            msg.onSend();
1739            msg.setProducerId(msg.getMessageId().getProducerId());
1740            if (LOG.isTraceEnabled()) {
1741                LOG.trace(getSessionId() + " sending message: " + msg);
1742            }
1743            if (sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1744                this.connection.asyncSendPacket(msg);
1745                if (producerWindow != null) {
1746                    // Since we defer lots of the marshaling till we hit the
1747                    // wire, this might not
1748                    // provide and accurate size. We may change over to doing
1749                    // more aggressive marshaling,
1750                    // to get more accurate sizes.. this is more important once
1751                    // users start using producer window
1752                    // flow control.
1753                    int size = msg.getSize();
1754                    producerWindow.increaseUsage(size);
1755                }
1756            } else {
1757                if (sendTimeout > 0) {
1758                    this.connection.syncSendPacket(msg,sendTimeout);
1759                }else {
1760                    this.connection.syncSendPacket(msg);
1761                }
1762            }
1763
1764        }
1765    }
1766
1767    /**
1768     * Send TransactionInfo to indicate transaction has started
1769     * 
1770     * @throws JMSException if some internal error occurs
1771     */
1772    protected void doStartTransaction() throws JMSException {
1773        if (getTransacted() && !transactionContext.isInXATransaction()) {
1774            transactionContext.begin();
1775        }
1776    }
1777
1778    /**
1779     * Checks whether the session has unconsumed messages.
1780     * 
1781     * @return true - if there are unconsumed messages.
1782     */
1783    public boolean hasUncomsumedMessages() {
1784        return executor.hasUncomsumedMessages();
1785    }
1786
1787    /**
1788     * Checks whether the session uses transactions.
1789     * 
1790     * @return true - if the session uses transactions.
1791     */
1792    public boolean isTransacted() {
1793        return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
1794    }
1795
1796    /**
1797     * Checks whether the session used client acknowledgment.
1798     * 
1799     * @return true - if the session uses client acknowledgment.
1800     */
1801    protected boolean isClientAcknowledge() {
1802        return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
1803    }
1804
1805    /**
1806     * Checks whether the session used auto acknowledgment.
1807     * 
1808     * @return true - if the session uses client acknowledgment.
1809     */
1810    public boolean isAutoAcknowledge() {
1811        return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
1812    }
1813
1814    /**
1815     * Checks whether the session used dup ok acknowledgment.
1816     * 
1817     * @return true - if the session uses client acknowledgment.
1818     */
1819    public boolean isDupsOkAcknowledge() {
1820        return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
1821    }
1822    
1823    public boolean isIndividualAcknowledge(){
1824        return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
1825    }
1826
1827    /**
1828     * Returns the message delivery listener.
1829     * 
1830     * @return deliveryListener - message delivery listener.
1831     */
1832    public DeliveryListener getDeliveryListener() {
1833        return deliveryListener;
1834    }
1835
1836    /**
1837     * Sets the message delivery listener.
1838     * 
1839     * @param deliveryListener - message delivery listener.
1840     */
1841    public void setDeliveryListener(DeliveryListener deliveryListener) {
1842        this.deliveryListener = deliveryListener;
1843    }
1844
1845    /**
1846     * Returns the SessionInfo bean.
1847     * 
1848     * @return info - SessionInfo bean.
1849     * @throws JMSException
1850     */
1851    protected SessionInfo getSessionInfo() throws JMSException {
1852        SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
1853        return info;
1854    }
1855
1856    /**
1857     * Send the asynchronus command.
1858     * 
1859     * @param command - command to be executed.
1860     * @throws JMSException
1861     */
1862    public void asyncSendPacket(Command command) throws JMSException {
1863        connection.asyncSendPacket(command);
1864    }
1865
1866    /**
1867     * Send the synchronus command.
1868     * 
1869     * @param command - command to be executed.
1870     * @return Response
1871     * @throws JMSException
1872     */
1873    public Response syncSendPacket(Command command) throws JMSException {
1874        return connection.syncSendPacket(command);
1875    }
1876
1877    public long getNextDeliveryId() {
1878        return deliveryIdGenerator.getNextSequenceId();
1879    }
1880
1881    public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
1882
1883        List<MessageDispatch> c = unconsumedMessages.removeAll();
1884        for (MessageDispatch md : c) {
1885            this.connection.rollbackDuplicate(dispatcher, md.getMessage());
1886        }
1887        Collections.reverse(c);
1888
1889        for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
1890            MessageDispatch md = iter.next();
1891            executor.executeFirst(md);
1892        }
1893
1894    }
1895
1896    public boolean isRunning() {
1897        return started.get();
1898    }
1899
1900    public boolean isAsyncDispatch() {
1901        return asyncDispatch;
1902    }
1903
1904    public void setAsyncDispatch(boolean asyncDispatch) {
1905        this.asyncDispatch = asyncDispatch;
1906    }
1907
1908    /**
1909     * @return Returns the sessionAsyncDispatch.
1910     */
1911    public boolean isSessionAsyncDispatch() {
1912        return sessionAsyncDispatch;
1913    }
1914
1915    /**
1916     * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
1917     */
1918    public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
1919        this.sessionAsyncDispatch = sessionAsyncDispatch;
1920    }
1921
1922    public MessageTransformer getTransformer() {
1923        return transformer;
1924    }
1925
1926    public ActiveMQConnection getConnection() {
1927        return connection;
1928    }
1929
1930    /**
1931     * Sets the transformer used to transform messages before they are sent on
1932     * to the JMS bus or when they are received from the bus but before they are
1933     * delivered to the JMS client
1934     */
1935    public void setTransformer(MessageTransformer transformer) {
1936        this.transformer = transformer;
1937    }
1938
1939    public BlobTransferPolicy getBlobTransferPolicy() {
1940        return blobTransferPolicy;
1941    }
1942
1943    /**
1944     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1945     * OBjects) are transferred from producers to brokers to consumers
1946     */
1947    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1948        this.blobTransferPolicy = blobTransferPolicy;
1949    }
1950
1951    public List getUnconsumedMessages() {
1952        return executor.getUnconsumedMessages();
1953    }
1954
1955    @Override
1956    public String toString() {
1957        return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
1958    }
1959
1960    public void checkMessageListener() throws JMSException {
1961        if (messageListener != null) {
1962            throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1963        }
1964        for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
1965            ActiveMQMessageConsumer consumer = i.next();
1966            if (consumer.getMessageListener() != null) {
1967                throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1968            }
1969        }
1970    }
1971
1972    protected void setOptimizeAcknowledge(boolean value) {
1973        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1974            ActiveMQMessageConsumer c = iter.next();
1975            c.setOptimizeAcknowledge(value);
1976        }
1977    }
1978
1979    protected void setPrefetchSize(ConsumerId id, int prefetch) {
1980        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1981            ActiveMQMessageConsumer c = iter.next();
1982            if (c.getConsumerId().equals(id)) {
1983                c.setPrefetchSize(prefetch);
1984                break;
1985            }
1986        }
1987    }
1988
1989    protected void close(ConsumerId id) {
1990        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1991            ActiveMQMessageConsumer c = iter.next();
1992            if (c.getConsumerId().equals(id)) {
1993                try {
1994                    c.close();
1995                } catch (JMSException e) {
1996                    LOG.warn("Exception closing consumer", e);
1997                }
1998                LOG.warn("Closed consumer on Command");
1999                break;
2000            }
2001        }
2002    }
2003
2004    public boolean isInUse(ActiveMQTempDestination destination) {
2005        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2006            ActiveMQMessageConsumer c = iter.next();
2007            if (c.isInUse(destination)) {
2008                return true;
2009            }
2010        }
2011        return false;
2012    }
2013    
2014    /**
2015     * highest sequence id of the last message delivered by this session.
2016     * Passed to the broker in the close command, maintained by dispose()
2017     * @return lastDeliveredSequenceId
2018     */
2019    public long getLastDeliveredSequenceId() {
2020        return lastDeliveredSequenceId;
2021    }
2022    
2023    protected void sendAck(MessageAck ack) throws JMSException {
2024        sendAck(ack,false);
2025    }
2026    
2027    protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
2028        if (lazy || connection.isSendAcksAsync() || getTransacted()) {
2029            asyncSendPacket(ack);
2030        } else {
2031            syncSendPacket(ack);
2032        }
2033    }
2034    
2035    protected Scheduler getScheduler() {
2036        return this.scheduler;
2037    }
2038    
2039    protected ThreadPoolExecutor getConnectionExecutor() {
2040        return this.connectionExecutor;
2041    }
2042}