001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import javax.jms.Connection;
020import javax.jms.Destination;
021import javax.jms.JMSException;
022import javax.jms.Session;
023import javax.jms.Topic;
024import javax.jms.TopicConnection;
025import javax.jms.TopicConnectionFactory;
026import javax.jms.TopicSession;
027import javax.naming.NamingException;
028
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * A Bridge to other JMS Topic providers
034 * 
035 * @org.apache.xbean.XBean
036 * 
037 * 
038 */
039public class JmsTopicConnector extends JmsConnector {
040    private static final Logger LOG = LoggerFactory.getLogger(JmsTopicConnector.class);
041    private String outboundTopicConnectionFactoryName;
042    private String localConnectionFactoryName;
043    private TopicConnectionFactory outboundTopicConnectionFactory;
044    private TopicConnectionFactory localTopicConnectionFactory;
045    private TopicConnection outboundTopicConnection;
046    private TopicConnection localTopicConnection;
047    private InboundTopicBridge[] inboundTopicBridges;
048    private OutboundTopicBridge[] outboundTopicBridges;
049
050    public boolean init() {
051        boolean result = super.init();
052        if (result) {
053            try {
054                initializeForeignTopicConnection();
055                initializeLocalTopicConnection();
056                initializeInboundJmsMessageConvertor();
057                initializeOutboundJmsMessageConvertor();
058                initializeInboundTopicBridges();
059                initializeOutboundTopicBridges();
060            } catch (Exception e) {
061                LOG.error("Failed to initialize the JMSConnector", e);
062            }
063        }
064        return result;
065    }
066
067    /**
068     * @return Returns the inboundTopicBridges.
069     */
070    public InboundTopicBridge[] getInboundTopicBridges() {
071        return inboundTopicBridges;
072    }
073
074    /**
075     * @param inboundTopicBridges The inboundTopicBridges to set.
076     */
077    public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges) {
078        this.inboundTopicBridges = inboundTopicBridges;
079    }
080
081    /**
082     * @return Returns the outboundTopicBridges.
083     */
084    public OutboundTopicBridge[] getOutboundTopicBridges() {
085        return outboundTopicBridges;
086    }
087
088    /**
089     * @param outboundTopicBridges The outboundTopicBridges to set.
090     */
091    public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges) {
092        this.outboundTopicBridges = outboundTopicBridges;
093    }
094
095    /**
096     * @return Returns the localTopicConnectionFactory.
097     */
098    public TopicConnectionFactory getLocalTopicConnectionFactory() {
099        return localTopicConnectionFactory;
100    }
101
102    /**
103     * @param localTopicConnectionFactory The localTopicConnectionFactory to
104     *                set.
105     */
106    public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) {
107        this.localTopicConnectionFactory = localConnectionFactory;
108    }
109
110    /**
111     * @return Returns the outboundTopicConnectionFactory.
112     */
113    public TopicConnectionFactory getOutboundTopicConnectionFactory() {
114        return outboundTopicConnectionFactory;
115    }
116
117    /**
118     * @return Returns the outboundTopicConnectionFactoryName.
119     */
120    public String getOutboundTopicConnectionFactoryName() {
121        return outboundTopicConnectionFactoryName;
122    }
123
124    /**
125     * @param outboundTopicConnectionFactoryName The
126     *                outboundTopicConnectionFactoryName to set.
127     */
128    public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) {
129        this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName;
130    }
131
132    /**
133     * @return Returns the localConnectionFactoryName.
134     */
135    public String getLocalConnectionFactoryName() {
136        return localConnectionFactoryName;
137    }
138
139    /**
140     * @param localConnectionFactoryName The localConnectionFactoryName to set.
141     */
142    public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
143        this.localConnectionFactoryName = localConnectionFactoryName;
144    }
145
146    /**
147     * @return Returns the localTopicConnection.
148     */
149    public TopicConnection getLocalTopicConnection() {
150        return localTopicConnection;
151    }
152
153    /**
154     * @param localTopicConnection The localTopicConnection to set.
155     */
156    public void setLocalTopicConnection(TopicConnection localTopicConnection) {
157        this.localTopicConnection = localTopicConnection;
158    }
159
160    /**
161     * @return Returns the outboundTopicConnection.
162     */
163    public TopicConnection getOutboundTopicConnection() {
164        return outboundTopicConnection;
165    }
166
167    /**
168     * @param outboundTopicConnection The outboundTopicConnection to set.
169     */
170    public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) {
171        this.outboundTopicConnection = foreignTopicConnection;
172    }
173
174    /**
175     * @param outboundTopicConnectionFactory The outboundTopicConnectionFactory
176     *                to set.
177     */
178    public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) {
179        this.outboundTopicConnectionFactory = foreignTopicConnectionFactory;
180    }
181
182    public void restartProducerConnection() throws NamingException, JMSException {
183        outboundTopicConnection = null;
184        initializeForeignTopicConnection();
185    }
186
187    protected void initializeForeignTopicConnection() throws NamingException, JMSException {
188        if (outboundTopicConnection == null) {
189            // get the connection factories
190            if (outboundTopicConnectionFactory == null) {
191                // look it up from JNDI
192                if (outboundTopicConnectionFactoryName != null) {
193                    outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate
194                        .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class);
195                    if (outboundUsername != null) {
196                        outboundTopicConnection = outboundTopicConnectionFactory
197                            .createTopicConnection(outboundUsername, outboundPassword);
198                    } else {
199                        outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection();
200                    }
201                } else {
202                    throw new JMSException("Cannot create localConnection - no information");
203                }
204            } else {
205                if (outboundUsername != null) {
206                    outboundTopicConnection = outboundTopicConnectionFactory
207                        .createTopicConnection(outboundUsername, outboundPassword);
208                } else {
209                    outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection();
210                }
211            }
212        }
213        if (localClientId != null && localClientId.length() > 0) {
214            outboundTopicConnection.setClientID(getOutboundClientId());
215        }
216        outboundTopicConnection.start();
217    }
218
219    protected void initializeLocalTopicConnection() throws NamingException, JMSException {
220        if (localTopicConnection == null) {
221            // get the connection factories
222            if (localTopicConnectionFactory == null) {
223                if (embeddedConnectionFactory == null) {
224                    // look it up from JNDI
225                    if (localConnectionFactoryName != null) {
226                        localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate
227                            .lookup(localConnectionFactoryName, TopicConnectionFactory.class);
228                        if (localUsername != null) {
229                            localTopicConnection = localTopicConnectionFactory
230                                .createTopicConnection(localUsername, localPassword);
231                        } else {
232                            localTopicConnection = localTopicConnectionFactory.createTopicConnection();
233                        }
234                    } else {
235                        throw new JMSException("Cannot create localConnection - no information");
236                    }
237                } else {
238                    localTopicConnection = embeddedConnectionFactory.createTopicConnection();
239                }
240            } else {
241                if (localUsername != null) {
242                    localTopicConnection = localTopicConnectionFactory.createTopicConnection(localUsername,
243                                                                                             localPassword);
244                } else {
245                    localTopicConnection = localTopicConnectionFactory.createTopicConnection();
246                }
247            }
248        }
249        if (localClientId != null && localClientId.length() > 0) {
250            localTopicConnection.setClientID(getLocalClientId());
251        }
252        localTopicConnection.start();
253    }
254
255    protected void initializeInboundJmsMessageConvertor() {
256        inboundMessageConvertor.setConnection(localTopicConnection);
257    }
258
259    protected void initializeOutboundJmsMessageConvertor() {
260        outboundMessageConvertor.setConnection(outboundTopicConnection);
261    }
262
263    protected void initializeInboundTopicBridges() throws JMSException {
264        if (inboundTopicBridges != null) {
265            TopicSession outboundSession = outboundTopicConnection
266                .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
267            TopicSession localSession = localTopicConnection.createTopicSession(false,
268                                                                                Session.AUTO_ACKNOWLEDGE);
269            for (int i = 0; i < inboundTopicBridges.length; i++) {
270                InboundTopicBridge bridge = inboundTopicBridges[i];
271                String localTopicName = bridge.getLocalTopicName();
272                Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
273                String topicName = bridge.getInboundTopicName();
274                Topic foreignTopic = createForeignTopic(outboundSession, topicName);
275                bridge.setConsumerTopic(foreignTopic);
276                bridge.setProducerTopic(activemqTopic);
277                bridge.setProducerConnection(localTopicConnection);
278                bridge.setConsumerConnection(outboundTopicConnection);
279                if (bridge.getJmsMessageConvertor() == null) {
280                    bridge.setJmsMessageConvertor(getInboundMessageConvertor());
281                }
282                bridge.setJmsConnector(this);
283                addInboundBridge(bridge);
284            }
285            outboundSession.close();
286            localSession.close();
287        }
288    }
289
290    protected void initializeOutboundTopicBridges() throws JMSException {
291        if (outboundTopicBridges != null) {
292            TopicSession outboundSession = outboundTopicConnection
293                .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
294            TopicSession localSession = localTopicConnection.createTopicSession(false,
295                                                                                Session.AUTO_ACKNOWLEDGE);
296            for (int i = 0; i < outboundTopicBridges.length; i++) {
297                OutboundTopicBridge bridge = outboundTopicBridges[i];
298                String localTopicName = bridge.getLocalTopicName();
299                Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
300                String topicName = bridge.getOutboundTopicName();
301                Topic foreignTopic = createForeignTopic(outboundSession, topicName);
302                bridge.setConsumerTopic(activemqTopic);
303                bridge.setProducerTopic(foreignTopic);
304                bridge.setProducerConnection(outboundTopicConnection);
305                bridge.setConsumerConnection(localTopicConnection);
306                if (bridge.getJmsMessageConvertor() == null) {
307                    bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
308                }
309                bridge.setJmsConnector(this);
310                addOutboundBridge(bridge);
311            }
312            outboundSession.close();
313            localSession.close();
314        }
315    }
316
317    protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
318                                              Connection replyToConsumerConnection) {
319        Topic replyToProducerTopic = (Topic)destination;
320        boolean isInbound = replyToProducerConnection.equals(localTopicConnection);
321
322        if (isInbound) {
323            InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic);
324            if (bridge == null) {
325                bridge = new InboundTopicBridge() {
326                    protected Destination processReplyToDestination(Destination destination) {
327                        return null;
328                    }
329                };
330                try {
331                    TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
332                        .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
333                    Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
334                    replyToConsumerSession.close();
335                    bridge.setConsumerTopic(replyToConsumerTopic);
336                    bridge.setProducerTopic(replyToProducerTopic);
337                    bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
338                    bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
339                    bridge.setDoHandleReplyTo(false);
340                    if (bridge.getJmsMessageConvertor() == null) {
341                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
342                    }
343                    bridge.setJmsConnector(this);
344                    bridge.start();
345                    LOG.info("Created replyTo bridge for " + replyToProducerTopic);
346                } catch (Exception e) {
347                    LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
348                    return null;
349                }
350                replyToBridges.put(replyToProducerTopic, bridge);
351            }
352            return bridge.getConsumerTopic();
353        } else {
354            OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic);
355            if (bridge == null) {
356                bridge = new OutboundTopicBridge() {
357                    protected Destination processReplyToDestination(Destination destination) {
358                        return null;
359                    }
360                };
361                try {
362                    TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
363                        .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
364                    Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
365                    replyToConsumerSession.close();
366                    bridge.setConsumerTopic(replyToConsumerTopic);
367                    bridge.setProducerTopic(replyToProducerTopic);
368                    bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
369                    bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
370                    bridge.setDoHandleReplyTo(false);
371                    if (bridge.getJmsMessageConvertor() == null) {
372                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
373                    }
374                    bridge.setJmsConnector(this);
375                    bridge.start();
376                    LOG.info("Created replyTo bridge for " + replyToProducerTopic);
377                } catch (Exception e) {
378                    LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
379                    return null;
380                }
381                replyToBridges.put(replyToProducerTopic, bridge);
382            }
383            return bridge.getConsumerTopic();
384        }
385    }
386
387    protected Topic createActiveMQTopic(TopicSession session, String topicName) throws JMSException {
388        return session.createTopic(topicName);
389    }
390
391    protected Topic createForeignTopic(TopicSession session, String topicName) throws JMSException {
392        Topic result = null;
393        try {
394            result = session.createTopic(topicName);
395        } catch (JMSException e) {
396            // look-up the Topic
397            try {
398                result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class);
399            } catch (NamingException e1) {
400                String errStr = "Failed to look-up Topic for name: " + topicName;
401                LOG.error(errStr, e);
402                JMSException jmsEx = new JMSException(errStr);
403                jmsEx.setLinkedException(e1);
404                throw jmsEx;
405            }
406        }
407        return result;
408    }
409
410}