001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.StringTokenizer;
025import java.util.concurrent.CopyOnWriteArrayList;
026import java.util.regex.Pattern;
027import javax.management.ObjectName;
028import org.apache.activemq.broker.jmx.ManagedTransportConnector;
029import org.apache.activemq.broker.jmx.ManagementContext;
030import org.apache.activemq.broker.region.ConnectorStatistics;
031import org.apache.activemq.command.BrokerInfo;
032import org.apache.activemq.command.ConnectionControl;
033import org.apache.activemq.security.MessageAuthorizationPolicy;
034import org.apache.activemq.thread.DefaultThreadPools;
035import org.apache.activemq.thread.TaskRunnerFactory;
036import org.apache.activemq.transport.Transport;
037import org.apache.activemq.transport.TransportAcceptListener;
038import org.apache.activemq.transport.TransportFactory;
039import org.apache.activemq.transport.TransportServer;
040import org.apache.activemq.transport.discovery.DiscoveryAgent;
041import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
042import org.apache.activemq.util.MDCHelper;
043import org.apache.activemq.util.ServiceStopper;
044import org.apache.activemq.util.ServiceSupport;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * @org.apache.xbean.XBean
050 * 
051 */
052public class TransportConnector implements Connector, BrokerServiceAware {
053
054    final Logger LOG = LoggerFactory.getLogger(TransportConnector.class);
055
056    protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
057    protected TransportStatusDetector statusDector;
058    private BrokerService brokerService;
059    private TransportServer server;
060    private URI uri;
061    private BrokerInfo brokerInfo = new BrokerInfo();
062    private TaskRunnerFactory taskRunnerFactory;
063    private MessageAuthorizationPolicy messageAuthorizationPolicy;
064    private DiscoveryAgent discoveryAgent;
065    private final ConnectorStatistics statistics = new ConnectorStatistics();
066    private URI discoveryUri;
067    private URI connectUri;
068    private String name;
069    private boolean disableAsyncDispatch;
070    private boolean enableStatusMonitor = false;
071    private Broker broker;
072    private boolean updateClusterClients = false;
073    private boolean rebalanceClusterClients;
074    private boolean updateClusterClientsOnRemove = false;
075    private String updateClusterFilter;
076
077    public TransportConnector() {
078    }
079
080    public TransportConnector(TransportServer server) {
081        this();
082        setServer(server);
083        if (server != null && server.getConnectURI() != null) {
084            URI uri = server.getConnectURI();
085            if (uri != null && uri.getScheme().equals("vm")) {
086                setEnableStatusMonitor(false);
087            }
088        }
089
090    }
091
092    /**
093     * @return Returns the connections.
094     */
095    public CopyOnWriteArrayList<TransportConnection> getConnections() {
096        return connections;
097    }
098
099    /**
100     * Factory method to create a JMX managed version of this transport
101     * connector
102     */
103    public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName)
104            throws IOException, URISyntaxException {
105        ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
106        rc.setBrokerInfo(getBrokerInfo());
107        rc.setConnectUri(getConnectUri());
108        rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
109        rc.setDiscoveryAgent(getDiscoveryAgent());
110        rc.setDiscoveryUri(getDiscoveryUri());
111        rc.setEnableStatusMonitor(isEnableStatusMonitor());
112        rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
113        rc.setName(getName());
114        rc.setTaskRunnerFactory(getTaskRunnerFactory());
115        rc.setUri(getUri());
116        rc.setBrokerService(brokerService);
117        rc.setUpdateClusterClients(isUpdateClusterClients());
118        rc.setRebalanceClusterClients(isRebalanceClusterClients());
119        rc.setUpdateClusterFilter(getUpdateClusterFilter());
120        rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove());
121        return rc;
122    }
123
124    public BrokerInfo getBrokerInfo() {
125        return brokerInfo;
126    }
127
128    public void setBrokerInfo(BrokerInfo brokerInfo) {
129        this.brokerInfo = brokerInfo;
130    }
131
132    /**
133     * 
134     * @deprecated use the {@link #setBrokerService(BrokerService)} method
135     *             instead.
136     */
137    @Deprecated
138    public void setBrokerName(String name) {
139        if (this.brokerInfo == null) {
140            this.brokerInfo = new BrokerInfo();
141        }
142        this.brokerInfo.setBrokerName(name);
143    }
144
145    public TransportServer getServer() throws IOException, URISyntaxException {
146        if (server == null) {
147            setServer(createTransportServer());
148        }
149        return server;
150    }
151
152    public void setServer(TransportServer server) {
153        this.server = server;
154    }
155
156    public URI getUri() {
157        if (uri == null) {
158            try {
159                uri = getConnectUri();
160            } catch (Throwable e) {
161            }
162        }
163        return uri;
164    }
165
166    /**
167     * Sets the server transport URI to use if there is not a
168     * {@link TransportServer} configured via the
169     * {@link #setServer(TransportServer)} method. This value is used to lazy
170     * create a {@link TransportServer} instance
171     * 
172     * @param uri
173     */
174    public void setUri(URI uri) {
175        this.uri = uri;
176    }
177
178    public TaskRunnerFactory getTaskRunnerFactory() {
179        return taskRunnerFactory;
180    }
181
182    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
183        this.taskRunnerFactory = taskRunnerFactory;
184    }
185
186    /**
187     * @return the statistics for this connector
188     */
189    public ConnectorStatistics getStatistics() {
190        return statistics;
191    }
192
193    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
194        return messageAuthorizationPolicy;
195    }
196
197    /**
198     * Sets the policy used to decide if the current connection is authorized to
199     * consume a given message
200     */
201    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
202        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
203    }
204
205    public void start() throws Exception {
206        broker = brokerService.getBroker();
207        brokerInfo.setBrokerName(broker.getBrokerName());
208        brokerInfo.setBrokerId(broker.getBrokerId());
209        brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
210        brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
211        brokerInfo.setBrokerURL(getServer().getConnectURI().toString());
212        final Map context = MDCHelper.getCopyOfContextMap();
213        getServer().setAcceptListener(new TransportAcceptListener() {
214            public void onAccept(final Transport transport) {
215                try {
216                    DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
217                        public void run() {
218                            MDCHelper.setContextMap(context);
219                            try {
220                                Connection connection = createConnection(transport);
221                                connection.start();
222                            } catch (Exception e) {
223                                ServiceSupport.dispose(transport);
224                                onAcceptError(e);
225                            }
226                        }
227                    });
228                } catch (Exception e) {
229                    String remoteHost = transport.getRemoteAddress();
230                    ServiceSupport.dispose(transport);
231                    onAcceptError(e, remoteHost);
232                }
233            }
234
235            public void onAcceptError(Exception error) {
236                onAcceptError(error, null);
237            }
238
239            private void onAcceptError(Exception error, String remoteHost) {
240                LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": "
241                        + error);
242                LOG.debug("Reason: " + error, error);
243            }
244        });
245        getServer().setBrokerInfo(brokerInfo);
246        getServer().start();
247
248        DiscoveryAgent da = getDiscoveryAgent();
249        if (da != null) {
250            da.registerService(getPublishableConnectString());
251            da.start();
252        }
253        if (enableStatusMonitor) {
254            this.statusDector = new TransportStatusDetector(this);
255            this.statusDector.start();
256        }
257
258        LOG.info("Connector " + getName() + " Started");
259    }
260
261    public String getPublishableConnectString() throws Exception {
262        String publishableConnectString = null;
263        URI theConnectURI = getConnectUri();
264        if (theConnectURI != null) {
265            publishableConnectString = theConnectURI.toString();
266            // strip off server side query parameters which may not be compatible to
267            // clients
268            if (theConnectURI.getRawQuery() != null) {
269                publishableConnectString = publishableConnectString.substring(0, publishableConnectString
270                        .indexOf(theConnectURI.getRawQuery()) - 1);
271            }
272        }
273        if (LOG.isDebugEnabled()) {
274            LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + theConnectURI);
275        }
276        return publishableConnectString;
277    }
278
279    public void stop() throws Exception {
280        ServiceStopper ss = new ServiceStopper();
281        if (discoveryAgent != null) {
282            ss.stop(discoveryAgent);
283        }
284        if (server != null) {
285            ss.stop(server);
286            server = null;
287        }
288        if (this.statusDector != null) {
289            this.statusDector.stop();
290        }
291
292        for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
293            TransportConnection c = iter.next();
294            ss.stop(c);
295        }
296        ss.throwFirstException();
297        LOG.info("Connector " + getName() + " Stopped");
298    }
299
300    // Implementation methods
301    // -------------------------------------------------------------------------
302    protected Connection createConnection(Transport transport) throws IOException {
303        TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
304                : taskRunnerFactory);
305        boolean statEnabled = this.getStatistics().isEnabled();
306        answer.getStatistics().setEnabled(statEnabled);
307        answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
308        return answer;
309    }
310
311    protected TransportServer createTransportServer() throws IOException, URISyntaxException {
312        if (uri == null) {
313            throw new IllegalArgumentException("You must specify either a server or uri property");
314        }
315        if (brokerService == null) {
316            throw new IllegalArgumentException(
317                    "You must specify the brokerService property. Maybe this connector should be added to a broker?");
318        }
319        return TransportFactory.bind(brokerService, uri);
320    }
321
322    public DiscoveryAgent getDiscoveryAgent() throws IOException {
323        if (discoveryAgent == null) {
324            discoveryAgent = createDiscoveryAgent();
325        }
326        return discoveryAgent;
327    }
328
329    protected DiscoveryAgent createDiscoveryAgent() throws IOException {
330        if (discoveryUri != null) {
331            return DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
332        }
333        return null;
334    }
335
336    public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
337        this.discoveryAgent = discoveryAgent;
338    }
339
340    public URI getDiscoveryUri() {
341        return discoveryUri;
342    }
343
344    public void setDiscoveryUri(URI discoveryUri) {
345        this.discoveryUri = discoveryUri;
346    }
347
348    public URI getConnectUri() throws IOException, URISyntaxException {
349        if (connectUri == null) {
350            if (server != null) {
351                connectUri = server.getConnectURI();
352            }
353        }
354        return connectUri;
355    }
356
357    public void setConnectUri(URI transportUri) {
358        this.connectUri = transportUri;
359    }
360
361    public void onStarted(TransportConnection connection) {
362        connections.add(connection);
363    }
364
365    public void onStopped(TransportConnection connection) {
366        connections.remove(connection);
367    }
368
369    public String getName() {
370        if (name == null) {
371            uri = getUri();
372            if (uri != null) {
373                name = uri.toString();
374            }
375        }
376        return name;
377    }
378
379    public void setName(String name) {
380        this.name = name;
381    }
382
383    @Override
384    public String toString() {
385        String rc = getName();
386        if (rc == null) {
387            rc = super.toString();
388        }
389        return rc;
390    }
391
392    protected ConnectionControl getConnectionControl() {
393        boolean rebalance = isRebalanceClusterClients();
394        String connectedBrokers = "";
395        String self = "";
396
397        if (isUpdateClusterClients()) {
398            if (brokerService.getDefaultSocketURIString() != null) {
399                self += brokerService.getDefaultSocketURIString();
400                self += ",";
401            }
402            if (rebalance == false) {
403                connectedBrokers += self;
404            }
405            if (this.broker.getPeerBrokerInfos() != null) {
406                for (BrokerInfo info : this.broker.getPeerBrokerInfos()) {
407                    if (isMatchesClusterFilter(info.getBrokerName())) {
408                        connectedBrokers += info.getBrokerURL();
409                        connectedBrokers += ",";
410                    }
411                }
412            }
413            if (rebalance) {
414                connectedBrokers += self;
415            }
416        }
417
418        ConnectionControl control = new ConnectionControl();
419        control.setConnectedBrokers(connectedBrokers);
420        control.setRebalanceConnection(rebalance);
421        return control;
422
423    }
424
425    public void updateClientClusterInfo() {
426        if (isRebalanceClusterClients() || isUpdateClusterClients()) {
427            ConnectionControl control = getConnectionControl();
428            for (Connection c : this.connections) {
429                c.updateClient(control);
430            }
431        }
432    }
433
434    private boolean isMatchesClusterFilter(String brokerName) {
435        boolean result = true;
436        String filter = getUpdateClusterFilter();
437        if (filter != null) {
438            filter = filter.trim();
439            if (filter.length() > 0) {
440                StringTokenizer tokenizer = new StringTokenizer(filter, ",");
441                while (result && tokenizer.hasMoreTokens()) {
442                    String token = tokenizer.nextToken();
443                    result = isMatchesClusterFilter(brokerName, token);
444                }
445            }
446        }
447        return result;
448    }
449
450    private boolean isMatchesClusterFilter(String brokerName, String match) {
451        boolean result = true;
452        if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) {
453            result = Pattern.matches(match, brokerName);
454        }
455        return result;
456    }
457
458    public boolean isDisableAsyncDispatch() {
459        return disableAsyncDispatch;
460    }
461
462    public void setDisableAsyncDispatch(boolean disableAsyncDispatch) {
463        this.disableAsyncDispatch = disableAsyncDispatch;
464    }
465
466    /**
467     * @return the enableStatusMonitor
468     */
469    public boolean isEnableStatusMonitor() {
470        return enableStatusMonitor;
471    }
472
473    /**
474     * @param enableStatusMonitor
475     *            the enableStatusMonitor to set
476     */
477    public void setEnableStatusMonitor(boolean enableStatusMonitor) {
478        this.enableStatusMonitor = enableStatusMonitor;
479    }
480
481    /**
482     * This is called by the BrokerService right before it starts the transport.
483     */
484    public void setBrokerService(BrokerService brokerService) {
485        this.brokerService = brokerService;
486    }
487
488    public Broker getBroker() {
489        return broker;
490    }
491
492    public BrokerService getBrokerService() {
493        return brokerService;
494    }
495
496    /**
497     * @return the updateClusterClients
498     */
499    public boolean isUpdateClusterClients() {
500        return this.updateClusterClients;
501    }
502
503    /**
504     * @param updateClusterClients
505     *            the updateClusterClients to set
506     */
507    public void setUpdateClusterClients(boolean updateClusterClients) {
508        this.updateClusterClients = updateClusterClients;
509    }
510
511    /**
512     * @return the rebalanceClusterClients
513     */
514    public boolean isRebalanceClusterClients() {
515        return this.rebalanceClusterClients;
516    }
517
518    /**
519     * @param rebalanceClusterClients
520     *            the rebalanceClusterClients to set
521     */
522    public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
523        this.rebalanceClusterClients = rebalanceClusterClients;
524    }
525 
526    /**
527     * @return the updateClusterClientsOnRemove
528     */
529    public boolean isUpdateClusterClientsOnRemove() {
530        return this.updateClusterClientsOnRemove;
531    }
532
533    /**
534     * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set
535     */
536    public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
537        this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
538    }
539    
540    /**
541     * @return the updateClusterFilter
542     */
543    public String getUpdateClusterFilter() {
544        return this.updateClusterFilter;
545    }
546
547    /**
548     * @param updateClusterFilter
549     *            the updateClusterFilter to set
550     */
551    public void setUpdateClusterFilter(String updateClusterFilter) {
552        this.updateClusterFilter = updateClusterFilter;
553    }
554
555}