001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.util;
018
019import java.util.Set;
020import javax.annotation.PostConstruct;
021import org.apache.activemq.broker.BrokerPluginSupport;
022import org.apache.activemq.broker.Connection;
023import org.apache.activemq.broker.ConnectionContext;
024import org.apache.activemq.broker.ConsumerBrokerExchange;
025import org.apache.activemq.broker.ProducerBrokerExchange;
026import org.apache.activemq.broker.region.Destination;
027import org.apache.activemq.broker.region.MessageReference;
028import org.apache.activemq.broker.region.Subscription;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.BrokerInfo;
031import org.apache.activemq.command.ConnectionInfo;
032import org.apache.activemq.command.ConsumerInfo;
033import org.apache.activemq.command.DestinationInfo;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.command.MessageAck;
036import org.apache.activemq.command.MessageDispatch;
037import org.apache.activemq.command.MessageDispatchNotification;
038import org.apache.activemq.command.MessagePull;
039import org.apache.activemq.command.ProducerInfo;
040import org.apache.activemq.command.RemoveSubscriptionInfo;
041import org.apache.activemq.command.Response;
042import org.apache.activemq.command.SessionInfo;
043import org.apache.activemq.command.TransactionId;
044import org.apache.activemq.usage.Usage;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * A simple Broker intercepter which allows you to enable/disable logging.
050 * 
051 * @org.apache.xbean.XBean
052 */
053
054public class LoggingBrokerPlugin extends BrokerPluginSupport {
055
056    private static final Logger LOG = LoggerFactory.getLogger(LoggingBrokerPlugin.class);
057
058    private boolean logAll = false;
059    private boolean logMessageEvents = false;
060    private boolean logConnectionEvents = true;
061    private boolean logTransactionEvents = false;
062    private boolean logConsumerEvents = false;
063    private boolean logProducerEvents = false;
064    private boolean logInternalEvents = false;
065
066    /**
067     * 
068     * @throws Exception
069     * @org.apache.xbean.InitMethod
070     */
071    @PostConstruct
072    public void afterPropertiesSet() throws Exception {
073        LOG.info("Created LoggingBrokerPlugin: " + this.toString());
074    }
075
076    public boolean isLogAll() {
077        return logAll;
078    }
079
080    /**
081     * Logger all Events that go through the Plugin
082     */
083    public void setLogAll(boolean logAll) {
084        this.logAll = logAll;
085    }
086
087    public boolean isLogMessageEvents() {
088        return logMessageEvents;
089    }
090
091    /**
092     * Logger Events that are related to message processing
093     */
094    public void setLogMessageEvents(boolean logMessageEvents) {
095        this.logMessageEvents = logMessageEvents;
096    }
097
098    public boolean isLogConnectionEvents() {
099        return logConnectionEvents;
100    }
101
102    /**
103     * Logger Events that are related to connections and sessions
104     */
105    public void setLogConnectionEvents(boolean logConnectionEvents) {
106        this.logConnectionEvents = logConnectionEvents;
107    }
108
109    public boolean isLogTransactionEvents() {
110        return logTransactionEvents;
111    }
112
113    /**
114     * Logger Events that are related to transaction processing
115     */
116    public void setLogTransactionEvents(boolean logTransactionEvents) {
117        this.logTransactionEvents = logTransactionEvents;
118    }
119
120    public boolean isLogConsumerEvents() {
121        return logConsumerEvents;
122    }
123
124    /**
125     * Logger Events that are related to Consumers
126     */
127    public void setLogConsumerEvents(boolean logConsumerEvents) {
128        this.logConsumerEvents = logConsumerEvents;
129    }
130
131    public boolean isLogProducerEvents() {
132        return logProducerEvents;
133    }
134
135    /**
136     * Logger Events that are related to Producers
137     */
138    public void setLogProducerEvents(boolean logProducerEvents) {
139        this.logProducerEvents = logProducerEvents;
140    }
141
142    public boolean isLogInternalEvents() {
143        return logInternalEvents;
144    }
145
146    /**
147     * Logger Events that are normally internal to the broker
148     */
149    public void setLogInternalEvents(boolean logInternalEvents) {
150        this.logInternalEvents = logInternalEvents;
151    }
152
153    @Override
154    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
155        if (isLogAll() || isLogConsumerEvents()) {
156            LOG.info("Acknowledging message for client ID : " + consumerExchange.getConnectionContext().getClientId()
157                    + (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : ""));
158            if (LOG.isTraceEnabled() && ack.getMessageCount() > 1) {
159                LOG.trace("Message count: " + ack.getMessageCount() + ", First Message Id: " + ack.getFirstMessageId()
160                        + ", Last Message Id: " + ack.getLastMessageId());
161            }
162        }
163        super.acknowledge(consumerExchange, ack);
164    }
165
166    @Override
167    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
168        if (isLogAll() || isLogConsumerEvents()) {
169            LOG.info("Message Pull from : " + context.getClientId() + " on " + pull.getDestination().getPhysicalName());
170        }
171        return super.messagePull(context, pull);
172    }
173
174    @Override
175    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
176        if (isLogAll() || isLogConnectionEvents()) {
177            LOG.info("Adding Connection : " + info);
178        }
179        super.addConnection(context, info);
180    }
181
182    @Override
183    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
184        if (isLogAll() || isLogConsumerEvents()) {
185            LOG.info("Adding Consumer : " + info);
186        }
187        return super.addConsumer(context, info);
188    }
189
190    @Override
191    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
192        if (isLogAll() || isLogProducerEvents()) {
193            LOG.info("Adding Producer :" + info);
194        }
195        super.addProducer(context, info);
196    }
197
198    @Override
199    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
200        if (isLogAll() || isLogTransactionEvents()) {
201            LOG.info("Commiting transaction : " + xid.getTransactionKey());
202        }
203        super.commitTransaction(context, xid, onePhase);
204    }
205
206    @Override
207    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
208        if (isLogAll() || isLogConsumerEvents()) {
209            LOG.info("Removing subscription : " + info);
210        }
211        super.removeSubscription(context, info);
212    }
213
214    @Override
215    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
216
217        TransactionId[] result = super.getPreparedTransactions(context);
218        if ((isLogAll() || isLogTransactionEvents()) && result != null) {
219            StringBuffer tids = new StringBuffer();
220            for (TransactionId tid : result) {
221                if (tids.length() > 0) {
222                    tids.append(", ");
223                }
224                tids.append(tid.getTransactionKey());
225            }
226            LOG.info("Prepared transactions : " + tids);
227        }
228        return result;
229    }
230
231    @Override
232    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
233        if (isLogAll() || isLogTransactionEvents()) {
234            LOG.info("Preparing transaction : " + xid.getTransactionKey());
235        }
236        return super.prepareTransaction(context, xid);
237    }
238
239    @Override
240    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
241        if (isLogAll() || isLogConnectionEvents()) {
242            LOG.info("Removing Connection : " + info);
243        }
244        super.removeConnection(context, info, error);
245    }
246
247    @Override
248    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
249        if (isLogAll() || isLogConsumerEvents()) {
250            LOG.info("Removing Consumer : " + info);
251        }
252        super.removeConsumer(context, info);
253    }
254
255    @Override
256    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
257        if (isLogAll() || isLogProducerEvents()) {
258            LOG.info("Removing Producer : " + info);
259        }
260        super.removeProducer(context, info);
261    }
262
263    @Override
264    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
265        if (isLogAll() || isLogTransactionEvents()) {
266            LOG.info("Rolling back Transaction : " + xid.getTransactionKey());
267        }
268        super.rollbackTransaction(context, xid);
269    }
270
271    @Override
272    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
273        if (isLogAll() || isLogProducerEvents()) {
274            LOG.info("Sending message : " + messageSend.copy());
275        }
276        super.send(producerExchange, messageSend);
277    }
278
279    @Override
280    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
281        if (isLogAll() || isLogTransactionEvents()) {
282            LOG.info("Beginning transaction : " + xid.getTransactionKey());
283        }
284        super.beginTransaction(context, xid);
285    }
286
287    @Override
288    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
289        if (isLogAll() || isLogTransactionEvents()) {
290            LOG.info("Forgetting transaction : " + transactionId.getTransactionKey());
291        }
292        super.forgetTransaction(context, transactionId);
293    }
294
295    @Override
296    public Connection[] getClients() throws Exception {
297        Connection[] result = super.getClients();
298
299        if (isLogAll() || isLogInternalEvents()) {
300            if (result == null) {
301                LOG.info("Get Clients returned empty list.");
302            } else {
303                StringBuffer cids = new StringBuffer();
304                for (Connection c : result) {
305                    cids.append(cids.length() > 0 ? ", " : "");
306                    cids.append(c.getConnectionId());
307                }
308                LOG.info("Connected clients : " + cids);
309            }
310        }
311        return super.getClients();
312    }
313
314    @Override
315    public org.apache.activemq.broker.region.Destination addDestination(ConnectionContext context,
316            ActiveMQDestination destination, boolean create) throws Exception {
317        if (isLogAll() || isLogInternalEvents()) {
318            LOG.info("Adding destination : " + destination.getDestinationTypeAsString() + ":"
319                    + destination.getPhysicalName());
320        }
321        return super.addDestination(context, destination, create);
322    }
323
324    @Override
325    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
326            throws Exception {
327        if (isLogAll() || isLogInternalEvents()) {
328            LOG.info("Removing destination : " + destination.getDestinationTypeAsString() + ":"
329                    + destination.getPhysicalName());
330        }
331        super.removeDestination(context, destination, timeout);
332    }
333
334    @Override
335    public ActiveMQDestination[] getDestinations() throws Exception {
336        ActiveMQDestination[] result = super.getDestinations();
337        if (isLogAll() || isLogInternalEvents()) {
338            if (result == null) {
339                LOG.info("Get Destinations returned empty list.");
340            } else {
341                StringBuffer destinations = new StringBuffer();
342                for (ActiveMQDestination dest : result) {
343                    destinations.append(destinations.length() > 0 ? ", " : "");
344                    destinations.append(dest.getPhysicalName());
345                }
346                LOG.info("Get Destinations : " + destinations);
347            }
348        }
349        return result;
350    }
351
352    @Override
353    public void start() throws Exception {
354        if (isLogAll() || isLogInternalEvents()) {
355            LOG.info("Starting " + getBrokerName());
356        }
357        super.start();
358    }
359
360    @Override
361    public void stop() throws Exception {
362        if (isLogAll() || isLogInternalEvents()) {
363            LOG.info("Stopping " + getBrokerName());
364        }
365        super.stop();
366    }
367
368    @Override
369    public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
370        if (isLogAll() || isLogConnectionEvents()) {
371            LOG.info("Adding Session : " + info);
372        }
373        super.addSession(context, info);
374    }
375
376    @Override
377    public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
378        if (isLogAll() || isLogConnectionEvents()) {
379            LOG.info("Removing Session : " + info);
380        }
381        super.removeSession(context, info);
382    }
383
384    @Override
385    public void addBroker(Connection connection, BrokerInfo info) {
386        if (isLogAll() || isLogInternalEvents()) {
387            LOG.info("Adding Broker " + info.getBrokerName());
388        }
389        super.addBroker(connection, info);
390    }
391
392    @Override
393    public void removeBroker(Connection connection, BrokerInfo info) {
394        if (isLogAll() || isLogInternalEvents()) {
395            LOG.info("Removing Broker " + info.getBrokerName());
396        }
397        super.removeBroker(connection, info);
398    }
399
400    @Override
401    public BrokerInfo[] getPeerBrokerInfos() {
402        BrokerInfo[] result = super.getPeerBrokerInfos();
403        if (isLogAll() || isLogInternalEvents()) {
404            if (result == null) {
405                LOG.info("Get Peer Broker Infos returned empty list.");
406            } else {
407                StringBuffer peers = new StringBuffer();
408                for (BrokerInfo bi : result) {
409                    peers.append(peers.length() > 0 ? ", " : "");
410                    peers.append(bi.getBrokerName());
411                }
412                LOG.info("Get Peer Broker Infos : " + peers);
413            }
414        }
415        return result;
416    }
417
418    @Override
419    public void preProcessDispatch(MessageDispatch messageDispatch) {
420        if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
421            LOG.info("preProcessDispatch :" + messageDispatch);
422        }
423        super.preProcessDispatch(messageDispatch);
424    }
425
426    @Override
427    public void postProcessDispatch(MessageDispatch messageDispatch) {
428        if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
429            LOG.info("postProcessDispatch :" + messageDispatch);
430        }
431        super.postProcessDispatch(messageDispatch);
432    }
433
434    @Override
435    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
436        if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
437            LOG.info("ProcessDispatchNotification :" + messageDispatchNotification);
438        }
439        super.processDispatchNotification(messageDispatchNotification);
440    }
441
442    @Override
443    public Set<ActiveMQDestination> getDurableDestinations() {
444        Set<ActiveMQDestination> result = super.getDurableDestinations();
445        if (isLogAll() || isLogInternalEvents()) {
446            if (result == null) {
447                LOG.info("Get Durable Destinations returned empty list.");
448            } else {
449                StringBuffer destinations = new StringBuffer();
450                for (ActiveMQDestination dest : result) {
451                    destinations.append(destinations.length() > 0 ? ", " : "");
452                    destinations.append(dest.getPhysicalName());
453                }
454                LOG.info("Get Durable Destinations : " + destinations);
455            }
456        }
457        return result;
458    }
459
460    @Override
461    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
462        if (isLogAll() || isLogInternalEvents()) {
463            LOG.info("Adding destination info : " + info);
464        }
465        super.addDestinationInfo(context, info);
466    }
467
468    @Override
469    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
470        if (isLogAll() || isLogInternalEvents()) {
471            LOG.info("Removing destination info : " + info);
472        }
473        super.removeDestinationInfo(context, info);
474    }
475
476    @Override
477    public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
478        if (isLogAll() || isLogInternalEvents()) {
479            String msg = "Unable to display message.";
480
481            msg = message.getMessage().toString();
482
483            LOG.info("Message has expired : " + msg);
484        }
485        super.messageExpired(context, message, subscription);
486    }
487
488    @Override
489    public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
490                                      Subscription subscription) {
491        if (isLogAll() || isLogInternalEvents()) {
492            String msg = "Unable to display message.";
493
494            msg = messageReference.getMessage().toString();
495
496            LOG.info("Sending to DLQ : " + msg);
497        }
498        super.sendToDeadLetterQueue(context, messageReference, subscription);
499    }
500
501    @Override
502    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
503        if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
504            LOG.info("Fast Producer : " + producerInfo);
505        }
506        super.fastProducer(context, producerInfo);
507    }
508
509    @Override
510    public void isFull(ConnectionContext context, Destination destination, Usage usage) {
511        if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
512            LOG.info("Destination is full : " + destination.getName());
513        }
514        super.isFull(context, destination, usage);
515    }
516
517    @Override
518    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
519        if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
520            String msg = "Unable to display message.";
521
522            msg = messageReference.getMessage().toString();
523
524            LOG.info("Message consumed : " + msg);
525        }
526        super.messageConsumed(context, messageReference);
527    }
528
529    @Override
530    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
531        if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
532            String msg = "Unable to display message.";
533
534            msg = messageReference.getMessage().toString();
535
536            LOG.info("Message delivered : " + msg);
537        }
538        super.messageDelivered(context, messageReference);
539    }
540
541    @Override
542    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
543        if (isLogAll() || isLogInternalEvents()) {
544            String msg = "Unable to display message.";
545
546            msg = messageReference.getMessage().toString();
547
548            LOG.info("Message discarded : " + msg);
549        }
550        super.messageDiscarded(context, sub, messageReference);
551    }
552
553    @Override
554    public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) {
555        if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
556            LOG.info("Detected slow consumer on " + destination.getName());
557            StringBuffer buf = new StringBuffer("Connection(");
558            buf.append(subs.getConsumerInfo().getConsumerId().getConnectionId());
559            buf.append(") Session(");
560            buf.append(subs.getConsumerInfo().getConsumerId().getSessionId());
561            buf.append(")");
562            LOG.info(buf.toString());
563        }
564        super.slowConsumer(context, destination, subs);
565    }
566
567    @Override
568    public void nowMasterBroker() {
569        if (isLogAll() || isLogInternalEvents()) {
570            LOG.info("Is now the master broker : " + getBrokerName());
571        }
572        super.nowMasterBroker();
573    }
574
575    @Override
576    public String toString() {
577        StringBuffer buf = new StringBuffer();
578        buf.append("LoggingBrokerPlugin(");
579        buf.append("logAll=");
580        buf.append(isLogAll());
581        buf.append(", logConnectionEvents=");
582        buf.append(isLogConnectionEvents());
583        buf.append(", logConsumerEvents=");
584        buf.append(isLogConsumerEvents());
585        buf.append(", logProducerEvents=");
586        buf.append(isLogProducerEvents());
587        buf.append(", logMessageEvents=");
588        buf.append(isLogMessageEvents());
589        buf.append(", logTransactionEvents=");
590        buf.append(isLogTransactionEvents());
591        buf.append(", logInternalEvents=");
592        buf.append(isLogInternalEvents());
593        buf.append(")");
594        return buf.toString();
595    }
596}