001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.pool;
018
019import java.io.Serializable;
020import java.util.Iterator;
021import java.util.concurrent.CopyOnWriteArrayList;
022
023import javax.jms.BytesMessage;
024import javax.jms.Destination;
025import javax.jms.JMSException;
026import javax.jms.MapMessage;
027import javax.jms.Message;
028import javax.jms.MessageConsumer;
029import javax.jms.MessageListener;
030import javax.jms.MessageProducer;
031import javax.jms.ObjectMessage;
032import javax.jms.Queue;
033import javax.jms.QueueBrowser;
034import javax.jms.QueueReceiver;
035import javax.jms.QueueSender;
036import javax.jms.QueueSession;
037import javax.jms.StreamMessage;
038import javax.jms.TemporaryQueue;
039import javax.jms.TemporaryTopic;
040import javax.jms.TextMessage;
041import javax.jms.Topic;
042import javax.jms.TopicPublisher;
043import javax.jms.TopicSession;
044import javax.jms.TopicSubscriber;
045import javax.jms.XASession;
046import javax.jms.Session;
047import javax.transaction.xa.XAResource;
048
049import org.apache.activemq.ActiveMQMessageProducer;
050import org.apache.activemq.ActiveMQQueueSender;
051import org.apache.activemq.ActiveMQSession;
052import org.apache.activemq.ActiveMQTopicPublisher;
053import org.apache.activemq.AlreadyClosedException;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * 
059 */
060public class PooledSession implements Session, TopicSession, QueueSession, XASession {
061    private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
062
063    private ActiveMQSession session;
064    private SessionPool sessionPool;
065    private ActiveMQMessageProducer messageProducer;
066    private ActiveMQQueueSender queueSender;
067    private ActiveMQTopicPublisher topicPublisher;
068    private boolean transactional = true;
069    private boolean ignoreClose;
070
071    private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
072    private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
073
074    public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
075        this.session = aSession;
076        this.sessionPool = sessionPool;
077        this.transactional = session.isTransacted();
078    }
079
080    protected boolean isIgnoreClose() {
081        return ignoreClose;
082    }
083
084    protected void setIgnoreClose(boolean ignoreClose) {
085        this.ignoreClose = ignoreClose;
086    }
087
088    public void close() throws JMSException {
089        if (!ignoreClose) {
090            // TODO a cleaner way to reset??
091
092            // lets reset the session
093            getInternalSession().setMessageListener(null);
094
095            // Close any consumers and browsers that may have been created.
096            for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
097                MessageConsumer consumer = iter.next();
098                consumer.close();
099            }
100            consumers.clear();
101
102            for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) {
103                QueueBrowser browser = iter.next();
104                browser.close();
105            }
106            browsers.clear();
107
108            // maybe do a rollback?
109            if (transactional) {
110                try {
111                    getInternalSession().rollback();
112                } catch (JMSException e) {
113                    LOG.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e);
114
115                    // lets close the session and not put the session back into
116                    // the pool
117                    try {
118                        session.close();
119                    } catch (JMSException e1) {
120                        LOG.trace("Ignoring exception as discarding session: " + e1, e1);
121                    }
122                    session = null;
123                    sessionPool.invalidateSession(this);
124                    return;
125                }
126            }
127
128            sessionPool.returnSession(this);
129        }
130    }
131
132    public void commit() throws JMSException {
133        getInternalSession().commit();
134    }
135
136    public BytesMessage createBytesMessage() throws JMSException {
137        return getInternalSession().createBytesMessage();
138    }
139
140    public MapMessage createMapMessage() throws JMSException {
141        return getInternalSession().createMapMessage();
142    }
143
144    public Message createMessage() throws JMSException {
145        return getInternalSession().createMessage();
146    }
147
148    public ObjectMessage createObjectMessage() throws JMSException {
149        return getInternalSession().createObjectMessage();
150    }
151
152    public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
153        return getInternalSession().createObjectMessage(serializable);
154    }
155
156    public Queue createQueue(String s) throws JMSException {
157        return getInternalSession().createQueue(s);
158    }
159
160    public StreamMessage createStreamMessage() throws JMSException {
161        return getInternalSession().createStreamMessage();
162    }
163
164    public TemporaryQueue createTemporaryQueue() throws JMSException {
165        return getInternalSession().createTemporaryQueue();
166    }
167
168    public TemporaryTopic createTemporaryTopic() throws JMSException {
169        return getInternalSession().createTemporaryTopic();
170    }
171
172    public void unsubscribe(String s) throws JMSException {
173        getInternalSession().unsubscribe(s);
174    }
175
176    public TextMessage createTextMessage() throws JMSException {
177        return getInternalSession().createTextMessage();
178    }
179
180    public TextMessage createTextMessage(String s) throws JMSException {
181        return getInternalSession().createTextMessage(s);
182    }
183
184    public Topic createTopic(String s) throws JMSException {
185        return getInternalSession().createTopic(s);
186    }
187
188    public int getAcknowledgeMode() throws JMSException {
189        return getInternalSession().getAcknowledgeMode();
190    }
191
192    public boolean getTransacted() throws JMSException {
193        return getInternalSession().getTransacted();
194    }
195
196    public void recover() throws JMSException {
197        getInternalSession().recover();
198    }
199
200    public void rollback() throws JMSException {
201        getInternalSession().rollback();
202    }
203
204    public XAResource getXAResource() {
205        if (session == null) {
206            throw new IllegalStateException("Session is closed");
207        }
208        return session.getTransactionContext();
209    }
210
211    public Session getSession() {
212        return this;
213    }
214
215    public void run() {
216        if (session != null) {
217            session.run();
218        }
219    }
220
221    // Consumer related methods
222    // -------------------------------------------------------------------------
223    public QueueBrowser createBrowser(Queue queue) throws JMSException {
224        return addQueueBrowser(getInternalSession().createBrowser(queue));
225    }
226
227    public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
228        return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
229    }
230
231    public MessageConsumer createConsumer(Destination destination) throws JMSException {
232        return addConsumer(getInternalSession().createConsumer(destination));
233    }
234
235    public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
236        return addConsumer(getInternalSession().createConsumer(destination, selector));
237    }
238
239    public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
240        return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
241    }
242
243    public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
244        return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
245    }
246
247    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
248        return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
249    }
250
251    public MessageListener getMessageListener() throws JMSException {
252        return getInternalSession().getMessageListener();
253    }
254
255    public void setMessageListener(MessageListener messageListener) throws JMSException {
256        getInternalSession().setMessageListener(messageListener);
257    }
258
259    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
260        return addTopicSubscriber(getInternalSession().createSubscriber(topic));
261    }
262
263    public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
264        return addTopicSubscriber(getInternalSession().createSubscriber(topic, selector, local));
265    }
266
267    public QueueReceiver createReceiver(Queue queue) throws JMSException {
268        return addQueueReceiver(getInternalSession().createReceiver(queue));
269    }
270
271    public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
272        return addQueueReceiver(getInternalSession().createReceiver(queue, selector));
273    }
274
275    // Producer related methods
276    // -------------------------------------------------------------------------
277    public MessageProducer createProducer(Destination destination) throws JMSException {
278        return new PooledProducer(getMessageProducer(), destination);
279    }
280
281    public QueueSender createSender(Queue queue) throws JMSException {
282        return new PooledQueueSender(getQueueSender(), queue);
283    }
284
285    public TopicPublisher createPublisher(Topic topic) throws JMSException {
286        return new PooledTopicPublisher(getTopicPublisher(), topic);
287    }
288
289    public ActiveMQSession getInternalSession() throws AlreadyClosedException {
290        if (session == null) {
291            throw new AlreadyClosedException("The session has already been closed");
292        }
293        return session;
294    }
295
296    public ActiveMQMessageProducer getMessageProducer() throws JMSException {
297        if (messageProducer == null) {
298            messageProducer = (ActiveMQMessageProducer) getInternalSession().createProducer(null);
299        }
300        return messageProducer;
301    }
302
303    public ActiveMQQueueSender getQueueSender() throws JMSException {
304        if (queueSender == null) {
305            queueSender = (ActiveMQQueueSender) getInternalSession().createSender(null);
306        }
307        return queueSender;
308    }
309
310    public ActiveMQTopicPublisher getTopicPublisher() throws JMSException {
311        if (topicPublisher == null) {
312            topicPublisher = (ActiveMQTopicPublisher) getInternalSession().createPublisher(null);
313        }
314        return topicPublisher;
315    }
316
317    private QueueBrowser addQueueBrowser(QueueBrowser browser) {
318        browsers.add(browser);
319        return browser;
320    }
321
322    private MessageConsumer addConsumer(MessageConsumer consumer) {
323        consumers.add(consumer);
324        return consumer;
325    }
326
327    private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) {
328        consumers.add(subscriber);
329        return subscriber;
330    }
331
332    private QueueReceiver addQueueReceiver(QueueReceiver receiver) {
333        consumers.add(receiver);
334        return receiver;
335    }
336
337    public String toString() {
338        return "PooledSession { " + session + " }";
339    }
340}