001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.io.OutputStream;
022import java.net.URI;
023import java.net.URISyntaxException;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.CopyOnWriteArrayList;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.ThreadFactory;
032import java.util.concurrent.ThreadPoolExecutor;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036import javax.jms.Connection;
037import javax.jms.ConnectionConsumer;
038import javax.jms.ConnectionMetaData;
039import javax.jms.DeliveryMode;
040import javax.jms.Destination;
041import javax.jms.ExceptionListener;
042import javax.jms.IllegalStateException;
043import javax.jms.InvalidDestinationException;
044import javax.jms.JMSException;
045import javax.jms.Queue;
046import javax.jms.QueueConnection;
047import javax.jms.QueueSession;
048import javax.jms.ServerSessionPool;
049import javax.jms.Session;
050import javax.jms.Topic;
051import javax.jms.TopicConnection;
052import javax.jms.TopicSession;
053import javax.jms.XAConnection;
054import org.apache.activemq.advisory.DestinationSource;
055import org.apache.activemq.blob.BlobTransferPolicy;
056import org.apache.activemq.command.ActiveMQDestination;
057import org.apache.activemq.command.ActiveMQMessage;
058import org.apache.activemq.command.ActiveMQTempDestination;
059import org.apache.activemq.command.ActiveMQTempQueue;
060import org.apache.activemq.command.ActiveMQTempTopic;
061import org.apache.activemq.command.BrokerInfo;
062import org.apache.activemq.command.Command;
063import org.apache.activemq.command.CommandTypes;
064import org.apache.activemq.command.ConnectionControl;
065import org.apache.activemq.command.ConnectionError;
066import org.apache.activemq.command.ConnectionId;
067import org.apache.activemq.command.ConnectionInfo;
068import org.apache.activemq.command.ConsumerControl;
069import org.apache.activemq.command.ConsumerId;
070import org.apache.activemq.command.ConsumerInfo;
071import org.apache.activemq.command.ControlCommand;
072import org.apache.activemq.command.DestinationInfo;
073import org.apache.activemq.command.ExceptionResponse;
074import org.apache.activemq.command.Message;
075import org.apache.activemq.command.MessageDispatch;
076import org.apache.activemq.command.MessageId;
077import org.apache.activemq.command.ProducerAck;
078import org.apache.activemq.command.ProducerId;
079import org.apache.activemq.command.RemoveInfo;
080import org.apache.activemq.command.RemoveSubscriptionInfo;
081import org.apache.activemq.command.Response;
082import org.apache.activemq.command.SessionId;
083import org.apache.activemq.command.ShutdownInfo;
084import org.apache.activemq.command.WireFormatInfo;
085import org.apache.activemq.management.JMSConnectionStatsImpl;
086import org.apache.activemq.management.JMSStatsImpl;
087import org.apache.activemq.management.StatsCapable;
088import org.apache.activemq.management.StatsImpl;
089import org.apache.activemq.state.CommandVisitorAdapter;
090import org.apache.activemq.thread.Scheduler;
091import org.apache.activemq.thread.TaskRunnerFactory;
092import org.apache.activemq.transport.Transport;
093import org.apache.activemq.transport.TransportListener;
094import org.apache.activemq.transport.failover.FailoverTransport;
095import org.apache.activemq.util.IdGenerator;
096import org.apache.activemq.util.IntrospectionSupport;
097import org.apache.activemq.util.JMSExceptionSupport;
098import org.apache.activemq.util.LongSequenceGenerator;
099import org.apache.activemq.util.ServiceSupport;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
104
105    public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
106    public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
107    public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
108
109    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
110    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
111
112    public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
113
114    protected boolean dispatchAsync=true;
115    protected boolean alwaysSessionAsync = true;
116
117    private TaskRunnerFactory sessionTaskRunner;
118    private final ThreadPoolExecutor executor;
119
120    // Connection state variables
121    private final ConnectionInfo info;
122    private ExceptionListener exceptionListener;
123    private ClientInternalExceptionListener clientInternalExceptionListener;
124    private boolean clientIDSet;
125    private boolean isConnectionInfoSentToBroker;
126    private boolean userSpecifiedClientID;
127
128    // Configuration options variables
129    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
130    private BlobTransferPolicy blobTransferPolicy;
131    private RedeliveryPolicy redeliveryPolicy;
132    private MessageTransformer transformer;
133
134    private boolean disableTimeStampsByDefault;
135    private boolean optimizedMessageDispatch = true;
136    private boolean copyMessageOnSend = true;
137    private boolean useCompression;
138    private boolean objectMessageSerializationDefered;
139    private boolean useAsyncSend;
140    private boolean optimizeAcknowledge;
141    private boolean nestedMapAndListEnabled = true;
142    private boolean useRetroactiveConsumer;
143    private boolean exclusiveConsumer;
144    private boolean alwaysSyncSend;
145    private int closeTimeout = 15000;
146    private boolean watchTopicAdvisories = true;
147    private long warnAboutUnstartedConnectionTimeout = 500L;
148    private int sendTimeout =0;
149    private boolean sendAcksAsync=true;
150    private boolean checkForDuplicates = true;
151
152    private final Transport transport;
153    private final IdGenerator clientIdGenerator;
154    private final JMSStatsImpl factoryStats;
155    private final JMSConnectionStatsImpl stats;
156
157    private final AtomicBoolean started = new AtomicBoolean(false);
158    private final AtomicBoolean closing = new AtomicBoolean(false);
159    private final AtomicBoolean closed = new AtomicBoolean(false);
160    private final AtomicBoolean transportFailed = new AtomicBoolean(false);
161    private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
162    private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
163    private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
164    private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
165    private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
166
167    // Maps ConsumerIds to ActiveMQConsumer objects
168    private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
169    private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
170    private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
171    private final SessionId connectionSessionId;
172    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
173    private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
174    private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
175    private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
176
177    private AdvisoryConsumer advisoryConsumer;
178    private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
179    private BrokerInfo brokerInfo;
180    private IOException firstFailureError;
181    private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
182
183    // Assume that protocol is the latest. Change to the actual protocol
184    // version when a WireFormatInfo is received.
185    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
186    private final long timeCreated;
187    private final ConnectionAudit connectionAudit = new ConnectionAudit();
188    private DestinationSource destinationSource;
189    private final Object ensureConnectionInfoSentMutex = new Object();
190    private boolean useDedicatedTaskRunner;
191    protected volatile CountDownLatch transportInterruptionProcessingComplete;
192    private long consumerFailoverRedeliveryWaitPeriod;
193    private final Scheduler scheduler;
194    private boolean messagePrioritySupported=true;
195
196    /**
197     * Construct an <code>ActiveMQConnection</code>
198     * 
199     * @param transport
200     * @param factoryStats
201     * @throws Exception
202     */
203    protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
204
205        this.transport = transport;
206        this.clientIdGenerator = clientIdGenerator;
207        this.factoryStats = factoryStats;
208
209        // Configure a single threaded executor who's core thread can timeout if
210        // idle
211        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
212            public Thread newThread(Runnable r) {
213                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
214                thread.setDaemon(true);
215                return thread;
216            }
217        });
218        // asyncConnectionThread.allowCoreThreadTimeOut(true);
219        String uniqueId = CONNECTION_ID_GENERATOR.generateId();
220        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
221        this.info.setManageable(true);
222        this.info.setFaultTolerant(transport.isFaultTolerant());
223        this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
224
225        this.transport.setTransportListener(this);
226
227        this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
228        this.factoryStats.addConnection(this);
229        this.timeCreated = System.currentTimeMillis();
230        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
231        this.scheduler = new Scheduler("ActiveMQConnection["+uniqueId+"] Scheduler");
232        this.scheduler.start();
233    }
234
235    protected void setUserName(String userName) {
236        this.info.setUserName(userName);
237    }
238
239    protected void setPassword(String password) {
240        this.info.setPassword(password);
241    }
242
243    /**
244     * A static helper method to create a new connection
245     * 
246     * @return an ActiveMQConnection
247     * @throws JMSException
248     */
249    public static ActiveMQConnection makeConnection() throws JMSException {
250        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
251        return (ActiveMQConnection)factory.createConnection();
252    }
253
254    /**
255     * A static helper method to create a new connection
256     * 
257     * @param uri
258     * @return and ActiveMQConnection
259     * @throws JMSException
260     */
261    public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
262        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
263        return (ActiveMQConnection)factory.createConnection();
264    }
265
266    /**
267     * A static helper method to create a new connection
268     * 
269     * @param user
270     * @param password
271     * @param uri
272     * @return an ActiveMQConnection
273     * @throws JMSException
274     */
275    public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
276        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
277        return (ActiveMQConnection)factory.createConnection();
278    }
279
280    /**
281     * @return a number unique for this connection
282     */
283    public JMSConnectionStatsImpl getConnectionStats() {
284        return stats;
285    }
286
287    /**
288     * Creates a <CODE>Session</CODE> object.
289     * 
290     * @param transacted indicates whether the session is transacted
291     * @param acknowledgeMode indicates whether the consumer or the client will
292     *                acknowledge any messages it receives; ignored if the
293     *                session is transacted. Legal values are
294     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
295     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
296     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
297     * @return a newly created session
298     * @throws JMSException if the <CODE>Connection</CODE> object fails to
299     *                 create a session due to some internal error or lack of
300     *                 support for the specific transaction and acknowledgement
301     *                 mode.
302     * @see Session#AUTO_ACKNOWLEDGE
303     * @see Session#CLIENT_ACKNOWLEDGE
304     * @see Session#DUPS_OK_ACKNOWLEDGE
305     * @since 1.1
306     */
307    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
308        checkClosedOrFailed();
309        ensureConnectionInfoSent();
310        if(!transacted) {
311            if (acknowledgeMode==Session.SESSION_TRANSACTED) {
312                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
313            } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
314                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
315                        "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
316            }
317        }
318        return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
319            ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
320    }
321
322    /**
323     * @return sessionId
324     */
325    protected SessionId getNextSessionId() {
326        return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
327    }
328
329    /**
330     * Gets the client identifier for this connection.
331     * <P>
332     * This value is specific to the JMS provider. It is either preconfigured by
333     * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
334     * dynamically by the application by calling the <code>setClientID</code>
335     * method.
336     * 
337     * @return the unique client identifier
338     * @throws JMSException if the JMS provider fails to return the client ID
339     *                 for this connection due to some internal error.
340     */
341    public String getClientID() throws JMSException {
342        checkClosedOrFailed();
343        return this.info.getClientId();
344    }
345
346    /**
347     * Sets the client identifier for this connection.
348     * <P>
349     * The preferred way to assign a JMS client's client identifier is for it to
350     * be configured in a client-specific <CODE>ConnectionFactory</CODE>
351     * object and transparently assigned to the <CODE>Connection</CODE> object
352     * it creates.
353     * <P>
354     * Alternatively, a client can set a connection's client identifier using a
355     * provider-specific value. The facility to set a connection's client
356     * identifier explicitly is not a mechanism for overriding the identifier
357     * that has been administratively configured. It is provided for the case
358     * where no administratively specified identifier exists. If one does exist,
359     * an attempt to change it by setting it must throw an
360     * <CODE>IllegalStateException</CODE>. If a client sets the client
361     * identifier explicitly, it must do so immediately after it creates the
362     * connection and before any other action on the connection is taken. After
363     * this point, setting the client identifier is a programming error that
364     * should throw an <CODE>IllegalStateException</CODE>.
365     * <P>
366     * The purpose of the client identifier is to associate a connection and its
367     * objects with a state maintained on behalf of the client by a provider.
368     * The only such state identified by the JMS API is that required to support
369     * durable subscriptions.
370     * <P>
371     * If another connection with the same <code>clientID</code> is already
372     * running when this method is called, the JMS provider should detect the
373     * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
374     * 
375     * @param newClientID the unique client identifier
376     * @throws JMSException if the JMS provider fails to set the client ID for
377     *                 this connection due to some internal error.
378     * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
379     *                 invalid or duplicate client ID.
380     * @throws javax.jms.IllegalStateException if the JMS client attempts to set
381     *                 a connection's client ID at the wrong time or when it has
382     *                 been administratively configured.
383     */
384    public void setClientID(String newClientID) throws JMSException {
385        checkClosedOrFailed();
386
387        if (this.clientIDSet) {
388            throw new IllegalStateException("The clientID has already been set");
389        }
390
391        if (this.isConnectionInfoSentToBroker) {
392            throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
393        }
394
395        this.info.setClientId(newClientID);
396        this.userSpecifiedClientID = true;
397        ensureConnectionInfoSent();
398    }
399
400    /**
401     * Sets the default client id that the connection will use if explicitly not
402     * set with the setClientId() call.
403     */
404    public void setDefaultClientID(String clientID) throws JMSException {
405        this.info.setClientId(clientID);
406        this.userSpecifiedClientID = true;
407    }
408
409    /**
410     * Gets the metadata for this connection.
411     * 
412     * @return the connection metadata
413     * @throws JMSException if the JMS provider fails to get the connection
414     *                 metadata for this connection.
415     * @see javax.jms.ConnectionMetaData
416     */
417    public ConnectionMetaData getMetaData() throws JMSException {
418        checkClosedOrFailed();
419        return ActiveMQConnectionMetaData.INSTANCE;
420    }
421
422    /**
423     * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
424     * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
425     * associated with it.
426     * 
427     * @return the <CODE>ExceptionListener</CODE> for this connection, or
428     *         null, if no <CODE>ExceptionListener</CODE> is associated with
429     *         this connection.
430     * @throws JMSException if the JMS provider fails to get the
431     *                 <CODE>ExceptionListener</CODE> for this connection.
432     * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
433     */
434    public ExceptionListener getExceptionListener() throws JMSException {
435        checkClosedOrFailed();
436        return this.exceptionListener;
437    }
438
439    /**
440     * Sets an exception listener for this connection.
441     * <P>
442     * If a JMS provider detects a serious problem with a connection, it informs
443     * the connection's <CODE> ExceptionListener</CODE>, if one has been
444     * registered. It does this by calling the listener's <CODE>onException
445     * </CODE>
446     * method, passing it a <CODE>JMSException</CODE> object describing the
447     * problem.
448     * <P>
449     * An exception listener allows a client to be notified of a problem
450     * asynchronously. Some connections only consume messages, so they would
451     * have no other way to learn their connection has failed.
452     * <P>
453     * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
454     * <P>
455     * A JMS provider should attempt to resolve connection problems itself
456     * before it notifies the client of them.
457     * 
458     * @param listener the exception listener
459     * @throws JMSException if the JMS provider fails to set the exception
460     *                 listener for this connection.
461     */
462    public void setExceptionListener(ExceptionListener listener) throws JMSException {
463        checkClosedOrFailed();
464        this.exceptionListener = listener;
465    }
466
467    /**
468     * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
469     * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
470     * associated with it.
471     * 
472     * @return the listener or <code>null</code> if no listener is registered with the connection.
473     */
474    public ClientInternalExceptionListener getClientInternalExceptionListener()
475    {
476        return clientInternalExceptionListener;
477    }
478
479    /**
480     * Sets a client internal exception listener for this connection.
481     * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
482     * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
483     * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
484     * describing the problem.
485     * 
486     * @param listener the exception listener
487     */
488    public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
489    {
490        this.clientInternalExceptionListener = listener;
491    }
492    
493    /**
494     * Starts (or restarts) a connection's delivery of incoming messages. A call
495     * to <CODE>start</CODE> on a connection that has already been started is
496     * ignored.
497     * 
498     * @throws JMSException if the JMS provider fails to start message delivery
499     *                 due to some internal error.
500     * @see javax.jms.Connection#stop()
501     */
502    public void start() throws JMSException {
503        checkClosedOrFailed();
504        ensureConnectionInfoSent();
505        if (started.compareAndSet(false, true)) {
506            for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
507                ActiveMQSession session = i.next();
508                session.start();
509            }
510        }
511    }
512
513    /**
514     * Temporarily stops a connection's delivery of incoming messages. Delivery
515     * can be restarted using the connection's <CODE>start</CODE> method. When
516     * the connection is stopped, delivery to all the connection's message
517     * consumers is inhibited: synchronous receives block, and messages are not
518     * delivered to message listeners.
519     * <P>
520     * This call blocks until receives and/or message listeners in progress have
521     * completed.
522     * <P>
523     * Stopping a connection has no effect on its ability to send messages. A
524     * call to <CODE>stop</CODE> on a connection that has already been stopped
525     * is ignored.
526     * <P>
527     * A call to <CODE>stop</CODE> must not return until delivery of messages
528     * has paused. This means that a client can rely on the fact that none of
529     * its message listeners will be called and that all threads of control
530     * waiting for <CODE>receive</CODE> calls to return will not return with a
531     * message until the connection is restarted. The receive timers for a
532     * stopped connection continue to advance, so receives may time out while
533     * the connection is stopped.
534     * <P>
535     * If message listeners are running when <CODE>stop</CODE> is invoked, the
536     * <CODE>stop</CODE> call must wait until all of them have returned before
537     * it may return. While these message listeners are completing, they must
538     * have the full services of the connection available to them.
539     * 
540     * @throws JMSException if the JMS provider fails to stop message delivery
541     *                 due to some internal error.
542     * @see javax.jms.Connection#start()
543     */
544    public void stop() throws JMSException {
545        checkClosedOrFailed();
546        if (started.compareAndSet(true, false)) {
547            synchronized(sessions) {
548                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
549                    ActiveMQSession s = i.next();
550                    s.stop();
551                }
552            }
553        }
554    }
555
556    /**
557     * Closes the connection.
558     * <P>
559     * Since a provider typically allocates significant resources outside the
560     * JVM on behalf of a connection, clients should close these resources when
561     * they are not needed. Relying on garbage collection to eventually reclaim
562     * these resources may not be timely enough.
563     * <P>
564     * There is no need to close the sessions, producers, and consumers of a
565     * closed connection.
566     * <P>
567     * Closing a connection causes all temporary destinations to be deleted.
568     * <P>
569     * When this method is invoked, it should not return until message
570     * processing has been shut down in an orderly fashion. This means that all
571     * message listeners that may have been running have returned, and that all
572     * pending receives have returned. A close terminates all pending message
573     * receives on the connection's sessions' consumers. The receives may return
574     * with a message or with null, depending on whether there was a message
575     * available at the time of the close. If one or more of the connection's
576     * sessions' message listeners is processing a message at the time when
577     * connection <CODE>close</CODE> is invoked, all the facilities of the
578     * connection and its sessions must remain available to those listeners
579     * until they return control to the JMS provider.
580     * <P>
581     * Closing a connection causes any of its sessions' transactions in progress
582     * to be rolled back. In the case where a session's work is coordinated by
583     * an external transaction manager, a session's <CODE>commit</CODE> and
584     * <CODE> rollback</CODE> methods are not used and the result of a closed
585     * session's work is determined later by the transaction manager. Closing a
586     * connection does NOT force an acknowledgment of client-acknowledged
587     * sessions.
588     * <P>
589     * Invoking the <CODE>acknowledge</CODE> method of a received message from
590     * a closed connection's session must throw an
591     * <CODE>IllegalStateException</CODE>. Closing a closed connection must
592     * NOT throw an exception.
593     * 
594     * @throws JMSException if the JMS provider fails to close the connection
595     *                 due to some internal error. For example, a failure to
596     *                 release resources or to close a socket connection can
597     *                 cause this exception to be thrown.
598     */
599    public void close() throws JMSException {
600        try {
601            // If we were running, lets stop first.
602            if (!closed.get() && !transportFailed.get()) {
603                stop();
604            }
605
606            synchronized (this) {
607                if (!closed.get()) {
608                    closing.set(true);
609
610                    if (destinationSource != null) {
611                        destinationSource.stop();
612                        destinationSource = null;
613                    }
614                    if (advisoryConsumer != null) {
615                        advisoryConsumer.dispose();
616                        advisoryConsumer = null;
617                    }
618                    if (this.scheduler != null) {
619                        try {
620                            this.scheduler.stop();
621                        } catch (Exception e) {
622                            JMSException ex =  JMSExceptionSupport.create(e);
623                            throw ex;
624                        }
625                    }
626
627                    long lastDeliveredSequenceId = 0;
628                    for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
629                        ActiveMQSession s = i.next();
630                        s.dispose();
631                        lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
632                    }
633                    for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
634                        ActiveMQConnectionConsumer c = i.next();
635                        c.dispose();
636                    }
637                    for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
638                        ActiveMQInputStream c = i.next();
639                        c.dispose();
640                    }
641                    for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
642                        ActiveMQOutputStream c = i.next();
643                        c.dispose();
644                    }
645
646                    // As TemporaryQueue and TemporaryTopic instances are bound
647                    // to a connection we should just delete them after the connection
648                    // is closed to free up memory
649                    for (Iterator<ActiveMQTempDestination> i = this.activeTempDestinations.values().iterator(); i.hasNext();) {
650                        ActiveMQTempDestination c = i.next();
651                        c.delete();
652                    }
653                    
654                    if (isConnectionInfoSentToBroker) {
655                        // If we announced ourselfs to the broker.. Try to let
656                        // the broker
657                        // know that the connection is being shutdown.
658                        RemoveInfo removeCommand = info.createRemoveCommand();
659                        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
660                        doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
661                        doAsyncSendPacket(new ShutdownInfo());
662                    }
663
664                    ServiceSupport.dispose(this.transport);
665
666                    started.set(false);
667
668                    // TODO if we move the TaskRunnerFactory to the connection
669                    // factory
670                    // then we may need to call
671                    // factory.onConnectionClose(this);
672                    if (sessionTaskRunner != null) {
673                        sessionTaskRunner.shutdown();
674                    }
675                    closed.set(true);
676                    closing.set(false);
677                }
678            }
679        } finally {
680            try {
681                if (executor != null){
682                    executor.shutdown();
683                }
684            }catch(Throwable e) {
685                LOG.error("Error shutting down thread pool " + e,e);
686            }
687            factoryStats.removeConnection(this);
688        }
689    }
690
691    /**
692     * Tells the broker to terminate its VM. This can be used to cleanly
693     * terminate a broker running in a standalone java process. Server must have
694     * property enable.vm.shutdown=true defined to allow this to work.
695     */
696    // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
697    // implemented.
698    /*
699     * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
700     * command = new BrokerAdminCommand();
701     * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
702     * asyncSendPacket(command); }
703     */
704
705    /**
706     * Create a durable connection consumer for this connection (optional
707     * operation). This is an expert facility not used by regular JMS clients.
708     * 
709     * @param topic topic to access
710     * @param subscriptionName durable subscription name
711     * @param messageSelector only messages with properties matching the message
712     *                selector expression are delivered. A value of null or an
713     *                empty string indicates that there is no message selector
714     *                for the message consumer.
715     * @param sessionPool the server session pool to associate with this durable
716     *                connection consumer
717     * @param maxMessages the maximum number of messages that can be assigned to
718     *                a server session at one time
719     * @return the durable connection consumer
720     * @throws JMSException if the <CODE>Connection</CODE> object fails to
721     *                 create a connection consumer due to some internal error
722     *                 or invalid arguments for <CODE>sessionPool</CODE> and
723     *                 <CODE>messageSelector</CODE>.
724     * @throws javax.jms.InvalidDestinationException if an invalid destination
725     *                 is specified.
726     * @throws javax.jms.InvalidSelectorException if the message selector is
727     *                 invalid.
728     * @see javax.jms.ConnectionConsumer
729     * @since 1.1
730     */
731    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
732        throws JMSException {
733        return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
734    }
735
736    /**
737     * Create a durable connection consumer for this connection (optional
738     * operation). This is an expert facility not used by regular JMS clients.
739     * 
740     * @param topic topic to access
741     * @param subscriptionName durable subscription name
742     * @param messageSelector only messages with properties matching the message
743     *                selector expression are delivered. A value of null or an
744     *                empty string indicates that there is no message selector
745     *                for the message consumer.
746     * @param sessionPool the server session pool to associate with this durable
747     *                connection consumer
748     * @param maxMessages the maximum number of messages that can be assigned to
749     *                a server session at one time
750     * @param noLocal set true if you want to filter out messages published
751     *                locally
752     * @return the durable connection consumer
753     * @throws JMSException if the <CODE>Connection</CODE> object fails to
754     *                 create a connection consumer due to some internal error
755     *                 or invalid arguments for <CODE>sessionPool</CODE> and
756     *                 <CODE>messageSelector</CODE>.
757     * @throws javax.jms.InvalidDestinationException if an invalid destination
758     *                 is specified.
759     * @throws javax.jms.InvalidSelectorException if the message selector is
760     *                 invalid.
761     * @see javax.jms.ConnectionConsumer
762     * @since 1.1
763     */
764    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
765                                                              boolean noLocal) throws JMSException {
766        checkClosedOrFailed();
767        ensureConnectionInfoSent();
768        SessionId sessionId = new SessionId(info.getConnectionId(), -1);
769        ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
770        info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
771        info.setSubscriptionName(subscriptionName);
772        info.setSelector(messageSelector);
773        info.setPrefetchSize(maxMessages);
774        info.setDispatchAsync(isDispatchAsync());
775
776        // Allows the options on the destination to configure the consumerInfo
777        if (info.getDestination().getOptions() != null) {
778            Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
779            IntrospectionSupport.setProperties(this.info, options, "consumer.");
780        }
781
782        return new ActiveMQConnectionConsumer(this, sessionPool, info);
783    }
784
785    // Properties
786    // -------------------------------------------------------------------------
787
788    /**
789     * Returns true if this connection has been started
790     * 
791     * @return true if this Connection is started
792     */
793    public boolean isStarted() {
794        return started.get();
795    }
796
797    /**
798     * Returns true if the connection is closed
799     */
800    public boolean isClosed() {
801        return closed.get();
802    }
803
804    /**
805     * Returns true if the connection is in the process of being closed
806     */
807    public boolean isClosing() {
808        return closing.get();
809    }
810
811    /**
812     * Returns true if the underlying transport has failed
813     */
814    public boolean isTransportFailed() {
815        return transportFailed.get();
816    }
817
818    /**
819     * @return Returns the prefetchPolicy.
820     */
821    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
822        return prefetchPolicy;
823    }
824
825    /**
826     * Sets the <a
827     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
828     * policy</a> for consumers created by this connection.
829     */
830    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
831        this.prefetchPolicy = prefetchPolicy;
832    }
833
834    /**
835     */
836    public Transport getTransportChannel() {
837        return transport;
838    }
839
840    /**
841     * @return Returns the clientID of the connection, forcing one to be
842     *         generated if one has not yet been configured.
843     */
844    public String getInitializedClientID() throws JMSException {
845        ensureConnectionInfoSent();
846        return info.getClientId();
847    }
848
849    /**
850     * @return Returns the timeStampsDisableByDefault.
851     */
852    public boolean isDisableTimeStampsByDefault() {
853        return disableTimeStampsByDefault;
854    }
855
856    /**
857     * Sets whether or not timestamps on messages should be disabled or not. If
858     * you disable them it adds a small performance boost.
859     */
860    public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
861        this.disableTimeStampsByDefault = timeStampsDisableByDefault;
862    }
863
864    /**
865     * @return Returns the dispatchOptimizedMessage.
866     */
867    public boolean isOptimizedMessageDispatch() {
868        return optimizedMessageDispatch;
869    }
870
871    /**
872     * If this flag is set then an larger prefetch limit is used - only
873     * applicable for durable topic subscribers.
874     */
875    public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
876        this.optimizedMessageDispatch = dispatchOptimizedMessage;
877    }
878
879    /**
880     * @return Returns the closeTimeout.
881     */
882    public int getCloseTimeout() {
883        return closeTimeout;
884    }
885
886    /**
887     * Sets the timeout before a close is considered complete. Normally a
888     * close() on a connection waits for confirmation from the broker; this
889     * allows that operation to timeout to save the client hanging if there is
890     * no broker
891     */
892    public void setCloseTimeout(int closeTimeout) {
893        this.closeTimeout = closeTimeout;
894    }
895
896    /**
897     * @return ConnectionInfo
898     */
899    public ConnectionInfo getConnectionInfo() {
900        return this.info;
901    }
902
903    public boolean isUseRetroactiveConsumer() {
904        return useRetroactiveConsumer;
905    }
906
907    /**
908     * Sets whether or not retroactive consumers are enabled. Retroactive
909     * consumers allow non-durable topic subscribers to receive old messages
910     * that were published before the non-durable subscriber started.
911     */
912    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
913        this.useRetroactiveConsumer = useRetroactiveConsumer;
914    }
915
916    public boolean isNestedMapAndListEnabled() {
917        return nestedMapAndListEnabled;
918    }
919
920    /**
921     * Enables/disables whether or not Message properties and MapMessage entries
922     * support <a
923     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
924     * Structures</a> of Map and List objects
925     */
926    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
927        this.nestedMapAndListEnabled = structuredMapsEnabled;
928    }
929
930    public boolean isExclusiveConsumer() {
931        return exclusiveConsumer;
932    }
933
934    /**
935     * Enables or disables whether or not queue consumers should be exclusive or
936     * not for example to preserve ordering when not using <a
937     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
938     * 
939     * @param exclusiveConsumer
940     */
941    public void setExclusiveConsumer(boolean exclusiveConsumer) {
942        this.exclusiveConsumer = exclusiveConsumer;
943    }
944
945    /**
946     * Adds a transport listener so that a client can be notified of events in
947     * the underlying transport
948     */
949    public void addTransportListener(TransportListener transportListener) {
950        transportListeners.add(transportListener);
951    }
952
953    public void removeTransportListener(TransportListener transportListener) {
954        transportListeners.remove(transportListener);
955    }
956
957    public boolean isUseDedicatedTaskRunner() {
958        return useDedicatedTaskRunner;
959    }
960    
961    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
962        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
963    }
964
965    public TaskRunnerFactory getSessionTaskRunner() {
966        synchronized (this) {
967            if (sessionTaskRunner == null) {
968                sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner());
969            }
970        }
971        return sessionTaskRunner;
972    }
973
974    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
975        this.sessionTaskRunner = sessionTaskRunner;
976    }
977
978    public MessageTransformer getTransformer() {
979        return transformer;
980    }
981
982    /**
983     * Sets the transformer used to transform messages before they are sent on
984     * to the JMS bus or when they are received from the bus but before they are
985     * delivered to the JMS client
986     */
987    public void setTransformer(MessageTransformer transformer) {
988        this.transformer = transformer;
989    }
990
991    /**
992     * @return the statsEnabled
993     */
994    public boolean isStatsEnabled() {
995        return this.stats.isEnabled();
996    }
997
998    /**
999     * @param statsEnabled the statsEnabled to set
1000     */
1001    public void setStatsEnabled(boolean statsEnabled) {
1002        this.stats.setEnabled(statsEnabled);
1003    }
1004
1005    /**
1006     * Returns the {@link DestinationSource} object which can be used to listen to destinations
1007     * being created or destroyed or to enquire about the current destinations available on the broker
1008     *
1009     * @return a lazily created destination source
1010     * @throws JMSException
1011     */
1012    public DestinationSource getDestinationSource() throws JMSException {
1013        if (destinationSource == null) {
1014            destinationSource = new DestinationSource(this);
1015            destinationSource.start();
1016        }
1017        return destinationSource;
1018    }
1019
1020    // Implementation methods
1021    // -------------------------------------------------------------------------
1022
1023    /**
1024     * Used internally for adding Sessions to the Connection
1025     * 
1026     * @param session
1027     * @throws JMSException
1028     * @throws JMSException
1029     */
1030    protected void addSession(ActiveMQSession session) throws JMSException {
1031        this.sessions.add(session);
1032        if (sessions.size() > 1 || session.isTransacted()) {
1033            optimizedMessageDispatch = false;
1034        }
1035    }
1036
1037    /**
1038     * Used interanlly for removing Sessions from a Connection
1039     * 
1040     * @param session
1041     */
1042    protected void removeSession(ActiveMQSession session) {
1043        this.sessions.remove(session);
1044        this.removeDispatcher(session);
1045    }
1046
1047    /**
1048     * Add a ConnectionConsumer
1049     * 
1050     * @param connectionConsumer
1051     * @throws JMSException
1052     */
1053    protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1054        this.connectionConsumers.add(connectionConsumer);
1055    }
1056
1057    /**
1058     * Remove a ConnectionConsumer
1059     * 
1060     * @param connectionConsumer
1061     */
1062    protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1063        this.connectionConsumers.remove(connectionConsumer);
1064        this.removeDispatcher(connectionConsumer);
1065    }
1066
1067    /**
1068     * Creates a <CODE>TopicSession</CODE> object.
1069     * 
1070     * @param transacted indicates whether the session is transacted
1071     * @param acknowledgeMode indicates whether the consumer or the client will
1072     *                acknowledge any messages it receives; ignored if the
1073     *                session is transacted. Legal values are
1074     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1075     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1076     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1077     * @return a newly created topic session
1078     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1079     *                 to create a session due to some internal error or lack of
1080     *                 support for the specific transaction and acknowledgement
1081     *                 mode.
1082     * @see Session#AUTO_ACKNOWLEDGE
1083     * @see Session#CLIENT_ACKNOWLEDGE
1084     * @see Session#DUPS_OK_ACKNOWLEDGE
1085     */
1086    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1087        return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1088    }
1089
1090    /**
1091     * Creates a connection consumer for this connection (optional operation).
1092     * This is an expert facility not used by regular JMS clients.
1093     * 
1094     * @param topic the topic to access
1095     * @param messageSelector only messages with properties matching the message
1096     *                selector expression are delivered. A value of null or an
1097     *                empty string indicates that there is no message selector
1098     *                for the message consumer.
1099     * @param sessionPool the server session pool to associate with this
1100     *                connection consumer
1101     * @param maxMessages the maximum number of messages that can be assigned to
1102     *                a server session at one time
1103     * @return the connection consumer
1104     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1105     *                 to create a connection consumer due to some internal
1106     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1107     *                 and <CODE>messageSelector</CODE>.
1108     * @throws javax.jms.InvalidDestinationException if an invalid topic is
1109     *                 specified.
1110     * @throws javax.jms.InvalidSelectorException if the message selector is
1111     *                 invalid.
1112     * @see javax.jms.ConnectionConsumer
1113     */
1114    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1115        return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1116    }
1117
1118    /**
1119     * Creates a connection consumer for this connection (optional operation).
1120     * This is an expert facility not used by regular JMS clients.
1121     * 
1122     * @param queue the queue to access
1123     * @param messageSelector only messages with properties matching the message
1124     *                selector expression are delivered. A value of null or an
1125     *                empty string indicates that there is no message selector
1126     *                for the message consumer.
1127     * @param sessionPool the server session pool to associate with this
1128     *                connection consumer
1129     * @param maxMessages the maximum number of messages that can be assigned to
1130     *                a server session at one time
1131     * @return the connection consumer
1132     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1133     *                 to create a connection consumer due to some internal
1134     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1135     *                 and <CODE>messageSelector</CODE>.
1136     * @throws javax.jms.InvalidDestinationException if an invalid queue is
1137     *                 specified.
1138     * @throws javax.jms.InvalidSelectorException if the message selector is
1139     *                 invalid.
1140     * @see javax.jms.ConnectionConsumer
1141     */
1142    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1143        return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1144    }
1145
1146    /**
1147     * Creates a connection consumer for this connection (optional operation).
1148     * This is an expert facility not used by regular JMS clients.
1149     * 
1150     * @param destination the destination to access
1151     * @param messageSelector only messages with properties matching the message
1152     *                selector expression are delivered. A value of null or an
1153     *                empty string indicates that there is no message selector
1154     *                for the message consumer.
1155     * @param sessionPool the server session pool to associate with this
1156     *                connection consumer
1157     * @param maxMessages the maximum number of messages that can be assigned to
1158     *                a server session at one time
1159     * @return the connection consumer
1160     * @throws JMSException if the <CODE>Connection</CODE> object fails to
1161     *                 create a connection consumer due to some internal error
1162     *                 or invalid arguments for <CODE>sessionPool</CODE> and
1163     *                 <CODE>messageSelector</CODE>.
1164     * @throws javax.jms.InvalidDestinationException if an invalid destination
1165     *                 is specified.
1166     * @throws javax.jms.InvalidSelectorException if the message selector is
1167     *                 invalid.
1168     * @see javax.jms.ConnectionConsumer
1169     * @since 1.1
1170     */
1171    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1172        return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1173    }
1174
1175    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1176        throws JMSException {
1177
1178        checkClosedOrFailed();
1179        ensureConnectionInfoSent();
1180
1181        ConsumerId consumerId = createConsumerId();
1182        ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1183        consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1184        consumerInfo.setSelector(messageSelector);
1185        consumerInfo.setPrefetchSize(maxMessages);
1186        consumerInfo.setNoLocal(noLocal);
1187        consumerInfo.setDispatchAsync(isDispatchAsync());
1188
1189        // Allows the options on the destination to configure the consumerInfo
1190        if (consumerInfo.getDestination().getOptions() != null) {
1191            Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1192            IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1193        }
1194
1195        return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1196    }
1197
1198    /**
1199     * @return
1200     */
1201    private ConsumerId createConsumerId() {
1202        return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1203    }
1204
1205    /**
1206     * @return
1207     */
1208    private ProducerId createProducerId() {
1209        return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1210    }
1211
1212    /**
1213     * Creates a <CODE>QueueSession</CODE> object.
1214     * 
1215     * @param transacted indicates whether the session is transacted
1216     * @param acknowledgeMode indicates whether the consumer or the client will
1217     *                acknowledge any messages it receives; ignored if the
1218     *                session is transacted. Legal values are
1219     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1220     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1221     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1222     * @return a newly created queue session
1223     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1224     *                 to create a session due to some internal error or lack of
1225     *                 support for the specific transaction and acknowledgement
1226     *                 mode.
1227     * @see Session#AUTO_ACKNOWLEDGE
1228     * @see Session#CLIENT_ACKNOWLEDGE
1229     * @see Session#DUPS_OK_ACKNOWLEDGE
1230     */
1231    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1232        return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1233    }
1234
1235    /**
1236     * Ensures that the clientID was manually specified and not auto-generated.
1237     * If the clientID was not specified this method will throw an exception.
1238     * This method is used to ensure that the clientID + durableSubscriber name
1239     * are used correctly.
1240     * 
1241     * @throws JMSException
1242     */
1243    public void checkClientIDWasManuallySpecified() throws JMSException {
1244        if (!userSpecifiedClientID) {
1245            throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1246        }
1247    }
1248
1249    /**
1250     * send a Packet through the Connection - for internal use only
1251     * 
1252     * @param command
1253     * @throws JMSException
1254     */
1255    public void asyncSendPacket(Command command) throws JMSException {
1256        if (isClosed()) {
1257            throw new ConnectionClosedException();
1258        } else {
1259            doAsyncSendPacket(command);
1260        }
1261    }
1262
1263        private void doAsyncSendPacket(Command command) throws JMSException {
1264                try {
1265                    this.transport.oneway(command);
1266                } catch (IOException e) {
1267                    throw JMSExceptionSupport.create(e);
1268                }
1269        }
1270
1271    /**
1272     * Send a packet through a Connection - for internal use only
1273     * 
1274     * @param command
1275     * @return
1276     * @throws JMSException
1277     */
1278    public Response syncSendPacket(Command command) throws JMSException {
1279        if (isClosed()) {
1280            throw new ConnectionClosedException();
1281        } else {
1282
1283            try {
1284                Response response = (Response)this.transport.request(command);
1285                if (response.isException()) {
1286                    ExceptionResponse er = (ExceptionResponse)response;
1287                    if (er.getException() instanceof JMSException) {
1288                        throw (JMSException)er.getException();
1289                    } else {
1290                        if (isClosed()||closing.get()) {
1291                            LOG.debug("Received an exception but connection is closing");
1292                        }
1293                        JMSException jmsEx = null;
1294                        try {
1295                            jmsEx = JMSExceptionSupport.create(er.getException());
1296                        }catch(Throwable e) {
1297                            LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1298                        }
1299                        //dispose of transport for security exceptions
1300                        if (er.getException() instanceof SecurityException){
1301                            Transport t = this.transport;
1302                            if (null != t){
1303                                ServiceSupport.dispose(t);
1304                            }
1305                        }
1306                        if(jmsEx !=null) {
1307                            throw jmsEx;
1308                        }
1309                    }
1310                }
1311                return response;
1312            } catch (IOException e) {
1313                throw JMSExceptionSupport.create(e);
1314            }
1315        }
1316    }
1317
1318    /**
1319     * Send a packet through a Connection - for internal use only
1320     * 
1321     * @param command
1322     * @return
1323     * @throws JMSException
1324     */
1325    public Response syncSendPacket(Command command, int timeout) throws JMSException {
1326        if (isClosed() || closing.get()) {
1327            throw new ConnectionClosedException();
1328        } else {
1329            return doSyncSendPacket(command, timeout);
1330        }
1331    }
1332
1333        private Response doSyncSendPacket(Command command, int timeout)
1334                        throws JMSException {
1335                try {
1336                    Response response = (Response) (timeout > 0
1337                    ? this.transport.request(command, timeout) 
1338                    : this.transport.request(command));
1339                    if (response != null && response.isException()) {
1340                        ExceptionResponse er = (ExceptionResponse)response;
1341                        if (er.getException() instanceof JMSException) {
1342                            throw (JMSException)er.getException();
1343                        } else {
1344                            throw JMSExceptionSupport.create(er.getException());
1345                        }
1346                    }
1347                    return response;
1348                } catch (IOException e) {
1349                    throw JMSExceptionSupport.create(e);
1350                }
1351        }
1352
1353    /**
1354     * @return statistics for this Connection
1355     */
1356    public StatsImpl getStats() {
1357        return stats;
1358    }
1359
1360    /**
1361     * simply throws an exception if the Connection is already closed or the
1362     * Transport has failed
1363     * 
1364     * @throws JMSException
1365     */
1366    protected synchronized void checkClosedOrFailed() throws JMSException {
1367        checkClosed();
1368        if (transportFailed.get()) {
1369            throw new ConnectionFailedException(firstFailureError);
1370        }
1371    }
1372
1373    /**
1374     * simply throws an exception if the Connection is already closed
1375     * 
1376     * @throws JMSException
1377     */
1378    protected synchronized void checkClosed() throws JMSException {
1379        if (closed.get()) {
1380            throw new ConnectionClosedException();
1381        }
1382    }
1383
1384    /**
1385     * Send the ConnectionInfo to the Broker
1386     * 
1387     * @throws JMSException
1388     */
1389    protected void ensureConnectionInfoSent() throws JMSException {
1390        synchronized(this.ensureConnectionInfoSentMutex) {
1391            // Can we skip sending the ConnectionInfo packet??
1392            if (isConnectionInfoSentToBroker || closed.get()) {
1393                return;
1394            }
1395            //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1396            if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1397                info.setClientId(clientIdGenerator.generateId());
1398            }
1399            syncSendPacket(info.copy());
1400    
1401            this.isConnectionInfoSentToBroker = true;
1402            // Add a temp destination advisory consumer so that
1403            // We know what the valid temporary destinations are on the
1404            // broker without having to do an RPC to the broker.
1405    
1406            ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1407            if (watchTopicAdvisories) {
1408                advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1409            }
1410        }
1411    }
1412
1413    public synchronized boolean isWatchTopicAdvisories() {
1414        return watchTopicAdvisories;
1415    }
1416
1417    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1418        this.watchTopicAdvisories = watchTopicAdvisories;
1419    }
1420
1421    /**
1422     * @return Returns the useAsyncSend.
1423     */
1424    public boolean isUseAsyncSend() {
1425        return useAsyncSend;
1426    }
1427
1428    /**
1429     * Forces the use of <a
1430     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1431     * adds a massive performance boost; but means that the send() method will
1432     * return immediately whether the message has been sent or not which could
1433     * lead to message loss.
1434     */
1435    public void setUseAsyncSend(boolean useAsyncSend) {
1436        this.useAsyncSend = useAsyncSend;
1437    }
1438
1439    /**
1440     * @return true if always sync send messages
1441     */
1442    public boolean isAlwaysSyncSend() {
1443        return this.alwaysSyncSend;
1444    }
1445
1446    /**
1447     * Set true if always require messages to be sync sent
1448     * 
1449     * @param alwaysSyncSend
1450     */
1451    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1452        this.alwaysSyncSend = alwaysSyncSend;
1453    }
1454    
1455    /**
1456     * @return the messagePrioritySupported
1457     */
1458    public boolean isMessagePrioritySupported() {
1459        return this.messagePrioritySupported;
1460    }
1461
1462    /**
1463     * @param messagePrioritySupported the messagePrioritySupported to set
1464     */
1465    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1466        this.messagePrioritySupported = messagePrioritySupported;
1467    }
1468
1469    /**
1470     * Cleans up this connection so that it's state is as if the connection was
1471     * just created. This allows the Resource Adapter to clean up a connection
1472     * so that it can be reused without having to close and recreate the
1473     * connection.
1474     */
1475    public void cleanup() throws JMSException {
1476
1477        if (advisoryConsumer != null && !isTransportFailed()) {
1478            advisoryConsumer.dispose();
1479            advisoryConsumer = null;
1480        }
1481
1482        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1483            ActiveMQSession s = i.next();
1484            s.dispose();
1485        }
1486        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1487            ActiveMQConnectionConsumer c = i.next();
1488            c.dispose();
1489        }
1490        for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
1491            ActiveMQInputStream c = i.next();
1492            c.dispose();
1493        }
1494        for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
1495            ActiveMQOutputStream c = i.next();
1496            c.dispose();
1497        }
1498
1499        if (isConnectionInfoSentToBroker) {
1500            if (!transportFailed.get() && !closing.get()) {
1501                syncSendPacket(info.createRemoveCommand());
1502            }
1503            isConnectionInfoSentToBroker = false;
1504        }
1505        if (userSpecifiedClientID) {
1506            info.setClientId(null);
1507            userSpecifiedClientID = false;
1508        }
1509        clientIDSet = false;
1510
1511        started.set(false);
1512    }
1513
1514    public void finalize() throws Throwable{
1515        if (scheduler != null){
1516            scheduler.stop();
1517        }
1518    }
1519
1520    /**
1521     * Changes the associated username/password that is associated with this
1522     * connection. If the connection has been used, you must called cleanup()
1523     * before calling this method.
1524     * 
1525     * @throws IllegalStateException if the connection is in used.
1526     */
1527    public void changeUserInfo(String userName, String password) throws JMSException {
1528        if (isConnectionInfoSentToBroker) {
1529            throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1530        }
1531        this.info.setUserName(userName);
1532        this.info.setPassword(password);
1533    }
1534
1535    /**
1536     * @return Returns the resourceManagerId.
1537     * @throws JMSException
1538     */
1539    public String getResourceManagerId() throws JMSException {
1540        waitForBrokerInfo();
1541        if (brokerInfo == null) {
1542            throw new JMSException("Connection failed before Broker info was received.");
1543        }
1544        return brokerInfo.getBrokerId().getValue();
1545    }
1546
1547    /**
1548     * Returns the broker name if one is available or null if one is not
1549     * available yet.
1550     */
1551    public String getBrokerName() {
1552        try {
1553            brokerInfoReceived.await(5, TimeUnit.SECONDS);
1554            if (brokerInfo == null) {
1555                return null;
1556            }
1557            return brokerInfo.getBrokerName();
1558        } catch (InterruptedException e) {
1559            Thread.currentThread().interrupt();
1560            return null;
1561        }
1562    }
1563
1564    /**
1565     * Returns the broker information if it is available or null if it is not
1566     * available yet.
1567     */
1568    public BrokerInfo getBrokerInfo() {
1569        return brokerInfo;
1570    }
1571
1572    /**
1573     * @return Returns the RedeliveryPolicy.
1574     * @throws JMSException
1575     */
1576    public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1577        return redeliveryPolicy;
1578    }
1579
1580    /**
1581     * Sets the redelivery policy to be used when messages are rolled back
1582     */
1583    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1584        this.redeliveryPolicy = redeliveryPolicy;
1585    }
1586
1587    public BlobTransferPolicy getBlobTransferPolicy() {
1588        if (blobTransferPolicy == null) {
1589            blobTransferPolicy = createBlobTransferPolicy();
1590        }
1591        return blobTransferPolicy;
1592    }
1593
1594    /**
1595     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1596     * OBjects) are transferred from producers to brokers to consumers
1597     */
1598    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1599        this.blobTransferPolicy = blobTransferPolicy;
1600    }
1601
1602    /**
1603     * @return Returns the alwaysSessionAsync.
1604     */
1605    public boolean isAlwaysSessionAsync() {
1606        return alwaysSessionAsync;
1607    }
1608
1609    /**
1610     * If this flag is set then a separate thread is not used for dispatching
1611     * messages for each Session in the Connection. However, a separate thread
1612     * is always used if there is more than one session, or the session isn't in
1613     * auto acknowledge or duplicates ok mode
1614     */
1615    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1616        this.alwaysSessionAsync = alwaysSessionAsync;
1617    }
1618
1619    /**
1620     * @return Returns the optimizeAcknowledge.
1621     */
1622    public boolean isOptimizeAcknowledge() {
1623        return optimizeAcknowledge;
1624    }
1625
1626    /**
1627     * Enables an optimised acknowledgement mode where messages are acknowledged
1628     * in batches rather than individually
1629     * 
1630     * @param optimizeAcknowledge The optimizeAcknowledge to set.
1631     */
1632    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1633        this.optimizeAcknowledge = optimizeAcknowledge;
1634    }
1635
1636    public long getWarnAboutUnstartedConnectionTimeout() {
1637        return warnAboutUnstartedConnectionTimeout;
1638    }
1639
1640    /**
1641     * Enables the timeout from a connection creation to when a warning is
1642     * generated if the connection is not properly started via {@link #start()}
1643     * and a message is received by a consumer. It is a very common gotcha to
1644     * forget to <a
1645     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1646     * the connection</a> so this option makes the default case to create a
1647     * warning if the user forgets. To disable the warning just set the value to <
1648     * 0 (say -1).
1649     */
1650    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1651        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1652    }
1653    
1654    /**
1655     * @return the sendTimeout
1656     */
1657    public int getSendTimeout() {
1658        return sendTimeout;
1659    }
1660
1661    /**
1662     * @param sendTimeout the sendTimeout to set
1663     */
1664    public void setSendTimeout(int sendTimeout) {
1665        this.sendTimeout = sendTimeout;
1666    }
1667    
1668    /**
1669     * @return the sendAcksAsync
1670     */
1671    public boolean isSendAcksAsync() {
1672        return sendAcksAsync;
1673    }
1674
1675    /**
1676     * @param sendAcksAsync the sendAcksAsync to set
1677     */
1678    public void setSendAcksAsync(boolean sendAcksAsync) {
1679        this.sendAcksAsync = sendAcksAsync;
1680    }
1681
1682
1683    /**
1684     * Returns the time this connection was created
1685     */
1686    public long getTimeCreated() {
1687        return timeCreated;
1688    }
1689
1690    private void waitForBrokerInfo() throws JMSException {
1691        try {
1692            brokerInfoReceived.await();
1693        } catch (InterruptedException e) {
1694            Thread.currentThread().interrupt();
1695            throw JMSExceptionSupport.create(e);
1696        }
1697    }
1698
1699    // Package protected so that it can be used in unit tests
1700    public Transport getTransport() {
1701        return transport;
1702    }
1703
1704    public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1705        producers.put(producerId, producer);
1706    }
1707
1708    public void removeProducer(ProducerId producerId) {
1709        producers.remove(producerId);
1710    }
1711
1712    public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1713        dispatchers.put(consumerId, dispatcher);
1714    }
1715
1716    public void removeDispatcher(ConsumerId consumerId) {
1717        dispatchers.remove(consumerId);
1718    }
1719
1720    /**
1721     * @param o - the command to consume
1722     */
1723    public void onCommand(final Object o) {
1724        final Command command = (Command)o;
1725        if (!closed.get() && command != null) {
1726            try {
1727                command.visit(new CommandVisitorAdapter() {
1728                    @Override
1729                    public Response processMessageDispatch(MessageDispatch md) throws Exception {
1730                        waitForTransportInterruptionProcessingToComplete();
1731                        ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1732                        if (dispatcher != null) {
1733                            // Copy in case a embedded broker is dispatching via
1734                            // vm://
1735                            // md.getMessage() == null to signal end of queue
1736                            // browse.
1737                            Message msg = md.getMessage();
1738                            if (msg != null) {
1739                                msg = msg.copy();
1740                                msg.setReadOnlyBody(true);
1741                                msg.setReadOnlyProperties(true);
1742                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
1743                                msg.setConnection(ActiveMQConnection.this);
1744                                md.setMessage(msg);
1745                            }
1746                            dispatcher.dispatch(md);
1747                        }
1748                        return null;
1749                    }
1750
1751                    @Override
1752                    public Response processProducerAck(ProducerAck pa) throws Exception {
1753                        if (pa != null && pa.getProducerId() != null) {
1754                            ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1755                            if (producer != null) {
1756                                producer.onProducerAck(pa);
1757                            }
1758                        }
1759                        return null;
1760                    }
1761
1762                    @Override
1763                    public Response processBrokerInfo(BrokerInfo info) throws Exception {
1764                        brokerInfo = info;
1765                        brokerInfoReceived.countDown();
1766                        optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1767                        getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1768                        return null;
1769                    }
1770
1771                    @Override
1772                    public Response processConnectionError(final ConnectionError error) throws Exception {
1773                        executor.execute(new Runnable() {
1774                            public void run() {
1775                                onAsyncException(error.getException());
1776                            }
1777                        });
1778                        return null;
1779                    }
1780
1781                    @Override
1782                    public Response processControlCommand(ControlCommand command) throws Exception {
1783                        onControlCommand(command);
1784                        return null;
1785                    }
1786
1787                    @Override
1788                    public Response processConnectionControl(ConnectionControl control) throws Exception {
1789                        onConnectionControl((ConnectionControl)command);
1790                        return null;
1791                    }
1792
1793                    @Override
1794                    public Response processConsumerControl(ConsumerControl control) throws Exception {
1795                        onConsumerControl((ConsumerControl)command);
1796                        return null;
1797                    }
1798
1799                    @Override
1800                    public Response processWireFormat(WireFormatInfo info) throws Exception {
1801                        onWireFormatInfo((WireFormatInfo)command);
1802                        return null;
1803                    }
1804                });
1805            } catch (Exception e) {
1806                onClientInternalException(e);
1807            }
1808
1809        }
1810        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1811            TransportListener listener = iter.next();
1812            listener.onCommand(command);
1813        }
1814    }
1815
1816    protected void onWireFormatInfo(WireFormatInfo info) {
1817        protocolVersion.set(info.getVersion());
1818    }
1819
1820    /**
1821     * Handles async client internal exceptions.
1822     * A client internal exception is usually one that has been thrown
1823     * by a container runtime component during asynchronous processing of a
1824     * message that does not affect the connection itself.
1825     * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1826     * its <code>onException</code> method, if one has been registered with this connection.
1827     * 
1828     * @param error the exception that the problem
1829     */
1830    public void onClientInternalException(final Throwable error) {
1831        if ( !closed.get() && !closing.get() ) {
1832            if ( this.clientInternalExceptionListener != null ) {
1833                executor.execute(new Runnable() {
1834                    public void run() {
1835                        ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1836                    }
1837                });
1838            } else {
1839                LOG.debug("Async client internal exception occurred with no exception listener registered: " 
1840                        + error, error);
1841            }
1842        }
1843    }
1844    /**
1845     * Used for handling async exceptions
1846     * 
1847     * @param error
1848     */
1849    public void onAsyncException(Throwable error) {
1850        if (!closed.get() && !closing.get()) {
1851            if (this.exceptionListener != null) {
1852
1853                if (!(error instanceof JMSException)) {
1854                    error = JMSExceptionSupport.create(error);
1855                }
1856                final JMSException e = (JMSException)error;
1857
1858                executor.execute(new Runnable() {
1859                    public void run() {
1860                        ActiveMQConnection.this.exceptionListener.onException(e);
1861                    }
1862                });
1863
1864            } else {
1865                LOG.debug("Async exception with no exception listener: " + error, error);
1866            }
1867        }
1868    }
1869
1870    public void onException(final IOException error) {
1871                onAsyncException(error);
1872                if (!closing.get() && !closed.get()) {
1873                        executor.execute(new Runnable() {
1874                                public void run() {
1875                                        transportFailed(error);
1876                                        ServiceSupport.dispose(ActiveMQConnection.this.transport);
1877                                        brokerInfoReceived.countDown();
1878                                        try {
1879                                                cleanup();
1880                                        } catch (JMSException e) {
1881                                                LOG.warn("Exception during connection cleanup, " + e, e);
1882                                        }
1883                                        for (Iterator<TransportListener> iter = transportListeners
1884                                                        .iterator(); iter.hasNext();) {
1885                                                TransportListener listener = iter.next();
1886                                                listener.onException(error);
1887                                        }
1888                                }
1889                        });
1890                }
1891        }
1892
1893    public void transportInterupted() {
1894        this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
1895        if (LOG.isDebugEnabled()) {
1896            LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
1897        }
1898        signalInterruptionProcessingNeeded();
1899
1900        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1901            ActiveMQSession s = i.next();
1902            s.clearMessagesInProgress();
1903        }
1904        
1905        for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
1906            connectionConsumer.clearMessagesInProgress();    
1907        }
1908        
1909        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1910            TransportListener listener = iter.next();
1911            listener.transportInterupted();
1912        }
1913    }
1914
1915    public void transportResumed() {
1916        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1917            TransportListener listener = iter.next();
1918            listener.transportResumed();
1919        }
1920    }
1921
1922    /**
1923     * Create the DestinationInfo object for the temporary destination.
1924     * 
1925     * @param topic - if its true topic, else queue.
1926     * @return DestinationInfo
1927     * @throws JMSException
1928     */
1929    protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
1930
1931        // Check if Destination info is of temporary type.
1932        ActiveMQTempDestination dest;
1933        if (topic) {
1934            dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1935        } else {
1936            dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1937        }
1938
1939        DestinationInfo info = new DestinationInfo();
1940        info.setConnectionId(this.info.getConnectionId());
1941        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
1942        info.setDestination(dest);
1943        syncSendPacket(info);
1944
1945        dest.setConnection(this);
1946        activeTempDestinations.put(dest, dest);
1947        return dest;
1948    }
1949
1950    /**
1951     * @param destination
1952     * @throws JMSException
1953     */
1954    public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
1955
1956        checkClosedOrFailed();
1957
1958        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1959            ActiveMQSession s = i.next();
1960            if (s.isInUse(destination)) {
1961                throw new JMSException("A consumer is consuming from the temporary destination");
1962            }
1963        }
1964
1965        activeTempDestinations.remove(destination);
1966
1967        DestinationInfo destInfo = new DestinationInfo();
1968        destInfo.setConnectionId(this.info.getConnectionId());
1969        destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
1970        destInfo.setDestination(destination);
1971        destInfo.setTimeout(0);
1972        syncSendPacket(destInfo);
1973    }
1974
1975    public boolean isDeleted(ActiveMQDestination dest) {
1976
1977        // If we are not watching the advisories.. then
1978        // we will assume that the temp destination does exist.
1979        if (advisoryConsumer == null) {
1980            return false;
1981        }
1982
1983        return !activeTempDestinations.contains(dest);
1984    }
1985
1986    public boolean isCopyMessageOnSend() {
1987        return copyMessageOnSend;
1988    }
1989
1990    public LongSequenceGenerator getLocalTransactionIdGenerator() {
1991        return localTransactionIdGenerator;
1992    }
1993
1994    public boolean isUseCompression() {
1995        return useCompression;
1996    }
1997
1998    /**
1999     * Enables the use of compression of the message bodies
2000     */
2001    public void setUseCompression(boolean useCompression) {
2002        this.useCompression = useCompression;
2003    }
2004
2005    public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2006
2007        checkClosedOrFailed();
2008        ensureConnectionInfoSent();
2009
2010        DestinationInfo info = new DestinationInfo();
2011        info.setConnectionId(this.info.getConnectionId());
2012        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2013        info.setDestination(destination);
2014        info.setTimeout(0);
2015        syncSendPacket(info);
2016
2017    }
2018
2019    public boolean isDispatchAsync() {
2020        return dispatchAsync;
2021    }
2022
2023    /**
2024     * Enables or disables the default setting of whether or not consumers have
2025     * their messages <a
2026     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2027     * synchronously or asynchronously by the broker</a>. For non-durable
2028     * topics for example we typically dispatch synchronously by default to
2029     * minimize context switches which boost performance. However sometimes its
2030     * better to go slower to ensure that a single blocked consumer socket does
2031     * not block delivery to other consumers.
2032     * 
2033     * @param asyncDispatch If true then consumers created on this connection
2034     *                will default to having their messages dispatched
2035     *                asynchronously. The default value is false.
2036     */
2037    public void setDispatchAsync(boolean asyncDispatch) {
2038        this.dispatchAsync = asyncDispatch;
2039    }
2040
2041    public boolean isObjectMessageSerializationDefered() {
2042        return objectMessageSerializationDefered;
2043    }
2044
2045    /**
2046     * When an object is set on an ObjectMessage, the JMS spec requires the
2047     * object to be serialized by that set method. Enabling this flag causes the
2048     * object to not get serialized. The object may subsequently get serialized
2049     * if the message needs to be sent over a socket or stored to disk.
2050     */
2051    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2052        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2053    }
2054
2055    public InputStream createInputStream(Destination dest) throws JMSException {
2056        return createInputStream(dest, null);
2057    }
2058
2059    public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
2060        return createInputStream(dest, messageSelector, false);
2061    }
2062
2063    public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
2064        return createInputStream(dest, messageSelector, noLocal,  -1);
2065    }
2066
2067
2068
2069    public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2070        return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
2071    }
2072    
2073    public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
2074        return createInputStream(dest, null, false);
2075    }
2076
2077    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
2078        return createDurableInputStream(dest, name, messageSelector, false);
2079    }
2080
2081    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
2082        return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
2083    }
2084
2085    public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2086        return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
2087    }
2088    
2089    private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
2090        checkClosedOrFailed();
2091        ensureConnectionInfoSent();
2092        return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
2093    }
2094
2095    /**
2096     * Creates a persistent output stream; individual messages will be written
2097     * to disk/database by the broker
2098     */
2099    public OutputStream createOutputStream(Destination dest) throws JMSException {
2100        return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2101    }
2102
2103    /**
2104     * Creates a non persistent output stream; messages will not be written to
2105     * disk
2106     */
2107    public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
2108        return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2109    }
2110
2111    /**
2112     * Creates an output stream allowing full control over the delivery mode,
2113     * the priority and time to live of the messages and the properties added to
2114     * messages on the stream.
2115     * 
2116     * @param streamProperties defines a map of key-value pairs where the keys
2117     *                are strings and the values are primitive values (numbers
2118     *                and strings) which are appended to the messages similarly
2119     *                to using the
2120     *                {@link javax.jms.Message#setObjectProperty(String, Object)}
2121     *                method
2122     */
2123    public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
2124        checkClosedOrFailed();
2125        ensureConnectionInfoSent();
2126        return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
2127    }
2128
2129    /**
2130     * Unsubscribes a durable subscription that has been created by a client.
2131     * <P>
2132     * This method deletes the state being maintained on behalf of the
2133     * subscriber by its provider.
2134     * <P>
2135     * It is erroneous for a client to delete a durable subscription while there
2136     * is an active <CODE>MessageConsumer </CODE> or
2137     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2138     * message is part of a pending transaction or has not been acknowledged in
2139     * the session.
2140     * 
2141     * @param name the name used to identify this subscription
2142     * @throws JMSException if the session fails to unsubscribe to the durable
2143     *                 subscription due to some internal error.
2144     * @throws InvalidDestinationException if an invalid subscription name is
2145     *                 specified.
2146     * @since 1.1
2147     */
2148    public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2149        checkClosedOrFailed();
2150        RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2151        rsi.setConnectionId(getConnectionInfo().getConnectionId());
2152        rsi.setSubscriptionName(name);
2153        rsi.setClientId(getConnectionInfo().getClientId());
2154        syncSendPacket(rsi);
2155    }
2156
2157    /**
2158     * Internal send method optimized: - It does not copy the message - It can
2159     * only handle ActiveMQ messages. - You can specify if the send is async or
2160     * sync - Does not allow you to send /w a transaction.
2161     */
2162    void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2163        checkClosedOrFailed();
2164
2165        if (destination.isTemporary() && isDeleted(destination)) {
2166            throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2167        }
2168
2169        msg.setJMSDestination(destination);
2170        msg.setJMSDeliveryMode(deliveryMode);
2171        long expiration = 0L;
2172
2173        if (!isDisableTimeStampsByDefault()) {
2174            long timeStamp = System.currentTimeMillis();
2175            msg.setJMSTimestamp(timeStamp);
2176            if (timeToLive > 0) {
2177                expiration = timeToLive + timeStamp;
2178            }
2179        }
2180
2181        msg.setJMSExpiration(expiration);
2182        msg.setJMSPriority(priority);
2183
2184        msg.setJMSRedelivered(false);
2185        msg.setMessageId(messageId);
2186
2187        msg.onSend();
2188
2189        msg.setProducerId(msg.getMessageId().getProducerId());
2190
2191        if (LOG.isDebugEnabled()) {
2192            LOG.debug("Sending message: " + msg);
2193        }
2194
2195        if (async) {
2196            asyncSendPacket(msg);
2197        } else {
2198            syncSendPacket(msg);
2199        }
2200
2201    }
2202
2203    public void addOutputStream(ActiveMQOutputStream stream) {
2204        outputStreams.add(stream);
2205    }
2206
2207    public void removeOutputStream(ActiveMQOutputStream stream) {
2208        outputStreams.remove(stream);
2209    }
2210
2211    public void addInputStream(ActiveMQInputStream stream) {
2212        inputStreams.add(stream);
2213    }
2214
2215    public void removeInputStream(ActiveMQInputStream stream) {
2216        inputStreams.remove(stream);
2217    }
2218
2219    protected void onControlCommand(ControlCommand command) {
2220        String text = command.getCommand();
2221        if (text != null) {
2222            if ("shutdown".equals(text)) {
2223                LOG.info("JVM told to shutdown");
2224                System.exit(0);
2225            }
2226            if (false && "close".equals(text)){
2227                LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
2228                try {
2229                    close();
2230                } catch (JMSException e) {
2231                }
2232            }
2233        }
2234    }
2235
2236    protected void onConnectionControl(ConnectionControl command) {
2237        if (command.isFaultTolerant()) {
2238            this.optimizeAcknowledge = false;
2239            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2240                ActiveMQSession s = i.next();
2241                s.setOptimizeAcknowledge(false);
2242            }
2243        }
2244    }
2245
2246    protected void onConsumerControl(ConsumerControl command) {
2247        if (command.isClose()) {
2248            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2249                ActiveMQSession s = i.next();
2250                s.close(command.getConsumerId());
2251            }
2252        } else {
2253            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2254                ActiveMQSession s = i.next();
2255                s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2256            }
2257        }
2258    }
2259
2260    protected void transportFailed(IOException error) {
2261        transportFailed.set(true);
2262        if (firstFailureError == null) {
2263            firstFailureError = error;
2264        }
2265    }
2266
2267    /**
2268     * Should a JMS message be copied to a new JMS Message object as part of the
2269     * send() method in JMS. This is enabled by default to be compliant with the
2270     * JMS specification. You can disable it if you do not mutate JMS messages
2271     * after they are sent for a performance boost
2272     */
2273    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2274        this.copyMessageOnSend = copyMessageOnSend;
2275    }
2276
2277    @Override
2278    public String toString() {
2279        return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2280    }
2281
2282    protected BlobTransferPolicy createBlobTransferPolicy() {
2283        return new BlobTransferPolicy();
2284    }
2285
2286    public int getProtocolVersion() {
2287        return protocolVersion.get();
2288    }
2289
2290    public int getProducerWindowSize() {
2291        return producerWindowSize;
2292    }
2293
2294    public void setProducerWindowSize(int producerWindowSize) {
2295        this.producerWindowSize = producerWindowSize;
2296    }
2297
2298    public void setAuditDepth(int auditDepth) {
2299        connectionAudit.setAuditDepth(auditDepth);
2300        }
2301
2302    public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2303        connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2304        }
2305
2306    protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2307        connectionAudit.removeDispatcher(dispatcher);
2308    }
2309
2310    protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2311        return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2312    }
2313
2314    protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2315        connectionAudit.rollbackDuplicate(dispatcher, message);
2316    }
2317
2318        public IOException getFirstFailureError() {
2319                return firstFailureError;
2320        }
2321        
2322        protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2323            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2324            if (cdl != null) {
2325            if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
2326                LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
2327                cdl.await(10, TimeUnit.SECONDS);
2328            }
2329            signalInterruptionProcessingComplete();
2330        }
2331    }
2332        
2333        protected void transportInterruptionProcessingComplete() {
2334            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2335            if (cdl != null) {
2336                cdl.countDown();
2337                try {
2338                    signalInterruptionProcessingComplete();
2339                } catch (InterruptedException ignored) {}
2340            }
2341        }
2342
2343    private void signalInterruptionProcessingComplete() throws InterruptedException {
2344        CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2345        if (cdl.getCount()==0) {
2346            if (LOG.isDebugEnabled()) {
2347                LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
2348            }
2349            this.transportInterruptionProcessingComplete = null;
2350
2351            FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2352            if (failoverTransport != null) {
2353                failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2354                if (LOG.isDebugEnabled()) {
2355                    LOG.debug("notified failover transport (" + failoverTransport
2356                            + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2357                }
2358            }
2359
2360        }
2361    }
2362
2363    private void signalInterruptionProcessingNeeded() {
2364        FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2365        if (failoverTransport != null) {
2366            failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2367            if (LOG.isDebugEnabled()) {
2368                LOG.debug("notified failover transport (" + failoverTransport
2369                        + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2370            }
2371        }
2372    }
2373
2374    /*
2375     * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2376     * will wait to receive re dispatched messages.
2377     * default value is 0 so there is no wait by default.
2378     */
2379    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2380        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2381    }
2382    
2383    public long getConsumerFailoverRedeliveryWaitPeriod() {
2384        return consumerFailoverRedeliveryWaitPeriod;
2385    }
2386    
2387    protected Scheduler getScheduler() {
2388        return this.scheduler;
2389    }
2390    
2391    protected ThreadPoolExecutor getExecutor() {
2392        return this.executor;
2393    }
2394
2395    /**
2396     * @return the checkForDuplicates
2397     */
2398    public boolean isCheckForDuplicates() {
2399        return this.checkForDuplicates;
2400    }
2401
2402    /**
2403     * @param checkForDuplicates the checkForDuplicates to set
2404     */
2405    public void setCheckForDuplicates(boolean checkForDuplicates) {
2406        this.checkForDuplicates = checkForDuplicates;
2407    }
2408
2409}