001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.ra; 018 019import java.lang.reflect.Method; 020 021import java.util.concurrent.atomic.AtomicBoolean; 022import javax.jms.Connection; 023import javax.jms.ConnectionConsumer; 024import javax.jms.ExceptionListener; 025import javax.jms.JMSException; 026import javax.jms.Message; 027import javax.jms.MessageListener; 028import javax.jms.Session; 029import javax.jms.Topic; 030import javax.resource.ResourceException; 031import javax.resource.spi.endpoint.MessageEndpointFactory; 032import javax.resource.spi.work.Work; 033import javax.resource.spi.work.WorkException; 034import javax.resource.spi.work.WorkManager; 035 036import org.apache.activemq.ActiveMQConnection; 037import org.apache.activemq.command.ActiveMQDestination; 038import org.apache.activemq.command.ActiveMQQueue; 039import org.apache.activemq.command.ActiveMQTopic; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * $Date$ 045 */ 046public class ActiveMQEndpointWorker { 047 048 public static final Method ON_MESSAGE_METHOD; 049 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQEndpointWorker.class); 050 051 private static final long INITIAL_RECONNECT_DELAY = 1000; // 1 second. 052 private static final long MAX_RECONNECT_DELAY = 1000 * 30; // 30 seconds. 053 private static final ThreadLocal<Session> THREAD_LOCAL = new ThreadLocal<Session>(); 054 055 static { 056 try { 057 ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", new Class[] { 058 Message.class 059 }); 060 } catch (Exception e) { 061 throw new ExceptionInInitializerError(e); 062 } 063 } 064 065 protected final ActiveMQEndpointActivationKey endpointActivationKey; 066 protected final MessageEndpointFactory endpointFactory; 067 protected final WorkManager workManager; 068 protected final boolean transacted; 069 070 private final ActiveMQDestination dest; 071 private final Work connectWork; 072 private final AtomicBoolean connecting = new AtomicBoolean(false); 073 private final Object shutdownMutex = new String("shutdownMutex"); 074 075 private ActiveMQConnection connection; 076 private ConnectionConsumer consumer; 077 private ServerSessionPoolImpl serverSessionPool; 078 private boolean running; 079 080 protected ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException { 081 this.endpointActivationKey = key; 082 this.endpointFactory = endpointActivationKey.getMessageEndpointFactory(); 083 this.workManager = adapter.getBootstrapContext().getWorkManager(); 084 try { 085 this.transacted = endpointFactory.isDeliveryTransacted(ON_MESSAGE_METHOD); 086 } catch (NoSuchMethodException e) { 087 throw new ResourceException("Endpoint does not implement the onMessage method."); 088 } 089 090 connectWork = new Work() { 091 long currentReconnectDelay = INITIAL_RECONNECT_DELAY; 092 093 public void release() { 094 // 095 } 096 097 public synchronized void run() { 098 currentReconnectDelay = INITIAL_RECONNECT_DELAY; 099 MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec(); 100 if ( LOG.isInfoEnabled() ) { 101 LOG.info("Establishing connection to broker [" + adapter.getInfo().getServerUrl() + "]"); 102 } 103 104 while ( connecting.get() && running ) { 105 try { 106 connection = adapter.makeConnection(activationSpec); 107 connection.setExceptionListener(new ExceptionListener() { 108 public void onException(JMSException error) { 109 if (!serverSessionPool.isClosing()) { 110 // initiate reconnection only once, i.e. on initial exception 111 // and only if not already trying to connect 112 LOG.error("Connection to broker failed: " + error.getMessage(), error); 113 if ( connecting.compareAndSet(false, true) ) { 114 synchronized ( connectWork ) { 115 disconnect(); 116 serverSessionPool.closeIdleSessions(); 117 connect(); 118 } 119 } else { 120 // connection attempt has already been initiated 121 LOG.info("Connection attempt already in progress, ignoring connection exception"); 122 } 123 } 124 } 125 }); 126 connection.start(); 127 128 int prefetchSize = activationSpec.getMaxMessagesPerSessionsIntValue() * activationSpec.getMaxSessionsIntValue(); 129 if (activationSpec.isDurableSubscription()) { 130 consumer = connection.createDurableConnectionConsumer( 131 (Topic) dest, 132 activationSpec.getSubscriptionName(), 133 emptyToNull(activationSpec.getMessageSelector()), 134 serverSessionPool, 135 prefetchSize, 136 activationSpec.getNoLocalBooleanValue()); 137 } else { 138 consumer = connection.createConnectionConsumer( 139 dest, 140 emptyToNull(activationSpec.getMessageSelector()), 141 serverSessionPool, 142 prefetchSize, 143 activationSpec.getNoLocalBooleanValue()); 144 } 145 146 147 if ( connecting.compareAndSet(true, false) ) { 148 if ( LOG.isInfoEnabled() ) { 149 LOG.info("Successfully established connection to broker [" + adapter.getInfo().getServerUrl() + "]"); 150 } 151 } else { 152 LOG.error("Could not release connection lock"); 153 } 154 } catch (JMSException error) { 155 if ( LOG.isDebugEnabled() ) { 156 LOG.debug("Failed to connect: " + error.getMessage(), error); 157 } 158 disconnect(); 159 pause(error); 160 } 161 } 162 } 163 164 private void pause(JMSException error) { 165 if (currentReconnectDelay == MAX_RECONNECT_DELAY) { 166 LOG.error("Failed to connect to broker [" + adapter.getInfo().getServerUrl() + "]: " 167 + error.getMessage(), error); 168 LOG.error("Endpoint will try to reconnect to the JMS broker in " + (MAX_RECONNECT_DELAY / 1000) + " seconds"); 169 } 170 try { 171 synchronized ( shutdownMutex ) { 172 // shutdownMutex will be notified by stop() method in 173 // order to accelerate shutdown of endpoint 174 shutdownMutex.wait(currentReconnectDelay); 175 } 176 } catch ( InterruptedException e ) { 177 Thread.interrupted(); 178 } 179 currentReconnectDelay *= 2; 180 if (currentReconnectDelay > MAX_RECONNECT_DELAY) { 181 currentReconnectDelay = MAX_RECONNECT_DELAY; 182 } 183 } 184 }; 185 186 MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec(); 187 if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) { 188 dest = new ActiveMQQueue(activationSpec.getDestination()); 189 } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) { 190 dest = new ActiveMQTopic(activationSpec.getDestination()); 191 } else { 192 throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType()); 193 } 194 195 } 196 197 /** 198 * @param c 199 */ 200 public static void safeClose(Connection c) { 201 try { 202 if (c != null) { 203 LOG.debug("Closing connection to broker"); 204 c.close(); 205 } 206 } catch (JMSException e) { 207 // 208 } 209 } 210 211 /** 212 * @param cc 213 */ 214 public static void safeClose(ConnectionConsumer cc) { 215 try { 216 if (cc != null) { 217 LOG.debug("Closing ConnectionConsumer"); 218 cc.close(); 219 } 220 } catch (JMSException e) { 221 // 222 } 223 } 224 225 /** 226 * 227 */ 228 public void start() throws ResourceException { 229 synchronized (connectWork) { 230 if (running) 231 return; 232 running = true; 233 234 if ( connecting.compareAndSet(false, true) ) { 235 LOG.info("Starting"); 236 serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue()); 237 connect(); 238 } else { 239 LOG.warn("Ignoring start command, EndpointWorker is already trying to connect"); 240 } 241 } 242 } 243 244 /** 245 * 246 */ 247 public void stop() throws InterruptedException { 248 synchronized (shutdownMutex) { 249 if (!running) 250 return; 251 running = false; 252 LOG.info("Stopping"); 253 // wake up pausing reconnect attempt 254 shutdownMutex.notifyAll(); 255 serverSessionPool.close(); 256 } 257 disconnect(); 258 } 259 260 private boolean isRunning() { 261 return running; 262 } 263 264 private void connect() { 265 synchronized ( connectWork ) { 266 if (!running) { 267 return; 268 } 269 270 try { 271 workManager.scheduleWork(connectWork, WorkManager.INDEFINITE, null, null); 272 } catch (WorkException e) { 273 running = false; 274 LOG.error("Work Manager did not accept work: ", e); 275 } 276 } 277 } 278 279 /** 280 * 281 */ 282 private void disconnect() { 283 synchronized ( connectWork ) { 284 safeClose(consumer); 285 consumer = null; 286 safeClose(connection); 287 connection = null; 288 } 289 } 290 291 protected void registerThreadSession(Session session) { 292 THREAD_LOCAL.set(session); 293 } 294 295 protected void unregisterThreadSession(Session session) { 296 THREAD_LOCAL.set(null); 297 } 298 299 protected ActiveMQConnection getConnection() { 300 // make sure we only return a working connection 301 // in particular make sure that we do not return null 302 // after the resource adapter got disconnected from 303 // the broker via the disconnect() method 304 synchronized ( connectWork ) { 305 return connection; 306 } 307 } 308 309 private String emptyToNull(String value) { 310 if (value == null || value.length() == 0) { 311 return null; 312 } 313 return value; 314 } 315 316}