001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network.jms; 018 019import javax.jms.Connection; 020import javax.jms.Destination; 021import javax.jms.JMSException; 022import javax.jms.Queue; 023import javax.jms.QueueConnection; 024import javax.jms.QueueConnectionFactory; 025import javax.jms.QueueSession; 026import javax.jms.Session; 027import javax.naming.NamingException; 028 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * A Bridge to other JMS Queue providers 034 * 035 * @org.apache.xbean.XBean 036 * 037 * 038 */ 039public class JmsQueueConnector extends JmsConnector { 040 private static final Logger LOG = LoggerFactory.getLogger(JmsQueueConnector.class); 041 private String outboundQueueConnectionFactoryName; 042 private String localConnectionFactoryName; 043 private QueueConnectionFactory outboundQueueConnectionFactory; 044 private QueueConnectionFactory localQueueConnectionFactory; 045 private QueueConnection outboundQueueConnection; 046 private QueueConnection localQueueConnection; 047 private InboundQueueBridge[] inboundQueueBridges; 048 private OutboundQueueBridge[] outboundQueueBridges; 049 050 public boolean init() { 051 boolean result = super.init(); 052 if (result) { 053 try { 054 initializeForeignQueueConnection(); 055 initializeLocalQueueConnection(); 056 initializeInboundJmsMessageConvertor(); 057 initializeOutboundJmsMessageConvertor(); 058 initializeInboundQueueBridges(); 059 initializeOutboundQueueBridges(); 060 } catch (Exception e) { 061 LOG.error("Failed to initialize the JMSConnector", e); 062 } 063 } 064 return result; 065 } 066 067 /** 068 * @return Returns the inboundQueueBridges. 069 */ 070 public InboundQueueBridge[] getInboundQueueBridges() { 071 return inboundQueueBridges; 072 } 073 074 /** 075 * @param inboundQueueBridges The inboundQueueBridges to set. 076 */ 077 public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) { 078 this.inboundQueueBridges = inboundQueueBridges; 079 } 080 081 /** 082 * @return Returns the outboundQueueBridges. 083 */ 084 public OutboundQueueBridge[] getOutboundQueueBridges() { 085 return outboundQueueBridges; 086 } 087 088 /** 089 * @param outboundQueueBridges The outboundQueueBridges to set. 090 */ 091 public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) { 092 this.outboundQueueBridges = outboundQueueBridges; 093 } 094 095 /** 096 * @return Returns the localQueueConnectionFactory. 097 */ 098 public QueueConnectionFactory getLocalQueueConnectionFactory() { 099 return localQueueConnectionFactory; 100 } 101 102 /** 103 * @param localQueueConnectionFactory The localQueueConnectionFactory to 104 * set. 105 */ 106 public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) { 107 this.localQueueConnectionFactory = localConnectionFactory; 108 } 109 110 /** 111 * @return Returns the outboundQueueConnectionFactory. 112 */ 113 public QueueConnectionFactory getOutboundQueueConnectionFactory() { 114 return outboundQueueConnectionFactory; 115 } 116 117 /** 118 * @return Returns the outboundQueueConnectionFactoryName. 119 */ 120 public String getOutboundQueueConnectionFactoryName() { 121 return outboundQueueConnectionFactoryName; 122 } 123 124 /** 125 * @param outboundQueueConnectionFactoryName The 126 * outboundQueueConnectionFactoryName to set. 127 */ 128 public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) { 129 this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName; 130 } 131 132 /** 133 * @return Returns the localConnectionFactoryName. 134 */ 135 public String getLocalConnectionFactoryName() { 136 return localConnectionFactoryName; 137 } 138 139 /** 140 * @param localConnectionFactoryName The localConnectionFactoryName to set. 141 */ 142 public void setLocalConnectionFactoryName(String localConnectionFactoryName) { 143 this.localConnectionFactoryName = localConnectionFactoryName; 144 } 145 146 /** 147 * @return Returns the localQueueConnection. 148 */ 149 public QueueConnection getLocalQueueConnection() { 150 return localQueueConnection; 151 } 152 153 /** 154 * @param localQueueConnection The localQueueConnection to set. 155 */ 156 public void setLocalQueueConnection(QueueConnection localQueueConnection) { 157 this.localQueueConnection = localQueueConnection; 158 } 159 160 /** 161 * @return Returns the outboundQueueConnection. 162 */ 163 public QueueConnection getOutboundQueueConnection() { 164 return outboundQueueConnection; 165 } 166 167 /** 168 * @param outboundQueueConnection The outboundQueueConnection to set. 169 */ 170 public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) { 171 this.outboundQueueConnection = foreignQueueConnection; 172 } 173 174 /** 175 * @param outboundQueueConnectionFactory The outboundQueueConnectionFactory 176 * to set. 177 */ 178 public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) { 179 this.outboundQueueConnectionFactory = foreignQueueConnectionFactory; 180 } 181 182 public void restartProducerConnection() throws NamingException, JMSException { 183 outboundQueueConnection = null; 184 initializeForeignQueueConnection(); 185 186 // the outboundQueueConnection was reestablished - publish the new connection to the bridges 187 if (inboundQueueBridges != null) { 188 for (int i = 0; i < inboundQueueBridges.length; i++) { 189 InboundQueueBridge bridge = inboundQueueBridges[i]; 190 bridge.setConsumerConnection(outboundQueueConnection); 191 } 192 } 193 if (outboundQueueBridges != null) { 194 for (int i = 0; i < outboundQueueBridges.length; i++) { 195 OutboundQueueBridge bridge = outboundQueueBridges[i]; 196 bridge.setProducerConnection(outboundQueueConnection); 197 } 198 } 199 } 200 201 protected void initializeForeignQueueConnection() throws NamingException, JMSException { 202 if (outboundQueueConnection == null) { 203 // get the connection factories 204 if (outboundQueueConnectionFactory == null) { 205 // look it up from JNDI 206 if (outboundQueueConnectionFactoryName != null) { 207 outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate 208 .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class); 209 if (outboundUsername != null) { 210 outboundQueueConnection = outboundQueueConnectionFactory 211 .createQueueConnection(outboundUsername, outboundPassword); 212 } else { 213 outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection(); 214 } 215 } else { 216 throw new JMSException("Cannot create foreignConnection - no information"); 217 } 218 } else { 219 if (outboundUsername != null) { 220 outboundQueueConnection = outboundQueueConnectionFactory 221 .createQueueConnection(outboundUsername, outboundPassword); 222 } else { 223 outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection(); 224 } 225 } 226 } 227 if (localClientId != null && localClientId.length() > 0) { 228 outboundQueueConnection.setClientID(getOutboundClientId()); 229 } 230 outboundQueueConnection.start(); 231 } 232 233 protected void initializeLocalQueueConnection() throws NamingException, JMSException { 234 if (localQueueConnection == null) { 235 // get the connection factories 236 if (localQueueConnectionFactory == null) { 237 if (embeddedConnectionFactory == null) { 238 // look it up from JNDI 239 if (localConnectionFactoryName != null) { 240 localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate 241 .lookup(localConnectionFactoryName, QueueConnectionFactory.class); 242 if (localUsername != null) { 243 localQueueConnection = localQueueConnectionFactory 244 .createQueueConnection(localUsername, localPassword); 245 } else { 246 localQueueConnection = localQueueConnectionFactory.createQueueConnection(); 247 } 248 } else { 249 throw new JMSException("Cannot create localConnection - no information"); 250 } 251 } else { 252 localQueueConnection = embeddedConnectionFactory.createQueueConnection(); 253 } 254 } else { 255 if (localUsername != null) { 256 localQueueConnection = localQueueConnectionFactory.createQueueConnection(localUsername, 257 localPassword); 258 } else { 259 localQueueConnection = localQueueConnectionFactory.createQueueConnection(); 260 } 261 } 262 } 263 if (localClientId != null && localClientId.length() > 0) { 264 localQueueConnection.setClientID(getLocalClientId()); 265 } 266 localQueueConnection.start(); 267 } 268 269 protected void initializeInboundJmsMessageConvertor() { 270 inboundMessageConvertor.setConnection(localQueueConnection); 271 } 272 273 protected void initializeOutboundJmsMessageConvertor() { 274 outboundMessageConvertor.setConnection(outboundQueueConnection); 275 } 276 277 protected void initializeInboundQueueBridges() throws JMSException { 278 if (inboundQueueBridges != null) { 279 QueueSession outboundSession = outboundQueueConnection 280 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 281 QueueSession localSession = localQueueConnection.createQueueSession(false, 282 Session.AUTO_ACKNOWLEDGE); 283 for (int i = 0; i < inboundQueueBridges.length; i++) { 284 InboundQueueBridge bridge = inboundQueueBridges[i]; 285 String localQueueName = bridge.getLocalQueueName(); 286 Queue activemqQueue = createActiveMQQueue(localSession, localQueueName); 287 String queueName = bridge.getInboundQueueName(); 288 Queue foreignQueue = createForeignQueue(outboundSession, queueName); 289 bridge.setConsumerQueue(foreignQueue); 290 bridge.setProducerQueue(activemqQueue); 291 bridge.setProducerConnection(localQueueConnection); 292 bridge.setConsumerConnection(outboundQueueConnection); 293 if (bridge.getJmsMessageConvertor() == null) { 294 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 295 } 296 bridge.setJmsConnector(this); 297 addInboundBridge(bridge); 298 } 299 outboundSession.close(); 300 localSession.close(); 301 } 302 } 303 304 protected void initializeOutboundQueueBridges() throws JMSException { 305 if (outboundQueueBridges != null) { 306 QueueSession outboundSession = outboundQueueConnection 307 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 308 QueueSession localSession = localQueueConnection.createQueueSession(false, 309 Session.AUTO_ACKNOWLEDGE); 310 for (int i = 0; i < outboundQueueBridges.length; i++) { 311 OutboundQueueBridge bridge = outboundQueueBridges[i]; 312 String localQueueName = bridge.getLocalQueueName(); 313 Queue activemqQueue = createActiveMQQueue(localSession, localQueueName); 314 String queueName = bridge.getOutboundQueueName(); 315 Queue foreignQueue = createForeignQueue(outboundSession, queueName); 316 bridge.setConsumerQueue(activemqQueue); 317 bridge.setProducerQueue(foreignQueue); 318 bridge.setProducerConnection(outboundQueueConnection); 319 bridge.setConsumerConnection(localQueueConnection); 320 if (bridge.getJmsMessageConvertor() == null) { 321 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 322 } 323 bridge.setJmsConnector(this); 324 addOutboundBridge(bridge); 325 } 326 outboundSession.close(); 327 localSession.close(); 328 } 329 } 330 331 protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, 332 Connection replyToConsumerConnection) { 333 Queue replyToProducerQueue = (Queue)destination; 334 boolean isInbound = replyToProducerConnection.equals(localQueueConnection); 335 336 if (isInbound) { 337 InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue); 338 if (bridge == null) { 339 bridge = new InboundQueueBridge() { 340 protected Destination processReplyToDestination(Destination destination) { 341 return null; 342 } 343 }; 344 try { 345 QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection) 346 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 347 Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue(); 348 replyToConsumerSession.close(); 349 bridge.setConsumerQueue(replyToConsumerQueue); 350 bridge.setProducerQueue(replyToProducerQueue); 351 bridge.setProducerConnection((QueueConnection)replyToProducerConnection); 352 bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection); 353 bridge.setDoHandleReplyTo(false); 354 if (bridge.getJmsMessageConvertor() == null) { 355 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 356 } 357 bridge.setJmsConnector(this); 358 bridge.start(); 359 LOG.info("Created replyTo bridge for " + replyToProducerQueue); 360 } catch (Exception e) { 361 LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e); 362 return null; 363 } 364 replyToBridges.put(replyToProducerQueue, bridge); 365 } 366 return bridge.getConsumerQueue(); 367 } else { 368 OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue); 369 if (bridge == null) { 370 bridge = new OutboundQueueBridge() { 371 protected Destination processReplyToDestination(Destination destination) { 372 return null; 373 } 374 }; 375 try { 376 QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection) 377 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 378 Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue(); 379 replyToConsumerSession.close(); 380 bridge.setConsumerQueue(replyToConsumerQueue); 381 bridge.setProducerQueue(replyToProducerQueue); 382 bridge.setProducerConnection((QueueConnection)replyToProducerConnection); 383 bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection); 384 bridge.setDoHandleReplyTo(false); 385 if (bridge.getJmsMessageConvertor() == null) { 386 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 387 } 388 bridge.setJmsConnector(this); 389 bridge.start(); 390 LOG.info("Created replyTo bridge for " + replyToProducerQueue); 391 } catch (Exception e) { 392 LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e); 393 return null; 394 } 395 replyToBridges.put(replyToProducerQueue, bridge); 396 } 397 return bridge.getConsumerQueue(); 398 } 399 } 400 401 protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException { 402 return session.createQueue(queueName); 403 } 404 405 protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException { 406 Queue result = null; 407 try { 408 result = session.createQueue(queueName); 409 } catch (JMSException e) { 410 // look-up the Queue 411 try { 412 result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class); 413 } catch (NamingException e1) { 414 String errStr = "Failed to look-up Queue for name: " + queueName; 415 LOG.error(errStr, e); 416 JMSException jmsEx = new JMSException(errStr); 417 jmsEx.setLinkedException(e1); 418 throw jmsEx; 419 } 420 } 421 return result; 422 } 423 424}