001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.net.URI; 020import java.net.URISyntaxException; 021import java.util.HashMap; 022import java.util.Map; 023import java.util.Properties; 024import java.util.concurrent.Executor; 025import java.util.concurrent.ScheduledThreadPoolExecutor; 026import java.util.concurrent.ThreadFactory; 027import javax.jms.Connection; 028import javax.jms.ConnectionFactory; 029import javax.jms.ExceptionListener; 030import javax.jms.JMSException; 031import javax.jms.QueueConnection; 032import javax.jms.QueueConnectionFactory; 033import javax.jms.TopicConnection; 034import javax.jms.TopicConnectionFactory; 035import javax.naming.Context; 036import org.apache.activemq.blob.BlobTransferPolicy; 037import org.apache.activemq.jndi.JNDIBaseStorable; 038import org.apache.activemq.management.JMSStatsImpl; 039import org.apache.activemq.management.StatsCapable; 040import org.apache.activemq.management.StatsImpl; 041import org.apache.activemq.transport.Transport; 042import org.apache.activemq.transport.TransportFactory; 043import org.apache.activemq.transport.TransportListener; 044import org.apache.activemq.util.IdGenerator; 045import org.apache.activemq.util.IntrospectionSupport; 046import org.apache.activemq.util.JMSExceptionSupport; 047import org.apache.activemq.util.URISupport; 048import org.apache.activemq.util.URISupport.CompositeData; 049 050/** 051 * A ConnectionFactory is an an Administered object, and is used for creating 052 * Connections. <p/> This class also implements QueueConnectionFactory and 053 * TopicConnectionFactory. You can use this connection to create both 054 * QueueConnections and TopicConnections. 055 * 056 * 057 * @see javax.jms.ConnectionFactory 058 */ 059public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable { 060 061 public static final String DEFAULT_BROKER_BIND_URL = "tcp://localhost:61616"; 062 public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL; 063 public static final String DEFAULT_USER = null; 064 public static final String DEFAULT_PASSWORD = null; 065 public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0; 066 067 protected static final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { 068 public Thread newThread(Runnable run) { 069 Thread thread = new Thread(run); 070 thread.setPriority(ThreadPriorities.INBOUND_CLIENT_CONNECTION); 071 return thread; 072 } 073 }); 074 075 protected URI brokerURL; 076 protected String userName; 077 protected String password; 078 protected String clientID; 079 protected boolean dispatchAsync=true; 080 protected boolean alwaysSessionAsync=true; 081 082 JMSStatsImpl factoryStats = new JMSStatsImpl(); 083 084 private IdGenerator clientIdGenerator; 085 private String clientIDPrefix; 086 087 // client policies 088 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 089 private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 090 private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); 091 private MessageTransformer transformer; 092 093 private boolean disableTimeStampsByDefault; 094 private boolean optimizedMessageDispatch = true; 095 private boolean copyMessageOnSend = true; 096 private boolean useCompression; 097 private boolean objectMessageSerializationDefered; 098 private boolean useAsyncSend; 099 private boolean optimizeAcknowledge; 100 private int closeTimeout = 15000; 101 private boolean useRetroactiveConsumer; 102 private boolean exclusiveConsumer; 103 private boolean nestedMapAndListEnabled = true; 104 private boolean alwaysSyncSend; 105 private boolean watchTopicAdvisories = true; 106 private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE; 107 private long warnAboutUnstartedConnectionTimeout = 500L; 108 private int sendTimeout = 0; 109 private boolean sendAcksAsync=true; 110 private TransportListener transportListener; 111 private ExceptionListener exceptionListener; 112 private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; 113 private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; 114 private boolean useDedicatedTaskRunner; 115 private long consumerFailoverRedeliveryWaitPeriod = 0; 116 private boolean checkForDuplicates = true; 117 private ClientInternalExceptionListener clientInternalExceptionListener; 118 private boolean messagePrioritySupported = true; 119 120 // ///////////////////////////////////////////// 121 // 122 // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods 123 // 124 // ///////////////////////////////////////////// 125 126 public ActiveMQConnectionFactory() { 127 this(DEFAULT_BROKER_URL); 128 } 129 130 public ActiveMQConnectionFactory(String brokerURL) { 131 this(createURI(brokerURL)); 132 } 133 134 public ActiveMQConnectionFactory(URI brokerURL) { 135 setBrokerURL(brokerURL.toString()); 136 } 137 138 public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) { 139 setUserName(userName); 140 setPassword(password); 141 setBrokerURL(brokerURL.toString()); 142 } 143 144 public ActiveMQConnectionFactory(String userName, String password, String brokerURL) { 145 setUserName(userName); 146 setPassword(password); 147 setBrokerURL(brokerURL); 148 } 149 150 /** 151 * Returns a copy of the given connection factory 152 */ 153 public ActiveMQConnectionFactory copy() { 154 try { 155 return (ActiveMQConnectionFactory)super.clone(); 156 } catch (CloneNotSupportedException e) { 157 throw new RuntimeException("This should never happen: " + e, e); 158 } 159 } 160 161 /** 162 * @param brokerURL 163 * @return 164 * @throws URISyntaxException 165 */ 166 private static URI createURI(String brokerURL) { 167 try { 168 return new URI(brokerURL); 169 } catch (URISyntaxException e) { 170 throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e); 171 } 172 } 173 174 /** 175 * @return Returns the Connection. 176 */ 177 public Connection createConnection() throws JMSException { 178 return createActiveMQConnection(); 179 } 180 181 /** 182 * @return Returns the Connection. 183 */ 184 public Connection createConnection(String userName, String password) throws JMSException { 185 return createActiveMQConnection(userName, password); 186 } 187 188 /** 189 * @return Returns the QueueConnection. 190 * @throws JMSException 191 */ 192 public QueueConnection createQueueConnection() throws JMSException { 193 return createActiveMQConnection(); 194 } 195 196 /** 197 * @return Returns the QueueConnection. 198 */ 199 public QueueConnection createQueueConnection(String userName, String password) throws JMSException { 200 return createActiveMQConnection(userName, password); 201 } 202 203 /** 204 * @return Returns the TopicConnection. 205 * @throws JMSException 206 */ 207 public TopicConnection createTopicConnection() throws JMSException { 208 return createActiveMQConnection(); 209 } 210 211 /** 212 * @return Returns the TopicConnection. 213 */ 214 public TopicConnection createTopicConnection(String userName, String password) throws JMSException { 215 return createActiveMQConnection(userName, password); 216 } 217 218 public StatsImpl getStats() { 219 // TODO 220 return null; 221 } 222 223 // ///////////////////////////////////////////// 224 // 225 // Implementation methods. 226 // 227 // ///////////////////////////////////////////// 228 229 protected ActiveMQConnection createActiveMQConnection() throws JMSException { 230 return createActiveMQConnection(userName, password); 231 } 232 233 /** 234 * Creates a Transport based on this object's connection settings. Separated 235 * from createActiveMQConnection to allow for subclasses to override. 236 * 237 * @return The newly created Transport. 238 * @throws JMSException If unable to create trasnport. 239 * @author sepandm@gmail.com 240 */ 241 protected Transport createTransport() throws JMSException { 242 try { 243 return TransportFactory.connect(brokerURL, DEFAULT_CONNECTION_EXECUTOR); 244 } catch (Exception e) { 245 throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e); 246 } 247 } 248 249 /** 250 * @return Returns the Connection. 251 */ 252 protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { 253 if (brokerURL == null) { 254 throw new ConfigurationException("brokerURL not set."); 255 } 256 ActiveMQConnection connection = null; 257 try { 258 Transport transport = createTransport(); 259 connection = createActiveMQConnection(transport, factoryStats); 260 261 connection.setUserName(userName); 262 connection.setPassword(password); 263 264 configureConnection(connection); 265 266 transport.start(); 267 268 if (clientID != null) { 269 connection.setDefaultClientID(clientID); 270 } 271 272 return connection; 273 } catch (JMSException e) { 274 // Clean up! 275 try { 276 connection.close(); 277 } catch (Throwable ignore) { 278 } 279 throw e; 280 } catch (Exception e) { 281 // Clean up! 282 try { 283 connection.close(); 284 } catch (Throwable ignore) { 285 } 286 throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e); 287 } 288 } 289 290 protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { 291 ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), stats); 292 return connection; 293 } 294 295 protected void configureConnection(ActiveMQConnection connection) throws JMSException { 296 connection.setPrefetchPolicy(getPrefetchPolicy()); 297 connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault()); 298 connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch()); 299 connection.setCopyMessageOnSend(isCopyMessageOnSend()); 300 connection.setUseCompression(isUseCompression()); 301 connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered()); 302 connection.setDispatchAsync(isDispatchAsync()); 303 connection.setUseAsyncSend(isUseAsyncSend()); 304 connection.setAlwaysSyncSend(isAlwaysSyncSend()); 305 connection.setAlwaysSessionAsync(isAlwaysSessionAsync()); 306 connection.setOptimizeAcknowledge(isOptimizeAcknowledge()); 307 connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer()); 308 connection.setExclusiveConsumer(isExclusiveConsumer()); 309 connection.setRedeliveryPolicy(getRedeliveryPolicy()); 310 connection.setTransformer(getTransformer()); 311 connection.setBlobTransferPolicy(getBlobTransferPolicy().copy()); 312 connection.setWatchTopicAdvisories(isWatchTopicAdvisories()); 313 connection.setProducerWindowSize(getProducerWindowSize()); 314 connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout()); 315 connection.setSendTimeout(getSendTimeout()); 316 connection.setCloseTimeout(getCloseTimeout()); 317 connection.setSendAcksAsync(isSendAcksAsync()); 318 connection.setAuditDepth(getAuditDepth()); 319 connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber()); 320 connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner()); 321 connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod()); 322 connection.setCheckForDuplicates(isCheckForDuplicates()); 323 connection.setMessagePrioritySupported(isMessagePrioritySupported()); 324 if (transportListener != null) { 325 connection.addTransportListener(transportListener); 326 } 327 if (exceptionListener != null) { 328 connection.setExceptionListener(exceptionListener); 329 } 330 if (clientInternalExceptionListener != null) { 331 connection.setClientInternalExceptionListener(clientInternalExceptionListener); 332 } 333 } 334 335 // ///////////////////////////////////////////// 336 // 337 // Property Accessors 338 // 339 // ///////////////////////////////////////////// 340 341 public String getBrokerURL() { 342 return brokerURL == null ? null : brokerURL.toString(); 343 } 344 345 /** 346 * Sets the <a 347 * href="http://activemq.apache.org/configuring-transports.html">connection 348 * URL</a> used to connect to the ActiveMQ broker. 349 */ 350 public void setBrokerURL(String brokerURL) { 351 this.brokerURL = createURI(brokerURL); 352 353 // Use all the properties prefixed with 'jms.' to set the connection 354 // factory 355 // options. 356 if (this.brokerURL.getQuery() != null) { 357 // It might be a standard URI or... 358 try { 359 360 Map map = URISupport.parseQuery(this.brokerURL.getQuery()); 361 if (buildFromMap(IntrospectionSupport.extractProperties(map, "jms."))) { 362 this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map); 363 } 364 365 } catch (URISyntaxException e) { 366 } 367 368 } else { 369 370 // It might be a composite URI. 371 try { 372 CompositeData data = URISupport.parseComposite(this.brokerURL); 373 if (buildFromMap(IntrospectionSupport.extractProperties(data.getParameters(), "jms."))) { 374 this.brokerURL = data.toURI(); 375 } 376 } catch (URISyntaxException e) { 377 } 378 } 379 } 380 381 public String getClientID() { 382 return clientID; 383 } 384 385 /** 386 * Sets the JMS clientID to use for the created connection. Note that this 387 * can only be used by one connection at once so generally its a better idea 388 * to set the clientID on a Connection 389 */ 390 public void setClientID(String clientID) { 391 this.clientID = clientID; 392 } 393 394 public boolean isCopyMessageOnSend() { 395 return copyMessageOnSend; 396 } 397 398 /** 399 * Should a JMS message be copied to a new JMS Message object as part of the 400 * send() method in JMS. This is enabled by default to be compliant with the 401 * JMS specification. You can disable it if you do not mutate JMS messages 402 * after they are sent for a performance boost 403 */ 404 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 405 this.copyMessageOnSend = copyMessageOnSend; 406 } 407 408 public boolean isDisableTimeStampsByDefault() { 409 return disableTimeStampsByDefault; 410 } 411 412 /** 413 * Sets whether or not timestamps on messages should be disabled or not. If 414 * you disable them it adds a small performance boost. 415 */ 416 public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) { 417 this.disableTimeStampsByDefault = disableTimeStampsByDefault; 418 } 419 420 public boolean isOptimizedMessageDispatch() { 421 return optimizedMessageDispatch; 422 } 423 424 /** 425 * If this flag is set then an larger prefetch limit is used - only 426 * applicable for durable topic subscribers. 427 */ 428 public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) { 429 this.optimizedMessageDispatch = optimizedMessageDispatch; 430 } 431 432 public String getPassword() { 433 return password; 434 } 435 436 /** 437 * Sets the JMS password used for connections created from this factory 438 */ 439 public void setPassword(String password) { 440 this.password = password; 441 } 442 443 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 444 return prefetchPolicy; 445 } 446 447 /** 448 * Sets the <a 449 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch 450 * policy</a> for consumers created by this connection. 451 */ 452 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 453 this.prefetchPolicy = prefetchPolicy; 454 } 455 456 public boolean isUseAsyncSend() { 457 return useAsyncSend; 458 } 459 460 public BlobTransferPolicy getBlobTransferPolicy() { 461 return blobTransferPolicy; 462 } 463 464 /** 465 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 466 * OBjects) are transferred from producers to brokers to consumers 467 */ 468 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 469 this.blobTransferPolicy = blobTransferPolicy; 470 } 471 472 /** 473 * Forces the use of <a 474 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which 475 * adds a massive performance boost; but means that the send() method will 476 * return immediately whether the message has been sent or not which could 477 * lead to message loss. 478 */ 479 public void setUseAsyncSend(boolean useAsyncSend) { 480 this.useAsyncSend = useAsyncSend; 481 } 482 483 public synchronized boolean isWatchTopicAdvisories() { 484 return watchTopicAdvisories; 485 } 486 487 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { 488 this.watchTopicAdvisories = watchTopicAdvisories; 489 } 490 491 /** 492 * @return true if always sync send messages 493 */ 494 public boolean isAlwaysSyncSend() { 495 return this.alwaysSyncSend; 496 } 497 498 /** 499 * Set true if always require messages to be sync sent 500 * 501 * @param alwaysSyncSend 502 */ 503 public void setAlwaysSyncSend(boolean alwaysSyncSend) { 504 this.alwaysSyncSend = alwaysSyncSend; 505 } 506 507 public String getUserName() { 508 return userName; 509 } 510 511 /** 512 * Sets the JMS userName used by connections created by this factory 513 */ 514 public void setUserName(String userName) { 515 this.userName = userName; 516 } 517 518 public boolean isUseRetroactiveConsumer() { 519 return useRetroactiveConsumer; 520 } 521 522 /** 523 * Sets whether or not retroactive consumers are enabled. Retroactive 524 * consumers allow non-durable topic subscribers to receive old messages 525 * that were published before the non-durable subscriber started. 526 */ 527 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) { 528 this.useRetroactiveConsumer = useRetroactiveConsumer; 529 } 530 531 public boolean isExclusiveConsumer() { 532 return exclusiveConsumer; 533 } 534 535 /** 536 * Enables or disables whether or not queue consumers should be exclusive or 537 * not for example to preserve ordering when not using <a 538 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 539 * 540 * @param exclusiveConsumer 541 */ 542 public void setExclusiveConsumer(boolean exclusiveConsumer) { 543 this.exclusiveConsumer = exclusiveConsumer; 544 } 545 546 public RedeliveryPolicy getRedeliveryPolicy() { 547 return redeliveryPolicy; 548 } 549 550 /** 551 * Sets the global redelivery policy to be used when a message is delivered 552 * but the session is rolled back 553 */ 554 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 555 this.redeliveryPolicy = redeliveryPolicy; 556 } 557 558 public MessageTransformer getTransformer() { 559 return transformer; 560 } 561 562 /** 563 * @return the sendTimeout 564 */ 565 public int getSendTimeout() { 566 return sendTimeout; 567 } 568 569 /** 570 * @param sendTimeout the sendTimeout to set 571 */ 572 public void setSendTimeout(int sendTimeout) { 573 this.sendTimeout = sendTimeout; 574 } 575 576 /** 577 * @return the sendAcksAsync 578 */ 579 public boolean isSendAcksAsync() { 580 return sendAcksAsync; 581 } 582 583 /** 584 * @param sendAcksAsync the sendAcksAsync to set 585 */ 586 public void setSendAcksAsync(boolean sendAcksAsync) { 587 this.sendAcksAsync = sendAcksAsync; 588 } 589 590 /** 591 * @return the messagePrioritySupported 592 */ 593 public boolean isMessagePrioritySupported() { 594 return this.messagePrioritySupported; 595 } 596 597 /** 598 * @param messagePrioritySupported the messagePrioritySupported to set 599 */ 600 public void setMessagePrioritySupported(boolean messagePrioritySupported) { 601 this.messagePrioritySupported = messagePrioritySupported; 602 } 603 604 605 /** 606 * Sets the transformer used to transform messages before they are sent on 607 * to the JMS bus or when they are received from the bus but before they are 608 * delivered to the JMS client 609 */ 610 public void setTransformer(MessageTransformer transformer) { 611 this.transformer = transformer; 612 } 613 614 @Override 615 public void buildFromProperties(Properties properties) { 616 617 if (properties == null) { 618 properties = new Properties(); 619 } 620 621 String temp = properties.getProperty(Context.PROVIDER_URL); 622 if (temp == null || temp.length() == 0) { 623 temp = properties.getProperty("brokerURL"); 624 } 625 if (temp != null && temp.length() > 0) { 626 setBrokerURL(temp); 627 } 628 629 Map<String, Object> p = new HashMap(properties); 630 buildFromMap(p); 631 } 632 633 public boolean buildFromMap(Map<String, Object> properties) { 634 boolean rc = false; 635 636 ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy(); 637 if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) { 638 setPrefetchPolicy(p); 639 rc = true; 640 } 641 642 RedeliveryPolicy rp = new RedeliveryPolicy(); 643 if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) { 644 setRedeliveryPolicy(rp); 645 rc = true; 646 } 647 648 BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); 649 if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) { 650 setBlobTransferPolicy(blobTransferPolicy); 651 rc = true; 652 } 653 654 rc |= IntrospectionSupport.setProperties(this, properties); 655 656 return rc; 657 } 658 659 @Override 660 public void populateProperties(Properties props) { 661 props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync())); 662 663 if (getBrokerURL() != null) { 664 props.setProperty(Context.PROVIDER_URL, getBrokerURL()); 665 props.setProperty("brokerURL", getBrokerURL()); 666 } 667 668 if (getClientID() != null) { 669 props.setProperty("clientID", getClientID()); 670 } 671 672 IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy."); 673 IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy."); 674 IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy."); 675 676 props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend())); 677 props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault())); 678 props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered())); 679 props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch())); 680 681 if (getPassword() != null) { 682 props.setProperty("password", getPassword()); 683 } 684 685 props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend())); 686 props.setProperty("useCompression", Boolean.toString(isUseCompression())); 687 props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer())); 688 props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories())); 689 690 if (getUserName() != null) { 691 props.setProperty("userName", getUserName()); 692 } 693 694 props.setProperty("closeTimeout", Integer.toString(getCloseTimeout())); 695 props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync())); 696 props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge())); 697 props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled())); 698 props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend())); 699 props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize())); 700 props.setProperty("sendTimeout", Integer.toString(getSendTimeout())); 701 props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync())); 702 props.setProperty("auditDepth", Integer.toString(getAuditDepth())); 703 props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber())); 704 props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates())); 705 props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported())); 706 } 707 708 public boolean isUseCompression() { 709 return useCompression; 710 } 711 712 /** 713 * Enables the use of compression of the message bodies 714 */ 715 public void setUseCompression(boolean useCompression) { 716 this.useCompression = useCompression; 717 } 718 719 public boolean isObjectMessageSerializationDefered() { 720 return objectMessageSerializationDefered; 721 } 722 723 /** 724 * When an object is set on an ObjectMessage, the JMS spec requires the 725 * object to be serialized by that set method. Enabling this flag causes the 726 * object to not get serialized. The object may subsequently get serialized 727 * if the message needs to be sent over a socket or stored to disk. 728 */ 729 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) { 730 this.objectMessageSerializationDefered = objectMessageSerializationDefered; 731 } 732 733 public boolean isDispatchAsync() { 734 return dispatchAsync; 735 } 736 737 /** 738 * Enables or disables the default setting of whether or not consumers have 739 * their messages <a 740 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched 741 * synchronously or asynchronously by the broker</a>. For non-durable 742 * topics for example we typically dispatch synchronously by default to 743 * minimize context switches which boost performance. However sometimes its 744 * better to go slower to ensure that a single blocked consumer socket does 745 * not block delivery to other consumers. 746 * 747 * @param asyncDispatch If true then consumers created on this connection 748 * will default to having their messages dispatched 749 * asynchronously. The default value is false. 750 */ 751 public void setDispatchAsync(boolean asyncDispatch) { 752 this.dispatchAsync = asyncDispatch; 753 } 754 755 /** 756 * @return Returns the closeTimeout. 757 */ 758 public int getCloseTimeout() { 759 return closeTimeout; 760 } 761 762 /** 763 * Sets the timeout before a close is considered complete. Normally a 764 * close() on a connection waits for confirmation from the broker; this 765 * allows that operation to timeout to save the client hanging if there is 766 * no broker 767 */ 768 public void setCloseTimeout(int closeTimeout) { 769 this.closeTimeout = closeTimeout; 770 } 771 772 /** 773 * @return Returns the alwaysSessionAsync. 774 */ 775 public boolean isAlwaysSessionAsync() { 776 return alwaysSessionAsync; 777 } 778 779 /** 780 * If this flag is set then a separate thread is not used for dispatching 781 * messages for each Session in the Connection. However, a separate thread 782 * is always used if there is more than one session, or the session isn't in 783 * auto acknowledge or duplicates ok mode 784 */ 785 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) { 786 this.alwaysSessionAsync = alwaysSessionAsync; 787 } 788 789 /** 790 * @return Returns the optimizeAcknowledge. 791 */ 792 public boolean isOptimizeAcknowledge() { 793 return optimizeAcknowledge; 794 } 795 796 /** 797 * @param optimizeAcknowledge The optimizeAcknowledge to set. 798 */ 799 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) { 800 this.optimizeAcknowledge = optimizeAcknowledge; 801 } 802 803 public boolean isNestedMapAndListEnabled() { 804 return nestedMapAndListEnabled; 805 } 806 807 /** 808 * Enables/disables whether or not Message properties and MapMessage entries 809 * support <a 810 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested 811 * Structures</a> of Map and List objects 812 */ 813 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { 814 this.nestedMapAndListEnabled = structuredMapsEnabled; 815 } 816 817 public String getClientIDPrefix() { 818 return clientIDPrefix; 819 } 820 821 /** 822 * Sets the prefix used by autogenerated JMS Client ID values which are used 823 * if the JMS client does not explicitly specify on. 824 * 825 * @param clientIDPrefix 826 */ 827 public void setClientIDPrefix(String clientIDPrefix) { 828 this.clientIDPrefix = clientIDPrefix; 829 } 830 831 protected synchronized IdGenerator getClientIdGenerator() { 832 if (clientIdGenerator == null) { 833 if (clientIDPrefix != null) { 834 clientIdGenerator = new IdGenerator(clientIDPrefix); 835 } else { 836 clientIdGenerator = new IdGenerator(); 837 } 838 } 839 return clientIdGenerator; 840 } 841 842 protected void setClientIdGenerator(IdGenerator clientIdGenerator) { 843 this.clientIdGenerator = clientIdGenerator; 844 } 845 846 /** 847 * @return the statsEnabled 848 */ 849 public boolean isStatsEnabled() { 850 return this.factoryStats.isEnabled(); 851 } 852 853 /** 854 * @param statsEnabled the statsEnabled to set 855 */ 856 public void setStatsEnabled(boolean statsEnabled) { 857 this.factoryStats.setEnabled(statsEnabled); 858 } 859 860 public synchronized int getProducerWindowSize() { 861 return producerWindowSize; 862 } 863 864 public synchronized void setProducerWindowSize(int producerWindowSize) { 865 this.producerWindowSize = producerWindowSize; 866 } 867 868 public long getWarnAboutUnstartedConnectionTimeout() { 869 return warnAboutUnstartedConnectionTimeout; 870 } 871 872 /** 873 * Enables the timeout from a connection creation to when a warning is 874 * generated if the connection is not properly started via 875 * {@link Connection#start()} and a message is received by a consumer. It is 876 * a very common gotcha to forget to <a 877 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start 878 * the connection</a> so this option makes the default case to create a 879 * warning if the user forgets. To disable the warning just set the value to < 880 * 0 (say -1). 881 */ 882 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) { 883 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; 884 } 885 886 public TransportListener getTransportListener() { 887 return transportListener; 888 } 889 890 /** 891 * Allows a listener to be configured on the ConnectionFactory so that when this factory is used 892 * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register 893 * a transport listener. 894 * 895 * @param transportListener sets the listener to be registered on all connections 896 * created by this factory 897 */ 898 public void setTransportListener(TransportListener transportListener) { 899 this.transportListener = transportListener; 900 } 901 902 903 public ExceptionListener getExceptionListener() { 904 return exceptionListener; 905 } 906 907 /** 908 * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory 909 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register 910 * an exception listener. 911 * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than 912 * on connection (as it will be if more than one connection is subsequently created by this connection factory) 913 * @param exceptionListener sets the exception listener to be registered on all connections 914 * created by this factory 915 */ 916 public void setExceptionListener(ExceptionListener exceptionListener) { 917 this.exceptionListener = exceptionListener; 918 } 919 920 public int getAuditDepth() { 921 return auditDepth; 922 } 923 924 public void setAuditDepth(int auditDepth) { 925 this.auditDepth = auditDepth; 926 } 927 928 public int getAuditMaximumProducerNumber() { 929 return auditMaximumProducerNumber; 930 } 931 932 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { 933 this.auditMaximumProducerNumber = auditMaximumProducerNumber; 934 } 935 936 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 937 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 938 } 939 940 public boolean isUseDedicatedTaskRunner() { 941 return useDedicatedTaskRunner; 942 } 943 944 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) { 945 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod; 946 } 947 948 public long getConsumerFailoverRedeliveryWaitPeriod() { 949 return consumerFailoverRedeliveryWaitPeriod; 950 } 951 952 public ClientInternalExceptionListener getClientInternalExceptionListener() { 953 return clientInternalExceptionListener; 954 } 955 956 /** 957 * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory 958 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register 959 * an exception listener. 960 * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than 961 * on connection (as it will be if more than one connection is subsequently created by this connection factory) 962 * @param clientInternalExceptionListener sets the exception listener to be registered on all connections 963 * created by this factory 964 */ 965 public void setClientInternalExceptionListener( 966 ClientInternalExceptionListener clientInternalExceptionListener) { 967 this.clientInternalExceptionListener = clientInternalExceptionListener; 968 } 969 970 /** 971 * @return the checkForDuplicates 972 */ 973 public boolean isCheckForDuplicates() { 974 return this.checkForDuplicates; 975 } 976 977 /** 978 * @param checkForDuplicates the checkForDuplicates to set 979 */ 980 public void setCheckForDuplicates(boolean checkForDuplicates) { 981 this.checkForDuplicates = checkForDuplicates; 982 } 983}