001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.util; 018 019import java.util.Set; 020import javax.annotation.PostConstruct; 021import org.apache.activemq.broker.BrokerPluginSupport; 022import org.apache.activemq.broker.Connection; 023import org.apache.activemq.broker.ConnectionContext; 024import org.apache.activemq.broker.ConsumerBrokerExchange; 025import org.apache.activemq.broker.ProducerBrokerExchange; 026import org.apache.activemq.broker.region.Destination; 027import org.apache.activemq.broker.region.MessageReference; 028import org.apache.activemq.broker.region.Subscription; 029import org.apache.activemq.command.ActiveMQDestination; 030import org.apache.activemq.command.BrokerInfo; 031import org.apache.activemq.command.ConnectionInfo; 032import org.apache.activemq.command.ConsumerInfo; 033import org.apache.activemq.command.DestinationInfo; 034import org.apache.activemq.command.Message; 035import org.apache.activemq.command.MessageAck; 036import org.apache.activemq.command.MessageDispatch; 037import org.apache.activemq.command.MessageDispatchNotification; 038import org.apache.activemq.command.MessagePull; 039import org.apache.activemq.command.ProducerInfo; 040import org.apache.activemq.command.RemoveSubscriptionInfo; 041import org.apache.activemq.command.Response; 042import org.apache.activemq.command.SessionInfo; 043import org.apache.activemq.command.TransactionId; 044import org.apache.activemq.usage.Usage; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * A simple Broker intercepter which allows you to enable/disable logging. 050 * 051 * @org.apache.xbean.XBean 052 */ 053 054public class LoggingBrokerPlugin extends BrokerPluginSupport { 055 056 private static final Logger LOG = LoggerFactory.getLogger(LoggingBrokerPlugin.class); 057 058 private boolean logAll = false; 059 private boolean logMessageEvents = false; 060 private boolean logConnectionEvents = true; 061 private boolean logTransactionEvents = false; 062 private boolean logConsumerEvents = false; 063 private boolean logProducerEvents = false; 064 private boolean logInternalEvents = false; 065 066 /** 067 * 068 * @throws Exception 069 * @org.apache.xbean.InitMethod 070 */ 071 @PostConstruct 072 public void afterPropertiesSet() throws Exception { 073 LOG.info("Created LoggingBrokerPlugin: " + this.toString()); 074 } 075 076 public boolean isLogAll() { 077 return logAll; 078 } 079 080 /** 081 * Logger all Events that go through the Plugin 082 */ 083 public void setLogAll(boolean logAll) { 084 this.logAll = logAll; 085 } 086 087 public boolean isLogMessageEvents() { 088 return logMessageEvents; 089 } 090 091 /** 092 * Logger Events that are related to message processing 093 */ 094 public void setLogMessageEvents(boolean logMessageEvents) { 095 this.logMessageEvents = logMessageEvents; 096 } 097 098 public boolean isLogConnectionEvents() { 099 return logConnectionEvents; 100 } 101 102 /** 103 * Logger Events that are related to connections and sessions 104 */ 105 public void setLogConnectionEvents(boolean logConnectionEvents) { 106 this.logConnectionEvents = logConnectionEvents; 107 } 108 109 public boolean isLogTransactionEvents() { 110 return logTransactionEvents; 111 } 112 113 /** 114 * Logger Events that are related to transaction processing 115 */ 116 public void setLogTransactionEvents(boolean logTransactionEvents) { 117 this.logTransactionEvents = logTransactionEvents; 118 } 119 120 public boolean isLogConsumerEvents() { 121 return logConsumerEvents; 122 } 123 124 /** 125 * Logger Events that are related to Consumers 126 */ 127 public void setLogConsumerEvents(boolean logConsumerEvents) { 128 this.logConsumerEvents = logConsumerEvents; 129 } 130 131 public boolean isLogProducerEvents() { 132 return logProducerEvents; 133 } 134 135 /** 136 * Logger Events that are related to Producers 137 */ 138 public void setLogProducerEvents(boolean logProducerEvents) { 139 this.logProducerEvents = logProducerEvents; 140 } 141 142 public boolean isLogInternalEvents() { 143 return logInternalEvents; 144 } 145 146 /** 147 * Logger Events that are normally internal to the broker 148 */ 149 public void setLogInternalEvents(boolean logInternalEvents) { 150 this.logInternalEvents = logInternalEvents; 151 } 152 153 @Override 154 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 155 if (isLogAll() || isLogConsumerEvents()) { 156 LOG.info("Acknowledging message for client ID : " + consumerExchange.getConnectionContext().getClientId() 157 + (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : "")); 158 if (LOG.isTraceEnabled() && ack.getMessageCount() > 1) { 159 LOG.trace("Message count: " + ack.getMessageCount() + ", First Message Id: " + ack.getFirstMessageId() 160 + ", Last Message Id: " + ack.getLastMessageId()); 161 } 162 } 163 super.acknowledge(consumerExchange, ack); 164 } 165 166 @Override 167 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 168 if (isLogAll() || isLogConsumerEvents()) { 169 LOG.info("Message Pull from : " + context.getClientId() + " on " + pull.getDestination().getPhysicalName()); 170 } 171 return super.messagePull(context, pull); 172 } 173 174 @Override 175 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 176 if (isLogAll() || isLogConnectionEvents()) { 177 LOG.info("Adding Connection : " + info); 178 } 179 super.addConnection(context, info); 180 } 181 182 @Override 183 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 184 if (isLogAll() || isLogConsumerEvents()) { 185 LOG.info("Adding Consumer : " + info); 186 } 187 return super.addConsumer(context, info); 188 } 189 190 @Override 191 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 192 if (isLogAll() || isLogProducerEvents()) { 193 LOG.info("Adding Producer :" + info); 194 } 195 super.addProducer(context, info); 196 } 197 198 @Override 199 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 200 if (isLogAll() || isLogTransactionEvents()) { 201 LOG.info("Commiting transaction : " + xid.getTransactionKey()); 202 } 203 super.commitTransaction(context, xid, onePhase); 204 } 205 206 @Override 207 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 208 if (isLogAll() || isLogConsumerEvents()) { 209 LOG.info("Removing subscription : " + info); 210 } 211 super.removeSubscription(context, info); 212 } 213 214 @Override 215 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 216 217 TransactionId[] result = super.getPreparedTransactions(context); 218 if ((isLogAll() || isLogTransactionEvents()) && result != null) { 219 StringBuffer tids = new StringBuffer(); 220 for (TransactionId tid : result) { 221 if (tids.length() > 0) { 222 tids.append(", "); 223 } 224 tids.append(tid.getTransactionKey()); 225 } 226 LOG.info("Prepared transactions : " + tids); 227 } 228 return result; 229 } 230 231 @Override 232 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 233 if (isLogAll() || isLogTransactionEvents()) { 234 LOG.info("Preparing transaction : " + xid.getTransactionKey()); 235 } 236 return super.prepareTransaction(context, xid); 237 } 238 239 @Override 240 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 241 if (isLogAll() || isLogConnectionEvents()) { 242 LOG.info("Removing Connection : " + info); 243 } 244 super.removeConnection(context, info, error); 245 } 246 247 @Override 248 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 249 if (isLogAll() || isLogConsumerEvents()) { 250 LOG.info("Removing Consumer : " + info); 251 } 252 super.removeConsumer(context, info); 253 } 254 255 @Override 256 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 257 if (isLogAll() || isLogProducerEvents()) { 258 LOG.info("Removing Producer : " + info); 259 } 260 super.removeProducer(context, info); 261 } 262 263 @Override 264 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 265 if (isLogAll() || isLogTransactionEvents()) { 266 LOG.info("Rolling back Transaction : " + xid.getTransactionKey()); 267 } 268 super.rollbackTransaction(context, xid); 269 } 270 271 @Override 272 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 273 if (isLogAll() || isLogProducerEvents()) { 274 LOG.info("Sending message : " + messageSend.copy()); 275 } 276 super.send(producerExchange, messageSend); 277 } 278 279 @Override 280 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 281 if (isLogAll() || isLogTransactionEvents()) { 282 LOG.info("Beginning transaction : " + xid.getTransactionKey()); 283 } 284 super.beginTransaction(context, xid); 285 } 286 287 @Override 288 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { 289 if (isLogAll() || isLogTransactionEvents()) { 290 LOG.info("Forgetting transaction : " + transactionId.getTransactionKey()); 291 } 292 super.forgetTransaction(context, transactionId); 293 } 294 295 @Override 296 public Connection[] getClients() throws Exception { 297 Connection[] result = super.getClients(); 298 299 if (isLogAll() || isLogInternalEvents()) { 300 if (result == null) { 301 LOG.info("Get Clients returned empty list."); 302 } else { 303 StringBuffer cids = new StringBuffer(); 304 for (Connection c : result) { 305 cids.append(cids.length() > 0 ? ", " : ""); 306 cids.append(c.getConnectionId()); 307 } 308 LOG.info("Connected clients : " + cids); 309 } 310 } 311 return super.getClients(); 312 } 313 314 @Override 315 public org.apache.activemq.broker.region.Destination addDestination(ConnectionContext context, 316 ActiveMQDestination destination, boolean create) throws Exception { 317 if (isLogAll() || isLogInternalEvents()) { 318 LOG.info("Adding destination : " + destination.getDestinationTypeAsString() + ":" 319 + destination.getPhysicalName()); 320 } 321 return super.addDestination(context, destination, create); 322 } 323 324 @Override 325 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) 326 throws Exception { 327 if (isLogAll() || isLogInternalEvents()) { 328 LOG.info("Removing destination : " + destination.getDestinationTypeAsString() + ":" 329 + destination.getPhysicalName()); 330 } 331 super.removeDestination(context, destination, timeout); 332 } 333 334 @Override 335 public ActiveMQDestination[] getDestinations() throws Exception { 336 ActiveMQDestination[] result = super.getDestinations(); 337 if (isLogAll() || isLogInternalEvents()) { 338 if (result == null) { 339 LOG.info("Get Destinations returned empty list."); 340 } else { 341 StringBuffer destinations = new StringBuffer(); 342 for (ActiveMQDestination dest : result) { 343 destinations.append(destinations.length() > 0 ? ", " : ""); 344 destinations.append(dest.getPhysicalName()); 345 } 346 LOG.info("Get Destinations : " + destinations); 347 } 348 } 349 return result; 350 } 351 352 @Override 353 public void start() throws Exception { 354 if (isLogAll() || isLogInternalEvents()) { 355 LOG.info("Starting " + getBrokerName()); 356 } 357 super.start(); 358 } 359 360 @Override 361 public void stop() throws Exception { 362 if (isLogAll() || isLogInternalEvents()) { 363 LOG.info("Stopping " + getBrokerName()); 364 } 365 super.stop(); 366 } 367 368 @Override 369 public void addSession(ConnectionContext context, SessionInfo info) throws Exception { 370 if (isLogAll() || isLogConnectionEvents()) { 371 LOG.info("Adding Session : " + info); 372 } 373 super.addSession(context, info); 374 } 375 376 @Override 377 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { 378 if (isLogAll() || isLogConnectionEvents()) { 379 LOG.info("Removing Session : " + info); 380 } 381 super.removeSession(context, info); 382 } 383 384 @Override 385 public void addBroker(Connection connection, BrokerInfo info) { 386 if (isLogAll() || isLogInternalEvents()) { 387 LOG.info("Adding Broker " + info.getBrokerName()); 388 } 389 super.addBroker(connection, info); 390 } 391 392 @Override 393 public void removeBroker(Connection connection, BrokerInfo info) { 394 if (isLogAll() || isLogInternalEvents()) { 395 LOG.info("Removing Broker " + info.getBrokerName()); 396 } 397 super.removeBroker(connection, info); 398 } 399 400 @Override 401 public BrokerInfo[] getPeerBrokerInfos() { 402 BrokerInfo[] result = super.getPeerBrokerInfos(); 403 if (isLogAll() || isLogInternalEvents()) { 404 if (result == null) { 405 LOG.info("Get Peer Broker Infos returned empty list."); 406 } else { 407 StringBuffer peers = new StringBuffer(); 408 for (BrokerInfo bi : result) { 409 peers.append(peers.length() > 0 ? ", " : ""); 410 peers.append(bi.getBrokerName()); 411 } 412 LOG.info("Get Peer Broker Infos : " + peers); 413 } 414 } 415 return result; 416 } 417 418 @Override 419 public void preProcessDispatch(MessageDispatch messageDispatch) { 420 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { 421 LOG.info("preProcessDispatch :" + messageDispatch); 422 } 423 super.preProcessDispatch(messageDispatch); 424 } 425 426 @Override 427 public void postProcessDispatch(MessageDispatch messageDispatch) { 428 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { 429 LOG.info("postProcessDispatch :" + messageDispatch); 430 } 431 super.postProcessDispatch(messageDispatch); 432 } 433 434 @Override 435 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 436 if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { 437 LOG.info("ProcessDispatchNotification :" + messageDispatchNotification); 438 } 439 super.processDispatchNotification(messageDispatchNotification); 440 } 441 442 @Override 443 public Set<ActiveMQDestination> getDurableDestinations() { 444 Set<ActiveMQDestination> result = super.getDurableDestinations(); 445 if (isLogAll() || isLogInternalEvents()) { 446 if (result == null) { 447 LOG.info("Get Durable Destinations returned empty list."); 448 } else { 449 StringBuffer destinations = new StringBuffer(); 450 for (ActiveMQDestination dest : result) { 451 destinations.append(destinations.length() > 0 ? ", " : ""); 452 destinations.append(dest.getPhysicalName()); 453 } 454 LOG.info("Get Durable Destinations : " + destinations); 455 } 456 } 457 return result; 458 } 459 460 @Override 461 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 462 if (isLogAll() || isLogInternalEvents()) { 463 LOG.info("Adding destination info : " + info); 464 } 465 super.addDestinationInfo(context, info); 466 } 467 468 @Override 469 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 470 if (isLogAll() || isLogInternalEvents()) { 471 LOG.info("Removing destination info : " + info); 472 } 473 super.removeDestinationInfo(context, info); 474 } 475 476 @Override 477 public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) { 478 if (isLogAll() || isLogInternalEvents()) { 479 String msg = "Unable to display message."; 480 481 msg = message.getMessage().toString(); 482 483 LOG.info("Message has expired : " + msg); 484 } 485 super.messageExpired(context, message, subscription); 486 } 487 488 @Override 489 public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, 490 Subscription subscription) { 491 if (isLogAll() || isLogInternalEvents()) { 492 String msg = "Unable to display message."; 493 494 msg = messageReference.getMessage().toString(); 495 496 LOG.info("Sending to DLQ : " + msg); 497 } 498 super.sendToDeadLetterQueue(context, messageReference, subscription); 499 } 500 501 @Override 502 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { 503 if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { 504 LOG.info("Fast Producer : " + producerInfo); 505 } 506 super.fastProducer(context, producerInfo); 507 } 508 509 @Override 510 public void isFull(ConnectionContext context, Destination destination, Usage usage) { 511 if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { 512 LOG.info("Destination is full : " + destination.getName()); 513 } 514 super.isFull(context, destination, usage); 515 } 516 517 @Override 518 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 519 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { 520 String msg = "Unable to display message."; 521 522 msg = messageReference.getMessage().toString(); 523 524 LOG.info("Message consumed : " + msg); 525 } 526 super.messageConsumed(context, messageReference); 527 } 528 529 @Override 530 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 531 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { 532 String msg = "Unable to display message."; 533 534 msg = messageReference.getMessage().toString(); 535 536 LOG.info("Message delivered : " + msg); 537 } 538 super.messageDelivered(context, messageReference); 539 } 540 541 @Override 542 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 543 if (isLogAll() || isLogInternalEvents()) { 544 String msg = "Unable to display message."; 545 546 msg = messageReference.getMessage().toString(); 547 548 LOG.info("Message discarded : " + msg); 549 } 550 super.messageDiscarded(context, sub, messageReference); 551 } 552 553 @Override 554 public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) { 555 if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { 556 LOG.info("Detected slow consumer on " + destination.getName()); 557 StringBuffer buf = new StringBuffer("Connection("); 558 buf.append(subs.getConsumerInfo().getConsumerId().getConnectionId()); 559 buf.append(") Session("); 560 buf.append(subs.getConsumerInfo().getConsumerId().getSessionId()); 561 buf.append(")"); 562 LOG.info(buf.toString()); 563 } 564 super.slowConsumer(context, destination, subs); 565 } 566 567 @Override 568 public void nowMasterBroker() { 569 if (isLogAll() || isLogInternalEvents()) { 570 LOG.info("Is now the master broker : " + getBrokerName()); 571 } 572 super.nowMasterBroker(); 573 } 574 575 @Override 576 public String toString() { 577 StringBuffer buf = new StringBuffer(); 578 buf.append("LoggingBrokerPlugin("); 579 buf.append("logAll="); 580 buf.append(isLogAll()); 581 buf.append(", logConnectionEvents="); 582 buf.append(isLogConnectionEvents()); 583 buf.append(", logConsumerEvents="); 584 buf.append(isLogConsumerEvents()); 585 buf.append(", logProducerEvents="); 586 buf.append(isLogProducerEvents()); 587 buf.append(", logMessageEvents="); 588 buf.append(isLogMessageEvents()); 589 buf.append(", logTransactionEvents="); 590 buf.append(isLogTransactionEvents()); 591 buf.append(", logInternalEvents="); 592 buf.append(isLogInternalEvents()); 593 buf.append(")"); 594 return buf.toString(); 595 } 596}