001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.console.command.store.amq;
018
019import java.io.File;
020import java.io.InputStream;
021import java.io.PrintWriter;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Scanner;
030
031import org.apache.activemq.command.ActiveMQBlobMessage;
032import org.apache.activemq.command.ActiveMQBytesMessage;
033import org.apache.activemq.command.ActiveMQMapMessage;
034import org.apache.activemq.command.ActiveMQMessage;
035import org.apache.activemq.command.ActiveMQObjectMessage;
036import org.apache.activemq.command.ActiveMQStreamMessage;
037import org.apache.activemq.command.ActiveMQTextMessage;
038import org.apache.activemq.command.DataStructure;
039import org.apache.activemq.command.JournalQueueAck;
040import org.apache.activemq.command.JournalTopicAck;
041import org.apache.activemq.command.JournalTrace;
042import org.apache.activemq.command.JournalTransaction;
043import org.apache.activemq.kaha.impl.async.Location;
044import org.apache.activemq.kaha.impl.async.ReadOnlyAsyncDataManager;
045import org.apache.activemq.openwire.OpenWireFormat;
046import org.apache.activemq.util.ByteSequence;
047import org.apache.activemq.wireformat.WireFormat;
048import org.apache.velocity.Template;
049import org.apache.velocity.VelocityContext;
050import org.apache.velocity.app.Velocity;
051import org.apache.velocity.app.VelocityEngine;
052import org.josql.Query;
053
054/**
055 * Allows you to view the contents of a Journal.
056 * 
057 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
058 */
059public class AMQJournalTool {
060
061        private final ArrayList<File> dirs = new ArrayList<File>();
062        private final WireFormat wireFormat = new OpenWireFormat();
063        private final HashMap<String, String> resources = new HashMap<String, String>();
064
065        private String messageFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}";
066        private String topicAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}";
067        private String queueAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}";
068        private String transactionFormat = "${location.dataFileId},${location.offset}|${type}|${record.transactionId}";
069        private String traceFormat = "${location.dataFileId},${location.offset}|${type}|${record.message}";
070        private String unknownFormat = "${location.dataFileId},${location.offset}|${type}|${record.class.name}";
071        private String where;
072        private VelocityContext context;
073        private VelocityEngine velocity;
074        private boolean help;
075
076        public static void main(String[] args) throws Exception {
077                AMQJournalTool consumerTool = new AMQJournalTool();
078                String[] directories = CommandLineSupport
079                                .setOptions(consumerTool, args);
080                if (directories.length < 1) {
081                        System.out
082                                        .println("Please specify the directories with journal data to scan");
083                        return;
084                }
085                for (int i = 0; i < directories.length; i++) {
086                        consumerTool.getDirs().add(new File(directories[i]));
087                }
088                consumerTool.execute();
089        }
090
091        public void execute() throws Exception {
092
093                if( help ) {
094                        showHelp();
095                        return;
096                }
097                
098                if (getDirs().size() < 1) {
099                        System.out.println("");
100                        System.out.println("Invalid Usage: Please specify the directories with journal data to scan");
101                        System.out.println("");
102                        showHelp();
103                        return;
104                }
105
106                for (File dir : getDirs()) {
107                        if( !dir.exists() ) {
108                                System.out.println("");
109                                System.out.println("Invalid Usage: the directory '"+dir.getPath()+"' does not exist");
110                                System.out.println("");
111                                showHelp();
112                                return;
113                        }
114                        if( !dir.isDirectory() ) {
115                                System.out.println("");
116                                System.out.println("Invalid Usage: the argument '"+dir.getPath()+"' is not a directory");
117                                System.out.println("");
118                                showHelp();
119                                return;
120                        }
121                }
122                
123                
124                context = new VelocityContext();
125                List keys = Arrays.asList(context.getKeys());
126
127                for (Iterator iterator = System.getProperties().entrySet()
128                                .iterator(); iterator.hasNext();) {
129                        Map.Entry kv = (Map.Entry) iterator.next();
130                        String name = (String) kv.getKey();
131                        String value = (String) kv.getValue();
132
133                        if (!keys.contains(name)) {
134                                context.put(name, value);
135                        }
136                }
137                
138                velocity = new VelocityEngine();
139                velocity.setProperty(Velocity.RESOURCE_LOADER, "all");
140                velocity.setProperty("all.resource.loader.class", CustomResourceLoader.class.getName());
141                velocity.init();
142
143
144                resources.put("message", messageFormat);
145                resources.put("topicAck", topicAckFormat);
146                resources.put("queueAck", queueAckFormat);
147                resources.put("transaction", transactionFormat);
148                resources.put("trace", traceFormat);
149                resources.put("unknown", unknownFormat);
150
151                Query query = null;
152                if (where != null) {
153                        query = new Query();
154                        query.parse("select * from "+Entry.class.getName()+" where "+where);
155
156                }
157
158                ReadOnlyAsyncDataManager manager = new ReadOnlyAsyncDataManager(getDirs());
159                manager.start();
160                try {
161                        Location curr = manager.getFirstLocation();
162                        while (curr != null) {
163
164                                ByteSequence data = manager.read(curr);
165                                DataStructure c = (DataStructure) wireFormat.unmarshal(data);
166
167                                Entry entry = new Entry();
168                                entry.setLocation(curr);
169                                entry.setRecord(c);
170                                entry.setData(data);
171                                entry.setQuery(query);
172                                process(entry);
173
174                                curr = manager.getNextLocation(curr);
175                        }
176                } finally {
177                        manager.close();
178                }
179        }
180
181        private void showHelp() {
182                InputStream is = AMQJournalTool.class.getResourceAsStream("help.txt");
183                Scanner scanner = new Scanner(is);
184                while (scanner.hasNextLine()) {
185                        String line = scanner.nextLine();
186                        System.out.println(line);
187                }
188                scanner.close();        }
189
190        private void process(Entry entry) throws Exception {
191
192                Location location = entry.getLocation();
193                DataStructure record = entry.getRecord();
194
195                switch (record.getDataStructureType()) {
196                case ActiveMQMessage.DATA_STRUCTURE_TYPE:
197                        entry.setType("ActiveMQMessage");
198                        entry.setFormater("message");
199                        display(entry);
200                        break;
201                case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE:
202                        entry.setType("ActiveMQBytesMessage");
203                        entry.setFormater("message");
204                        display(entry);
205                        break;
206                case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE:
207                        entry.setType("ActiveMQBlobMessage");
208                        entry.setFormater("message");
209                        display(entry);
210                        break;
211                case ActiveMQMapMessage.DATA_STRUCTURE_TYPE:
212                        entry.setType("ActiveMQMapMessage");
213                        entry.setFormater("message");
214                        display(entry);
215                        break;
216                case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE:
217                        entry.setType("ActiveMQObjectMessage");
218                        entry.setFormater("message");
219                        display(entry);
220                        break;
221                case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE:
222                        entry.setType("ActiveMQStreamMessage");
223                        entry.setFormater("message");
224                        display(entry);
225                        break;
226                case ActiveMQTextMessage.DATA_STRUCTURE_TYPE:
227                        entry.setType("ActiveMQTextMessage");
228                        entry.setFormater("message");
229                        display(entry);
230                        break;
231                case JournalQueueAck.DATA_STRUCTURE_TYPE:
232                        entry.setType("Queue Ack");
233                        entry.setFormater("queueAck");
234                        display(entry);
235                        break;
236                case JournalTopicAck.DATA_STRUCTURE_TYPE:
237                        entry.setType("Topic Ack");
238                        entry.setFormater("topicAck");
239                        display(entry);
240                        break;
241                case JournalTransaction.DATA_STRUCTURE_TYPE:
242                        entry.setType(getType((JournalTransaction) record));
243                        entry.setFormater("transaction");
244                        display(entry);
245                        break;
246                case JournalTrace.DATA_STRUCTURE_TYPE:
247                        entry.setType("Trace");
248                        entry.setFormater("trace");
249                        display(entry);
250                        break;
251                default:
252                        entry.setType("Unknown");
253                        entry.setFormater("unknown");
254                        display(entry);
255                        break;
256                }
257        }
258
259        private String getType(JournalTransaction record) {
260                switch (record.getType()) {
261                case JournalTransaction.XA_PREPARE:
262                        return "XA Prepare";
263                case JournalTransaction.XA_COMMIT:
264                        return "XA Commit";
265                case JournalTransaction.XA_ROLLBACK:
266                        return "XA Rollback";
267                case JournalTransaction.LOCAL_COMMIT:
268                        return "Commit";
269                case JournalTransaction.LOCAL_ROLLBACK:
270                        return "Rollback";
271                }
272                return "Unknown Transaction";
273        }
274
275        private void display(Entry entry) throws Exception {
276
277                if (entry.getQuery() != null) {
278                        List list = Collections.singletonList(entry);
279                        List results = entry.getQuery().execute(list).getResults();
280                        if (results.isEmpty()) {
281                                return;
282                        }
283                }
284
285                CustomResourceLoader.setResources(resources);
286                try {
287
288                        context.put("location", entry.getLocation());
289                        context.put("record", entry.getRecord());
290                        context.put("type", entry.getType());
291                        if (entry.getRecord() instanceof ActiveMQMessage) {
292                                context.put("body", new MessageBodyFormatter(
293                                                (ActiveMQMessage) entry.getRecord()));
294                        }
295
296                        Template template = velocity.getTemplate(entry.getFormater());
297                        PrintWriter writer = new PrintWriter(System.out);
298                        template.merge(context, writer);
299                        writer.println();
300                        writer.flush();
301                } finally {
302                        CustomResourceLoader.setResources(null);
303                }
304        }
305
306        public void setMessageFormat(String messageFormat) {
307                this.messageFormat = messageFormat;
308        }
309
310        public void setTopicAckFormat(String ackFormat) {
311                this.topicAckFormat = ackFormat;
312        }
313
314        public void setTransactionFormat(String transactionFormat) {
315                this.transactionFormat = transactionFormat;
316        }
317
318        public void setTraceFormat(String traceFormat) {
319                this.traceFormat = traceFormat;
320        }
321
322        public void setUnknownFormat(String unknownFormat) {
323                this.unknownFormat = unknownFormat;
324        }
325
326        public void setQueueAckFormat(String queueAckFormat) {
327                this.queueAckFormat = queueAckFormat;
328        }
329
330        public String getQuery() {
331                return where;
332        }
333
334        public void setWhere(String query) {
335                this.where = query;
336        }
337
338        public boolean isHelp() {
339                return help;
340        }
341
342        public void setHelp(boolean help) {
343                this.help = help;
344        }
345
346        /**
347         * @return the dirs
348         */
349        public ArrayList<File> getDirs() {
350                return dirs;
351        }
352
353}