001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.pool;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.concurrent.atomic.AtomicBoolean;
025
026import javax.jms.JMSException;
027import javax.jms.Session;
028
029import org.apache.activemq.ActiveMQConnection;
030import org.apache.activemq.transport.TransportListener;
031import org.apache.commons.pool.ObjectPoolFactory;
032
033/**
034 * Holds a real JMS connection along with the session pools associated with it.
035 * 
036 * 
037 */
038public class ConnectionPool {
039
040    private ActiveMQConnection connection;
041    private Map<SessionKey, SessionPool> cache;
042    private AtomicBoolean started = new AtomicBoolean(false);
043    private int referenceCount;
044    private ObjectPoolFactory poolFactory;
045    private long lastUsed = System.currentTimeMillis();
046    private long firstUsed = lastUsed;
047    private boolean hasFailed;
048    private boolean hasExpired;
049    private int idleTimeout = 30 * 1000;
050    private long expiryTimeout = 0l;
051
052    public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
053        this(connection, new HashMap<SessionKey, SessionPool>(), poolFactory);
054        // Add a transport Listener so that we can notice if this connection
055        // should be expired due to
056        // a connection failure.
057        connection.addTransportListener(new TransportListener() {
058            public void onCommand(Object command) {
059            }
060
061            public void onException(IOException error) {
062                synchronized (ConnectionPool.this) {
063                    hasFailed = true;
064                }
065            }
066
067            public void transportInterupted() {
068            }
069
070            public void transportResumed() {
071            }
072        });       
073        //
074        // make sure that we set the hasFailed flag, in case the transport already failed
075        // prior to the addition of our new TransportListener
076        //
077        if(connection.isTransportFailed()) {
078            hasFailed = true;
079        }
080    }
081
082    public ConnectionPool(ActiveMQConnection connection, Map<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory) {
083        this.connection = connection;
084        this.cache = cache;
085        this.poolFactory = poolFactory;
086    }
087
088    public void start() throws JMSException {
089        if (started.compareAndSet(false, true)) {
090                try {
091                        connection.start();
092                } catch (JMSException e) {
093                        started.set(false);
094                        throw(e);
095                }
096        }
097    }
098
099    public synchronized ActiveMQConnection getConnection() {
100        return connection;
101    }
102
103    public Session createSession(boolean transacted, int ackMode) throws JMSException {
104        SessionKey key = new SessionKey(transacted, ackMode);
105        SessionPool pool = cache.get(key);
106        if (pool == null) {
107            pool = createSessionPool(key);
108            cache.put(key, pool);
109        }
110        PooledSession session = pool.borrowSession();
111        return session;
112    }
113
114    public synchronized void close() {
115        if (connection != null) {
116            try {
117                Iterator<SessionPool> i = cache.values().iterator();
118                while (i.hasNext()) {
119                    SessionPool pool = i.next();
120                    i.remove();
121                    try {
122                        pool.close();
123                    } catch (Exception e) {
124                    }
125                }
126            } finally {
127                try {
128                    connection.close();
129                } catch (Exception e) {
130                } finally {
131                    connection = null;
132                }
133            }
134        }
135    }
136
137    public synchronized void incrementReferenceCount() {
138        referenceCount++;
139        lastUsed = System.currentTimeMillis();
140    }
141
142    public synchronized void decrementReferenceCount() {
143        referenceCount--;
144        lastUsed = System.currentTimeMillis();
145        if (referenceCount == 0) {
146            expiredCheck();
147        }
148    }
149
150    /**
151     * @return true if this connection has expired.
152     */
153    public synchronized boolean expiredCheck() {
154        if (connection == null) {
155            return true;
156        }
157        if (hasExpired) {
158            if (referenceCount == 0) {
159                close();
160            }
161            return true;
162        }
163        if (hasFailed 
164                || (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout)
165                || expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
166            hasExpired = true;
167            if (referenceCount == 0) {
168                close();
169            }
170            return true;
171        }
172        return false;
173    }
174
175    public int getIdleTimeout() {
176        return idleTimeout;
177    }
178
179    public void setIdleTimeout(int idleTimeout) {
180        this.idleTimeout = idleTimeout;
181    }
182
183    protected SessionPool createSessionPool(SessionKey key) {
184        return new SessionPool(this, key, poolFactory.createPool());
185    }
186
187    public void setExpiryTimeout(long expiryTimeout) {
188        this.expiryTimeout  = expiryTimeout;
189    }
190    
191    public long getExpiryTimeout() {
192        return expiryTimeout;
193    }
194
195}