001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.plugin;
018
019import org.apache.activemq.advisory.AdvisorySupport;
020import org.apache.activemq.broker.Broker;
021import org.apache.activemq.broker.BrokerFilter;
022import org.apache.activemq.broker.BrokerService;
023import org.apache.activemq.broker.ConnectionContext;
024import org.apache.activemq.broker.ProducerBrokerExchange;
025import org.apache.activemq.broker.region.Destination;
026import org.apache.activemq.broker.region.DestinationStatistics;
027import org.apache.activemq.broker.region.RegionBroker;
028import org.apache.activemq.command.ActiveMQDestination;
029import org.apache.activemq.command.ActiveMQMapMessage;
030import org.apache.activemq.command.Message;
031import org.apache.activemq.command.MessageId;
032import org.apache.activemq.command.ProducerId;
033import org.apache.activemq.command.ProducerInfo;
034import org.apache.activemq.state.ProducerState;
035import org.apache.activemq.usage.SystemUsage;
036import org.apache.activemq.util.IdGenerator;
037import org.apache.activemq.util.LongSequenceGenerator;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import java.io.File;
041import java.net.URI;
042import java.util.Set;
043/**
044 * A StatisticsBroker You can retrieve a Map Message for a Destination - or
045 * Broker containing statistics as key-value pairs The message must contain a
046 * replyTo Destination - else its ignored
047 * 
048 */
049public class StatisticsBroker extends BrokerFilter {
050    private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class);
051    static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
052    static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
053    private static final IdGenerator ID_GENERATOR = new IdGenerator();
054    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
055    protected final ProducerId advisoryProducerId = new ProducerId();
056
057    /**
058     * 
059     * Constructor
060     * 
061     * @param next
062     */
063    public StatisticsBroker(Broker next) {
064        super(next);
065        this.advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
066    }
067
068    /**
069     * Sets the persistence mode
070     * 
071     * @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange,
072     *      org.apache.activemq.command.Message)
073     */
074    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
075        ActiveMQDestination msgDest = messageSend.getDestination();
076        ActiveMQDestination replyTo = messageSend.getReplyTo();
077        if (replyTo != null) {
078            String physicalName = msgDest.getPhysicalName();
079            boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0,
080                    STATS_DESTINATION_PREFIX.length());
081            boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX
082                    .length());
083            if (destStats) {
084                String queueryName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length());
085                ActiveMQDestination queryDest = ActiveMQDestination.createDestination(queueryName,msgDest.getDestinationType());
086                Set<Destination> set = getDestinations(queryDest);
087                for (Destination dest : set) {
088                    DestinationStatistics stats = dest.getDestinationStatistics();
089                    if (stats != null) {
090                        ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
091                        statsMessage.setString("destinationName", dest.getActiveMQDestination().toString());
092                        statsMessage.setLong("size", stats.getMessages().getCount());
093                        statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
094                        statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
095                        statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
096                        statsMessage.setLong("expiredCount", stats.getExpired().getCount());
097                        statsMessage.setLong("inflightCount", stats.getInflight().getCount());
098                        statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
099                        statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage());
100                        statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage());
101                        statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit());
102                        statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
103                        statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
104                        statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
105                        statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
106                        statsMessage.setLong("producerCount", stats.getProducers().getCount());
107                        statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
108                        sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
109                    }
110                }
111            } else if (brokerStats) {
112                ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
113                BrokerService brokerService = getBrokerService();
114                RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
115                SystemUsage systemUsage = brokerService.getSystemUsage();
116                DestinationStatistics stats = regionBroker.getDestinationStatistics();
117                statsMessage.setString("brokerName", regionBroker.getBrokerName());
118                statsMessage.setString("brokerId", regionBroker.getBrokerId().toString());
119                statsMessage.setLong("size", stats.getMessages().getCount());
120                statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
121                statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
122                statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
123                statsMessage.setLong("expiredCount", stats.getExpired().getCount());
124                statsMessage.setLong("inflightCount", stats.getInflight().getCount());
125                statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
126                statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage());
127                statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage());
128                statsMessage.setLong("memoryLimit", systemUsage.getMemoryUsage().getLimit());
129                statsMessage.setInt("storePercentUsage", systemUsage.getStoreUsage().getPercentUsage());
130                statsMessage.setLong("storeUsage", systemUsage.getStoreUsage().getUsage());
131                statsMessage.setLong("storeLimit", systemUsage.getStoreUsage().getLimit());
132                statsMessage.setInt("tempPercentUsage", systemUsage.getTempUsage().getPercentUsage());
133                statsMessage.setLong("tempUsage", systemUsage.getTempUsage().getUsage());
134                statsMessage.setLong("tempLimit", systemUsage.getTempUsage().getLimit());
135                statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
136                statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
137                statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
138                statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
139                statsMessage.setLong("producerCount", stats.getProducers().getCount());
140                String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
141                answer = answer != null ? answer : "";
142                statsMessage.setString("openwire", answer);
143                answer = brokerService.getTransportConnectorURIsAsMap().get("stomp");
144                answer = answer != null ? answer : "";
145                statsMessage.setString("stomp", answer);
146                answer = brokerService.getTransportConnectorURIsAsMap().get("ssl");
147                answer = answer != null ? answer : "";
148                statsMessage.setString("ssl", answer);
149                answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl");
150                answer = answer != null ? answer : "";
151                statsMessage.setString("stomp+ssl", answer);
152                URI uri = brokerService.getVmConnectorURI();
153                answer = uri != null ? uri.toString() : "";
154                statsMessage.setString("vm", answer);
155                File file = brokerService.getDataDirectoryFile();
156                answer = file != null ? file.getCanonicalPath() : "";
157                statsMessage.setString("dataDirectory", answer);
158                statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
159                sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
160            } else {
161                super.send(producerExchange, messageSend);
162            }
163        } else {
164            super.send(producerExchange, messageSend);
165        }
166    }
167
168    public void start() throws Exception {
169        super.start();
170        LOG.info("Starting StatisticsBroker");
171    }
172
173    public void stop() throws Exception {
174        super.stop();
175    }
176
177    protected void sendStats(ConnectionContext context, ActiveMQMapMessage msg, ActiveMQDestination replyTo)
178            throws Exception {
179        msg.setPersistent(false);
180        msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
181        msg.setMessageId(new MessageId(this.advisoryProducerId, this.messageIdGenerator.getNextSequenceId()));
182        msg.setDestination(replyTo);
183        msg.setResponseRequired(false);
184        msg.setProducerId(this.advisoryProducerId);
185        boolean originalFlowControl = context.isProducerFlowControl();
186        final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
187        producerExchange.setConnectionContext(context);
188        producerExchange.setMutable(true);
189        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
190        try {
191            context.setProducerFlowControl(false);
192            this.next.send(producerExchange, msg);
193        } finally {
194            context.setProducerFlowControl(originalFlowControl);
195        }
196    }
197}