001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.memory;
018
019import java.io.IOException;
020import java.util.Collections;
021import java.util.Iterator;
022import java.util.LinkedHashMap;
023import java.util.Map;
024import java.util.Map.Entry;
025
026import org.apache.activemq.broker.ConnectionContext;
027import org.apache.activemq.command.ActiveMQDestination;
028import org.apache.activemq.command.Message;
029import org.apache.activemq.command.MessageAck;
030import org.apache.activemq.command.MessageId;
031import org.apache.activemq.store.MessageRecoveryListener;
032import org.apache.activemq.store.MessageStore;
033import org.apache.activemq.store.AbstractMessageStore;
034import org.apache.activemq.usage.MemoryUsage;
035import org.apache.activemq.usage.SystemUsage;
036
037/**
038 * An implementation of {@link org.apache.activemq.store.MessageStore} which
039 * uses a
040 * 
041 * 
042 */
043public class MemoryMessageStore extends AbstractMessageStore {
044
045    protected final Map<MessageId, Message> messageTable;
046    protected MessageId lastBatchId;
047
048    public MemoryMessageStore(ActiveMQDestination destination) {
049        this(destination, new LinkedHashMap<MessageId, Message>());
050    }
051
052    public MemoryMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable) {
053        super(destination);
054        this.messageTable = Collections.synchronizedMap(messageTable);
055    }
056
057    public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
058        synchronized (messageTable) {
059            messageTable.put(message.getMessageId(), message);
060        }
061        message.incrementReferenceCount();
062    }
063
064    // public void addMessageReference(ConnectionContext context,MessageId
065    // messageId,long expirationTime,String messageRef)
066    // throws IOException{
067    // synchronized(messageTable){
068    // messageTable.put(messageId,messageRef);
069    // }
070    // }
071
072    public Message getMessage(MessageId identity) throws IOException {
073        return messageTable.get(identity);
074    }
075
076    // public String getMessageReference(MessageId identity) throws IOException{
077    // return (String)messageTable.get(identity);
078    // }
079
080    public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
081        removeMessage(ack.getLastMessageId());
082    }
083
084    public void removeMessage(MessageId msgId) throws IOException {
085        synchronized (messageTable) {
086            Message removed = messageTable.remove(msgId);
087            if( removed !=null ) {
088                removed.decrementReferenceCount();
089            }
090            if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
091                lastBatchId = null;
092            }
093        }
094    }
095
096    public void recover(MessageRecoveryListener listener) throws Exception {
097        // the message table is a synchronizedMap - so just have to synchronize
098        // here
099        synchronized (messageTable) {
100            for (Iterator<Message> iter = messageTable.values().iterator(); iter.hasNext();) {
101                Object msg = iter.next();
102                if (msg.getClass() == MessageId.class) {
103                    listener.recoverMessageReference((MessageId)msg);
104                } else {
105                    listener.recoverMessage((Message)msg);
106                }
107            }
108        }
109    }
110
111    public void removeAllMessages(ConnectionContext context) throws IOException {
112        synchronized (messageTable) {
113            messageTable.clear();
114        }
115    }
116
117    public void delete() {
118        synchronized (messageTable) {
119            messageTable.clear();
120        }
121    }
122
123    
124    public int getMessageCount() {
125        return messageTable.size();
126    }
127
128    public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
129        synchronized (messageTable) {
130            boolean pastLackBatch = lastBatchId == null;
131            int count = 0;
132            for (Iterator iter = messageTable.entrySet().iterator(); iter.hasNext();) {
133                Map.Entry entry = (Entry)iter.next();
134                if (pastLackBatch) {
135                    count++;
136                    Object msg = entry.getValue();
137                    lastBatchId = (MessageId)entry.getKey();
138                    if (msg.getClass() == MessageId.class) {
139                        listener.recoverMessageReference((MessageId)msg);
140                    } else {
141                        listener.recoverMessage((Message)msg);
142                    }
143                } else {
144                    pastLackBatch = entry.getKey().equals(lastBatchId);
145                }
146            }
147        }
148    }
149
150    public void resetBatching() {
151        lastBatchId = null;
152    }
153
154    @Override
155    public void setBatch(MessageId messageId) {
156        lastBatchId = messageId;
157    }
158    
159}