001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.console.filter;
018
019import java.net.URI;
020import java.util.Collections;
021import java.util.Iterator;
022import java.util.List;
023
024import javax.jms.Connection;
025import javax.jms.Destination;
026import javax.jms.JMSException;
027import javax.jms.QueueBrowser;
028import javax.jms.Session;
029
030import org.apache.activemq.ActiveMQConnectionFactory;
031import org.apache.activemq.command.ActiveMQQueue;
032import org.apache.activemq.command.ActiveMQTopic;
033
034public class AmqMessagesQueryFilter extends AbstractQueryFilter {
035
036    private URI brokerUrl;
037    private Destination destination;
038
039    /**
040     * Create a JMS message query filter
041     * 
042     * @param brokerUrl - broker url to connect to
043     * @param destination - JMS destination to query
044     */
045    public AmqMessagesQueryFilter(URI brokerUrl, Destination destination) {
046        super(null);
047        this.brokerUrl = brokerUrl;
048        this.destination = destination;
049    }
050
051    /**
052     * Queries the specified destination using the message selector format query
053     * 
054     * @param queries - message selector queries
055     * @return list messages that matches the selector
056     * @throws Exception
057     */
058    public List query(List queries) throws Exception {
059        String selector = "";
060
061        // Convert to message selector
062        for (Iterator i = queries.iterator(); i.hasNext();) {
063            selector = selector + "(" + i.next().toString() + ") AND ";
064        }
065
066        // Remove last AND
067        if (!selector.equals("")) {
068            selector = selector.substring(0, selector.length() - 5);
069        }
070
071        if (destination instanceof ActiveMQQueue) {
072            return queryMessages((ActiveMQQueue)destination, selector);
073        } else {
074            return queryMessages((ActiveMQTopic)destination, selector);
075        }
076    }
077
078    /**
079     * Query the messages of a queue destination using a queue browser
080     * 
081     * @param queue - queue destination
082     * @param selector - message selector
083     * @return list of messages that matches the selector
084     * @throws Exception
085     */
086    protected List queryMessages(ActiveMQQueue queue, String selector) throws Exception {
087        Connection conn = createConnection(getBrokerUrl());
088
089        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
090        QueueBrowser browser = sess.createBrowser(queue, selector);
091
092        List messages = Collections.list(browser.getEnumeration());
093
094        conn.close();
095
096        return messages;
097    }
098
099    /**
100     * Query the messages of a topic destination using a message consumer
101     * 
102     * @param topic - topic destination
103     * @param selector - message selector
104     * @return list of messages that matches the selector
105     * @throws Exception
106     */
107    protected List queryMessages(ActiveMQTopic topic, String selector) throws Exception {
108        // TODO: should we use a durable subscriber or a retroactive non-durable
109        // subscriber?
110        // TODO: if a durable subscriber is used, how do we manage it?
111        // subscribe/unsubscribe tasks?
112        return null;
113    }
114
115    /**
116     * Create and start a JMS connection
117     * 
118     * @param brokerUrl - broker url to connect to.
119     * @return JMS connection
120     * @throws JMSException
121     */
122    protected Connection createConnection(URI brokerUrl) throws JMSException {
123        Connection conn = (new ActiveMQConnectionFactory(brokerUrl)).createConnection();
124        conn.start();
125        return conn;
126    }
127
128    /**
129     * Get the broker url being used.
130     * 
131     * @return broker url
132     */
133    public URI getBrokerUrl() {
134        return brokerUrl;
135    }
136
137    /**
138     * Set the broker url to use.
139     * 
140     * @param brokerUrl - broker url
141     */
142    public void setBrokerUrl(URI brokerUrl) {
143        this.brokerUrl = brokerUrl;
144    }
145
146    /**
147     * Get the destination being used.
148     * 
149     * @return - JMS destination
150     */
151    public Destination getDestination() {
152        return destination;
153    }
154
155    /**
156     * Set the destination to use.
157     * 
158     * @param destination - JMS destination
159     */
160    public void setDestination(Destination destination) {
161        this.destination = destination;
162    }
163
164}