001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.util.ArrayList;
020import java.util.HashMap;
021import java.util.Iterator;
022import java.util.List;
023import java.util.Map;
024import org.apache.activemq.broker.region.MessageReference;
025import org.apache.activemq.command.MessageId;
026
027public class PrioritizedPendingList implements PendingList {
028    static final Integer MAX_PRIORITY = 10;
029    private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY];
030    final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
031
032    public PrioritizedPendingList() {
033        for (int i = 0; i < MAX_PRIORITY; i++) {
034            this.lists[i] = new OrderedPendingList();
035        }
036    }
037    public PendingNode addMessageFirst(MessageReference message) {
038        PendingNode node = getList(message).addMessageFirst(message);
039        this.map.put(message.getMessageId(), node);
040        return node;
041    }
042
043    public PendingNode addMessageLast(MessageReference message) {
044        PendingNode node = getList(message).addMessageLast(message);
045        this.map.put(message.getMessageId(), node);
046        return node;
047    }
048
049    public void clear() {
050        for (int i = 0; i < MAX_PRIORITY; i++) {
051            this.lists[i].clear();
052        }
053        this.map.clear();
054    }
055
056    public boolean isEmpty() {
057        return this.map.isEmpty();
058    }
059
060    public Iterator<MessageReference> iterator() {
061        return new PrioritizedPendingListIterator();
062    }
063
064    public void remove(MessageReference message) {
065        if (message != null) {
066            PendingNode node = this.map.remove(message.getMessageId());
067            if (node != null) {
068                node.getList().removeNode(node);
069            }
070        }
071    }
072
073    public int size() {
074        return this.map.size();
075    }
076
077    @Override
078    public String toString() {
079        return "PrioritizedPendingList(" + System.identityHashCode(this) + ")";
080    }
081
082    protected int getPriority(MessageReference message) {
083        int priority = javax.jms.Message.DEFAULT_PRIORITY;
084        if (message.getMessageId() != null) {
085            priority = Math.max(message.getMessage().getPriority(), 0);
086            priority = Math.min(priority, 9);
087        }
088        return priority;
089    }
090
091    protected OrderedPendingList getList(MessageReference msg) {
092        return lists[getPriority(msg)];
093    }
094
095    private class PrioritizedPendingListIterator implements Iterator<MessageReference> {
096        private int index = 0;
097        private int currentIndex = 0;
098        List<PendingNode> list = new ArrayList<PendingNode>(size());
099
100        PrioritizedPendingListIterator() {
101            for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
102                OrderedPendingList orderedPendingList = lists[i];
103                if (!orderedPendingList.isEmpty()) {
104                    list.addAll(orderedPendingList.getAsList());
105                }
106            }
107        }
108        public boolean hasNext() {
109            return list.size() > index;
110        }
111
112        public MessageReference next() {
113            PendingNode node = list.get(this.index);
114            this.currentIndex = this.index;
115            this.index++;
116            return node.getMessage();
117        }
118
119        public void remove() {
120            PendingNode node = list.get(this.currentIndex);
121            if (node != null) {
122                map.remove(node.getMessage().getMessageId());
123                node.getList().removeNode(node);
124            }
125
126        }
127
128    }
129
130}