001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.console.command; 018 019import java.util.ArrayList; 020import java.util.Iterator; 021import java.util.List; 022import java.util.StringTokenizer; 023 024import javax.management.MBeanServerConnection; 025import javax.management.ObjectInstance; 026import javax.management.ObjectName; 027import javax.management.openmbean.CompositeData; 028import javax.management.remote.JMXConnector; 029 030import org.apache.activemq.console.util.JmxMBeansUtil; 031 032public class PurgeCommand extends AbstractJmxCommand { 033 034 protected String[] helpFile = new String[] { 035 "Task Usage: Main purge [browse-options] <destinations>", 036 "Description: Delete selected destination's messages that matches the message selector.", 037 "", 038 "Purge Options:", 039 " --msgsel <msgsel1,msglsel2> Add to the search list messages matched by the query similar to", 040 " the messages selector format.", 041 " --jmxurl <url> Set the JMX URL to connect to.", 042 " --pid <pid> Set the pid to connect to (only on Sun JVM).", 043 " --jmxuser <user> Set the JMX user used for authenticating.", 044 " --jmxpassword <password> Set the JMX password used for authenticating.", 045 " --jmxlocal Use the local JMX server instead of a remote one.", 046 " --version Display the version information.", 047 " -h,-?,--help Display the browse broker help information.", 048 "", 049 "Examples:", 050 " Main purge FOO.BAR", 051 " - Delete all the messages in queue FOO.BAR", 052 053 " Main purge --msgsel JMSMessageID='*:10',JMSPriority>5 FOO.*", 054 " - Delete all the messages in the destinations that matches FOO.* and has a JMSMessageID in", 055 " the header field that matches the wildcard *:10, and has a JMSPriority field > 5 in the", 056 " queue FOO.BAR", 057 " * To use wildcard queries, the field must be a string and the query enclosed in ''", 058 "", 059 }; 060 061 private final List<String> queryAddObjects = new ArrayList<String>(10); 062 private final List<String> querySubObjects = new ArrayList<String>(10); 063 064 /** 065 * Execute the purge command, which allows you to purge the messages in a 066 * given JMS destination 067 * 068 * @param tokens - command arguments 069 * @throws Exception 070 */ 071 protected void runTask(List<String> tokens) throws Exception { 072 try { 073 // If there is no queue name specified, let's select all 074 if (tokens.isEmpty()) { 075 tokens.add("*"); 076 } 077 078 // Iterate through the queue names 079 for (Iterator<String> i = tokens.iterator(); i.hasNext();) { 080 List queueList = JmxMBeansUtil.queryMBeans(createJmxConnection(), "Type=Queue,Destination=" + i.next() + ",*"); 081 082 for (Iterator j = queueList.iterator(); j.hasNext();) { 083 ObjectName queueName = ((ObjectInstance)j.next()).getObjectName(); 084 if (queryAddObjects.isEmpty()) { 085 purgeQueue(queueName); 086 } else { 087 List messages = JmxMBeansUtil.createMessageQueryFilter(createJmxConnection(), queueName).query(queryAddObjects); 088 purgeMessages(queueName, messages); 089 } 090 } 091 } 092 } catch (Exception e) { 093 context.printException(new RuntimeException("Failed to execute purge task. Reason: " + e)); 094 throw new Exception(e); 095 } 096 } 097 098 /** 099 * Purge all the messages in the queue 100 * 101 * @param queue - ObjectName of the queue to purge 102 * @throws Exception 103 */ 104 public void purgeQueue(ObjectName queue) throws Exception { 105 context.printInfo("Purging all messages in queue: " + queue.getKeyProperty("Destination")); 106 createJmxConnection().invoke(queue, "purge", new Object[] {}, new String[] {}); 107 } 108 109 /** 110 * Purge selected messages in the queue 111 * 112 * @param queue - ObjectName of the queue to purge the messages from 113 * @param messages - List of messages to purge 114 * @throws Exception 115 */ 116 public void purgeMessages(ObjectName queue, List messages) throws Exception { 117 Object[] param = new Object[1]; 118 for (Iterator i = messages.iterator(); i.hasNext();) { 119 CompositeData msg = (CompositeData)i.next(); 120 param[0] = "" + msg.get("JMSMessageID"); 121 context.printInfo("Removing message: " + param[0] + " from queue: " + queue.getKeyProperty("Destination")); 122 createJmxConnection().invoke(queue, "removeMessage", param, new String[] { 123 "java.lang.String" 124 }); 125 } 126 } 127 128 /** 129 * Handle the --msgsel, --xmsgsel. 130 * 131 * @param token - option token to handle 132 * @param tokens - succeeding command arguments 133 * @throws Exception 134 */ 135 protected void handleOption(String token, List<String> tokens) throws Exception { 136 // If token is an additive message selector option 137 if (token.startsWith("--msgsel")) { 138 139 // If no message selector is specified, or next token is a new 140 // option 141 if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) { 142 context.printException(new IllegalArgumentException("Message selector not specified")); 143 return; 144 } 145 146 StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER); 147 while (queryTokens.hasMoreTokens()) { 148 queryAddObjects.add(queryTokens.nextToken()); 149 } 150 } else if (token.startsWith("--xmsgsel")) { 151 // If token is a substractive message selector option 152 153 // If no message selector is specified, or next token is a new 154 // option 155 if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) { 156 context.printException(new IllegalArgumentException("Message selector not specified")); 157 return; 158 } 159 160 StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER); 161 while (queryTokens.hasMoreTokens()) { 162 querySubObjects.add(queryTokens.nextToken()); 163 } 164 165 } else { 166 // Let super class handle unknown option 167 super.handleOption(token, tokens); 168 } 169 } 170 171 /** 172 * Print the help messages for the browse command 173 */ 174 protected void printHelp() { 175 context.printHelp(helpFile); 176 } 177 178}