001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.pool; 018 019import java.io.Serializable; 020import java.util.Iterator; 021import java.util.concurrent.CopyOnWriteArrayList; 022 023import javax.jms.BytesMessage; 024import javax.jms.Destination; 025import javax.jms.JMSException; 026import javax.jms.MapMessage; 027import javax.jms.Message; 028import javax.jms.MessageConsumer; 029import javax.jms.MessageListener; 030import javax.jms.MessageProducer; 031import javax.jms.ObjectMessage; 032import javax.jms.Queue; 033import javax.jms.QueueBrowser; 034import javax.jms.QueueReceiver; 035import javax.jms.QueueSender; 036import javax.jms.QueueSession; 037import javax.jms.StreamMessage; 038import javax.jms.TemporaryQueue; 039import javax.jms.TemporaryTopic; 040import javax.jms.TextMessage; 041import javax.jms.Topic; 042import javax.jms.TopicPublisher; 043import javax.jms.TopicSession; 044import javax.jms.TopicSubscriber; 045import javax.jms.XASession; 046import javax.jms.Session; 047import javax.transaction.xa.XAResource; 048 049import org.apache.activemq.ActiveMQMessageProducer; 050import org.apache.activemq.ActiveMQQueueSender; 051import org.apache.activemq.ActiveMQSession; 052import org.apache.activemq.ActiveMQTopicPublisher; 053import org.apache.activemq.AlreadyClosedException; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057/** 058 * 059 */ 060public class PooledSession implements Session, TopicSession, QueueSession, XASession { 061 private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class); 062 063 private ActiveMQSession session; 064 private SessionPool sessionPool; 065 private ActiveMQMessageProducer messageProducer; 066 private ActiveMQQueueSender queueSender; 067 private ActiveMQTopicPublisher topicPublisher; 068 private boolean transactional = true; 069 private boolean ignoreClose; 070 071 private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>(); 072 private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>(); 073 074 public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) { 075 this.session = aSession; 076 this.sessionPool = sessionPool; 077 this.transactional = session.isTransacted(); 078 } 079 080 protected boolean isIgnoreClose() { 081 return ignoreClose; 082 } 083 084 protected void setIgnoreClose(boolean ignoreClose) { 085 this.ignoreClose = ignoreClose; 086 } 087 088 public void close() throws JMSException { 089 if (!ignoreClose) { 090 // TODO a cleaner way to reset?? 091 092 // lets reset the session 093 getInternalSession().setMessageListener(null); 094 095 // Close any consumers and browsers that may have been created. 096 for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 097 MessageConsumer consumer = iter.next(); 098 consumer.close(); 099 } 100 consumers.clear(); 101 102 for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) { 103 QueueBrowser browser = iter.next(); 104 browser.close(); 105 } 106 browsers.clear(); 107 108 // maybe do a rollback? 109 if (transactional) { 110 try { 111 getInternalSession().rollback(); 112 } catch (JMSException e) { 113 LOG.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e); 114 115 // lets close the session and not put the session back into 116 // the pool 117 try { 118 session.close(); 119 } catch (JMSException e1) { 120 LOG.trace("Ignoring exception as discarding session: " + e1, e1); 121 } 122 session = null; 123 sessionPool.invalidateSession(this); 124 return; 125 } 126 } 127 128 sessionPool.returnSession(this); 129 } 130 } 131 132 public void commit() throws JMSException { 133 getInternalSession().commit(); 134 } 135 136 public BytesMessage createBytesMessage() throws JMSException { 137 return getInternalSession().createBytesMessage(); 138 } 139 140 public MapMessage createMapMessage() throws JMSException { 141 return getInternalSession().createMapMessage(); 142 } 143 144 public Message createMessage() throws JMSException { 145 return getInternalSession().createMessage(); 146 } 147 148 public ObjectMessage createObjectMessage() throws JMSException { 149 return getInternalSession().createObjectMessage(); 150 } 151 152 public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException { 153 return getInternalSession().createObjectMessage(serializable); 154 } 155 156 public Queue createQueue(String s) throws JMSException { 157 return getInternalSession().createQueue(s); 158 } 159 160 public StreamMessage createStreamMessage() throws JMSException { 161 return getInternalSession().createStreamMessage(); 162 } 163 164 public TemporaryQueue createTemporaryQueue() throws JMSException { 165 return getInternalSession().createTemporaryQueue(); 166 } 167 168 public TemporaryTopic createTemporaryTopic() throws JMSException { 169 return getInternalSession().createTemporaryTopic(); 170 } 171 172 public void unsubscribe(String s) throws JMSException { 173 getInternalSession().unsubscribe(s); 174 } 175 176 public TextMessage createTextMessage() throws JMSException { 177 return getInternalSession().createTextMessage(); 178 } 179 180 public TextMessage createTextMessage(String s) throws JMSException { 181 return getInternalSession().createTextMessage(s); 182 } 183 184 public Topic createTopic(String s) throws JMSException { 185 return getInternalSession().createTopic(s); 186 } 187 188 public int getAcknowledgeMode() throws JMSException { 189 return getInternalSession().getAcknowledgeMode(); 190 } 191 192 public boolean getTransacted() throws JMSException { 193 return getInternalSession().getTransacted(); 194 } 195 196 public void recover() throws JMSException { 197 getInternalSession().recover(); 198 } 199 200 public void rollback() throws JMSException { 201 getInternalSession().rollback(); 202 } 203 204 public XAResource getXAResource() { 205 if (session == null) { 206 throw new IllegalStateException("Session is closed"); 207 } 208 return session.getTransactionContext(); 209 } 210 211 public Session getSession() { 212 return this; 213 } 214 215 public void run() { 216 if (session != null) { 217 session.run(); 218 } 219 } 220 221 // Consumer related methods 222 // ------------------------------------------------------------------------- 223 public QueueBrowser createBrowser(Queue queue) throws JMSException { 224 return addQueueBrowser(getInternalSession().createBrowser(queue)); 225 } 226 227 public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException { 228 return addQueueBrowser(getInternalSession().createBrowser(queue, selector)); 229 } 230 231 public MessageConsumer createConsumer(Destination destination) throws JMSException { 232 return addConsumer(getInternalSession().createConsumer(destination)); 233 } 234 235 public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException { 236 return addConsumer(getInternalSession().createConsumer(destination, selector)); 237 } 238 239 public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException { 240 return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal)); 241 } 242 243 public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException { 244 return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector)); 245 } 246 247 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException { 248 return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal)); 249 } 250 251 public MessageListener getMessageListener() throws JMSException { 252 return getInternalSession().getMessageListener(); 253 } 254 255 public void setMessageListener(MessageListener messageListener) throws JMSException { 256 getInternalSession().setMessageListener(messageListener); 257 } 258 259 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 260 return addTopicSubscriber(getInternalSession().createSubscriber(topic)); 261 } 262 263 public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException { 264 return addTopicSubscriber(getInternalSession().createSubscriber(topic, selector, local)); 265 } 266 267 public QueueReceiver createReceiver(Queue queue) throws JMSException { 268 return addQueueReceiver(getInternalSession().createReceiver(queue)); 269 } 270 271 public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException { 272 return addQueueReceiver(getInternalSession().createReceiver(queue, selector)); 273 } 274 275 // Producer related methods 276 // ------------------------------------------------------------------------- 277 public MessageProducer createProducer(Destination destination) throws JMSException { 278 return new PooledProducer(getMessageProducer(), destination); 279 } 280 281 public QueueSender createSender(Queue queue) throws JMSException { 282 return new PooledQueueSender(getQueueSender(), queue); 283 } 284 285 public TopicPublisher createPublisher(Topic topic) throws JMSException { 286 return new PooledTopicPublisher(getTopicPublisher(), topic); 287 } 288 289 public ActiveMQSession getInternalSession() throws AlreadyClosedException { 290 if (session == null) { 291 throw new AlreadyClosedException("The session has already been closed"); 292 } 293 return session; 294 } 295 296 public ActiveMQMessageProducer getMessageProducer() throws JMSException { 297 if (messageProducer == null) { 298 messageProducer = (ActiveMQMessageProducer) getInternalSession().createProducer(null); 299 } 300 return messageProducer; 301 } 302 303 public ActiveMQQueueSender getQueueSender() throws JMSException { 304 if (queueSender == null) { 305 queueSender = (ActiveMQQueueSender) getInternalSession().createSender(null); 306 } 307 return queueSender; 308 } 309 310 public ActiveMQTopicPublisher getTopicPublisher() throws JMSException { 311 if (topicPublisher == null) { 312 topicPublisher = (ActiveMQTopicPublisher) getInternalSession().createPublisher(null); 313 } 314 return topicPublisher; 315 } 316 317 private QueueBrowser addQueueBrowser(QueueBrowser browser) { 318 browsers.add(browser); 319 return browser; 320 } 321 322 private MessageConsumer addConsumer(MessageConsumer consumer) { 323 consumers.add(consumer); 324 return consumer; 325 } 326 327 private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) { 328 consumers.add(subscriber); 329 return subscriber; 330 } 331 332 private QueueReceiver addQueueReceiver(QueueReceiver receiver) { 333 consumers.add(receiver); 334 return receiver; 335 } 336 337 public String toString() { 338 return "PooledSession { " + session + " }"; 339 } 340}