001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import java.util.concurrent.atomic.AtomicBoolean;
020import javax.jms.Connection;
021import javax.jms.Destination;
022import javax.jms.JMSException;
023import javax.jms.Message;
024import javax.jms.MessageConsumer;
025import javax.jms.MessageListener;
026import javax.jms.MessageProducer;
027import javax.naming.NamingException;
028import org.apache.activemq.Service;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * A Destination bridge is used to bridge between to different JMS systems
034 * 
035 * 
036 */
037public abstract class DestinationBridge implements Service, MessageListener {
038    private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class);
039    protected MessageConsumer consumer;
040    protected AtomicBoolean started = new AtomicBoolean(false);
041    protected JmsMesageConvertor jmsMessageConvertor;
042    protected boolean doHandleReplyTo = true;
043    protected JmsConnector jmsConnector;
044    private int maximumRetries = 10;
045
046    /**
047     * @return Returns the consumer.
048     */
049    public MessageConsumer getConsumer() {
050        return consumer;
051    }
052
053    /**
054     * @param consumer The consumer to set.
055     */
056    public void setConsumer(MessageConsumer consumer) {
057        this.consumer = consumer;
058    }
059
060    /**
061     * @param connector
062     */
063    public void setJmsConnector(JmsConnector connector) {
064        this.jmsConnector = connector;
065    }
066
067    /**
068     * @return Returns the inboundMessageConvertor.
069     */
070    public JmsMesageConvertor getJmsMessageConvertor() {
071        return jmsMessageConvertor;
072    }
073
074    /**
075     * @param jmsMessageConvertor
076     */
077    public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
078        this.jmsMessageConvertor = jmsMessageConvertor;
079    }
080
081    public int getMaximumRetries() {
082        return maximumRetries;
083    }
084
085    /**
086     * Sets the maximum number of retries if a send fails before closing the
087     * bridge
088     */
089    public void setMaximumRetries(int maximumRetries) {
090        this.maximumRetries = maximumRetries;
091    }
092
093    protected Destination processReplyToDestination(Destination destination) {
094        return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer());
095    }
096
097    public void start() throws Exception {
098        if (started.compareAndSet(false, true)) {
099            MessageConsumer consumer = createConsumer();
100            consumer.setMessageListener(this);
101            createProducer();
102        }
103    }
104
105    public void stop() throws Exception {
106        started.set(false);
107    }
108
109    public void onMessage(Message message) {
110        int attempt = 0;
111        while (started.get() && message != null) {
112           
113            try {
114                if (attempt > 0) {
115                    restartProducer();
116                }
117                Message converted;
118                if (doHandleReplyTo) {
119                    Destination replyTo = message.getJMSReplyTo();
120                    if (replyTo != null) {
121                        converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
122                    } else {
123                        converted = jmsMessageConvertor.convert(message);
124                    }
125                } else {
126                    message.setJMSReplyTo(null);
127                    converted = jmsMessageConvertor.convert(message);
128                }
129                sendMessage(converted);
130                message.acknowledge();
131                return;
132            } catch (Exception e) {
133                LOG.error("failed to forward message on attempt: " + (++attempt) + " reason: " + e + " message: " + message, e);
134                if (maximumRetries > 0 && attempt >= maximumRetries) {
135                    try {
136                        stop();
137                    } catch (Exception e1) {
138                        LOG.warn("Failed to stop cleanly", e1);
139                    }
140                }
141            }
142        }
143    }
144
145    /**
146     * @return Returns the doHandleReplyTo.
147     */
148    protected boolean isDoHandleReplyTo() {
149        return doHandleReplyTo;
150    }
151
152    /**
153     * @param doHandleReplyTo The doHandleReplyTo to set.
154     */
155    protected void setDoHandleReplyTo(boolean doHandleReplyTo) {
156        this.doHandleReplyTo = doHandleReplyTo;
157    }
158
159    protected abstract MessageConsumer createConsumer() throws JMSException;
160
161    protected abstract MessageProducer createProducer() throws JMSException;
162
163    protected abstract void sendMessage(Message message) throws JMSException;
164
165    protected abstract Connection getConnnectionForConsumer();
166
167    protected abstract Connection getConnectionForProducer();
168
169    protected void restartProducer() throws JMSException, NamingException {
170        try {
171            //don't reconnect immediately
172            Thread.sleep(1000);
173            getConnectionForProducer().close();
174        } catch (Exception e) {
175            LOG.debug("Ignoring failure to close producer connection: " + e, e);
176        }
177        jmsConnector.restartProducerConnection();
178        createProducer();
179    }
180}