001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network.jms; 018 019import java.util.concurrent.atomic.AtomicBoolean; 020import javax.jms.Connection; 021import javax.jms.Destination; 022import javax.jms.JMSException; 023import javax.jms.Message; 024import javax.jms.MessageConsumer; 025import javax.jms.MessageListener; 026import javax.jms.MessageProducer; 027import javax.naming.NamingException; 028import org.apache.activemq.Service; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * A Destination bridge is used to bridge between to different JMS systems 034 * 035 * 036 */ 037public abstract class DestinationBridge implements Service, MessageListener { 038 private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class); 039 protected MessageConsumer consumer; 040 protected AtomicBoolean started = new AtomicBoolean(false); 041 protected JmsMesageConvertor jmsMessageConvertor; 042 protected boolean doHandleReplyTo = true; 043 protected JmsConnector jmsConnector; 044 private int maximumRetries = 10; 045 046 /** 047 * @return Returns the consumer. 048 */ 049 public MessageConsumer getConsumer() { 050 return consumer; 051 } 052 053 /** 054 * @param consumer The consumer to set. 055 */ 056 public void setConsumer(MessageConsumer consumer) { 057 this.consumer = consumer; 058 } 059 060 /** 061 * @param connector 062 */ 063 public void setJmsConnector(JmsConnector connector) { 064 this.jmsConnector = connector; 065 } 066 067 /** 068 * @return Returns the inboundMessageConvertor. 069 */ 070 public JmsMesageConvertor getJmsMessageConvertor() { 071 return jmsMessageConvertor; 072 } 073 074 /** 075 * @param jmsMessageConvertor 076 */ 077 public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) { 078 this.jmsMessageConvertor = jmsMessageConvertor; 079 } 080 081 public int getMaximumRetries() { 082 return maximumRetries; 083 } 084 085 /** 086 * Sets the maximum number of retries if a send fails before closing the 087 * bridge 088 */ 089 public void setMaximumRetries(int maximumRetries) { 090 this.maximumRetries = maximumRetries; 091 } 092 093 protected Destination processReplyToDestination(Destination destination) { 094 return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer()); 095 } 096 097 public void start() throws Exception { 098 if (started.compareAndSet(false, true)) { 099 MessageConsumer consumer = createConsumer(); 100 consumer.setMessageListener(this); 101 createProducer(); 102 } 103 } 104 105 public void stop() throws Exception { 106 started.set(false); 107 } 108 109 public void onMessage(Message message) { 110 int attempt = 0; 111 while (started.get() && message != null) { 112 113 try { 114 if (attempt > 0) { 115 restartProducer(); 116 } 117 Message converted; 118 if (doHandleReplyTo) { 119 Destination replyTo = message.getJMSReplyTo(); 120 if (replyTo != null) { 121 converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo)); 122 } else { 123 converted = jmsMessageConvertor.convert(message); 124 } 125 } else { 126 message.setJMSReplyTo(null); 127 converted = jmsMessageConvertor.convert(message); 128 } 129 sendMessage(converted); 130 message.acknowledge(); 131 return; 132 } catch (Exception e) { 133 LOG.error("failed to forward message on attempt: " + (++attempt) + " reason: " + e + " message: " + message, e); 134 if (maximumRetries > 0 && attempt >= maximumRetries) { 135 try { 136 stop(); 137 } catch (Exception e1) { 138 LOG.warn("Failed to stop cleanly", e1); 139 } 140 } 141 } 142 } 143 } 144 145 /** 146 * @return Returns the doHandleReplyTo. 147 */ 148 protected boolean isDoHandleReplyTo() { 149 return doHandleReplyTo; 150 } 151 152 /** 153 * @param doHandleReplyTo The doHandleReplyTo to set. 154 */ 155 protected void setDoHandleReplyTo(boolean doHandleReplyTo) { 156 this.doHandleReplyTo = doHandleReplyTo; 157 } 158 159 protected abstract MessageConsumer createConsumer() throws JMSException; 160 161 protected abstract MessageProducer createProducer() throws JMSException; 162 163 protected abstract void sendMessage(Message message) throws JMSException; 164 165 protected abstract Connection getConnnectionForConsumer(); 166 167 protected abstract Connection getConnectionForProducer(); 168 169 protected void restartProducer() throws JMSException, NamingException { 170 try { 171 //don't reconnect immediately 172 Thread.sleep(1000); 173 getConnectionForProducer().close(); 174 } catch (Exception e) { 175 LOG.debug("Ignoring failure to close producer connection: " + e, e); 176 } 177 jmsConnector.restartProducerConnection(); 178 createProducer(); 179 } 180}