001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.pool;
018
019import java.util.HashMap;
020import java.util.Iterator;
021import java.util.LinkedList;
022import java.util.Map;
023import java.util.concurrent.atomic.AtomicBoolean;
024import javax.jms.Connection;
025import javax.jms.ConnectionFactory;
026import javax.jms.JMSException;
027import org.apache.activemq.ActiveMQConnection;
028import org.apache.activemq.ActiveMQConnectionFactory;
029import org.apache.activemq.Service;
030import org.apache.activemq.util.IOExceptionSupport;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033import org.apache.commons.pool.ObjectPoolFactory;
034import org.apache.commons.pool.impl.GenericObjectPoolFactory;
035
036/**
037 * A JMS provider which pools Connection, Session and MessageProducer instances
038 * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's <a
039 * href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
040 * Connections, sessions and producers are returned to a pool after use so that they can be reused later
041 * without having to undergo the cost of creating them again.
042 * 
043 * b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers,
044 * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which 
045 * are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually
046 * just created at startup and left active, handling incoming messages as they come. When a consumer is
047 * complete, it is best to close it rather than return it to a pool for later reuse: this is because, 
048 * even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer,
049 * where they'll get held until the consumer is active again.
050 * 
051 * If you are creating a collection of consumers (for example, for multi-threaded message consumption), you
052 * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that 
053 * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail: 
054 * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
055 * 
056 * @org.apache.xbean.XBean element="pooledConnectionFactory"
057 * 
058 * 
059 */
060public class PooledConnectionFactory implements ConnectionFactory, Service {
061    private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
062    private ConnectionFactory connectionFactory;
063    private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>();
064    private ObjectPoolFactory poolFactory;
065    private int maximumActive = 500;
066    private int maxConnections = 1;
067    private int idleTimeout = 30 * 1000;
068    private AtomicBoolean stopped = new AtomicBoolean(false);
069    private long expiryTimeout = 0l;
070
071    public PooledConnectionFactory() {
072        this(new ActiveMQConnectionFactory());
073    }
074
075    public PooledConnectionFactory(String brokerURL) {
076        this(new ActiveMQConnectionFactory(brokerURL));
077    }
078
079    public PooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
080        this.connectionFactory = connectionFactory;
081    }
082
083    public ConnectionFactory getConnectionFactory() {
084        return connectionFactory;
085    }
086
087    public void setConnectionFactory(ConnectionFactory connectionFactory) {
088        this.connectionFactory = connectionFactory;
089    }
090
091    public Connection createConnection() throws JMSException {
092        return createConnection(null, null);
093    }
094
095    public synchronized Connection createConnection(String userName, String password) throws JMSException {
096        if (stopped.get()) {
097            LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
098            return null;
099        }
100        
101        ConnectionKey key = new ConnectionKey(userName, password);
102        LinkedList<ConnectionPool> pools = cache.get(key);
103
104        if (pools == null) {
105            pools = new LinkedList<ConnectionPool>();
106            cache.put(key, pools);
107        }
108
109        ConnectionPool connection = null;
110        if (pools.size() == maxConnections) {
111            connection = pools.removeFirst();
112        }
113
114        // Now.. we might get a connection, but it might be that we need to
115        // dump it..
116        if (connection != null && connection.expiredCheck()) {
117            connection = null;
118        }
119
120        if (connection == null) {
121            ActiveMQConnection delegate = createConnection(key);
122            connection = createConnectionPool(delegate);
123        }
124        pools.add(connection);
125        return new PooledConnection(connection);
126    }
127
128    protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
129        ConnectionPool result =  new ConnectionPool(connection, getPoolFactory());
130        result.setIdleTimeout(getIdleTimeout());
131        result.setExpiryTimeout(getExpiryTimeout());
132        return result;
133    }
134
135    protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException {
136        if (key.getUserName() == null && key.getPassword() == null) {
137            return (ActiveMQConnection)connectionFactory.createConnection();
138        } else {
139            return (ActiveMQConnection)connectionFactory.createConnection(key.getUserName(), key.getPassword());
140        }
141    }
142
143    /**
144     * @see org.apache.activemq.service.Service#start()
145     */
146    public void start() {
147        try {
148            stopped.set(false);
149            createConnection();
150        } catch (JMSException e) {
151            LOG.warn("Create pooled connection during start failed.", e);
152            IOExceptionSupport.create(e);
153        }
154    }
155
156    public void stop() {
157        LOG.debug("Stop the PooledConnectionFactory, number of connections in cache: "+cache.size());
158        stopped.set(true);
159        for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) {
160            for (ConnectionPool connection : iter.next()) {
161                try {
162                    connection.close();
163                }catch(Exception e) {
164                    LOG.warn("Close connection failed",e);
165                }
166            }
167        }
168        cache.clear();
169    }
170
171    public ObjectPoolFactory getPoolFactory() {
172        if (poolFactory == null) {
173            poolFactory = createPoolFactory();
174        }
175        return poolFactory;
176    }
177
178    /**
179     * Sets the object pool factory used to create individual session pools for
180     * each connection
181     */
182    public void setPoolFactory(ObjectPoolFactory poolFactory) {
183        this.poolFactory = poolFactory;
184    }
185
186    public int getMaximumActive() {
187        return maximumActive;
188    }
189
190    /**
191     * Sets the maximum number of active sessions per connection
192     */
193    public void setMaximumActive(int maximumActive) {
194        this.maximumActive = maximumActive;
195    }
196
197    /**
198     * @return the maxConnections
199     */
200    public int getMaxConnections() {
201        return maxConnections;
202    }
203
204    /**
205     * @param maxConnections the maxConnections to set
206     */
207    public void setMaxConnections(int maxConnections) {
208        this.maxConnections = maxConnections;
209    }
210
211    protected ObjectPoolFactory createPoolFactory() {
212        return new GenericObjectPoolFactory(null, maximumActive);
213    }
214
215    public int getIdleTimeout() {
216        return idleTimeout;
217    }
218
219    public void setIdleTimeout(int idleTimeout) {
220        this.idleTimeout = idleTimeout;
221    }
222
223    /**
224     * allow connections to expire, irrespective of load or idle time. This is useful with failover
225     * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
226     * 
227     * @param expiryTimeout non zero in milliseconds
228     */
229    public void setExpiryTimeout(long expiryTimeout) {
230        this.expiryTimeout = expiryTimeout;   
231    }
232    
233    public long getExpiryTimeout() {
234        return expiryTimeout;
235    }
236}