001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.console.command;
018
019import java.util.ArrayList;
020import java.util.Iterator;
021import java.util.List;
022import java.util.StringTokenizer;
023
024import javax.management.MBeanServerConnection;
025import javax.management.ObjectInstance;
026import javax.management.ObjectName;
027import javax.management.openmbean.CompositeData;
028import javax.management.remote.JMXConnector;
029
030import org.apache.activemq.console.util.JmxMBeansUtil;
031
032public class PurgeCommand extends AbstractJmxCommand {
033
034    protected String[] helpFile = new String[] {
035        "Task Usage: Main purge [browse-options] <destinations>",
036        "Description: Delete selected destination's messages that matches the message selector.", 
037        "", 
038        "Purge Options:",
039        "    --msgsel <msgsel1,msglsel2>   Add to the search list messages matched by the query similar to",
040        "                                  the messages selector format.",
041        "    --jmxurl <url>                Set the JMX URL to connect to.",
042        "    --pid <pid>                   Set the pid to connect to (only on Sun JVM).",            
043        "    --jmxuser <user>              Set the JMX user used for authenticating.",
044        "    --jmxpassword <password>      Set the JMX password used for authenticating.",
045        "    --jmxlocal                    Use the local JMX server instead of a remote one.",
046        "    --version                     Display the version information.",
047        "    -h,-?,--help                  Display the browse broker help information.", 
048        "", 
049        "Examples:",
050        "    Main purge FOO.BAR", 
051        "        - Delete all the messages in queue FOO.BAR",
052
053        "    Main purge --msgsel JMSMessageID='*:10',JMSPriority>5 FOO.*", 
054        "        - Delete all the messages in the destinations that matches FOO.* and has a JMSMessageID in",
055        "          the header field that matches the wildcard *:10, and has a JMSPriority field > 5 in the",
056        "          queue FOO.BAR",
057        "        * To use wildcard queries, the field must be a string and the query enclosed in ''",
058        "",
059    };
060
061    private final List<String> queryAddObjects = new ArrayList<String>(10);
062    private final List<String> querySubObjects = new ArrayList<String>(10);
063
064    /**
065     * Execute the purge command, which allows you to purge the messages in a
066     * given JMS destination
067     * 
068     * @param tokens - command arguments
069     * @throws Exception
070     */
071    protected void runTask(List<String> tokens) throws Exception {
072        try {
073            // If there is no queue name specified, let's select all
074            if (tokens.isEmpty()) {
075                tokens.add("*");
076            }
077
078            // Iterate through the queue names
079            for (Iterator<String> i = tokens.iterator(); i.hasNext();) {
080                List queueList = JmxMBeansUtil.queryMBeans(createJmxConnection(), "Type=Queue,Destination=" + i.next() + ",*");
081
082                for (Iterator j = queueList.iterator(); j.hasNext();) {
083                    ObjectName queueName = ((ObjectInstance)j.next()).getObjectName();
084                    if (queryAddObjects.isEmpty()) {
085                        purgeQueue(queueName);
086                    } else {
087                        List messages = JmxMBeansUtil.createMessageQueryFilter(createJmxConnection(), queueName).query(queryAddObjects);
088                        purgeMessages(queueName, messages);
089                    }
090                }
091            }
092        } catch (Exception e) {
093            context.printException(new RuntimeException("Failed to execute purge task. Reason: " + e));
094            throw new Exception(e);
095        }
096    }
097
098    /**
099     * Purge all the messages in the queue
100     * 
101     * @param queue - ObjectName of the queue to purge
102     * @throws Exception
103     */
104    public void purgeQueue(ObjectName queue) throws Exception {
105        context.printInfo("Purging all messages in queue: " + queue.getKeyProperty("Destination"));
106        createJmxConnection().invoke(queue, "purge", new Object[] {}, new String[] {});
107    }
108
109    /**
110     * Purge selected messages in the queue
111     * 
112     * @param queue - ObjectName of the queue to purge the messages from
113     * @param messages - List of messages to purge
114     * @throws Exception
115     */
116    public void purgeMessages(ObjectName queue, List messages) throws Exception {
117        Object[] param = new Object[1];
118        for (Iterator i = messages.iterator(); i.hasNext();) {
119            CompositeData msg = (CompositeData)i.next();
120            param[0] = "" + msg.get("JMSMessageID");
121            context.printInfo("Removing message: " + param[0] + " from queue: " + queue.getKeyProperty("Destination"));
122            createJmxConnection().invoke(queue, "removeMessage", param, new String[] {
123                "java.lang.String"
124            });
125        }
126    }
127
128    /**
129     * Handle the --msgsel, --xmsgsel.
130     * 
131     * @param token - option token to handle
132     * @param tokens - succeeding command arguments
133     * @throws Exception
134     */
135    protected void handleOption(String token, List<String> tokens) throws Exception {
136        // If token is an additive message selector option
137        if (token.startsWith("--msgsel")) {
138
139            // If no message selector is specified, or next token is a new
140            // option
141            if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) {
142                context.printException(new IllegalArgumentException("Message selector not specified"));
143                return;
144            }
145
146            StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER);
147            while (queryTokens.hasMoreTokens()) {
148                queryAddObjects.add(queryTokens.nextToken());
149            }
150        } else if (token.startsWith("--xmsgsel")) {
151            // If token is a substractive message selector option
152
153            // If no message selector is specified, or next token is a new
154            // option
155            if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) {
156                context.printException(new IllegalArgumentException("Message selector not specified"));
157                return;
158            }
159
160            StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER);
161            while (queryTokens.hasMoreTokens()) {
162                querySubObjects.add(queryTokens.nextToken());
163            }
164
165        } else {
166            // Let super class handle unknown option
167            super.handleOption(token, tokens);
168        }
169    }
170
171    /**
172     * Print the help messages for the browse command
173     */
174    protected void printHelp() {
175        context.printHelp(helpFile);
176    }
177
178}