001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import java.util.Iterator;
020import java.util.List;
021import java.util.Map;
022import java.util.concurrent.CopyOnWriteArrayList;
023import java.util.concurrent.atomic.AtomicBoolean;
024
025import javax.jms.Connection;
026import javax.jms.Destination;
027import javax.jms.JMSException;
028import javax.naming.NamingException;
029
030import org.apache.activemq.ActiveMQConnectionFactory;
031import org.apache.activemq.Service;
032import org.apache.activemq.broker.BrokerService;
033import org.apache.activemq.util.LRUCache;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.springframework.jndi.JndiTemplate;
037
038/**
039 * This bridge joins the gap between foreign JMS providers and ActiveMQ As some
040 * JMS providers are still only 1.0.1 compliant, this bridge itself aimed to be
041 * JMS 1.0.2 compliant.
042 * 
043 * 
044 */
045public abstract class JmsConnector implements Service {
046
047    private static int nextId;
048    private static final Logger LOG = LoggerFactory.getLogger(JmsConnector.class);
049    
050    protected JndiTemplate jndiLocalTemplate;
051    protected JndiTemplate jndiOutboundTemplate;
052    protected JmsMesageConvertor inboundMessageConvertor;
053    protected JmsMesageConvertor outboundMessageConvertor;
054    protected AtomicBoolean initialized = new AtomicBoolean(false);
055    protected AtomicBoolean started = new AtomicBoolean(false);
056    protected ActiveMQConnectionFactory embeddedConnectionFactory;
057    protected int replyToDestinationCacheSize = 10000;
058    protected String outboundUsername;
059    protected String outboundPassword;
060    protected String localUsername;
061    protected String localPassword;
062    protected String outboundClientId;
063    protected String localClientId;
064    protected LRUCache replyToBridges = createLRUCache();
065
066    private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
067    private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
068    private String name;
069
070
071    private static LRUCache createLRUCache() {
072        return new LRUCache() {
073            private static final long serialVersionUID = -7446792754185879286L;
074
075            protected boolean removeEldestEntry(Map.Entry enty) {
076                if (size() > maxCacheSize) {
077                    Iterator iter = entrySet().iterator();
078                    Map.Entry lru = (Map.Entry)iter.next();
079                    remove(lru.getKey());
080                    DestinationBridge bridge = (DestinationBridge)lru.getValue();
081                    try {
082                        bridge.stop();
083                        LOG.info("Expired bridge: " + bridge);
084                    } catch (Exception e) {
085                        LOG.warn("stopping expired bridge" + bridge + " caused an exception", e);
086                    }
087                }
088                return false;
089            }
090        };
091    }
092
093    /**
094     */
095    public boolean init() {
096        boolean result = initialized.compareAndSet(false, true);
097        if (result) {
098            if (jndiLocalTemplate == null) {
099                jndiLocalTemplate = new JndiTemplate();
100            }
101            if (jndiOutboundTemplate == null) {
102                jndiOutboundTemplate = new JndiTemplate();
103            }
104            if (inboundMessageConvertor == null) {
105                inboundMessageConvertor = new SimpleJmsMessageConvertor();
106            }
107            if (outboundMessageConvertor == null) {
108                outboundMessageConvertor = new SimpleJmsMessageConvertor();
109            }
110            replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
111        }
112        return result;
113    }
114
115    public void start() throws Exception {
116        init();
117        if (started.compareAndSet(false, true)) {
118            for (int i = 0; i < inboundBridges.size(); i++) {
119                DestinationBridge bridge = inboundBridges.get(i);
120                bridge.start();
121            }
122            for (int i = 0; i < outboundBridges.size(); i++) {
123                DestinationBridge bridge = outboundBridges.get(i);
124                bridge.start();
125            }
126            LOG.info("JMS Connector " + getName() + " Started");
127        }
128    }
129
130    public void stop() throws Exception {
131        if (started.compareAndSet(true, false)) {
132            for (int i = 0; i < inboundBridges.size(); i++) {
133                DestinationBridge bridge = inboundBridges.get(i);
134                bridge.stop();
135            }
136            for (int i = 0; i < outboundBridges.size(); i++) {
137                DestinationBridge bridge = outboundBridges.get(i);
138                bridge.stop();
139            }
140            LOG.info("JMS Connector " + getName() + " Stopped");
141        }
142    }
143    
144    public void clearBridges() {
145        inboundBridges.clear();
146        outboundBridges.clear();
147    }
148
149    protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection);
150
151    /**
152     * One way to configure the local connection - this is called by The
153     * BrokerService when the Connector is embedded
154     * 
155     * @param service
156     */
157    public void setBrokerService(BrokerService service) {
158        embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI());
159    }
160
161    /**
162     * @return Returns the jndiTemplate.
163     */
164    public JndiTemplate getJndiLocalTemplate() {
165        return jndiLocalTemplate;
166    }
167
168    /**
169     * @param jndiTemplate The jndiTemplate to set.
170     */
171    public void setJndiLocalTemplate(JndiTemplate jndiTemplate) {
172        this.jndiLocalTemplate = jndiTemplate;
173    }
174
175    /**
176     * @return Returns the jndiOutboundTemplate.
177     */
178    public JndiTemplate getJndiOutboundTemplate() {
179        return jndiOutboundTemplate;
180    }
181
182    /**
183     * @param jndiOutboundTemplate The jndiOutboundTemplate to set.
184     */
185    public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate) {
186        this.jndiOutboundTemplate = jndiOutboundTemplate;
187    }
188
189    /**
190     * @return Returns the inboundMessageConvertor.
191     */
192    public JmsMesageConvertor getInboundMessageConvertor() {
193        return inboundMessageConvertor;
194    }
195
196    /**
197     * @param inboundMessageConvertor The inboundMessageConvertor to set.
198     */
199    public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
200        this.inboundMessageConvertor = jmsMessageConvertor;
201    }
202
203    /**
204     * @return Returns the outboundMessageConvertor.
205     */
206    public JmsMesageConvertor getOutboundMessageConvertor() {
207        return outboundMessageConvertor;
208    }
209
210    /**
211     * @param outboundMessageConvertor The outboundMessageConvertor to set.
212     */
213    public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) {
214        this.outboundMessageConvertor = outboundMessageConvertor;
215    }
216
217    /**
218     * @return Returns the replyToDestinationCacheSize.
219     */
220    public int getReplyToDestinationCacheSize() {
221        return replyToDestinationCacheSize;
222    }
223
224    /**
225     * @param replyToDestinationCacheSize The replyToDestinationCacheSize to
226     *                set.
227     */
228    public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) {
229        this.replyToDestinationCacheSize = replyToDestinationCacheSize;
230    }
231
232    /**
233     * @return Returns the localPassword.
234     */
235    public String getLocalPassword() {
236        return localPassword;
237    }
238
239    /**
240     * @param localPassword The localPassword to set.
241     */
242    public void setLocalPassword(String localPassword) {
243        this.localPassword = localPassword;
244    }
245
246    /**
247     * @return Returns the localUsername.
248     */
249    public String getLocalUsername() {
250        return localUsername;
251    }
252
253    /**
254     * @param localUsername The localUsername to set.
255     */
256    public void setLocalUsername(String localUsername) {
257        this.localUsername = localUsername;
258    }
259
260    /**
261     * @return Returns the outboundPassword.
262     */
263    public String getOutboundPassword() {
264        return outboundPassword;
265    }
266
267    /**
268     * @param outboundPassword The outboundPassword to set.
269     */
270    public void setOutboundPassword(String outboundPassword) {
271        this.outboundPassword = outboundPassword;
272    }
273
274    /**
275     * @return Returns the outboundUsername.
276     */
277    public String getOutboundUsername() {
278        return outboundUsername;
279    }
280
281    /**
282     * @param outboundUsername The outboundUsername to set.
283     */
284    public void setOutboundUsername(String outboundUsername) {
285        this.outboundUsername = outboundUsername;
286    }
287    
288    /**
289     * @return the outboundClientId
290     */
291    public String getOutboundClientId() {
292        return outboundClientId;
293    }
294
295    /**
296     * @param outboundClientId the outboundClientId to set
297     */
298    public void setOutboundClientId(String outboundClientId) {
299        this.outboundClientId = outboundClientId;
300    }
301
302    /**
303     * @return the localClientId
304     */
305    public String getLocalClientId() {
306        return localClientId;
307    }
308
309    /**
310     * @param localClientId the localClientId to set
311     */
312    public void setLocalClientId(String localClientId) {
313        this.localClientId = localClientId;
314    }
315    
316    
317    protected void addInboundBridge(DestinationBridge bridge) {
318        inboundBridges.add(bridge);
319    }
320
321    protected void addOutboundBridge(DestinationBridge bridge) {
322        outboundBridges.add(bridge);
323    }
324
325    protected void removeInboundBridge(DestinationBridge bridge) {
326        inboundBridges.remove(bridge);
327    }
328
329    protected void removeOutboundBridge(DestinationBridge bridge) {
330        outboundBridges.remove(bridge);
331    }
332
333    public String getName() {
334        if (name == null) {
335            name = "Connector:" + getNextId();
336        }
337        return name;
338    }
339
340    private static synchronized int getNextId() {
341        return nextId++;
342    }
343
344    public void setName(String name) {
345        this.name = name;
346    }
347
348    public abstract void restartProducerConnection() throws NamingException, JMSException;
349}