001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import javax.jms.Connection;
020import javax.jms.Destination;
021import javax.jms.JMSException;
022import javax.jms.Queue;
023import javax.jms.QueueConnection;
024import javax.jms.QueueConnectionFactory;
025import javax.jms.QueueSession;
026import javax.jms.Session;
027import javax.naming.NamingException;
028
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * A Bridge to other JMS Queue providers
034 * 
035 * @org.apache.xbean.XBean
036 * 
037 * 
038 */
039public class JmsQueueConnector extends JmsConnector {
040    private static final Logger LOG = LoggerFactory.getLogger(JmsQueueConnector.class);
041    private String outboundQueueConnectionFactoryName;
042    private String localConnectionFactoryName;
043    private QueueConnectionFactory outboundQueueConnectionFactory;
044    private QueueConnectionFactory localQueueConnectionFactory;
045    private QueueConnection outboundQueueConnection;
046    private QueueConnection localQueueConnection;
047    private InboundQueueBridge[] inboundQueueBridges;
048    private OutboundQueueBridge[] outboundQueueBridges;
049
050    public boolean init() {
051        boolean result = super.init();
052        if (result) {
053            try {
054                initializeForeignQueueConnection();
055                initializeLocalQueueConnection();
056                initializeInboundJmsMessageConvertor();
057                initializeOutboundJmsMessageConvertor();
058                initializeInboundQueueBridges();
059                initializeOutboundQueueBridges();
060            } catch (Exception e) {
061                LOG.error("Failed to initialize the JMSConnector", e);
062            }
063        }
064        return result;
065    }
066
067    /**
068     * @return Returns the inboundQueueBridges.
069     */
070    public InboundQueueBridge[] getInboundQueueBridges() {
071        return inboundQueueBridges;
072    }
073
074    /**
075     * @param inboundQueueBridges The inboundQueueBridges to set.
076     */
077    public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) {
078        this.inboundQueueBridges = inboundQueueBridges;
079    }
080
081    /**
082     * @return Returns the outboundQueueBridges.
083     */
084    public OutboundQueueBridge[] getOutboundQueueBridges() {
085        return outboundQueueBridges;
086    }
087
088    /**
089     * @param outboundQueueBridges The outboundQueueBridges to set.
090     */
091    public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) {
092        this.outboundQueueBridges = outboundQueueBridges;
093    }
094
095    /**
096     * @return Returns the localQueueConnectionFactory.
097     */
098    public QueueConnectionFactory getLocalQueueConnectionFactory() {
099        return localQueueConnectionFactory;
100    }
101
102    /**
103     * @param localQueueConnectionFactory The localQueueConnectionFactory to
104     *                set.
105     */
106    public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) {
107        this.localQueueConnectionFactory = localConnectionFactory;
108    }
109
110    /**
111     * @return Returns the outboundQueueConnectionFactory.
112     */
113    public QueueConnectionFactory getOutboundQueueConnectionFactory() {
114        return outboundQueueConnectionFactory;
115    }
116
117    /**
118     * @return Returns the outboundQueueConnectionFactoryName.
119     */
120    public String getOutboundQueueConnectionFactoryName() {
121        return outboundQueueConnectionFactoryName;
122    }
123
124    /**
125     * @param outboundQueueConnectionFactoryName The
126     *                outboundQueueConnectionFactoryName to set.
127     */
128    public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) {
129        this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName;
130    }
131
132    /**
133     * @return Returns the localConnectionFactoryName.
134     */
135    public String getLocalConnectionFactoryName() {
136        return localConnectionFactoryName;
137    }
138
139    /**
140     * @param localConnectionFactoryName The localConnectionFactoryName to set.
141     */
142    public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
143        this.localConnectionFactoryName = localConnectionFactoryName;
144    }
145
146    /**
147     * @return Returns the localQueueConnection.
148     */
149    public QueueConnection getLocalQueueConnection() {
150        return localQueueConnection;
151    }
152
153    /**
154     * @param localQueueConnection The localQueueConnection to set.
155     */
156    public void setLocalQueueConnection(QueueConnection localQueueConnection) {
157        this.localQueueConnection = localQueueConnection;
158    }
159
160    /**
161     * @return Returns the outboundQueueConnection.
162     */
163    public QueueConnection getOutboundQueueConnection() {
164        return outboundQueueConnection;
165    }
166
167    /**
168     * @param outboundQueueConnection The outboundQueueConnection to set.
169     */
170    public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) {
171        this.outboundQueueConnection = foreignQueueConnection;
172    }
173
174    /**
175     * @param outboundQueueConnectionFactory The outboundQueueConnectionFactory
176     *                to set.
177     */
178    public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) {
179        this.outboundQueueConnectionFactory = foreignQueueConnectionFactory;
180    }
181
182    public void restartProducerConnection() throws NamingException, JMSException {
183        outboundQueueConnection = null;
184        initializeForeignQueueConnection();
185
186        // the outboundQueueConnection was reestablished - publish the new connection to the bridges
187        if (inboundQueueBridges != null) {
188                for (int i = 0; i < inboundQueueBridges.length; i++) {
189                        InboundQueueBridge bridge = inboundQueueBridges[i];
190                        bridge.setConsumerConnection(outboundQueueConnection);
191                }
192        }
193        if (outboundQueueBridges != null) {
194                for (int i = 0; i < outboundQueueBridges.length; i++) {
195                        OutboundQueueBridge bridge = outboundQueueBridges[i];
196                        bridge.setProducerConnection(outboundQueueConnection);
197                }
198        }
199    }
200
201    protected void initializeForeignQueueConnection() throws NamingException, JMSException {
202        if (outboundQueueConnection == null) {
203            // get the connection factories
204            if (outboundQueueConnectionFactory == null) {
205                // look it up from JNDI
206                if (outboundQueueConnectionFactoryName != null) {
207                    outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate
208                        .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class);
209                    if (outboundUsername != null) {
210                        outboundQueueConnection = outboundQueueConnectionFactory
211                            .createQueueConnection(outboundUsername, outboundPassword);
212                    } else {
213                        outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection();
214                    }
215                } else {
216                    throw new JMSException("Cannot create foreignConnection - no information");
217                }
218            } else {
219                if (outboundUsername != null) {
220                    outboundQueueConnection = outboundQueueConnectionFactory
221                        .createQueueConnection(outboundUsername, outboundPassword);
222                } else {
223                    outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection();
224                }
225            }
226        }
227        if (localClientId != null && localClientId.length() > 0) {
228            outboundQueueConnection.setClientID(getOutboundClientId());
229        }
230        outboundQueueConnection.start();
231    }
232
233    protected void initializeLocalQueueConnection() throws NamingException, JMSException {
234        if (localQueueConnection == null) {
235            // get the connection factories
236            if (localQueueConnectionFactory == null) {
237                if (embeddedConnectionFactory == null) {
238                    // look it up from JNDI
239                    if (localConnectionFactoryName != null) {
240                        localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate
241                            .lookup(localConnectionFactoryName, QueueConnectionFactory.class);
242                        if (localUsername != null) {
243                            localQueueConnection = localQueueConnectionFactory
244                                .createQueueConnection(localUsername, localPassword);
245                        } else {
246                            localQueueConnection = localQueueConnectionFactory.createQueueConnection();
247                        }
248                    } else {
249                        throw new JMSException("Cannot create localConnection - no information");
250                    }
251                } else {
252                    localQueueConnection = embeddedConnectionFactory.createQueueConnection();
253                }
254            } else {
255                if (localUsername != null) {
256                    localQueueConnection = localQueueConnectionFactory.createQueueConnection(localUsername,
257                                                                                             localPassword);
258                } else {
259                    localQueueConnection = localQueueConnectionFactory.createQueueConnection();
260                }
261            }
262        }
263        if (localClientId != null && localClientId.length() > 0) {
264            localQueueConnection.setClientID(getLocalClientId());
265        }
266        localQueueConnection.start();
267    }
268
269    protected void initializeInboundJmsMessageConvertor() {
270        inboundMessageConvertor.setConnection(localQueueConnection);
271    }
272
273    protected void initializeOutboundJmsMessageConvertor() {
274        outboundMessageConvertor.setConnection(outboundQueueConnection);
275    }
276
277    protected void initializeInboundQueueBridges() throws JMSException {
278        if (inboundQueueBridges != null) {
279            QueueSession outboundSession = outboundQueueConnection
280                .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
281            QueueSession localSession = localQueueConnection.createQueueSession(false,
282                                                                                Session.AUTO_ACKNOWLEDGE);
283            for (int i = 0; i < inboundQueueBridges.length; i++) {
284                InboundQueueBridge bridge = inboundQueueBridges[i];
285                String localQueueName = bridge.getLocalQueueName();
286                Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
287                String queueName = bridge.getInboundQueueName();
288                Queue foreignQueue = createForeignQueue(outboundSession, queueName);
289                bridge.setConsumerQueue(foreignQueue);
290                bridge.setProducerQueue(activemqQueue);
291                bridge.setProducerConnection(localQueueConnection);
292                bridge.setConsumerConnection(outboundQueueConnection);
293                if (bridge.getJmsMessageConvertor() == null) {
294                    bridge.setJmsMessageConvertor(getInboundMessageConvertor());
295                }
296                bridge.setJmsConnector(this);
297                addInboundBridge(bridge);
298            }
299            outboundSession.close();
300            localSession.close();
301        }
302    }
303
304    protected void initializeOutboundQueueBridges() throws JMSException {
305        if (outboundQueueBridges != null) {
306            QueueSession outboundSession = outboundQueueConnection
307                .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
308            QueueSession localSession = localQueueConnection.createQueueSession(false,
309                                                                                Session.AUTO_ACKNOWLEDGE);
310            for (int i = 0; i < outboundQueueBridges.length; i++) {
311                OutboundQueueBridge bridge = outboundQueueBridges[i];
312                String localQueueName = bridge.getLocalQueueName();
313                Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
314                String queueName = bridge.getOutboundQueueName();
315                Queue foreignQueue = createForeignQueue(outboundSession, queueName);
316                bridge.setConsumerQueue(activemqQueue);
317                bridge.setProducerQueue(foreignQueue);
318                bridge.setProducerConnection(outboundQueueConnection);
319                bridge.setConsumerConnection(localQueueConnection);
320                if (bridge.getJmsMessageConvertor() == null) {
321                    bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
322                }
323                bridge.setJmsConnector(this);
324                addOutboundBridge(bridge);
325            }
326            outboundSession.close();
327            localSession.close();
328        }
329    }
330
331    protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
332                                              Connection replyToConsumerConnection) {
333        Queue replyToProducerQueue = (Queue)destination;
334        boolean isInbound = replyToProducerConnection.equals(localQueueConnection);
335
336        if (isInbound) {
337            InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue);
338            if (bridge == null) {
339                bridge = new InboundQueueBridge() {
340                    protected Destination processReplyToDestination(Destination destination) {
341                        return null;
342                    }
343                };
344                try {
345                    QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
346                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
347                    Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
348                    replyToConsumerSession.close();
349                    bridge.setConsumerQueue(replyToConsumerQueue);
350                    bridge.setProducerQueue(replyToProducerQueue);
351                    bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
352                    bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
353                    bridge.setDoHandleReplyTo(false);
354                    if (bridge.getJmsMessageConvertor() == null) {
355                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
356                    }
357                    bridge.setJmsConnector(this);
358                    bridge.start();
359                    LOG.info("Created replyTo bridge for " + replyToProducerQueue);
360                } catch (Exception e) {
361                    LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
362                    return null;
363                }
364                replyToBridges.put(replyToProducerQueue, bridge);
365            }
366            return bridge.getConsumerQueue();
367        } else {
368            OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue);
369            if (bridge == null) {
370                bridge = new OutboundQueueBridge() {
371                    protected Destination processReplyToDestination(Destination destination) {
372                        return null;
373                    }
374                };
375                try {
376                    QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
377                        .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
378                    Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
379                    replyToConsumerSession.close();
380                    bridge.setConsumerQueue(replyToConsumerQueue);
381                    bridge.setProducerQueue(replyToProducerQueue);
382                    bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
383                    bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
384                    bridge.setDoHandleReplyTo(false);
385                    if (bridge.getJmsMessageConvertor() == null) {
386                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
387                    }
388                    bridge.setJmsConnector(this);
389                    bridge.start();
390                    LOG.info("Created replyTo bridge for " + replyToProducerQueue);
391                } catch (Exception e) {
392                    LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
393                    return null;
394                }
395                replyToBridges.put(replyToProducerQueue, bridge);
396            }
397            return bridge.getConsumerQueue();
398        }
399    }
400
401    protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException {
402        return session.createQueue(queueName);
403    }
404
405    protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException {
406        Queue result = null;
407        try {
408            result = session.createQueue(queueName);
409        } catch (JMSException e) {
410            // look-up the Queue
411            try {
412                result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class);
413            } catch (NamingException e1) {
414                String errStr = "Failed to look-up Queue for name: " + queueName;
415                LOG.error(errStr, e);
416                JMSException jmsEx = new JMSException(errStr);
417                jmsEx.setLinkedException(e1);
418                throw jmsEx;
419            }
420        }
421        return result;
422    }
423
424}