001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.IOException; 020import java.net.URI; 021import java.net.URISyntaxException; 022import java.util.Iterator; 023import java.util.Map; 024import java.util.StringTokenizer; 025import java.util.concurrent.CopyOnWriteArrayList; 026import java.util.regex.Pattern; 027import javax.management.ObjectName; 028import org.apache.activemq.broker.jmx.ManagedTransportConnector; 029import org.apache.activemq.broker.jmx.ManagementContext; 030import org.apache.activemq.broker.region.ConnectorStatistics; 031import org.apache.activemq.command.BrokerInfo; 032import org.apache.activemq.command.ConnectionControl; 033import org.apache.activemq.security.MessageAuthorizationPolicy; 034import org.apache.activemq.thread.DefaultThreadPools; 035import org.apache.activemq.thread.TaskRunnerFactory; 036import org.apache.activemq.transport.Transport; 037import org.apache.activemq.transport.TransportAcceptListener; 038import org.apache.activemq.transport.TransportFactory; 039import org.apache.activemq.transport.TransportServer; 040import org.apache.activemq.transport.discovery.DiscoveryAgent; 041import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; 042import org.apache.activemq.util.MDCHelper; 043import org.apache.activemq.util.ServiceStopper; 044import org.apache.activemq.util.ServiceSupport; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * @org.apache.xbean.XBean 050 * 051 */ 052public class TransportConnector implements Connector, BrokerServiceAware { 053 054 final Logger LOG = LoggerFactory.getLogger(TransportConnector.class); 055 056 protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>(); 057 protected TransportStatusDetector statusDector; 058 private BrokerService brokerService; 059 private TransportServer server; 060 private URI uri; 061 private BrokerInfo brokerInfo = new BrokerInfo(); 062 private TaskRunnerFactory taskRunnerFactory; 063 private MessageAuthorizationPolicy messageAuthorizationPolicy; 064 private DiscoveryAgent discoveryAgent; 065 private final ConnectorStatistics statistics = new ConnectorStatistics(); 066 private URI discoveryUri; 067 private URI connectUri; 068 private String name; 069 private boolean disableAsyncDispatch; 070 private boolean enableStatusMonitor = false; 071 private Broker broker; 072 private boolean updateClusterClients = false; 073 private boolean rebalanceClusterClients; 074 private boolean updateClusterClientsOnRemove = false; 075 private String updateClusterFilter; 076 077 public TransportConnector() { 078 } 079 080 public TransportConnector(TransportServer server) { 081 this(); 082 setServer(server); 083 if (server != null && server.getConnectURI() != null) { 084 URI uri = server.getConnectURI(); 085 if (uri != null && uri.getScheme().equals("vm")) { 086 setEnableStatusMonitor(false); 087 } 088 } 089 090 } 091 092 /** 093 * @return Returns the connections. 094 */ 095 public CopyOnWriteArrayList<TransportConnection> getConnections() { 096 return connections; 097 } 098 099 /** 100 * Factory method to create a JMX managed version of this transport 101 * connector 102 */ 103 public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) 104 throws IOException, URISyntaxException { 105 ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer()); 106 rc.setBrokerInfo(getBrokerInfo()); 107 rc.setConnectUri(getConnectUri()); 108 rc.setDisableAsyncDispatch(isDisableAsyncDispatch()); 109 rc.setDiscoveryAgent(getDiscoveryAgent()); 110 rc.setDiscoveryUri(getDiscoveryUri()); 111 rc.setEnableStatusMonitor(isEnableStatusMonitor()); 112 rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); 113 rc.setName(getName()); 114 rc.setTaskRunnerFactory(getTaskRunnerFactory()); 115 rc.setUri(getUri()); 116 rc.setBrokerService(brokerService); 117 rc.setUpdateClusterClients(isUpdateClusterClients()); 118 rc.setRebalanceClusterClients(isRebalanceClusterClients()); 119 rc.setUpdateClusterFilter(getUpdateClusterFilter()); 120 rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove()); 121 return rc; 122 } 123 124 public BrokerInfo getBrokerInfo() { 125 return brokerInfo; 126 } 127 128 public void setBrokerInfo(BrokerInfo brokerInfo) { 129 this.brokerInfo = brokerInfo; 130 } 131 132 /** 133 * 134 * @deprecated use the {@link #setBrokerService(BrokerService)} method 135 * instead. 136 */ 137 @Deprecated 138 public void setBrokerName(String name) { 139 if (this.brokerInfo == null) { 140 this.brokerInfo = new BrokerInfo(); 141 } 142 this.brokerInfo.setBrokerName(name); 143 } 144 145 public TransportServer getServer() throws IOException, URISyntaxException { 146 if (server == null) { 147 setServer(createTransportServer()); 148 } 149 return server; 150 } 151 152 public void setServer(TransportServer server) { 153 this.server = server; 154 } 155 156 public URI getUri() { 157 if (uri == null) { 158 try { 159 uri = getConnectUri(); 160 } catch (Throwable e) { 161 } 162 } 163 return uri; 164 } 165 166 /** 167 * Sets the server transport URI to use if there is not a 168 * {@link TransportServer} configured via the 169 * {@link #setServer(TransportServer)} method. This value is used to lazy 170 * create a {@link TransportServer} instance 171 * 172 * @param uri 173 */ 174 public void setUri(URI uri) { 175 this.uri = uri; 176 } 177 178 public TaskRunnerFactory getTaskRunnerFactory() { 179 return taskRunnerFactory; 180 } 181 182 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 183 this.taskRunnerFactory = taskRunnerFactory; 184 } 185 186 /** 187 * @return the statistics for this connector 188 */ 189 public ConnectorStatistics getStatistics() { 190 return statistics; 191 } 192 193 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 194 return messageAuthorizationPolicy; 195 } 196 197 /** 198 * Sets the policy used to decide if the current connection is authorized to 199 * consume a given message 200 */ 201 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 202 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 203 } 204 205 public void start() throws Exception { 206 broker = brokerService.getBroker(); 207 brokerInfo.setBrokerName(broker.getBrokerName()); 208 brokerInfo.setBrokerId(broker.getBrokerId()); 209 brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos()); 210 brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration()); 211 brokerInfo.setBrokerURL(getServer().getConnectURI().toString()); 212 final Map context = MDCHelper.getCopyOfContextMap(); 213 getServer().setAcceptListener(new TransportAcceptListener() { 214 public void onAccept(final Transport transport) { 215 try { 216 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { 217 public void run() { 218 MDCHelper.setContextMap(context); 219 try { 220 Connection connection = createConnection(transport); 221 connection.start(); 222 } catch (Exception e) { 223 ServiceSupport.dispose(transport); 224 onAcceptError(e); 225 } 226 } 227 }); 228 } catch (Exception e) { 229 String remoteHost = transport.getRemoteAddress(); 230 ServiceSupport.dispose(transport); 231 onAcceptError(e, remoteHost); 232 } 233 } 234 235 public void onAcceptError(Exception error) { 236 onAcceptError(error, null); 237 } 238 239 private void onAcceptError(Exception error, String remoteHost) { 240 LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": " 241 + error); 242 LOG.debug("Reason: " + error, error); 243 } 244 }); 245 getServer().setBrokerInfo(brokerInfo); 246 getServer().start(); 247 248 DiscoveryAgent da = getDiscoveryAgent(); 249 if (da != null) { 250 da.registerService(getPublishableConnectString()); 251 da.start(); 252 } 253 if (enableStatusMonitor) { 254 this.statusDector = new TransportStatusDetector(this); 255 this.statusDector.start(); 256 } 257 258 LOG.info("Connector " + getName() + " Started"); 259 } 260 261 public String getPublishableConnectString() throws Exception { 262 String publishableConnectString = null; 263 URI theConnectURI = getConnectUri(); 264 if (theConnectURI != null) { 265 publishableConnectString = theConnectURI.toString(); 266 // strip off server side query parameters which may not be compatible to 267 // clients 268 if (theConnectURI.getRawQuery() != null) { 269 publishableConnectString = publishableConnectString.substring(0, publishableConnectString 270 .indexOf(theConnectURI.getRawQuery()) - 1); 271 } 272 } 273 if (LOG.isDebugEnabled()) { 274 LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + theConnectURI); 275 } 276 return publishableConnectString; 277 } 278 279 public void stop() throws Exception { 280 ServiceStopper ss = new ServiceStopper(); 281 if (discoveryAgent != null) { 282 ss.stop(discoveryAgent); 283 } 284 if (server != null) { 285 ss.stop(server); 286 server = null; 287 } 288 if (this.statusDector != null) { 289 this.statusDector.stop(); 290 } 291 292 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) { 293 TransportConnection c = iter.next(); 294 ss.stop(c); 295 } 296 ss.throwFirstException(); 297 LOG.info("Connector " + getName() + " Stopped"); 298 } 299 300 // Implementation methods 301 // ------------------------------------------------------------------------- 302 protected Connection createConnection(Transport transport) throws IOException { 303 TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null 304 : taskRunnerFactory); 305 boolean statEnabled = this.getStatistics().isEnabled(); 306 answer.getStatistics().setEnabled(statEnabled); 307 answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy); 308 return answer; 309 } 310 311 protected TransportServer createTransportServer() throws IOException, URISyntaxException { 312 if (uri == null) { 313 throw new IllegalArgumentException("You must specify either a server or uri property"); 314 } 315 if (brokerService == null) { 316 throw new IllegalArgumentException( 317 "You must specify the brokerService property. Maybe this connector should be added to a broker?"); 318 } 319 return TransportFactory.bind(brokerService, uri); 320 } 321 322 public DiscoveryAgent getDiscoveryAgent() throws IOException { 323 if (discoveryAgent == null) { 324 discoveryAgent = createDiscoveryAgent(); 325 } 326 return discoveryAgent; 327 } 328 329 protected DiscoveryAgent createDiscoveryAgent() throws IOException { 330 if (discoveryUri != null) { 331 return DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri); 332 } 333 return null; 334 } 335 336 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { 337 this.discoveryAgent = discoveryAgent; 338 } 339 340 public URI getDiscoveryUri() { 341 return discoveryUri; 342 } 343 344 public void setDiscoveryUri(URI discoveryUri) { 345 this.discoveryUri = discoveryUri; 346 } 347 348 public URI getConnectUri() throws IOException, URISyntaxException { 349 if (connectUri == null) { 350 if (server != null) { 351 connectUri = server.getConnectURI(); 352 } 353 } 354 return connectUri; 355 } 356 357 public void setConnectUri(URI transportUri) { 358 this.connectUri = transportUri; 359 } 360 361 public void onStarted(TransportConnection connection) { 362 connections.add(connection); 363 } 364 365 public void onStopped(TransportConnection connection) { 366 connections.remove(connection); 367 } 368 369 public String getName() { 370 if (name == null) { 371 uri = getUri(); 372 if (uri != null) { 373 name = uri.toString(); 374 } 375 } 376 return name; 377 } 378 379 public void setName(String name) { 380 this.name = name; 381 } 382 383 @Override 384 public String toString() { 385 String rc = getName(); 386 if (rc == null) { 387 rc = super.toString(); 388 } 389 return rc; 390 } 391 392 protected ConnectionControl getConnectionControl() { 393 boolean rebalance = isRebalanceClusterClients(); 394 String connectedBrokers = ""; 395 String self = ""; 396 397 if (isUpdateClusterClients()) { 398 if (brokerService.getDefaultSocketURIString() != null) { 399 self += brokerService.getDefaultSocketURIString(); 400 self += ","; 401 } 402 if (rebalance == false) { 403 connectedBrokers += self; 404 } 405 if (this.broker.getPeerBrokerInfos() != null) { 406 for (BrokerInfo info : this.broker.getPeerBrokerInfos()) { 407 if (isMatchesClusterFilter(info.getBrokerName())) { 408 connectedBrokers += info.getBrokerURL(); 409 connectedBrokers += ","; 410 } 411 } 412 } 413 if (rebalance) { 414 connectedBrokers += self; 415 } 416 } 417 418 ConnectionControl control = new ConnectionControl(); 419 control.setConnectedBrokers(connectedBrokers); 420 control.setRebalanceConnection(rebalance); 421 return control; 422 423 } 424 425 public void updateClientClusterInfo() { 426 if (isRebalanceClusterClients() || isUpdateClusterClients()) { 427 ConnectionControl control = getConnectionControl(); 428 for (Connection c : this.connections) { 429 c.updateClient(control); 430 } 431 } 432 } 433 434 private boolean isMatchesClusterFilter(String brokerName) { 435 boolean result = true; 436 String filter = getUpdateClusterFilter(); 437 if (filter != null) { 438 filter = filter.trim(); 439 if (filter.length() > 0) { 440 StringTokenizer tokenizer = new StringTokenizer(filter, ","); 441 while (result && tokenizer.hasMoreTokens()) { 442 String token = tokenizer.nextToken(); 443 result = isMatchesClusterFilter(brokerName, token); 444 } 445 } 446 } 447 return result; 448 } 449 450 private boolean isMatchesClusterFilter(String brokerName, String match) { 451 boolean result = true; 452 if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) { 453 result = Pattern.matches(match, brokerName); 454 } 455 return result; 456 } 457 458 public boolean isDisableAsyncDispatch() { 459 return disableAsyncDispatch; 460 } 461 462 public void setDisableAsyncDispatch(boolean disableAsyncDispatch) { 463 this.disableAsyncDispatch = disableAsyncDispatch; 464 } 465 466 /** 467 * @return the enableStatusMonitor 468 */ 469 public boolean isEnableStatusMonitor() { 470 return enableStatusMonitor; 471 } 472 473 /** 474 * @param enableStatusMonitor 475 * the enableStatusMonitor to set 476 */ 477 public void setEnableStatusMonitor(boolean enableStatusMonitor) { 478 this.enableStatusMonitor = enableStatusMonitor; 479 } 480 481 /** 482 * This is called by the BrokerService right before it starts the transport. 483 */ 484 public void setBrokerService(BrokerService brokerService) { 485 this.brokerService = brokerService; 486 } 487 488 public Broker getBroker() { 489 return broker; 490 } 491 492 public BrokerService getBrokerService() { 493 return brokerService; 494 } 495 496 /** 497 * @return the updateClusterClients 498 */ 499 public boolean isUpdateClusterClients() { 500 return this.updateClusterClients; 501 } 502 503 /** 504 * @param updateClusterClients 505 * the updateClusterClients to set 506 */ 507 public void setUpdateClusterClients(boolean updateClusterClients) { 508 this.updateClusterClients = updateClusterClients; 509 } 510 511 /** 512 * @return the rebalanceClusterClients 513 */ 514 public boolean isRebalanceClusterClients() { 515 return this.rebalanceClusterClients; 516 } 517 518 /** 519 * @param rebalanceClusterClients 520 * the rebalanceClusterClients to set 521 */ 522 public void setRebalanceClusterClients(boolean rebalanceClusterClients) { 523 this.rebalanceClusterClients = rebalanceClusterClients; 524 } 525 526 /** 527 * @return the updateClusterClientsOnRemove 528 */ 529 public boolean isUpdateClusterClientsOnRemove() { 530 return this.updateClusterClientsOnRemove; 531 } 532 533 /** 534 * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set 535 */ 536 public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) { 537 this.updateClusterClientsOnRemove = updateClusterClientsOnRemove; 538 } 539 540 /** 541 * @return the updateClusterFilter 542 */ 543 public String getUpdateClusterFilter() { 544 return this.updateClusterFilter; 545 } 546 547 /** 548 * @param updateClusterFilter 549 * the updateClusterFilter to set 550 */ 551 public void setUpdateClusterFilter(String updateClusterFilter) { 552 this.updateClusterFilter = updateClusterFilter; 553 } 554 555}