001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.jmx; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.Iterator; 023import java.util.List; 024import java.util.Map; 025import javax.jms.Connection; 026import javax.jms.InvalidSelectorException; 027import javax.jms.MessageProducer; 028import javax.jms.Session; 029import javax.management.MalformedObjectNameException; 030import javax.management.ObjectName; 031import javax.management.openmbean.CompositeData; 032import javax.management.openmbean.CompositeDataSupport; 033import javax.management.openmbean.CompositeType; 034import javax.management.openmbean.OpenDataException; 035import javax.management.openmbean.TabularData; 036import javax.management.openmbean.TabularDataSupport; 037import javax.management.openmbean.TabularType; 038import org.apache.activemq.ActiveMQConnectionFactory; 039import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; 040import org.apache.activemq.broker.region.Destination; 041import org.apache.activemq.broker.region.Subscription; 042import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; 043import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; 044import org.apache.activemq.command.ActiveMQDestination; 045import org.apache.activemq.command.ActiveMQMessage; 046import org.apache.activemq.command.ActiveMQTextMessage; 047import org.apache.activemq.command.Message; 048import org.apache.activemq.filter.BooleanExpression; 049import org.apache.activemq.filter.MessageEvaluationContext; 050import org.apache.activemq.selector.SelectorParser; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054public class DestinationView implements DestinationViewMBean { 055 private static final Logger LOG = LoggerFactory.getLogger(DestinationViewMBean.class); 056 protected final Destination destination; 057 protected final ManagedRegionBroker broker; 058 059 public DestinationView(ManagedRegionBroker broker, Destination destination) { 060 this.broker = broker; 061 this.destination = destination; 062 } 063 064 public void gc() { 065 destination.gc(); 066 } 067 068 public String getName() { 069 return destination.getName(); 070 } 071 072 public void resetStatistics() { 073 destination.getDestinationStatistics().reset(); 074 } 075 076 public long getEnqueueCount() { 077 return destination.getDestinationStatistics().getEnqueues().getCount(); 078 } 079 080 public long getDequeueCount() { 081 return destination.getDestinationStatistics().getDequeues().getCount(); 082 } 083 084 public long getDispatchCount() { 085 return destination.getDestinationStatistics().getDispatched().getCount(); 086 } 087 088 public long getInFlightCount() { 089 return destination.getDestinationStatistics().getInflight().getCount(); 090 } 091 092 public long getExpiredCount() { 093 return destination.getDestinationStatistics().getExpired().getCount(); 094 } 095 096 public long getConsumerCount() { 097 return destination.getDestinationStatistics().getConsumers().getCount(); 098 } 099 100 public long getQueueSize() { 101 return destination.getDestinationStatistics().getMessages().getCount(); 102 } 103 104 public long getMessagesCached() { 105 return destination.getDestinationStatistics().getMessagesCached().getCount(); 106 } 107 108 public int getMemoryPercentUsage() { 109 return destination.getMemoryUsage().getPercentUsage(); 110 } 111 112 public long getMemoryLimit() { 113 return destination.getMemoryUsage().getLimit(); 114 } 115 116 public void setMemoryLimit(long limit) { 117 destination.getMemoryUsage().setLimit(limit); 118 } 119 120 public double getAverageEnqueueTime() { 121 return destination.getDestinationStatistics().getProcessTime().getAverageTime(); 122 } 123 124 public long getMaxEnqueueTime() { 125 return destination.getDestinationStatistics().getProcessTime().getMaxTime(); 126 } 127 128 public long getMinEnqueueTime() { 129 return destination.getDestinationStatistics().getProcessTime().getMinTime(); 130 } 131 132 public boolean isPrioritizedMessages() { 133 return destination.isPrioritizedMessages(); 134 } 135 136 public CompositeData[] browse() throws OpenDataException { 137 try { 138 return browse(null); 139 } catch (InvalidSelectorException e) { 140 // should not happen. 141 throw new RuntimeException(e); 142 } 143 } 144 145 public CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException { 146 Message[] messages = destination.browse(); 147 ArrayList<CompositeData> c = new ArrayList<CompositeData>(); 148 149 MessageEvaluationContext ctx = new MessageEvaluationContext(); 150 ctx.setDestination(destination.getActiveMQDestination()); 151 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); 152 153 for (int i = 0; i < messages.length; i++) { 154 try { 155 156 if (selectorExpression == null) { 157 c.add(OpenTypeSupport.convert(messages[i])); 158 } else { 159 ctx.setMessageReference(messages[i]); 160 if (selectorExpression.matches(ctx)) { 161 c.add(OpenTypeSupport.convert(messages[i])); 162 } 163 } 164 165 } catch (Throwable e) { 166 // TODO DELETE ME 167 System.out.println(e); 168 e.printStackTrace(); 169 // TODO DELETE ME 170 LOG.warn("exception browsing destination", e); 171 } 172 } 173 174 CompositeData rc[] = new CompositeData[c.size()]; 175 c.toArray(rc); 176 return rc; 177 } 178 179 /** 180 * Browses the current destination returning a list of messages 181 */ 182 public List<Object> browseMessages() throws InvalidSelectorException { 183 return browseMessages(null); 184 } 185 186 /** 187 * Browses the current destination with the given selector returning a list 188 * of messages 189 */ 190 public List<Object> browseMessages(String selector) throws InvalidSelectorException { 191 Message[] messages = destination.browse(); 192 ArrayList<Object> answer = new ArrayList<Object>(); 193 194 MessageEvaluationContext ctx = new MessageEvaluationContext(); 195 ctx.setDestination(destination.getActiveMQDestination()); 196 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); 197 198 for (int i = 0; i < messages.length; i++) { 199 try { 200 Message message = messages[i]; 201 if (selectorExpression == null) { 202 answer.add(OpenTypeSupport.convert(message)); 203 } else { 204 ctx.setMessageReference(message); 205 if (selectorExpression.matches(ctx)) { 206 answer.add(message); 207 } 208 } 209 210 } catch (Throwable e) { 211 LOG.warn("exception browsing destination", e); 212 } 213 } 214 return answer; 215 } 216 217 public TabularData browseAsTable() throws OpenDataException { 218 try { 219 return browseAsTable(null); 220 } catch (InvalidSelectorException e) { 221 throw new RuntimeException(e); 222 } 223 } 224 225 public TabularData browseAsTable(String selector) throws OpenDataException, InvalidSelectorException { 226 OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class); 227 Message[] messages = destination.browse(); 228 CompositeType ct = factory.getCompositeType(); 229 TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" }); 230 TabularDataSupport rc = new TabularDataSupport(tt); 231 232 MessageEvaluationContext ctx = new MessageEvaluationContext(); 233 ctx.setDestination(destination.getActiveMQDestination()); 234 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); 235 236 for (int i = 0; i < messages.length; i++) { 237 try { 238 if (selectorExpression == null) { 239 rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i]))); 240 } else { 241 ctx.setMessageReference(messages[i]); 242 if (selectorExpression.matches(ctx)) { 243 rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i]))); 244 } 245 } 246 } catch (Throwable e) { 247 LOG.warn("exception browsing destination", e); 248 } 249 } 250 251 return rc; 252 } 253 254 public String sendTextMessage(String body) throws Exception { 255 return sendTextMessage(Collections.EMPTY_MAP, body); 256 } 257 258 public String sendTextMessage(Map headers, String body) throws Exception { 259 return sendTextMessage(headers, body, null, null); 260 } 261 262 public String sendTextMessage(String body, String user, String password) throws Exception { 263 return sendTextMessage(Collections.EMPTY_MAP, body, user, password); 264 } 265 266 public String sendTextMessage(Map headers, String body, String userName, String password) throws Exception { 267 268 String brokerUrl = "vm://" + broker.getBrokerName(); 269 ActiveMQDestination dest = destination.getActiveMQDestination(); 270 271 ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl); 272 Connection connection = null; 273 try { 274 275 connection = cf.createConnection(userName, password); 276 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 277 MessageProducer producer = session.createProducer(dest); 278 ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body); 279 280 for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) { 281 Map.Entry entry = (Map.Entry) iter.next(); 282 msg.setObjectProperty((String) entry.getKey(), entry.getValue()); 283 } 284 285 producer.setDeliveryMode(msg.getJMSDeliveryMode()); 286 producer.setPriority(msg.getPriority()); 287 long ttl = msg.getExpiration() - System.currentTimeMillis(); 288 producer.setTimeToLive(ttl > 0 ? ttl : 0); 289 producer.send(msg); 290 291 return msg.getJMSMessageID(); 292 293 } finally { 294 connection.close(); 295 } 296 297 } 298 299 public int getMaxAuditDepth() { 300 return destination.getMaxAuditDepth(); 301 } 302 303 public int getMaxProducersToAudit() { 304 return destination.getMaxProducersToAudit(); 305 } 306 307 public boolean isEnableAudit() { 308 return destination.isEnableAudit(); 309 } 310 311 public void setEnableAudit(boolean enableAudit) { 312 destination.setEnableAudit(enableAudit); 313 } 314 315 public void setMaxAuditDepth(int maxAuditDepth) { 316 destination.setMaxAuditDepth(maxAuditDepth); 317 } 318 319 public void setMaxProducersToAudit(int maxProducersToAudit) { 320 destination.setMaxProducersToAudit(maxProducersToAudit); 321 } 322 323 public float getMemoryUsagePortion() { 324 return destination.getMemoryUsage().getUsagePortion(); 325 } 326 327 public long getProducerCount() { 328 return destination.getDestinationStatistics().getProducers().getCount(); 329 } 330 331 public boolean isProducerFlowControl() { 332 return destination.isProducerFlowControl(); 333 } 334 335 public void setMemoryUsagePortion(float value) { 336 destination.getMemoryUsage().setUsagePortion(value); 337 } 338 339 public void setProducerFlowControl(boolean producerFlowControl) { 340 destination.setProducerFlowControl(producerFlowControl); 341 } 342 343 /** 344 * Set's the interval at which warnings about producers being blocked by 345 * resource usage will be triggered. Values of 0 or less will disable 346 * warnings 347 * 348 * @param blockedProducerWarningInterval the interval at which warning about 349 * blocked producers will be triggered. 350 */ 351 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 352 destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval); 353 } 354 355 /** 356 * 357 * @return the interval at which warning about blocked producers will be 358 * triggered. 359 */ 360 public long getBlockedProducerWarningInterval() { 361 return destination.getBlockedProducerWarningInterval(); 362 } 363 364 public int getMaxPageSize() { 365 return destination.getMaxPageSize(); 366 } 367 368 public void setMaxPageSize(int pageSize) { 369 destination.setMaxPageSize(pageSize); 370 } 371 372 public boolean isUseCache() { 373 return destination.isUseCache(); 374 } 375 376 public void setUseCache(boolean value) { 377 destination.setUseCache(value); 378 } 379 380 public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException { 381 List<Subscription> subscriptions = destination.getConsumers(); 382 ObjectName[] answer = new ObjectName[subscriptions.size()]; 383 ObjectName objectName = broker.getBrokerService().getBrokerObjectName(); 384 int index = 0; 385 for (Subscription subscription : subscriptions) { 386 String connectionClientId = subscription.getContext().getClientId(); 387 String objectNameStr = ManagedRegionBroker.getSubscriptionObjectName(subscription, connectionClientId, objectName); 388 answer[index++] = new ObjectName(objectNameStr); 389 } 390 return answer; 391 } 392 393 public ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException { 394 ObjectName result = null; 395 SlowConsumerStrategy strategy = destination.getSlowConsumerStrategy(); 396 if (strategy != null && strategy instanceof AbortSlowConsumerStrategy) { 397 result = broker.registerSlowConsumerStrategy((AbortSlowConsumerStrategy)strategy); 398 } 399 return result; 400 } 401 402}