001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.pool; 018 019import java.util.HashMap; 020import java.util.Iterator; 021import java.util.LinkedList; 022import java.util.Map; 023import java.util.concurrent.atomic.AtomicBoolean; 024import javax.jms.Connection; 025import javax.jms.ConnectionFactory; 026import javax.jms.JMSException; 027import org.apache.activemq.ActiveMQConnection; 028import org.apache.activemq.ActiveMQConnectionFactory; 029import org.apache.activemq.Service; 030import org.apache.activemq.util.IOExceptionSupport; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033import org.apache.commons.pool.ObjectPoolFactory; 034import org.apache.commons.pool.impl.GenericObjectPoolFactory; 035 036/** 037 * A JMS provider which pools Connection, Session and MessageProducer instances 038 * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's <a 039 * href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>. 040 * Connections, sessions and producers are returned to a pool after use so that they can be reused later 041 * without having to undergo the cost of creating them again. 042 * 043 * b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers, 044 * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which 045 * are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually 046 * just created at startup and left active, handling incoming messages as they come. When a consumer is 047 * complete, it is best to close it rather than return it to a pool for later reuse: this is because, 048 * even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer, 049 * where they'll get held until the consumer is active again. 050 * 051 * If you are creating a collection of consumers (for example, for multi-threaded message consumption), you 052 * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that 053 * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail: 054 * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html 055 * 056 * @org.apache.xbean.XBean element="pooledConnectionFactory" 057 * 058 * 059 */ 060public class PooledConnectionFactory implements ConnectionFactory, Service { 061 private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class); 062 private ConnectionFactory connectionFactory; 063 private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>(); 064 private ObjectPoolFactory poolFactory; 065 private int maximumActive = 500; 066 private int maxConnections = 1; 067 private int idleTimeout = 30 * 1000; 068 private AtomicBoolean stopped = new AtomicBoolean(false); 069 private long expiryTimeout = 0l; 070 071 public PooledConnectionFactory() { 072 this(new ActiveMQConnectionFactory()); 073 } 074 075 public PooledConnectionFactory(String brokerURL) { 076 this(new ActiveMQConnectionFactory(brokerURL)); 077 } 078 079 public PooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) { 080 this.connectionFactory = connectionFactory; 081 } 082 083 public ConnectionFactory getConnectionFactory() { 084 return connectionFactory; 085 } 086 087 public void setConnectionFactory(ConnectionFactory connectionFactory) { 088 this.connectionFactory = connectionFactory; 089 } 090 091 public Connection createConnection() throws JMSException { 092 return createConnection(null, null); 093 } 094 095 public synchronized Connection createConnection(String userName, String password) throws JMSException { 096 if (stopped.get()) { 097 LOG.debug("PooledConnectionFactory is stopped, skip create new connection."); 098 return null; 099 } 100 101 ConnectionKey key = new ConnectionKey(userName, password); 102 LinkedList<ConnectionPool> pools = cache.get(key); 103 104 if (pools == null) { 105 pools = new LinkedList<ConnectionPool>(); 106 cache.put(key, pools); 107 } 108 109 ConnectionPool connection = null; 110 if (pools.size() == maxConnections) { 111 connection = pools.removeFirst(); 112 } 113 114 // Now.. we might get a connection, but it might be that we need to 115 // dump it.. 116 if (connection != null && connection.expiredCheck()) { 117 connection = null; 118 } 119 120 if (connection == null) { 121 ActiveMQConnection delegate = createConnection(key); 122 connection = createConnectionPool(delegate); 123 } 124 pools.add(connection); 125 return new PooledConnection(connection); 126 } 127 128 protected ConnectionPool createConnectionPool(ActiveMQConnection connection) { 129 ConnectionPool result = new ConnectionPool(connection, getPoolFactory()); 130 result.setIdleTimeout(getIdleTimeout()); 131 result.setExpiryTimeout(getExpiryTimeout()); 132 return result; 133 } 134 135 protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException { 136 if (key.getUserName() == null && key.getPassword() == null) { 137 return (ActiveMQConnection)connectionFactory.createConnection(); 138 } else { 139 return (ActiveMQConnection)connectionFactory.createConnection(key.getUserName(), key.getPassword()); 140 } 141 } 142 143 /** 144 * @see org.apache.activemq.service.Service#start() 145 */ 146 public void start() { 147 try { 148 stopped.set(false); 149 createConnection(); 150 } catch (JMSException e) { 151 LOG.warn("Create pooled connection during start failed.", e); 152 IOExceptionSupport.create(e); 153 } 154 } 155 156 public void stop() { 157 LOG.debug("Stop the PooledConnectionFactory, number of connections in cache: "+cache.size()); 158 stopped.set(true); 159 for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) { 160 for (ConnectionPool connection : iter.next()) { 161 try { 162 connection.close(); 163 }catch(Exception e) { 164 LOG.warn("Close connection failed",e); 165 } 166 } 167 } 168 cache.clear(); 169 } 170 171 public ObjectPoolFactory getPoolFactory() { 172 if (poolFactory == null) { 173 poolFactory = createPoolFactory(); 174 } 175 return poolFactory; 176 } 177 178 /** 179 * Sets the object pool factory used to create individual session pools for 180 * each connection 181 */ 182 public void setPoolFactory(ObjectPoolFactory poolFactory) { 183 this.poolFactory = poolFactory; 184 } 185 186 public int getMaximumActive() { 187 return maximumActive; 188 } 189 190 /** 191 * Sets the maximum number of active sessions per connection 192 */ 193 public void setMaximumActive(int maximumActive) { 194 this.maximumActive = maximumActive; 195 } 196 197 /** 198 * @return the maxConnections 199 */ 200 public int getMaxConnections() { 201 return maxConnections; 202 } 203 204 /** 205 * @param maxConnections the maxConnections to set 206 */ 207 public void setMaxConnections(int maxConnections) { 208 this.maxConnections = maxConnections; 209 } 210 211 protected ObjectPoolFactory createPoolFactory() { 212 return new GenericObjectPoolFactory(null, maximumActive); 213 } 214 215 public int getIdleTimeout() { 216 return idleTimeout; 217 } 218 219 public void setIdleTimeout(int idleTimeout) { 220 this.idleTimeout = idleTimeout; 221 } 222 223 /** 224 * allow connections to expire, irrespective of load or idle time. This is useful with failover 225 * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery 226 * 227 * @param expiryTimeout non zero in milliseconds 228 */ 229 public void setExpiryTimeout(long expiryTimeout) { 230 this.expiryTimeout = expiryTimeout; 231 } 232 233 public long getExpiryTimeout() { 234 return expiryTimeout; 235 } 236}