001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Map;
025import javax.jms.Connection;
026import javax.jms.InvalidSelectorException;
027import javax.jms.MessageProducer;
028import javax.jms.Session;
029import javax.management.MalformedObjectNameException;
030import javax.management.ObjectName;
031import javax.management.openmbean.CompositeData;
032import javax.management.openmbean.CompositeDataSupport;
033import javax.management.openmbean.CompositeType;
034import javax.management.openmbean.OpenDataException;
035import javax.management.openmbean.TabularData;
036import javax.management.openmbean.TabularDataSupport;
037import javax.management.openmbean.TabularType;
038import org.apache.activemq.ActiveMQConnectionFactory;
039import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
040import org.apache.activemq.broker.region.Destination;
041import org.apache.activemq.broker.region.Subscription;
042import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
043import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
044import org.apache.activemq.command.ActiveMQDestination;
045import org.apache.activemq.command.ActiveMQMessage;
046import org.apache.activemq.command.ActiveMQTextMessage;
047import org.apache.activemq.command.Message;
048import org.apache.activemq.filter.BooleanExpression;
049import org.apache.activemq.filter.MessageEvaluationContext;
050import org.apache.activemq.selector.SelectorParser;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054public class DestinationView implements DestinationViewMBean {
055    private static final Logger LOG = LoggerFactory.getLogger(DestinationViewMBean.class);
056    protected final Destination destination;
057    protected final ManagedRegionBroker broker;
058
059    public DestinationView(ManagedRegionBroker broker, Destination destination) {
060        this.broker = broker;
061        this.destination = destination;
062    }
063
064    public void gc() {
065        destination.gc();
066    }
067
068    public String getName() {
069        return destination.getName();
070    }
071
072    public void resetStatistics() {
073        destination.getDestinationStatistics().reset();
074    }
075
076    public long getEnqueueCount() {
077        return destination.getDestinationStatistics().getEnqueues().getCount();
078    }
079
080    public long getDequeueCount() {
081        return destination.getDestinationStatistics().getDequeues().getCount();
082    }
083
084    public long getDispatchCount() {
085        return destination.getDestinationStatistics().getDispatched().getCount();
086    }
087
088    public long getInFlightCount() {
089        return destination.getDestinationStatistics().getInflight().getCount();
090    }
091
092    public long getExpiredCount() {
093        return destination.getDestinationStatistics().getExpired().getCount();
094    }
095
096    public long getConsumerCount() {
097        return destination.getDestinationStatistics().getConsumers().getCount();
098    }
099
100    public long getQueueSize() {
101        return destination.getDestinationStatistics().getMessages().getCount();
102    }
103
104    public long getMessagesCached() {
105        return destination.getDestinationStatistics().getMessagesCached().getCount();
106    }
107
108    public int getMemoryPercentUsage() {
109        return destination.getMemoryUsage().getPercentUsage();
110    }
111
112    public long getMemoryLimit() {
113        return destination.getMemoryUsage().getLimit();
114    }
115
116    public void setMemoryLimit(long limit) {
117        destination.getMemoryUsage().setLimit(limit);
118    }
119
120    public double getAverageEnqueueTime() {
121        return destination.getDestinationStatistics().getProcessTime().getAverageTime();
122    }
123
124    public long getMaxEnqueueTime() {
125        return destination.getDestinationStatistics().getProcessTime().getMaxTime();
126    }
127
128    public long getMinEnqueueTime() {
129        return destination.getDestinationStatistics().getProcessTime().getMinTime();
130    }
131    
132    public boolean isPrioritizedMessages() {
133        return destination.isPrioritizedMessages();
134    }
135
136    public CompositeData[] browse() throws OpenDataException {
137        try {
138            return browse(null);
139        } catch (InvalidSelectorException e) {
140            // should not happen.
141            throw new RuntimeException(e);
142        }
143    }
144
145    public CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException {
146        Message[] messages = destination.browse();
147        ArrayList<CompositeData> c = new ArrayList<CompositeData>();
148
149        MessageEvaluationContext ctx = new MessageEvaluationContext();
150        ctx.setDestination(destination.getActiveMQDestination());
151        BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
152
153        for (int i = 0; i < messages.length; i++) {
154            try {
155
156                if (selectorExpression == null) {
157                    c.add(OpenTypeSupport.convert(messages[i]));
158                } else {
159                    ctx.setMessageReference(messages[i]);
160                    if (selectorExpression.matches(ctx)) {
161                        c.add(OpenTypeSupport.convert(messages[i]));
162                    }
163                }
164
165            } catch (Throwable e) {
166                // TODO DELETE ME
167                System.out.println(e);
168                e.printStackTrace();
169                // TODO DELETE ME
170                LOG.warn("exception browsing destination", e);
171            }
172        }
173
174        CompositeData rc[] = new CompositeData[c.size()];
175        c.toArray(rc);
176        return rc;
177    }
178
179    /**
180     * Browses the current destination returning a list of messages
181     */
182    public List<Object> browseMessages() throws InvalidSelectorException {
183        return browseMessages(null);
184    }
185
186    /**
187     * Browses the current destination with the given selector returning a list
188     * of messages
189     */
190    public List<Object> browseMessages(String selector) throws InvalidSelectorException {
191        Message[] messages = destination.browse();
192        ArrayList<Object> answer = new ArrayList<Object>();
193
194        MessageEvaluationContext ctx = new MessageEvaluationContext();
195        ctx.setDestination(destination.getActiveMQDestination());
196        BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
197
198        for (int i = 0; i < messages.length; i++) {
199            try {
200                Message message = messages[i];
201                if (selectorExpression == null) {
202                    answer.add(OpenTypeSupport.convert(message));
203                } else {
204                    ctx.setMessageReference(message);
205                    if (selectorExpression.matches(ctx)) {
206                        answer.add(message);
207                    }
208                }
209
210            } catch (Throwable e) {
211                LOG.warn("exception browsing destination", e);
212            }
213        }
214        return answer;
215    }
216
217    public TabularData browseAsTable() throws OpenDataException {
218        try {
219            return browseAsTable(null);
220        } catch (InvalidSelectorException e) {
221            throw new RuntimeException(e);
222        }
223    }
224
225    public TabularData browseAsTable(String selector) throws OpenDataException, InvalidSelectorException {
226        OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
227        Message[] messages = destination.browse();
228        CompositeType ct = factory.getCompositeType();
229        TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" });
230        TabularDataSupport rc = new TabularDataSupport(tt);
231
232        MessageEvaluationContext ctx = new MessageEvaluationContext();
233        ctx.setDestination(destination.getActiveMQDestination());
234        BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
235
236        for (int i = 0; i < messages.length; i++) {
237            try {
238                if (selectorExpression == null) {
239                    rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
240                } else {
241                    ctx.setMessageReference(messages[i]);
242                    if (selectorExpression.matches(ctx)) {
243                        rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
244                    }
245                }
246            } catch (Throwable e) {
247                LOG.warn("exception browsing destination", e);
248            }
249        }
250
251        return rc;
252    }
253
254    public String sendTextMessage(String body) throws Exception {
255        return sendTextMessage(Collections.EMPTY_MAP, body);
256    }
257
258    public String sendTextMessage(Map headers, String body) throws Exception {
259        return sendTextMessage(headers, body, null, null);
260    }
261
262    public String sendTextMessage(String body, String user, String password) throws Exception {
263        return sendTextMessage(Collections.EMPTY_MAP, body, user, password);
264    }
265
266    public String sendTextMessage(Map headers, String body, String userName, String password) throws Exception {
267
268        String brokerUrl = "vm://" + broker.getBrokerName();
269        ActiveMQDestination dest = destination.getActiveMQDestination();
270
271        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);
272        Connection connection = null;
273        try {
274
275            connection = cf.createConnection(userName, password);
276            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
277            MessageProducer producer = session.createProducer(dest);
278            ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body);
279
280            for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
281                Map.Entry entry = (Map.Entry) iter.next();
282                msg.setObjectProperty((String) entry.getKey(), entry.getValue());
283            }
284
285            producer.setDeliveryMode(msg.getJMSDeliveryMode());
286            producer.setPriority(msg.getPriority());
287            long ttl = msg.getExpiration() - System.currentTimeMillis();
288            producer.setTimeToLive(ttl > 0 ? ttl : 0);
289            producer.send(msg);
290
291            return msg.getJMSMessageID();
292
293        } finally {
294            connection.close();
295        }
296
297    }
298
299    public int getMaxAuditDepth() {
300        return destination.getMaxAuditDepth();
301    }
302
303    public int getMaxProducersToAudit() {
304        return destination.getMaxProducersToAudit();
305    }
306
307    public boolean isEnableAudit() {
308        return destination.isEnableAudit();
309    }
310
311    public void setEnableAudit(boolean enableAudit) {
312        destination.setEnableAudit(enableAudit);
313    }
314
315    public void setMaxAuditDepth(int maxAuditDepth) {
316        destination.setMaxAuditDepth(maxAuditDepth);
317    }
318
319    public void setMaxProducersToAudit(int maxProducersToAudit) {
320        destination.setMaxProducersToAudit(maxProducersToAudit);
321    }
322
323    public float getMemoryUsagePortion() {
324        return destination.getMemoryUsage().getUsagePortion();
325    }
326
327    public long getProducerCount() {
328        return destination.getDestinationStatistics().getProducers().getCount();
329    }
330
331    public boolean isProducerFlowControl() {
332        return destination.isProducerFlowControl();
333    }
334
335    public void setMemoryUsagePortion(float value) {
336        destination.getMemoryUsage().setUsagePortion(value);
337    }
338
339    public void setProducerFlowControl(boolean producerFlowControl) {
340        destination.setProducerFlowControl(producerFlowControl);
341    }
342
343    /**
344     * Set's the interval at which warnings about producers being blocked by
345     * resource usage will be triggered. Values of 0 or less will disable
346     * warnings
347     * 
348     * @param blockedProducerWarningInterval the interval at which warning about
349     *            blocked producers will be triggered.
350     */
351    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
352        destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
353    }
354
355    /**
356     * 
357     * @return the interval at which warning about blocked producers will be
358     *         triggered.
359     */
360    public long getBlockedProducerWarningInterval() {
361        return destination.getBlockedProducerWarningInterval();
362    }
363
364    public int getMaxPageSize() {
365        return destination.getMaxPageSize();
366    }
367
368    public void setMaxPageSize(int pageSize) {
369        destination.setMaxPageSize(pageSize);
370    }
371
372    public boolean isUseCache() {
373        return destination.isUseCache();
374    }
375
376    public void setUseCache(boolean value) {
377        destination.setUseCache(value);
378    }
379
380    public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException {
381        List<Subscription> subscriptions = destination.getConsumers();
382        ObjectName[] answer = new ObjectName[subscriptions.size()];
383        ObjectName objectName = broker.getBrokerService().getBrokerObjectName();
384        int index = 0;
385        for (Subscription subscription : subscriptions) {
386            String connectionClientId = subscription.getContext().getClientId();
387            String objectNameStr = ManagedRegionBroker.getSubscriptionObjectName(subscription, connectionClientId, objectName);
388            answer[index++] = new ObjectName(objectNameStr);
389        }
390        return answer;
391    }
392
393    public ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException {
394        ObjectName result = null;
395        SlowConsumerStrategy strategy = destination.getSlowConsumerStrategy();
396        if (strategy != null && strategy instanceof AbortSlowConsumerStrategy) {
397            result = broker.registerSlowConsumerStrategy((AbortSlowConsumerStrategy)strategy);
398        }
399        return result;
400    }
401
402}