001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network.jms; 018 019import javax.jms.Connection; 020import javax.jms.Destination; 021import javax.jms.JMSException; 022import javax.jms.Session; 023import javax.jms.Topic; 024import javax.jms.TopicConnection; 025import javax.jms.TopicConnectionFactory; 026import javax.jms.TopicSession; 027import javax.naming.NamingException; 028 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * A Bridge to other JMS Topic providers 034 * 035 * @org.apache.xbean.XBean 036 * 037 * 038 */ 039public class JmsTopicConnector extends JmsConnector { 040 private static final Logger LOG = LoggerFactory.getLogger(JmsTopicConnector.class); 041 private String outboundTopicConnectionFactoryName; 042 private String localConnectionFactoryName; 043 private TopicConnectionFactory outboundTopicConnectionFactory; 044 private TopicConnectionFactory localTopicConnectionFactory; 045 private TopicConnection outboundTopicConnection; 046 private TopicConnection localTopicConnection; 047 private InboundTopicBridge[] inboundTopicBridges; 048 private OutboundTopicBridge[] outboundTopicBridges; 049 050 public boolean init() { 051 boolean result = super.init(); 052 if (result) { 053 try { 054 initializeForeignTopicConnection(); 055 initializeLocalTopicConnection(); 056 initializeInboundJmsMessageConvertor(); 057 initializeOutboundJmsMessageConvertor(); 058 initializeInboundTopicBridges(); 059 initializeOutboundTopicBridges(); 060 } catch (Exception e) { 061 LOG.error("Failed to initialize the JMSConnector", e); 062 } 063 } 064 return result; 065 } 066 067 /** 068 * @return Returns the inboundTopicBridges. 069 */ 070 public InboundTopicBridge[] getInboundTopicBridges() { 071 return inboundTopicBridges; 072 } 073 074 /** 075 * @param inboundTopicBridges The inboundTopicBridges to set. 076 */ 077 public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges) { 078 this.inboundTopicBridges = inboundTopicBridges; 079 } 080 081 /** 082 * @return Returns the outboundTopicBridges. 083 */ 084 public OutboundTopicBridge[] getOutboundTopicBridges() { 085 return outboundTopicBridges; 086 } 087 088 /** 089 * @param outboundTopicBridges The outboundTopicBridges to set. 090 */ 091 public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges) { 092 this.outboundTopicBridges = outboundTopicBridges; 093 } 094 095 /** 096 * @return Returns the localTopicConnectionFactory. 097 */ 098 public TopicConnectionFactory getLocalTopicConnectionFactory() { 099 return localTopicConnectionFactory; 100 } 101 102 /** 103 * @param localTopicConnectionFactory The localTopicConnectionFactory to 104 * set. 105 */ 106 public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) { 107 this.localTopicConnectionFactory = localConnectionFactory; 108 } 109 110 /** 111 * @return Returns the outboundTopicConnectionFactory. 112 */ 113 public TopicConnectionFactory getOutboundTopicConnectionFactory() { 114 return outboundTopicConnectionFactory; 115 } 116 117 /** 118 * @return Returns the outboundTopicConnectionFactoryName. 119 */ 120 public String getOutboundTopicConnectionFactoryName() { 121 return outboundTopicConnectionFactoryName; 122 } 123 124 /** 125 * @param outboundTopicConnectionFactoryName The 126 * outboundTopicConnectionFactoryName to set. 127 */ 128 public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) { 129 this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName; 130 } 131 132 /** 133 * @return Returns the localConnectionFactoryName. 134 */ 135 public String getLocalConnectionFactoryName() { 136 return localConnectionFactoryName; 137 } 138 139 /** 140 * @param localConnectionFactoryName The localConnectionFactoryName to set. 141 */ 142 public void setLocalConnectionFactoryName(String localConnectionFactoryName) { 143 this.localConnectionFactoryName = localConnectionFactoryName; 144 } 145 146 /** 147 * @return Returns the localTopicConnection. 148 */ 149 public TopicConnection getLocalTopicConnection() { 150 return localTopicConnection; 151 } 152 153 /** 154 * @param localTopicConnection The localTopicConnection to set. 155 */ 156 public void setLocalTopicConnection(TopicConnection localTopicConnection) { 157 this.localTopicConnection = localTopicConnection; 158 } 159 160 /** 161 * @return Returns the outboundTopicConnection. 162 */ 163 public TopicConnection getOutboundTopicConnection() { 164 return outboundTopicConnection; 165 } 166 167 /** 168 * @param outboundTopicConnection The outboundTopicConnection to set. 169 */ 170 public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) { 171 this.outboundTopicConnection = foreignTopicConnection; 172 } 173 174 /** 175 * @param outboundTopicConnectionFactory The outboundTopicConnectionFactory 176 * to set. 177 */ 178 public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) { 179 this.outboundTopicConnectionFactory = foreignTopicConnectionFactory; 180 } 181 182 public void restartProducerConnection() throws NamingException, JMSException { 183 outboundTopicConnection = null; 184 initializeForeignTopicConnection(); 185 } 186 187 protected void initializeForeignTopicConnection() throws NamingException, JMSException { 188 if (outboundTopicConnection == null) { 189 // get the connection factories 190 if (outboundTopicConnectionFactory == null) { 191 // look it up from JNDI 192 if (outboundTopicConnectionFactoryName != null) { 193 outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate 194 .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class); 195 if (outboundUsername != null) { 196 outboundTopicConnection = outboundTopicConnectionFactory 197 .createTopicConnection(outboundUsername, outboundPassword); 198 } else { 199 outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection(); 200 } 201 } else { 202 throw new JMSException("Cannot create localConnection - no information"); 203 } 204 } else { 205 if (outboundUsername != null) { 206 outboundTopicConnection = outboundTopicConnectionFactory 207 .createTopicConnection(outboundUsername, outboundPassword); 208 } else { 209 outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection(); 210 } 211 } 212 } 213 if (localClientId != null && localClientId.length() > 0) { 214 outboundTopicConnection.setClientID(getOutboundClientId()); 215 } 216 outboundTopicConnection.start(); 217 } 218 219 protected void initializeLocalTopicConnection() throws NamingException, JMSException { 220 if (localTopicConnection == null) { 221 // get the connection factories 222 if (localTopicConnectionFactory == null) { 223 if (embeddedConnectionFactory == null) { 224 // look it up from JNDI 225 if (localConnectionFactoryName != null) { 226 localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate 227 .lookup(localConnectionFactoryName, TopicConnectionFactory.class); 228 if (localUsername != null) { 229 localTopicConnection = localTopicConnectionFactory 230 .createTopicConnection(localUsername, localPassword); 231 } else { 232 localTopicConnection = localTopicConnectionFactory.createTopicConnection(); 233 } 234 } else { 235 throw new JMSException("Cannot create localConnection - no information"); 236 } 237 } else { 238 localTopicConnection = embeddedConnectionFactory.createTopicConnection(); 239 } 240 } else { 241 if (localUsername != null) { 242 localTopicConnection = localTopicConnectionFactory.createTopicConnection(localUsername, 243 localPassword); 244 } else { 245 localTopicConnection = localTopicConnectionFactory.createTopicConnection(); 246 } 247 } 248 } 249 if (localClientId != null && localClientId.length() > 0) { 250 localTopicConnection.setClientID(getLocalClientId()); 251 } 252 localTopicConnection.start(); 253 } 254 255 protected void initializeInboundJmsMessageConvertor() { 256 inboundMessageConvertor.setConnection(localTopicConnection); 257 } 258 259 protected void initializeOutboundJmsMessageConvertor() { 260 outboundMessageConvertor.setConnection(outboundTopicConnection); 261 } 262 263 protected void initializeInboundTopicBridges() throws JMSException { 264 if (inboundTopicBridges != null) { 265 TopicSession outboundSession = outboundTopicConnection 266 .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 267 TopicSession localSession = localTopicConnection.createTopicSession(false, 268 Session.AUTO_ACKNOWLEDGE); 269 for (int i = 0; i < inboundTopicBridges.length; i++) { 270 InboundTopicBridge bridge = inboundTopicBridges[i]; 271 String localTopicName = bridge.getLocalTopicName(); 272 Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); 273 String topicName = bridge.getInboundTopicName(); 274 Topic foreignTopic = createForeignTopic(outboundSession, topicName); 275 bridge.setConsumerTopic(foreignTopic); 276 bridge.setProducerTopic(activemqTopic); 277 bridge.setProducerConnection(localTopicConnection); 278 bridge.setConsumerConnection(outboundTopicConnection); 279 if (bridge.getJmsMessageConvertor() == null) { 280 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 281 } 282 bridge.setJmsConnector(this); 283 addInboundBridge(bridge); 284 } 285 outboundSession.close(); 286 localSession.close(); 287 } 288 } 289 290 protected void initializeOutboundTopicBridges() throws JMSException { 291 if (outboundTopicBridges != null) { 292 TopicSession outboundSession = outboundTopicConnection 293 .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 294 TopicSession localSession = localTopicConnection.createTopicSession(false, 295 Session.AUTO_ACKNOWLEDGE); 296 for (int i = 0; i < outboundTopicBridges.length; i++) { 297 OutboundTopicBridge bridge = outboundTopicBridges[i]; 298 String localTopicName = bridge.getLocalTopicName(); 299 Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); 300 String topicName = bridge.getOutboundTopicName(); 301 Topic foreignTopic = createForeignTopic(outboundSession, topicName); 302 bridge.setConsumerTopic(activemqTopic); 303 bridge.setProducerTopic(foreignTopic); 304 bridge.setProducerConnection(outboundTopicConnection); 305 bridge.setConsumerConnection(localTopicConnection); 306 if (bridge.getJmsMessageConvertor() == null) { 307 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 308 } 309 bridge.setJmsConnector(this); 310 addOutboundBridge(bridge); 311 } 312 outboundSession.close(); 313 localSession.close(); 314 } 315 } 316 317 protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, 318 Connection replyToConsumerConnection) { 319 Topic replyToProducerTopic = (Topic)destination; 320 boolean isInbound = replyToProducerConnection.equals(localTopicConnection); 321 322 if (isInbound) { 323 InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic); 324 if (bridge == null) { 325 bridge = new InboundTopicBridge() { 326 protected Destination processReplyToDestination(Destination destination) { 327 return null; 328 } 329 }; 330 try { 331 TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection) 332 .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 333 Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic(); 334 replyToConsumerSession.close(); 335 bridge.setConsumerTopic(replyToConsumerTopic); 336 bridge.setProducerTopic(replyToProducerTopic); 337 bridge.setProducerConnection((TopicConnection)replyToProducerConnection); 338 bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection); 339 bridge.setDoHandleReplyTo(false); 340 if (bridge.getJmsMessageConvertor() == null) { 341 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 342 } 343 bridge.setJmsConnector(this); 344 bridge.start(); 345 LOG.info("Created replyTo bridge for " + replyToProducerTopic); 346 } catch (Exception e) { 347 LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e); 348 return null; 349 } 350 replyToBridges.put(replyToProducerTopic, bridge); 351 } 352 return bridge.getConsumerTopic(); 353 } else { 354 OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic); 355 if (bridge == null) { 356 bridge = new OutboundTopicBridge() { 357 protected Destination processReplyToDestination(Destination destination) { 358 return null; 359 } 360 }; 361 try { 362 TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection) 363 .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 364 Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic(); 365 replyToConsumerSession.close(); 366 bridge.setConsumerTopic(replyToConsumerTopic); 367 bridge.setProducerTopic(replyToProducerTopic); 368 bridge.setProducerConnection((TopicConnection)replyToProducerConnection); 369 bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection); 370 bridge.setDoHandleReplyTo(false); 371 if (bridge.getJmsMessageConvertor() == null) { 372 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 373 } 374 bridge.setJmsConnector(this); 375 bridge.start(); 376 LOG.info("Created replyTo bridge for " + replyToProducerTopic); 377 } catch (Exception e) { 378 LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e); 379 return null; 380 } 381 replyToBridges.put(replyToProducerTopic, bridge); 382 } 383 return bridge.getConsumerTopic(); 384 } 385 } 386 387 protected Topic createActiveMQTopic(TopicSession session, String topicName) throws JMSException { 388 return session.createTopic(topicName); 389 } 390 391 protected Topic createForeignTopic(TopicSession session, String topicName) throws JMSException { 392 Topic result = null; 393 try { 394 result = session.createTopic(topicName); 395 } catch (JMSException e) { 396 // look-up the Topic 397 try { 398 result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class); 399 } catch (NamingException e1) { 400 String errStr = "Failed to look-up Topic for name: " + topicName; 401 LOG.error(errStr, e); 402 JMSException jmsEx = new JMSException(errStr); 403 jmsEx.setLinkedException(e1); 404 throw jmsEx; 405 } 406 } 407 return result; 408 } 409 410}