001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.pool;
018
019import javax.jms.Connection;
020import javax.jms.ConnectionConsumer;
021import javax.jms.ConnectionMetaData;
022import javax.jms.Destination;
023import javax.jms.ExceptionListener;
024import javax.jms.JMSException;
025import javax.jms.Queue;
026import javax.jms.QueueConnection;
027import javax.jms.QueueSession;
028import javax.jms.ServerSessionPool;
029import javax.jms.Session;
030import javax.jms.Topic;
031import javax.jms.TopicConnection;
032import javax.jms.TopicSession;
033
034import org.apache.activemq.ActiveMQConnection;
035import org.apache.activemq.ActiveMQSession;
036import org.apache.activemq.AlreadyClosedException;
037import org.apache.activemq.EnhancedConnection;
038import org.apache.activemq.advisory.DestinationSource;
039
040/**
041 * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
042 * {@link QueueConnection} which is pooled and on {@link #close()} will return
043 * itself to the sessionPool.
044 * 
045 * <b>NOTE</b> this implementation is only intended for use when sending
046 * messages. It does not deal with pooling of consumers; for that look at a
047 * library like <a href="http://jencks.org/">Jencks</a> such as in <a
048 * href="http://jencks.org/Message+Driven+POJOs">this example</a>
049 * 
050 * 
051 */
052public class PooledConnection implements TopicConnection, QueueConnection, EnhancedConnection {
053
054    private ConnectionPool pool;
055    private boolean stopped;
056
057    public PooledConnection(ConnectionPool pool) {
058        this.pool = pool;
059        this.pool.incrementReferenceCount();
060    }
061
062    /**
063     * Factory method to create a new instance.
064     */
065    public PooledConnection newInstance() {
066        return new PooledConnection(pool);
067    }
068
069    public void close() throws JMSException {
070        if (this.pool != null) {
071            this.pool.decrementReferenceCount();
072            this.pool = null;
073        }
074    }
075
076    public void start() throws JMSException {
077        assertNotClosed();
078        pool.start();
079    }
080
081    public void stop() throws JMSException {
082        stopped = true;
083    }
084
085    public ConnectionConsumer createConnectionConsumer(Destination destination, String selector,
086                                                       ServerSessionPool serverSessionPool, int maxMessages)
087        throws JMSException {
088        return getConnection()
089            .createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
090    }
091
092    public ConnectionConsumer createConnectionConsumer(Topic topic, String s,
093                                                       ServerSessionPool serverSessionPool, int maxMessages)
094        throws JMSException {
095        return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
096    }
097
098    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1,
099                                                              ServerSessionPool serverSessionPool, int i)
100        throws JMSException {
101        return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
102    }
103
104    public String getClientID() throws JMSException {
105        return getConnection().getClientID();
106    }
107
108    public ExceptionListener getExceptionListener() throws JMSException {
109        return getConnection().getExceptionListener();
110    }
111
112    public ConnectionMetaData getMetaData() throws JMSException {
113        return getConnection().getMetaData();
114    }
115
116    public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
117        getConnection().setExceptionListener(exceptionListener);
118    }
119
120    public void setClientID(String clientID) throws JMSException {
121        getConnection().setClientID(clientID);
122    }
123
124    public ConnectionConsumer createConnectionConsumer(Queue queue, String selector,
125                                                       ServerSessionPool serverSessionPool, int maxMessages)
126        throws JMSException {
127        return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
128    }
129
130    // Session factory methods
131    // -------------------------------------------------------------------------
132    public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
133        return (QueueSession)createSession(transacted, ackMode);
134    }
135
136    public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
137        return (TopicSession)createSession(transacted, ackMode);
138    }
139
140    public Session createSession(boolean transacted, int ackMode) throws JMSException {
141        return pool.createSession(transacted, ackMode);
142    }
143
144    // EnhancedCollection API
145    // -------------------------------------------------------------------------
146    
147    public DestinationSource getDestinationSource() throws JMSException {
148        return getConnection().getDestinationSource();
149    }
150
151    // Implementation methods
152    // -------------------------------------------------------------------------
153
154    public ActiveMQConnection getConnection() throws JMSException {
155        assertNotClosed();
156        return pool.getConnection();
157    }
158
159    protected void assertNotClosed() throws AlreadyClosedException {
160        if (stopped || pool == null) {
161            throw new AlreadyClosedException();
162        }
163    }
164
165    protected ActiveMQSession createSession(SessionKey key) throws JMSException {
166        return (ActiveMQSession)getConnection().createSession(key.isTransacted(), key.getAckMode());
167    }
168
169    public String toString() {
170        return "PooledConnection { " + pool + " }";
171    }
172
173}