001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.net.URI;
020import java.net.URISyntaxException;
021import java.util.Collection;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.CopyOnWriteArrayList;
029
030import javax.management.MalformedObjectNameException;
031import javax.management.ObjectName;
032
033import org.apache.activemq.Service;
034import org.apache.activemq.broker.BrokerService;
035import org.apache.activemq.broker.jmx.AnnotatedMBean;
036import org.apache.activemq.broker.jmx.NetworkBridgeView;
037import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
038import org.apache.activemq.command.ActiveMQDestination;
039import org.apache.activemq.command.ConsumerId;
040import org.apache.activemq.transport.Transport;
041import org.apache.activemq.transport.TransportFactory;
042import org.apache.activemq.util.JMXSupport;
043import org.apache.activemq.util.ServiceStopper;
044import org.apache.activemq.util.ServiceSupport;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * 
050 */
051public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service {
052
053    private static final Logger LOG = LoggerFactory.getLogger(NetworkConnector.class);
054    protected URI localURI;
055    protected ConnectionFilter connectionFilter;
056    protected ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
057    
058    protected ServiceSupport serviceSupport = new ServiceSupport() {
059
060        protected void doStart() throws Exception {
061            handleStart();
062        }
063
064        protected void doStop(ServiceStopper stopper) throws Exception {
065            handleStop(stopper);
066        }
067    };
068
069    private Set<ActiveMQDestination> durableDestinations;
070    private List<ActiveMQDestination> excludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
071    private List<ActiveMQDestination> dynamicallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
072    private List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
073    private BrokerService brokerService;
074    private ObjectName objectName;
075    
076    public NetworkConnector() {
077    }
078
079    public NetworkConnector(URI localURI) {
080        this.localURI = localURI;
081    }
082
083    public URI getLocalUri() throws URISyntaxException {
084        return localURI;
085    }
086
087    public void setLocalUri(URI localURI) {
088        this.localURI = localURI;
089    }
090
091    /**
092     * @return Returns the durableDestinations.
093     */
094    public Set getDurableDestinations() {
095        return durableDestinations;
096    }
097
098    /**
099     * @param durableDestinations The durableDestinations to set.
100     */
101    public void setDurableDestinations(Set<ActiveMQDestination> durableDestinations) {
102        this.durableDestinations = durableDestinations;
103    }
104
105    /**
106     * @return Returns the excludedDestinations.
107     */
108    public List<ActiveMQDestination> getExcludedDestinations() {
109        return excludedDestinations;
110    }
111
112    /**
113     * @param excludedDestinations The excludedDestinations to set.
114     */
115    public void setExcludedDestinations(List<ActiveMQDestination> excludedDestinations) {
116        this.excludedDestinations = excludedDestinations;
117    }
118
119    public void addExcludedDestination(ActiveMQDestination destiantion) {
120        this.excludedDestinations.add(destiantion);
121    }
122
123    /**
124     * @return Returns the staticallyIncludedDestinations.
125     */
126    public List<ActiveMQDestination> getStaticallyIncludedDestinations() {
127        return staticallyIncludedDestinations;
128    }
129
130    /**
131     * @param staticallyIncludedDestinations The staticallyIncludedDestinations
132     *                to set.
133     */
134    public void setStaticallyIncludedDestinations(List<ActiveMQDestination> staticallyIncludedDestinations) {
135        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
136    }
137
138    public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) {
139        this.staticallyIncludedDestinations.add(destiantion);
140    }
141
142    /**
143     * @return Returns the dynamicallyIncludedDestinations.
144     */
145    public List<ActiveMQDestination> getDynamicallyIncludedDestinations() {
146        return dynamicallyIncludedDestinations;
147    }
148
149    /**
150     * @param dynamicallyIncludedDestinations The
151     *                dynamicallyIncludedDestinations to set.
152     */
153    public void setDynamicallyIncludedDestinations(List<ActiveMQDestination> dynamicallyIncludedDestinations) {
154        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
155    }
156
157    public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) {
158        this.dynamicallyIncludedDestinations.add(destiantion);
159    }
160
161    public ConnectionFilter getConnectionFilter() {
162        return connectionFilter;
163    }
164
165    public void setConnectionFilter(ConnectionFilter connectionFilter) {
166        this.connectionFilter = connectionFilter;
167    }
168
169    // Implementation methods
170    // -------------------------------------------------------------------------
171    protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result) {
172        List<ActiveMQDestination> destsList = getDynamicallyIncludedDestinations();
173        ActiveMQDestination dests[] = destsList.toArray(new ActiveMQDestination[destsList.size()]);
174        result.setDynamicallyIncludedDestinations(dests);
175        destsList = getExcludedDestinations();
176        dests = destsList.toArray(new ActiveMQDestination[destsList.size()]);
177        result.setExcludedDestinations(dests);
178        destsList = getStaticallyIncludedDestinations();
179        dests = destsList.toArray(new ActiveMQDestination[destsList.size()]);
180        result.setStaticallyIncludedDestinations(dests);
181        if (durableDestinations != null) {
182            
183            HashSet<ActiveMQDestination> topics = new HashSet<ActiveMQDestination>();
184            for (ActiveMQDestination d : durableDestinations) {
185                if( d.isTopic() ) {
186                    topics.add(d);
187                }
188            }
189            
190            ActiveMQDestination[] dest = new ActiveMQDestination[topics.size()];
191            dest = (ActiveMQDestination[])topics.toArray(dest);
192            result.setDurableDestinations(dest);
193        }
194        return result;
195    }
196
197    protected Transport createLocalTransport() throws Exception {
198        return TransportFactory.connect(localURI);
199    }
200
201    public void start() throws Exception {
202        serviceSupport.start();
203    }
204
205    public void stop() throws Exception {
206        serviceSupport.stop();
207    }
208
209    protected void handleStart() throws Exception {
210        if (localURI == null) {
211            throw new IllegalStateException("You must configure the 'localURI' property");
212        }
213        LOG.info("Network Connector " + this + " Started");
214    }
215
216    protected void handleStop(ServiceStopper stopper) throws Exception {
217        LOG.info("Network Connector " + this + " Stopped");
218    }
219
220    public ObjectName getObjectName() {
221        return objectName;
222    }
223
224    public void setObjectName(ObjectName objectName) {
225        this.objectName = objectName;
226    }
227
228    public BrokerService getBrokerService() {
229        return brokerService;
230    }
231
232    public void setBrokerService(BrokerService brokerService) {
233        this.brokerService = brokerService;
234    }
235
236    protected void registerNetworkBridgeMBean(NetworkBridge bridge) {
237        if (!getBrokerService().isUseJmx()) {
238            return;
239        }
240        NetworkBridgeViewMBean view = new NetworkBridgeView(bridge);
241        try {
242            ObjectName objectName = createNetworkBridgeObjectName(bridge);
243            AnnotatedMBean.registerMBean(getBrokerService().getManagementContext(), view, objectName);
244        } catch (Throwable e) {
245            LOG.debug("Network bridge could not be registered in JMX: " + e.getMessage(), e);
246        }
247    }
248
249    protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) {
250        if (!getBrokerService().isUseJmx()) {
251            return;
252        }
253        try {
254            ObjectName objectName = createNetworkBridgeObjectName(bridge);
255            getBrokerService().getManagementContext().unregisterMBean(objectName);
256        } catch (Throwable e) {
257            LOG.debug("Network bridge could not be unregistered in JMX: " + e.getMessage(), e);
258        }
259    }
260    
261
262    @SuppressWarnings("unchecked")
263    protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException {
264        ObjectName connectorName = getObjectName();
265        Map<String, String> map = new HashMap<String, String>(connectorName.getKeyPropertyList());
266        return new ObjectName(connectorName.getDomain() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart((String)map.get("BrokerName")) + "," + "Type=NetworkBridge,"
267                              + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart((String)map.get("NetworkConnectorName")) + "," + "Name="
268                              + JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge.getRemoteAddress())));
269    }
270
271    // ask all the bridges as we can't know to which this consumer is tied
272    public boolean removeDemandSubscription(ConsumerId consumerId) {
273        boolean removeSucceeded = false;
274        for (NetworkBridge bridge : bridges.values()) {
275            if (bridge instanceof DemandForwardingBridgeSupport) {
276                DemandForwardingBridgeSupport demandBridge = (DemandForwardingBridgeSupport) bridge;
277                if (demandBridge.removeDemandSubscriptionByLocalId(consumerId)) {
278                    removeSucceeded = true;
279                    break;
280                }
281            }
282        }
283        return removeSucceeded;
284    }
285    
286    public Collection<NetworkBridge> activeBridges() {
287        return bridges.values();
288    }
289
290}