001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.Serializable;
020
021import javax.jms.JMSException;
022import javax.jms.Message;
023
024import org.apache.activemq.broker.region.MessageReference;
025import org.apache.activemq.command.MessageId;
026import org.apache.activemq.command.ProducerId;
027import org.apache.activemq.util.BitArrayBin;
028import org.apache.activemq.util.IdGenerator;
029import org.apache.activemq.util.LRUCache;
030
031/**
032 * Provides basic audit functions for Messages without sync
033 * 
034 * 
035 */
036public class ActiveMQMessageAuditNoSync implements Serializable {
037
038    private static final long serialVersionUID = 1L;
039
040    public static final int DEFAULT_WINDOW_SIZE = 2048;
041    public static final int MAXIMUM_PRODUCER_COUNT = 64;
042    private int auditDepth;
043    private int maximumNumberOfProducersToTrack;
044    private LRUCache<Object, BitArrayBin> map;
045
046    /**
047     * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
048     * 64
049     */
050    public ActiveMQMessageAuditNoSync() {
051        this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
052    }
053
054    /**
055     * Construct a MessageAudit
056     * 
057     * @param auditDepth range of ids to track
058     * @param maximumNumberOfProducersToTrack number of producers expected in
059     *                the system
060     */
061    public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) {
062        this.auditDepth = auditDepth;
063        this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
064        this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
065    }
066    
067    /**
068     * @return the auditDepth
069     */
070    public int getAuditDepth() {
071        return auditDepth;
072    }
073
074    /**
075     * @param auditDepth the auditDepth to set
076     */
077    public void setAuditDepth(int auditDepth) {
078        this.auditDepth = auditDepth;
079    }
080
081    /**
082     * @return the maximumNumberOfProducersToTrack
083     */
084    public int getMaximumNumberOfProducersToTrack() {
085        return maximumNumberOfProducersToTrack;
086    }
087
088    /**
089     * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
090     */
091    public void setMaximumNumberOfProducersToTrack(
092            int maximumNumberOfProducersToTrack) {
093        this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
094        this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
095    }
096
097    /**
098     * Checks if this message has been seen before
099     * 
100     * @param message
101     * @return true if the message is a duplicate
102     * @throws JMSException
103     */
104    public boolean isDuplicate(Message message) throws JMSException {
105        return isDuplicate(message.getJMSMessageID());
106    }
107
108    /**
109     * checks whether this messageId has been seen before and adds this
110     * messageId to the list
111     * 
112     * @param id
113     * @return true if the message is a duplicate
114     */
115    public boolean isDuplicate(String id) {
116        boolean answer = false;
117        String seed = IdGenerator.getSeedFromId(id);
118        if (seed != null) {
119            BitArrayBin bab = map.get(seed);
120            if (bab == null) {
121                bab = new BitArrayBin(auditDepth);
122                map.put(seed, bab);
123            }
124            long index = IdGenerator.getSequenceFromId(id);
125            if (index >= 0) {
126                answer = bab.setBit(index, true);
127            }
128        }
129        return answer;
130    }
131
132    /**
133     * Checks if this message has been seen before
134     * 
135     * @param message
136     * @return true if the message is a duplicate
137     */
138    public boolean isDuplicate(final MessageReference message) {
139        MessageId id = message.getMessageId();
140        return isDuplicate(id);
141    }
142    
143    /**
144     * Checks if this messageId has been seen before
145     * 
146     * @param id
147     * @return true if the message is a duplicate
148     */
149    public boolean isDuplicate(final MessageId id) {
150        boolean answer = false;
151        
152        if (id != null) {
153            ProducerId pid = id.getProducerId();
154            if (pid != null) {
155                BitArrayBin bab = map.get(pid);
156                if (bab == null) {
157                    bab = new BitArrayBin(auditDepth);
158                    map.put(pid, bab);
159                }
160                answer = bab.setBit(id.getProducerSequenceId(), true);
161            }
162        }
163        return answer;
164    }
165
166    /**
167     * mark this message as being received
168     * 
169     * @param message
170     */
171    public void rollback(final MessageReference message) {
172        MessageId id = message.getMessageId();
173        rollback(id);
174    }
175    
176    /**
177     * mark this message as being received
178     * 
179     * @param id
180     */
181    public void rollback(final  MessageId id) {
182        if (id != null) {
183            ProducerId pid = id.getProducerId();
184            if (pid != null) {
185                BitArrayBin bab = map.get(pid);
186                if (bab != null) {
187                    bab.setBit(id.getProducerSequenceId(), false);
188                }
189            }
190        }
191    }
192    
193    /**
194     * Check the message is in order
195     * @param msg
196     * @return
197     * @throws JMSException
198     */
199    public boolean isInOrder(Message msg) throws JMSException {
200        return isInOrder(msg.getJMSMessageID());
201    }
202    
203    /**
204     * Check the message id is in order
205     * @param id
206     * @return
207     */
208    public boolean isInOrder(final String id) {
209        boolean answer = true;
210        
211        if (id != null) {
212            String seed = IdGenerator.getSeedFromId(id);
213            if (seed != null) {
214                BitArrayBin bab = map.get(seed);
215                if (bab != null) {
216                    long index = IdGenerator.getSequenceFromId(id);
217                    answer = bab.isInOrder(index);
218                }
219               
220            }
221        }
222        return answer;
223    }
224    
225    /**
226     * Check the MessageId is in order
227     * @param message 
228     * @return
229     */
230    public boolean isInOrder(final MessageReference message) {
231        return isInOrder(message.getMessageId());
232    }
233    
234    /**
235     * Check the MessageId is in order
236     * @param id
237     * @return
238     */
239    public boolean isInOrder(final MessageId id) {
240        boolean answer = false;
241
242        if (id != null) {
243            ProducerId pid = id.getProducerId();
244            if (pid != null) {
245                BitArrayBin bab = map.get(pid);
246                if (bab == null) {
247                    bab = new BitArrayBin(auditDepth);
248                    map.put(pid, bab);
249                }
250                answer = bab.isInOrder(id.getProducerSequenceId());
251
252            }
253        }
254        return answer;
255    }
256
257    public long getLastSeqId(ProducerId id) {
258        long result = -1;
259        BitArrayBin bab = map.get(id.toString() + ":");
260        if (bab != null) {
261            result = bab.getLastSetIndex();
262        }
263        return result;
264    }
265
266    public void clear() {
267        map.clear();
268    }
269}