001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.console.filter; 018 019import java.net.URI; 020import java.util.Collections; 021import java.util.Iterator; 022import java.util.List; 023 024import javax.jms.Connection; 025import javax.jms.Destination; 026import javax.jms.JMSException; 027import javax.jms.QueueBrowser; 028import javax.jms.Session; 029 030import org.apache.activemq.ActiveMQConnectionFactory; 031import org.apache.activemq.command.ActiveMQQueue; 032import org.apache.activemq.command.ActiveMQTopic; 033 034public class AmqMessagesQueryFilter extends AbstractQueryFilter { 035 036 private URI brokerUrl; 037 private Destination destination; 038 039 /** 040 * Create a JMS message query filter 041 * 042 * @param brokerUrl - broker url to connect to 043 * @param destination - JMS destination to query 044 */ 045 public AmqMessagesQueryFilter(URI brokerUrl, Destination destination) { 046 super(null); 047 this.brokerUrl = brokerUrl; 048 this.destination = destination; 049 } 050 051 /** 052 * Queries the specified destination using the message selector format query 053 * 054 * @param queries - message selector queries 055 * @return list messages that matches the selector 056 * @throws Exception 057 */ 058 public List query(List queries) throws Exception { 059 String selector = ""; 060 061 // Convert to message selector 062 for (Iterator i = queries.iterator(); i.hasNext();) { 063 selector = selector + "(" + i.next().toString() + ") AND "; 064 } 065 066 // Remove last AND 067 if (!selector.equals("")) { 068 selector = selector.substring(0, selector.length() - 5); 069 } 070 071 if (destination instanceof ActiveMQQueue) { 072 return queryMessages((ActiveMQQueue)destination, selector); 073 } else { 074 return queryMessages((ActiveMQTopic)destination, selector); 075 } 076 } 077 078 /** 079 * Query the messages of a queue destination using a queue browser 080 * 081 * @param queue - queue destination 082 * @param selector - message selector 083 * @return list of messages that matches the selector 084 * @throws Exception 085 */ 086 protected List queryMessages(ActiveMQQueue queue, String selector) throws Exception { 087 Connection conn = createConnection(getBrokerUrl()); 088 089 Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 090 QueueBrowser browser = sess.createBrowser(queue, selector); 091 092 List messages = Collections.list(browser.getEnumeration()); 093 094 conn.close(); 095 096 return messages; 097 } 098 099 /** 100 * Query the messages of a topic destination using a message consumer 101 * 102 * @param topic - topic destination 103 * @param selector - message selector 104 * @return list of messages that matches the selector 105 * @throws Exception 106 */ 107 protected List queryMessages(ActiveMQTopic topic, String selector) throws Exception { 108 // TODO: should we use a durable subscriber or a retroactive non-durable 109 // subscriber? 110 // TODO: if a durable subscriber is used, how do we manage it? 111 // subscribe/unsubscribe tasks? 112 return null; 113 } 114 115 /** 116 * Create and start a JMS connection 117 * 118 * @param brokerUrl - broker url to connect to. 119 * @return JMS connection 120 * @throws JMSException 121 */ 122 protected Connection createConnection(URI brokerUrl) throws JMSException { 123 Connection conn = (new ActiveMQConnectionFactory(brokerUrl)).createConnection(); 124 conn.start(); 125 return conn; 126 } 127 128 /** 129 * Get the broker url being used. 130 * 131 * @return broker url 132 */ 133 public URI getBrokerUrl() { 134 return brokerUrl; 135 } 136 137 /** 138 * Set the broker url to use. 139 * 140 * @param brokerUrl - broker url 141 */ 142 public void setBrokerUrl(URI brokerUrl) { 143 this.brokerUrl = brokerUrl; 144 } 145 146 /** 147 * Get the destination being used. 148 * 149 * @return - JMS destination 150 */ 151 public Destination getDestination() { 152 return destination; 153 } 154 155 /** 156 * Set the destination to use. 157 * 158 * @param destination - JMS destination 159 */ 160 public void setDestination(Destination destination) { 161 this.destination = destination; 162 } 163 164}