001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network.jms; 018 019import java.util.Iterator; 020import java.util.List; 021import java.util.Map; 022import java.util.concurrent.CopyOnWriteArrayList; 023import java.util.concurrent.atomic.AtomicBoolean; 024 025import javax.jms.Connection; 026import javax.jms.Destination; 027import javax.jms.JMSException; 028import javax.naming.NamingException; 029 030import org.apache.activemq.ActiveMQConnectionFactory; 031import org.apache.activemq.Service; 032import org.apache.activemq.broker.BrokerService; 033import org.apache.activemq.util.LRUCache; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036import org.springframework.jndi.JndiTemplate; 037 038/** 039 * This bridge joins the gap between foreign JMS providers and ActiveMQ As some 040 * JMS providers are still only 1.0.1 compliant, this bridge itself aimed to be 041 * JMS 1.0.2 compliant. 042 * 043 * 044 */ 045public abstract class JmsConnector implements Service { 046 047 private static int nextId; 048 private static final Logger LOG = LoggerFactory.getLogger(JmsConnector.class); 049 050 protected JndiTemplate jndiLocalTemplate; 051 protected JndiTemplate jndiOutboundTemplate; 052 protected JmsMesageConvertor inboundMessageConvertor; 053 protected JmsMesageConvertor outboundMessageConvertor; 054 protected AtomicBoolean initialized = new AtomicBoolean(false); 055 protected AtomicBoolean started = new AtomicBoolean(false); 056 protected ActiveMQConnectionFactory embeddedConnectionFactory; 057 protected int replyToDestinationCacheSize = 10000; 058 protected String outboundUsername; 059 protected String outboundPassword; 060 protected String localUsername; 061 protected String localPassword; 062 protected String outboundClientId; 063 protected String localClientId; 064 protected LRUCache replyToBridges = createLRUCache(); 065 066 private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>(); 067 private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>(); 068 private String name; 069 070 071 private static LRUCache createLRUCache() { 072 return new LRUCache() { 073 private static final long serialVersionUID = -7446792754185879286L; 074 075 protected boolean removeEldestEntry(Map.Entry enty) { 076 if (size() > maxCacheSize) { 077 Iterator iter = entrySet().iterator(); 078 Map.Entry lru = (Map.Entry)iter.next(); 079 remove(lru.getKey()); 080 DestinationBridge bridge = (DestinationBridge)lru.getValue(); 081 try { 082 bridge.stop(); 083 LOG.info("Expired bridge: " + bridge); 084 } catch (Exception e) { 085 LOG.warn("stopping expired bridge" + bridge + " caused an exception", e); 086 } 087 } 088 return false; 089 } 090 }; 091 } 092 093 /** 094 */ 095 public boolean init() { 096 boolean result = initialized.compareAndSet(false, true); 097 if (result) { 098 if (jndiLocalTemplate == null) { 099 jndiLocalTemplate = new JndiTemplate(); 100 } 101 if (jndiOutboundTemplate == null) { 102 jndiOutboundTemplate = new JndiTemplate(); 103 } 104 if (inboundMessageConvertor == null) { 105 inboundMessageConvertor = new SimpleJmsMessageConvertor(); 106 } 107 if (outboundMessageConvertor == null) { 108 outboundMessageConvertor = new SimpleJmsMessageConvertor(); 109 } 110 replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize()); 111 } 112 return result; 113 } 114 115 public void start() throws Exception { 116 init(); 117 if (started.compareAndSet(false, true)) { 118 for (int i = 0; i < inboundBridges.size(); i++) { 119 DestinationBridge bridge = inboundBridges.get(i); 120 bridge.start(); 121 } 122 for (int i = 0; i < outboundBridges.size(); i++) { 123 DestinationBridge bridge = outboundBridges.get(i); 124 bridge.start(); 125 } 126 LOG.info("JMS Connector " + getName() + " Started"); 127 } 128 } 129 130 public void stop() throws Exception { 131 if (started.compareAndSet(true, false)) { 132 for (int i = 0; i < inboundBridges.size(); i++) { 133 DestinationBridge bridge = inboundBridges.get(i); 134 bridge.stop(); 135 } 136 for (int i = 0; i < outboundBridges.size(); i++) { 137 DestinationBridge bridge = outboundBridges.get(i); 138 bridge.stop(); 139 } 140 LOG.info("JMS Connector " + getName() + " Stopped"); 141 } 142 } 143 144 public void clearBridges() { 145 inboundBridges.clear(); 146 outboundBridges.clear(); 147 } 148 149 protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection); 150 151 /** 152 * One way to configure the local connection - this is called by The 153 * BrokerService when the Connector is embedded 154 * 155 * @param service 156 */ 157 public void setBrokerService(BrokerService service) { 158 embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI()); 159 } 160 161 /** 162 * @return Returns the jndiTemplate. 163 */ 164 public JndiTemplate getJndiLocalTemplate() { 165 return jndiLocalTemplate; 166 } 167 168 /** 169 * @param jndiTemplate The jndiTemplate to set. 170 */ 171 public void setJndiLocalTemplate(JndiTemplate jndiTemplate) { 172 this.jndiLocalTemplate = jndiTemplate; 173 } 174 175 /** 176 * @return Returns the jndiOutboundTemplate. 177 */ 178 public JndiTemplate getJndiOutboundTemplate() { 179 return jndiOutboundTemplate; 180 } 181 182 /** 183 * @param jndiOutboundTemplate The jndiOutboundTemplate to set. 184 */ 185 public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate) { 186 this.jndiOutboundTemplate = jndiOutboundTemplate; 187 } 188 189 /** 190 * @return Returns the inboundMessageConvertor. 191 */ 192 public JmsMesageConvertor getInboundMessageConvertor() { 193 return inboundMessageConvertor; 194 } 195 196 /** 197 * @param inboundMessageConvertor The inboundMessageConvertor to set. 198 */ 199 public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) { 200 this.inboundMessageConvertor = jmsMessageConvertor; 201 } 202 203 /** 204 * @return Returns the outboundMessageConvertor. 205 */ 206 public JmsMesageConvertor getOutboundMessageConvertor() { 207 return outboundMessageConvertor; 208 } 209 210 /** 211 * @param outboundMessageConvertor The outboundMessageConvertor to set. 212 */ 213 public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) { 214 this.outboundMessageConvertor = outboundMessageConvertor; 215 } 216 217 /** 218 * @return Returns the replyToDestinationCacheSize. 219 */ 220 public int getReplyToDestinationCacheSize() { 221 return replyToDestinationCacheSize; 222 } 223 224 /** 225 * @param replyToDestinationCacheSize The replyToDestinationCacheSize to 226 * set. 227 */ 228 public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) { 229 this.replyToDestinationCacheSize = replyToDestinationCacheSize; 230 } 231 232 /** 233 * @return Returns the localPassword. 234 */ 235 public String getLocalPassword() { 236 return localPassword; 237 } 238 239 /** 240 * @param localPassword The localPassword to set. 241 */ 242 public void setLocalPassword(String localPassword) { 243 this.localPassword = localPassword; 244 } 245 246 /** 247 * @return Returns the localUsername. 248 */ 249 public String getLocalUsername() { 250 return localUsername; 251 } 252 253 /** 254 * @param localUsername The localUsername to set. 255 */ 256 public void setLocalUsername(String localUsername) { 257 this.localUsername = localUsername; 258 } 259 260 /** 261 * @return Returns the outboundPassword. 262 */ 263 public String getOutboundPassword() { 264 return outboundPassword; 265 } 266 267 /** 268 * @param outboundPassword The outboundPassword to set. 269 */ 270 public void setOutboundPassword(String outboundPassword) { 271 this.outboundPassword = outboundPassword; 272 } 273 274 /** 275 * @return Returns the outboundUsername. 276 */ 277 public String getOutboundUsername() { 278 return outboundUsername; 279 } 280 281 /** 282 * @param outboundUsername The outboundUsername to set. 283 */ 284 public void setOutboundUsername(String outboundUsername) { 285 this.outboundUsername = outboundUsername; 286 } 287 288 /** 289 * @return the outboundClientId 290 */ 291 public String getOutboundClientId() { 292 return outboundClientId; 293 } 294 295 /** 296 * @param outboundClientId the outboundClientId to set 297 */ 298 public void setOutboundClientId(String outboundClientId) { 299 this.outboundClientId = outboundClientId; 300 } 301 302 /** 303 * @return the localClientId 304 */ 305 public String getLocalClientId() { 306 return localClientId; 307 } 308 309 /** 310 * @param localClientId the localClientId to set 311 */ 312 public void setLocalClientId(String localClientId) { 313 this.localClientId = localClientId; 314 } 315 316 317 protected void addInboundBridge(DestinationBridge bridge) { 318 inboundBridges.add(bridge); 319 } 320 321 protected void addOutboundBridge(DestinationBridge bridge) { 322 outboundBridges.add(bridge); 323 } 324 325 protected void removeInboundBridge(DestinationBridge bridge) { 326 inboundBridges.remove(bridge); 327 } 328 329 protected void removeOutboundBridge(DestinationBridge bridge) { 330 outboundBridges.remove(bridge); 331 } 332 333 public String getName() { 334 if (name == null) { 335 name = "Connector:" + getNextId(); 336 } 337 return name; 338 } 339 340 private static synchronized int getNextId() { 341 return nextId++; 342 } 343 344 public void setName(String name) { 345 this.name = name; 346 } 347 348 public abstract void restartProducerConnection() throws NamingException, JMSException; 349}