001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.console.command.store.amq.reader;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.HashSet;
022import java.util.Iterator;
023import java.util.Set;
024
025import javax.jms.InvalidSelectorException;
026import javax.jms.Message;
027
028import org.apache.activemq.command.DataStructure;
029import org.apache.activemq.filter.BooleanExpression;
030import org.apache.activemq.kaha.impl.async.AsyncDataManager;
031import org.apache.activemq.kaha.impl.async.Location;
032import org.apache.activemq.openwire.OpenWireFormat;
033import org.apache.activemq.selector.SelectorParser;
034import org.apache.activemq.util.ByteSequence;
035import org.apache.activemq.wireformat.WireFormat;
036
037/**
038 * Reads and iterates through data log files for the AMQMessage Store
039 * 
040 */
041public class AMQReader implements Iterable<Message> {
042
043    private AsyncDataManager dataManager;
044    private WireFormat wireFormat = new OpenWireFormat();
045    private File file;
046    private BooleanExpression expression;
047
048    /**
049     * List all the data files in a directory
050     * @param directory
051     * @return
052     * @throws IOException
053     */
054    public static Set<File> listDataFiles(File directory) throws IOException{
055        Set<File>result = new HashSet<File>();
056        if (directory == null || !directory.exists() || !directory.isDirectory()) {
057            throw new IOException("Invalid Directory " + directory);
058        }
059        AsyncDataManager dataManager = new AsyncDataManager();
060        dataManager.setDirectory(directory);
061        dataManager.start();
062        Set<File> set = dataManager.getFiles();
063        if (set != null) {
064            result.addAll(set);
065        }
066        dataManager.close();
067        return result;
068    }
069    /**
070     * Create the AMQReader to read a directory of amq data logs - or an
071     * individual data log file
072     * 
073     * @param file the directory - or file
074     * @throws IOException 
075     * @throws InvalidSelectorException 
076     * @throws IOException
077     * @throws InvalidSelectorException 
078     */
079    public AMQReader(File file) throws InvalidSelectorException, IOException {
080        this(file,null);
081    }
082    
083    /**
084     * Create the AMQReader to read a directory of amq data logs - or an
085     * individual data log file
086     * 
087     * @param file the directory - or file
088     * @param selector the JMS selector or null to select all
089     * @throws IOException
090     * @throws InvalidSelectorException 
091     */
092    public AMQReader(File file, String selector) throws IOException, InvalidSelectorException {
093        String str = selector != null ? selector.trim() : null;
094        if (str != null && str.length() > 0) {
095            this.expression=SelectorParser.parse(str);
096        }
097        dataManager = new AsyncDataManager();
098        dataManager.setArchiveDataLogs(false);
099        if (file.isDirectory()) {
100            dataManager.setDirectory(file);
101        } else {
102            dataManager.setDirectory(file.getParentFile());
103            dataManager.setDirectoryArchive(file);
104            this.file = file;
105        }
106        dataManager.start();
107    }
108
109    public Iterator<Message> iterator() {
110        return new AMQIterator(this,this.expression);
111    }
112
113    
114    protected MessageLocation getNextMessage(MessageLocation lastLocation)
115            throws IllegalStateException, IOException {
116        if (this.file != null) {
117            return getInternalNextMessage(this.file, lastLocation);
118        }
119        return getInternalNextMessage(lastLocation);
120    }
121
122    private MessageLocation getInternalNextMessage(MessageLocation lastLocation)
123            throws IllegalStateException, IOException {
124        return getInternalNextMessage(null, lastLocation);
125    }
126
127    private MessageLocation getInternalNextMessage(File file,
128            MessageLocation lastLocation) throws IllegalStateException,
129            IOException {
130        MessageLocation result = lastLocation;
131        if (result != null) {
132            result.setMessage(null);
133        }
134        Message message = null;
135        Location pos = lastLocation != null ? lastLocation.getLocation() : null;
136        while ((pos = getNextLocation(file, pos)) != null) {
137            message = getMessage(pos);
138            if (message != null) {
139                if (result == null) {
140                    result = new MessageLocation();
141                }
142                result.setMessage(message);
143                break;
144            }
145        }
146        result.setLocation(pos);
147        if (pos == null && message == null) {
148            result = null;
149        } else {
150            result.setLocation(pos);
151        }
152        return result;
153    }
154
155    private Location getNextLocation(File file, Location last)
156            throws IllegalStateException, IOException {
157        if (file != null) {
158            return dataManager.getNextLocation(file, last, true);
159        }
160        return dataManager.getNextLocation(last);
161    }
162
163    private Message getMessage(Location location) throws IOException {
164        ByteSequence data = dataManager.read(location);
165        DataStructure c = (DataStructure) wireFormat.unmarshal(data);
166        if (c instanceof Message) {
167            return (Message) c;
168        }
169        return null;
170
171    }
172}