001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.net.URI;
020import java.net.URISyntaxException;
021import java.util.HashMap;
022import java.util.Map;
023import java.util.Properties;
024import java.util.concurrent.Executor;
025import java.util.concurrent.ScheduledThreadPoolExecutor;
026import java.util.concurrent.ThreadFactory;
027import javax.jms.Connection;
028import javax.jms.ConnectionFactory;
029import javax.jms.ExceptionListener;
030import javax.jms.JMSException;
031import javax.jms.QueueConnection;
032import javax.jms.QueueConnectionFactory;
033import javax.jms.TopicConnection;
034import javax.jms.TopicConnectionFactory;
035import javax.naming.Context;
036import org.apache.activemq.blob.BlobTransferPolicy;
037import org.apache.activemq.jndi.JNDIBaseStorable;
038import org.apache.activemq.management.JMSStatsImpl;
039import org.apache.activemq.management.StatsCapable;
040import org.apache.activemq.management.StatsImpl;
041import org.apache.activemq.transport.Transport;
042import org.apache.activemq.transport.TransportFactory;
043import org.apache.activemq.transport.TransportListener;
044import org.apache.activemq.util.IdGenerator;
045import org.apache.activemq.util.IntrospectionSupport;
046import org.apache.activemq.util.JMSExceptionSupport;
047import org.apache.activemq.util.URISupport;
048import org.apache.activemq.util.URISupport.CompositeData;
049
050/**
051 * A ConnectionFactory is an an Administered object, and is used for creating
052 * Connections. <p/> This class also implements QueueConnectionFactory and
053 * TopicConnectionFactory. You can use this connection to create both
054 * QueueConnections and TopicConnections.
055 * 
056 * 
057 * @see javax.jms.ConnectionFactory
058 */
059public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
060
061    public static final String DEFAULT_BROKER_BIND_URL = "tcp://localhost:61616";
062    public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;
063    public static final String DEFAULT_USER = null;
064    public static final String DEFAULT_PASSWORD = null;
065    public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
066
067    protected static final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
068        public Thread newThread(Runnable run) {
069            Thread thread = new Thread(run);
070            thread.setPriority(ThreadPriorities.INBOUND_CLIENT_CONNECTION);
071            return thread;
072        }
073    });
074
075    protected URI brokerURL;
076    protected String userName;
077    protected String password;
078    protected String clientID;
079    protected boolean dispatchAsync=true;
080    protected boolean alwaysSessionAsync=true;
081
082    JMSStatsImpl factoryStats = new JMSStatsImpl();
083
084    private IdGenerator clientIdGenerator;
085    private String clientIDPrefix;
086
087    // client policies
088    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
089    private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
090    private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
091    private MessageTransformer transformer;
092
093    private boolean disableTimeStampsByDefault;
094    private boolean optimizedMessageDispatch = true;
095    private boolean copyMessageOnSend = true;
096    private boolean useCompression;
097    private boolean objectMessageSerializationDefered;
098    private boolean useAsyncSend;
099    private boolean optimizeAcknowledge;
100    private int closeTimeout = 15000;
101    private boolean useRetroactiveConsumer;
102    private boolean exclusiveConsumer;
103    private boolean nestedMapAndListEnabled = true;
104    private boolean alwaysSyncSend;
105    private boolean watchTopicAdvisories = true;
106    private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
107    private long warnAboutUnstartedConnectionTimeout = 500L;
108    private int sendTimeout = 0;
109    private boolean sendAcksAsync=true;
110    private TransportListener transportListener;
111    private ExceptionListener exceptionListener;
112    private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
113    private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
114    private boolean useDedicatedTaskRunner;
115    private long consumerFailoverRedeliveryWaitPeriod = 0;
116    private boolean checkForDuplicates = true;
117    private ClientInternalExceptionListener clientInternalExceptionListener;
118    private boolean messagePrioritySupported = true;
119
120    // /////////////////////////////////////////////
121    //
122    // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods
123    //
124    // /////////////////////////////////////////////
125
126    public ActiveMQConnectionFactory() {
127        this(DEFAULT_BROKER_URL);
128    }
129
130    public ActiveMQConnectionFactory(String brokerURL) {
131        this(createURI(brokerURL));
132    }
133
134    public ActiveMQConnectionFactory(URI brokerURL) {
135        setBrokerURL(brokerURL.toString());
136    }
137
138    public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
139        setUserName(userName);
140        setPassword(password);
141        setBrokerURL(brokerURL.toString());
142    }
143
144    public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
145        setUserName(userName);
146        setPassword(password);
147        setBrokerURL(brokerURL);
148    }
149
150    /**
151     * Returns a copy of the given connection factory
152     */
153    public ActiveMQConnectionFactory copy() {
154        try {
155            return (ActiveMQConnectionFactory)super.clone();
156        } catch (CloneNotSupportedException e) {
157            throw new RuntimeException("This should never happen: " + e, e);
158        }
159    }
160
161    /**
162     * @param brokerURL
163     * @return
164     * @throws URISyntaxException
165     */
166    private static URI createURI(String brokerURL) {
167        try {
168            return new URI(brokerURL);
169        } catch (URISyntaxException e) {
170            throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e);
171        }
172    }
173
174    /**
175     * @return Returns the Connection.
176     */
177    public Connection createConnection() throws JMSException {
178        return createActiveMQConnection();
179    }
180
181    /**
182     * @return Returns the Connection.
183     */
184    public Connection createConnection(String userName, String password) throws JMSException {
185        return createActiveMQConnection(userName, password);
186    }
187
188    /**
189     * @return Returns the QueueConnection.
190     * @throws JMSException
191     */
192    public QueueConnection createQueueConnection() throws JMSException {
193        return createActiveMQConnection();
194    }
195
196    /**
197     * @return Returns the QueueConnection.
198     */
199    public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
200        return createActiveMQConnection(userName, password);
201    }
202
203    /**
204     * @return Returns the TopicConnection.
205     * @throws JMSException
206     */
207    public TopicConnection createTopicConnection() throws JMSException {
208        return createActiveMQConnection();
209    }
210
211    /**
212     * @return Returns the TopicConnection.
213     */
214    public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
215        return createActiveMQConnection(userName, password);
216    }
217
218    public StatsImpl getStats() {
219        // TODO
220        return null;
221    }
222
223    // /////////////////////////////////////////////
224    //
225    // Implementation methods.
226    //
227    // /////////////////////////////////////////////
228
229    protected ActiveMQConnection createActiveMQConnection() throws JMSException {
230        return createActiveMQConnection(userName, password);
231    }
232
233    /**
234     * Creates a Transport based on this object's connection settings. Separated
235     * from createActiveMQConnection to allow for subclasses to override.
236     * 
237     * @return The newly created Transport.
238     * @throws JMSException If unable to create trasnport.
239     * @author sepandm@gmail.com
240     */
241    protected Transport createTransport() throws JMSException {
242        try {
243            return TransportFactory.connect(brokerURL, DEFAULT_CONNECTION_EXECUTOR);
244        } catch (Exception e) {
245            throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
246        }
247    }
248
249    /**
250     * @return Returns the Connection.
251     */
252    protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
253        if (brokerURL == null) {
254            throw new ConfigurationException("brokerURL not set.");
255        }
256        ActiveMQConnection connection = null;
257        try {
258            Transport transport = createTransport();
259            connection = createActiveMQConnection(transport, factoryStats);
260
261            connection.setUserName(userName);
262            connection.setPassword(password);
263
264            configureConnection(connection);
265
266            transport.start();
267
268            if (clientID != null) {
269                connection.setDefaultClientID(clientID);
270            }
271
272            return connection;
273        } catch (JMSException e) {
274            // Clean up!
275            try {
276                connection.close();
277            } catch (Throwable ignore) {
278            }
279            throw e;
280        } catch (Exception e) {
281            // Clean up!
282            try {
283                connection.close();
284            } catch (Throwable ignore) {
285            }
286            throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
287        }
288    }
289
290    protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
291        ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), stats);
292        return connection;
293    }
294
295    protected void configureConnection(ActiveMQConnection connection) throws JMSException {
296        connection.setPrefetchPolicy(getPrefetchPolicy());
297        connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
298        connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
299        connection.setCopyMessageOnSend(isCopyMessageOnSend());
300        connection.setUseCompression(isUseCompression());
301        connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
302        connection.setDispatchAsync(isDispatchAsync());
303        connection.setUseAsyncSend(isUseAsyncSend());
304        connection.setAlwaysSyncSend(isAlwaysSyncSend());
305        connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
306        connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
307        connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
308        connection.setExclusiveConsumer(isExclusiveConsumer());
309        connection.setRedeliveryPolicy(getRedeliveryPolicy());
310        connection.setTransformer(getTransformer());
311        connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
312        connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
313        connection.setProducerWindowSize(getProducerWindowSize());
314        connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
315        connection.setSendTimeout(getSendTimeout());
316        connection.setCloseTimeout(getCloseTimeout());
317        connection.setSendAcksAsync(isSendAcksAsync());
318        connection.setAuditDepth(getAuditDepth());
319        connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
320        connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
321        connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
322        connection.setCheckForDuplicates(isCheckForDuplicates());
323        connection.setMessagePrioritySupported(isMessagePrioritySupported());
324        if (transportListener != null) {
325            connection.addTransportListener(transportListener);
326        }
327        if (exceptionListener != null) {
328                connection.setExceptionListener(exceptionListener);
329        }
330        if (clientInternalExceptionListener != null) {
331            connection.setClientInternalExceptionListener(clientInternalExceptionListener);
332        }
333    }
334
335    // /////////////////////////////////////////////
336    //
337    // Property Accessors
338    //
339    // /////////////////////////////////////////////
340
341    public String getBrokerURL() {
342        return brokerURL == null ? null : brokerURL.toString();
343    }
344
345    /**
346     * Sets the <a
347     * href="http://activemq.apache.org/configuring-transports.html">connection
348     * URL</a> used to connect to the ActiveMQ broker.
349     */
350    public void setBrokerURL(String brokerURL) {
351        this.brokerURL = createURI(brokerURL);
352
353        // Use all the properties prefixed with 'jms.' to set the connection
354        // factory
355        // options.
356        if (this.brokerURL.getQuery() != null) {
357            // It might be a standard URI or...
358            try {
359
360                Map map = URISupport.parseQuery(this.brokerURL.getQuery());
361                if (buildFromMap(IntrospectionSupport.extractProperties(map, "jms."))) {
362                    this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map);
363                }
364
365            } catch (URISyntaxException e) {
366            }
367
368        } else {
369
370            // It might be a composite URI.
371            try {
372                CompositeData data = URISupport.parseComposite(this.brokerURL);
373                if (buildFromMap(IntrospectionSupport.extractProperties(data.getParameters(), "jms."))) {
374                    this.brokerURL = data.toURI();
375                }
376            } catch (URISyntaxException e) {
377            }
378        }
379    }
380
381    public String getClientID() {
382        return clientID;
383    }
384
385    /**
386     * Sets the JMS clientID to use for the created connection. Note that this
387     * can only be used by one connection at once so generally its a better idea
388     * to set the clientID on a Connection
389     */
390    public void setClientID(String clientID) {
391        this.clientID = clientID;
392    }
393
394    public boolean isCopyMessageOnSend() {
395        return copyMessageOnSend;
396    }
397
398    /**
399     * Should a JMS message be copied to a new JMS Message object as part of the
400     * send() method in JMS. This is enabled by default to be compliant with the
401     * JMS specification. You can disable it if you do not mutate JMS messages
402     * after they are sent for a performance boost
403     */
404    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
405        this.copyMessageOnSend = copyMessageOnSend;
406    }
407
408    public boolean isDisableTimeStampsByDefault() {
409        return disableTimeStampsByDefault;
410    }
411
412    /**
413     * Sets whether or not timestamps on messages should be disabled or not. If
414     * you disable them it adds a small performance boost.
415     */
416    public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
417        this.disableTimeStampsByDefault = disableTimeStampsByDefault;
418    }
419
420    public boolean isOptimizedMessageDispatch() {
421        return optimizedMessageDispatch;
422    }
423
424    /**
425     * If this flag is set then an larger prefetch limit is used - only
426     * applicable for durable topic subscribers.
427     */
428    public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
429        this.optimizedMessageDispatch = optimizedMessageDispatch;
430    }
431
432    public String getPassword() {
433        return password;
434    }
435
436    /**
437     * Sets the JMS password used for connections created from this factory
438     */
439    public void setPassword(String password) {
440        this.password = password;
441    }
442
443    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
444        return prefetchPolicy;
445    }
446
447    /**
448     * Sets the <a
449     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
450     * policy</a> for consumers created by this connection.
451     */
452    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
453        this.prefetchPolicy = prefetchPolicy;
454    }
455
456    public boolean isUseAsyncSend() {
457        return useAsyncSend;
458    }
459
460    public BlobTransferPolicy getBlobTransferPolicy() {
461        return blobTransferPolicy;
462    }
463
464    /**
465     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
466     * OBjects) are transferred from producers to brokers to consumers
467     */
468    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
469        this.blobTransferPolicy = blobTransferPolicy;
470    }
471
472    /**
473     * Forces the use of <a
474     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
475     * adds a massive performance boost; but means that the send() method will
476     * return immediately whether the message has been sent or not which could
477     * lead to message loss.
478     */
479    public void setUseAsyncSend(boolean useAsyncSend) {
480        this.useAsyncSend = useAsyncSend;
481    }
482
483    public synchronized boolean isWatchTopicAdvisories() {
484        return watchTopicAdvisories;
485    }
486
487    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
488        this.watchTopicAdvisories = watchTopicAdvisories;
489    }
490
491    /**
492     * @return true if always sync send messages
493     */
494    public boolean isAlwaysSyncSend() {
495        return this.alwaysSyncSend;
496    }
497
498    /**
499     * Set true if always require messages to be sync sent
500     * 
501     * @param alwaysSyncSend
502     */
503    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
504        this.alwaysSyncSend = alwaysSyncSend;
505    }
506
507    public String getUserName() {
508        return userName;
509    }
510
511    /**
512     * Sets the JMS userName used by connections created by this factory
513     */
514    public void setUserName(String userName) {
515        this.userName = userName;
516    }
517
518    public boolean isUseRetroactiveConsumer() {
519        return useRetroactiveConsumer;
520    }
521
522    /**
523     * Sets whether or not retroactive consumers are enabled. Retroactive
524     * consumers allow non-durable topic subscribers to receive old messages
525     * that were published before the non-durable subscriber started.
526     */
527    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
528        this.useRetroactiveConsumer = useRetroactiveConsumer;
529    }
530
531    public boolean isExclusiveConsumer() {
532        return exclusiveConsumer;
533    }
534
535    /**
536     * Enables or disables whether or not queue consumers should be exclusive or
537     * not for example to preserve ordering when not using <a
538     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
539     * 
540     * @param exclusiveConsumer
541     */
542    public void setExclusiveConsumer(boolean exclusiveConsumer) {
543        this.exclusiveConsumer = exclusiveConsumer;
544    }
545
546    public RedeliveryPolicy getRedeliveryPolicy() {
547        return redeliveryPolicy;
548    }
549
550    /**
551     * Sets the global redelivery policy to be used when a message is delivered
552     * but the session is rolled back
553     */
554    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
555        this.redeliveryPolicy = redeliveryPolicy;
556    }
557
558    public MessageTransformer getTransformer() {
559        return transformer;
560    }
561    
562    /**
563     * @return the sendTimeout
564     */
565    public int getSendTimeout() {
566        return sendTimeout;
567    }
568
569    /**
570     * @param sendTimeout the sendTimeout to set
571     */
572    public void setSendTimeout(int sendTimeout) {
573        this.sendTimeout = sendTimeout;
574    }
575    
576    /**
577     * @return the sendAcksAsync
578     */
579    public boolean isSendAcksAsync() {
580        return sendAcksAsync;
581    }
582
583    /**
584     * @param sendAcksAsync the sendAcksAsync to set
585     */
586    public void setSendAcksAsync(boolean sendAcksAsync) {
587        this.sendAcksAsync = sendAcksAsync;
588    }
589    
590    /**
591     * @return the messagePrioritySupported
592     */
593    public boolean isMessagePrioritySupported() {
594        return this.messagePrioritySupported;
595    }
596
597    /**
598     * @param messagePrioritySupported the messagePrioritySupported to set
599     */
600    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
601        this.messagePrioritySupported = messagePrioritySupported;
602    }
603
604
605    /**
606     * Sets the transformer used to transform messages before they are sent on
607     * to the JMS bus or when they are received from the bus but before they are
608     * delivered to the JMS client
609     */
610    public void setTransformer(MessageTransformer transformer) {
611        this.transformer = transformer;
612    }
613
614    @Override
615    public void buildFromProperties(Properties properties) {
616
617        if (properties == null) {
618            properties = new Properties();
619        }
620
621        String temp = properties.getProperty(Context.PROVIDER_URL);
622        if (temp == null || temp.length() == 0) {
623            temp = properties.getProperty("brokerURL");
624        }
625        if (temp != null && temp.length() > 0) {
626            setBrokerURL(temp);
627        }
628
629        Map<String, Object> p = new HashMap(properties);
630        buildFromMap(p);
631    }
632
633    public boolean buildFromMap(Map<String, Object> properties) {
634        boolean rc = false;
635
636        ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();
637        if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) {
638            setPrefetchPolicy(p);
639            rc = true;
640        }
641
642        RedeliveryPolicy rp = new RedeliveryPolicy();
643        if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) {
644            setRedeliveryPolicy(rp);
645            rc = true;
646        }
647
648        BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
649        if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) {
650            setBlobTransferPolicy(blobTransferPolicy);
651            rc = true;
652        }
653
654        rc |= IntrospectionSupport.setProperties(this, properties);
655
656        return rc;
657    }
658
659    @Override
660    public void populateProperties(Properties props) {
661        props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
662
663        if (getBrokerURL() != null) {
664            props.setProperty(Context.PROVIDER_URL, getBrokerURL());
665            props.setProperty("brokerURL", getBrokerURL());
666        }
667
668        if (getClientID() != null) {
669            props.setProperty("clientID", getClientID());
670        }
671
672        IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy.");
673        IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy.");
674        IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy.");
675
676        props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend()));
677        props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault()));
678        props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered()));
679        props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch()));
680
681        if (getPassword() != null) {
682            props.setProperty("password", getPassword());
683        }
684
685        props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
686        props.setProperty("useCompression", Boolean.toString(isUseCompression()));
687        props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
688        props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
689
690        if (getUserName() != null) {
691            props.setProperty("userName", getUserName());
692        }
693
694        props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
695        props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
696        props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
697        props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
698        props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
699        props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
700        props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
701        props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
702        props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
703        props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
704        props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates()));
705        props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported()));
706    }
707
708    public boolean isUseCompression() {
709        return useCompression;
710    }
711
712    /**
713     * Enables the use of compression of the message bodies
714     */
715    public void setUseCompression(boolean useCompression) {
716        this.useCompression = useCompression;
717    }
718
719    public boolean isObjectMessageSerializationDefered() {
720        return objectMessageSerializationDefered;
721    }
722
723    /**
724     * When an object is set on an ObjectMessage, the JMS spec requires the
725     * object to be serialized by that set method. Enabling this flag causes the
726     * object to not get serialized. The object may subsequently get serialized
727     * if the message needs to be sent over a socket or stored to disk.
728     */
729    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
730        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
731    }
732
733    public boolean isDispatchAsync() {
734        return dispatchAsync;
735    }
736
737    /**
738     * Enables or disables the default setting of whether or not consumers have
739     * their messages <a
740     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
741     * synchronously or asynchronously by the broker</a>. For non-durable
742     * topics for example we typically dispatch synchronously by default to
743     * minimize context switches which boost performance. However sometimes its
744     * better to go slower to ensure that a single blocked consumer socket does
745     * not block delivery to other consumers.
746     * 
747     * @param asyncDispatch If true then consumers created on this connection
748     *                will default to having their messages dispatched
749     *                asynchronously. The default value is false.
750     */
751    public void setDispatchAsync(boolean asyncDispatch) {
752        this.dispatchAsync = asyncDispatch;
753    }
754
755    /**
756     * @return Returns the closeTimeout.
757     */
758    public int getCloseTimeout() {
759        return closeTimeout;
760    }
761
762    /**
763     * Sets the timeout before a close is considered complete. Normally a
764     * close() on a connection waits for confirmation from the broker; this
765     * allows that operation to timeout to save the client hanging if there is
766     * no broker
767     */
768    public void setCloseTimeout(int closeTimeout) {
769        this.closeTimeout = closeTimeout;
770    }
771
772    /**
773     * @return Returns the alwaysSessionAsync.
774     */
775    public boolean isAlwaysSessionAsync() {
776        return alwaysSessionAsync;
777    }
778
779    /**
780     * If this flag is set then a separate thread is not used for dispatching
781     * messages for each Session in the Connection. However, a separate thread
782     * is always used if there is more than one session, or the session isn't in
783     * auto acknowledge or duplicates ok mode
784     */
785    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
786        this.alwaysSessionAsync = alwaysSessionAsync;
787    }
788
789    /**
790     * @return Returns the optimizeAcknowledge.
791     */
792    public boolean isOptimizeAcknowledge() {
793        return optimizeAcknowledge;
794    }
795
796    /**
797     * @param optimizeAcknowledge The optimizeAcknowledge to set.
798     */
799    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
800        this.optimizeAcknowledge = optimizeAcknowledge;
801    }
802
803    public boolean isNestedMapAndListEnabled() {
804        return nestedMapAndListEnabled;
805    }
806
807    /**
808     * Enables/disables whether or not Message properties and MapMessage entries
809     * support <a
810     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
811     * Structures</a> of Map and List objects
812     */
813    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
814        this.nestedMapAndListEnabled = structuredMapsEnabled;
815    }
816
817    public String getClientIDPrefix() {
818        return clientIDPrefix;
819    }
820
821    /**
822     * Sets the prefix used by autogenerated JMS Client ID values which are used
823     * if the JMS client does not explicitly specify on.
824     * 
825     * @param clientIDPrefix
826     */
827    public void setClientIDPrefix(String clientIDPrefix) {
828        this.clientIDPrefix = clientIDPrefix;
829    }
830
831    protected synchronized IdGenerator getClientIdGenerator() {
832        if (clientIdGenerator == null) {
833            if (clientIDPrefix != null) {
834                clientIdGenerator = new IdGenerator(clientIDPrefix);
835            } else {
836                clientIdGenerator = new IdGenerator();
837            }
838        }
839        return clientIdGenerator;
840    }
841
842    protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
843        this.clientIdGenerator = clientIdGenerator;
844    }
845
846    /**
847     * @return the statsEnabled
848     */
849    public boolean isStatsEnabled() {
850        return this.factoryStats.isEnabled();
851    }
852
853    /**
854     * @param statsEnabled the statsEnabled to set
855     */
856    public void setStatsEnabled(boolean statsEnabled) {
857        this.factoryStats.setEnabled(statsEnabled);
858    }
859
860    public synchronized int getProducerWindowSize() {
861        return producerWindowSize;
862    }
863
864    public synchronized void setProducerWindowSize(int producerWindowSize) {
865        this.producerWindowSize = producerWindowSize;
866    }
867
868    public long getWarnAboutUnstartedConnectionTimeout() {
869        return warnAboutUnstartedConnectionTimeout;
870    }
871
872    /**
873     * Enables the timeout from a connection creation to when a warning is
874     * generated if the connection is not properly started via
875     * {@link Connection#start()} and a message is received by a consumer. It is
876     * a very common gotcha to forget to <a
877     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
878     * the connection</a> so this option makes the default case to create a
879     * warning if the user forgets. To disable the warning just set the value to <
880     * 0 (say -1).
881     */
882    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
883        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
884    }
885
886    public TransportListener getTransportListener() {
887        return transportListener;
888    }
889
890    /**
891     * Allows a listener to be configured on the ConnectionFactory so that when this factory is used
892     * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register
893     * a transport listener.
894     *
895     * @param transportListener sets the listener to be registered on all connections
896     * created by this factory
897     */
898    public void setTransportListener(TransportListener transportListener) {
899        this.transportListener = transportListener;
900    }
901    
902    
903    public ExceptionListener getExceptionListener() {
904        return exceptionListener;
905    }
906    
907    /**
908     * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory
909     * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
910     * an exception listener.
911     * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than
912     * on connection (as it will be if more than one connection is subsequently created by this connection factory)
913     * @param exceptionListener sets the exception listener to be registered on all connections
914     * created by this factory
915     */
916    public void setExceptionListener(ExceptionListener exceptionListener) {
917        this.exceptionListener = exceptionListener;
918    }
919
920        public int getAuditDepth() {
921                return auditDepth;
922        }
923
924        public void setAuditDepth(int auditDepth) {
925                this.auditDepth = auditDepth;
926        }
927
928        public int getAuditMaximumProducerNumber() {
929                return auditMaximumProducerNumber;
930        }
931
932        public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
933                this.auditMaximumProducerNumber = auditMaximumProducerNumber;
934        }
935
936    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
937        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
938    }
939    
940    public boolean isUseDedicatedTaskRunner() {
941        return useDedicatedTaskRunner;
942    }
943    
944    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
945        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
946    }
947    
948    public long getConsumerFailoverRedeliveryWaitPeriod() {
949        return consumerFailoverRedeliveryWaitPeriod;
950    }
951
952    public ClientInternalExceptionListener getClientInternalExceptionListener() {
953        return clientInternalExceptionListener;
954    }
955    
956    /**
957     * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory
958     * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
959     * an exception listener.
960     * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than
961     * on connection (as it will be if more than one connection is subsequently created by this connection factory)
962     * @param clientInternalExceptionListener sets the exception listener to be registered on all connections
963     * created by this factory
964     */
965    public void setClientInternalExceptionListener(
966            ClientInternalExceptionListener clientInternalExceptionListener) {
967        this.clientInternalExceptionListener = clientInternalExceptionListener;
968    }
969
970    /**
971     * @return the checkForDuplicates
972     */
973    public boolean isCheckForDuplicates() {
974        return this.checkForDuplicates;
975    }
976
977    /**
978     * @param checkForDuplicates the checkForDuplicates to set
979     */
980    public void setCheckForDuplicates(boolean checkForDuplicates) {
981        this.checkForDuplicates = checkForDuplicates;
982    }
983}