001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import java.io.IOException;
020import java.util.Iterator;
021import java.util.LinkedHashMap;
022import java.util.LinkedList;
023import java.util.Map;
024import java.util.Map.Entry;
025
026import javax.jms.JMSException;
027
028import org.apache.activemq.command.ActiveMQBytesMessage;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.ActiveMQMessage;
031import org.apache.activemq.command.ConsumerInfo;
032import org.apache.activemq.command.MessageAck;
033import org.apache.activemq.command.MessageDispatch;
034import org.apache.activemq.command.MessageId;
035import org.apache.activemq.command.TransactionId;
036
037/**
038 * Keeps track of the STOMP subscription so that acking is correctly done.
039 *
040 * @author <a href="http://hiramchirino.com">chirino</a>
041 */
042public class StompSubscription {
043
044    public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
045    public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
046    public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
047
048    private final ProtocolConverter protocolConverter;
049    private final String subscriptionId;
050    private final ConsumerInfo consumerInfo;
051
052    private final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
053    private final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
054
055    private String ackMode = AUTO_ACK;
056    private ActiveMQDestination destination;
057    private String transformation;
058
059
060    public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
061        this.protocolConverter = stompTransport;
062        this.subscriptionId = subscriptionId;
063        this.consumerInfo = consumerInfo;
064        this.transformation = transformation;
065    }
066
067    void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
068        ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
069        if (ackMode == CLIENT_ACK) {
070            synchronized (this) {
071                dispatchedMessage.put(message.getMessageId(), md);
072            }
073        } else if (ackMode == INDIVIDUAL_ACK) {
074            synchronized (this) {
075                dispatchedMessage.put(message.getMessageId(), md);
076            }
077        } else if (ackMode == AUTO_ACK) {
078            MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
079            protocolConverter.getStompTransport().sendToActiveMQ(ack);
080        }
081
082        boolean ignoreTransformation = false;
083
084        if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
085                message.setReadOnlyProperties(false);
086                message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
087        } else {
088                if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
089                        ignoreTransformation = true;
090                }
091        }
092
093        StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
094
095        command.setAction(Stomp.Responses.MESSAGE);
096        if (subscriptionId != null) {
097            command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
098        }
099
100        protocolConverter.getStompTransport().sendToStomp(command);
101    }
102
103    synchronized void onStompAbort(TransactionId transactionId) {
104        unconsumedMessage.clear();
105    }
106
107    synchronized void onStompCommit(TransactionId transactionId) {
108        for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
109            Map.Entry entry = (Entry)iter.next();
110            MessageId id = (MessageId)entry.getKey();
111            MessageDispatch msg = (MessageDispatch)entry.getValue();
112            if (unconsumedMessage.contains(msg)) {
113                iter.remove();
114            }
115        }
116        unconsumedMessage.clear();
117    }
118
119    synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
120
121        MessageId msgId = new MessageId(messageId);
122
123        if (!dispatchedMessage.containsKey(msgId)) {
124            return null;
125        }
126
127        MessageAck ack = new MessageAck();
128        ack.setDestination(consumerInfo.getDestination());
129        ack.setConsumerId(consumerInfo.getConsumerId());
130
131        if (ackMode == CLIENT_ACK) {
132                ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
133            int count = 0;
134            for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
135
136                Map.Entry entry = (Entry)iter.next();
137                MessageId id = (MessageId)entry.getKey();
138                MessageDispatch msg = (MessageDispatch)entry.getValue();
139
140                if (ack.getFirstMessageId() == null) {
141                    ack.setFirstMessageId(id);
142                }
143
144                if (transactionId != null) {
145                        if (!unconsumedMessage.contains(msg)) {
146                                unconsumedMessage.add(msg);
147                        }
148                } else {
149                        iter.remove();
150                }
151
152
153                count++;
154
155                if (id.equals(msgId)) {
156                    ack.setLastMessageId(id);
157                    break;
158                }
159
160            }
161            ack.setMessageCount(count);
162            if (transactionId != null) {
163                ack.setTransactionId(transactionId);
164            }
165        }
166        else if (ackMode == INDIVIDUAL_ACK) {
167            ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
168            ack.setMessageID(msgId);
169            if (transactionId != null) {
170                unconsumedMessage.add(dispatchedMessage.get(msgId));
171                ack.setTransactionId(transactionId);
172            }
173            dispatchedMessage.remove(msgId);
174        }
175        return ack;
176    }
177
178    public String getAckMode() {
179        return ackMode;
180    }
181
182    public void setAckMode(String ackMode) {
183        this.ackMode = ackMode;
184    }
185
186    public String getSubscriptionId() {
187        return subscriptionId;
188    }
189
190    public void setDestination(ActiveMQDestination destination) {
191        this.destination = destination;
192    }
193
194    public ActiveMQDestination getDestination() {
195        return destination;
196    }
197
198    public ConsumerInfo getConsumerInfo() {
199        return consumerInfo;
200    }
201
202}