001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.IOException;
020import java.sql.SQLException;
021import java.util.Arrays;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.concurrent.ConcurrentHashMap;
025
026import org.apache.activemq.ActiveMQMessageAudit;
027import org.apache.activemq.broker.ConnectionContext;
028import org.apache.activemq.command.ActiveMQTopic;
029import org.apache.activemq.command.Message;
030import org.apache.activemq.command.MessageAck;
031import org.apache.activemq.command.MessageId;
032import org.apache.activemq.command.SubscriptionInfo;
033import org.apache.activemq.store.MessageRecoveryListener;
034import org.apache.activemq.store.TopicMessageStore;
035import org.apache.activemq.util.ByteSequence;
036import org.apache.activemq.util.IOExceptionSupport;
037import org.apache.activemq.wireformat.WireFormat;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * 
043 */
044public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
045
046    private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class);
047    private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>();
048
049    public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
050        super(persistenceAdapter, adapter, wireFormat, topic, audit);
051    }
052
053    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
054        if (ack != null && ack.isUnmatchedAck()) {
055            if (LOG.isTraceEnabled()) {
056                LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
057            }
058            return;
059        }
060        TransactionContext c = persistenceAdapter.getTransactionContext(context);
061        try {
062            long[] res = adapter.getStoreSequenceId(c, destination, messageId);
063            if (this.isPrioritizedMessages()) {
064                adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName, res[0], res[1]);
065            } else {
066                adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]);
067            }
068            if (LOG.isTraceEnabled()) {
069                LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId);
070            }
071        } catch (SQLException e) {
072            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
073            throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
074        } finally {
075            c.close();
076        }
077    }
078
079    /**
080     * @throws Exception
081     */
082    public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
083        TransactionContext c = persistenceAdapter.getTransactionContext();
084        try {
085            adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
086                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
087                    Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
088                    msg.getMessageId().setBrokerSequenceId(sequenceId);
089                    return listener.recoverMessage(msg);
090                }
091
092                public boolean recoverMessageReference(String reference) throws Exception {
093                    return listener.recoverMessageReference(new MessageId(reference));
094                }
095
096            });
097        } catch (SQLException e) {
098            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
099            throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
100        } finally {
101            c.close();
102        }
103    }
104
105    private class LastRecovered implements Iterable<LastRecoveredEntry> {
106        LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10];
107        LastRecovered() {
108            for (int i=0; i<perPriority.length; i++) {
109                perPriority[i] = new LastRecoveredEntry(i);
110            }
111        }
112
113        public void updateStored(long sequence, int priority) {
114            perPriority[priority].stored = sequence;
115        }
116
117        public LastRecoveredEntry defaultPriority() {
118            return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
119        }
120
121        public String toString() {
122            return Arrays.deepToString(perPriority);
123        }
124
125        public Iterator<LastRecoveredEntry> iterator() {
126            return new PriorityIterator();
127        }
128
129        class PriorityIterator implements Iterator<LastRecoveredEntry> {
130            int current = 9;
131            public boolean hasNext() {
132                for (int i=current; i>=0; i--) {
133                    if (perPriority[i].hasMessages()) {
134                        current = i;
135                        return true;
136                    }
137                }
138                return false;
139            }
140
141            public LastRecoveredEntry next() {
142                return perPriority[current];
143            }
144
145            public void remove() {
146                throw new RuntimeException("not implemented");
147            }
148        }
149    }
150
151    private class LastRecoveredEntry {
152        final int priority;
153        long recovered = 0;
154        long stored = Integer.MAX_VALUE;
155
156        public LastRecoveredEntry(int priority) {
157            this.priority = priority;
158        }
159
160        public String toString() {
161            return priority + "-" + stored + ":" + recovered;
162        }
163
164        public void exhausted() {
165            stored = recovered;
166        }
167
168        public boolean hasMessages() {
169            return stored > recovered;
170        }
171    }
172
173    class LastRecoveredAwareListener implements JDBCMessageRecoveryListener {
174        final MessageRecoveryListener delegate;
175        final int maxMessages;
176        LastRecoveredEntry lastRecovered;
177        int recoveredCount;
178        int recoveredMarker;
179
180        public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) {
181            this.delegate = delegate;
182            this.maxMessages = maxMessages;
183        }
184
185        public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
186            if (delegate.hasSpace()) {
187                Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
188                msg.getMessageId().setBrokerSequenceId(sequenceId);
189                if (delegate.recoverMessage(msg)) {
190                    lastRecovered.recovered = sequenceId;
191                    recoveredCount++;
192                    return true;
193                }
194            }
195            return false;
196        }
197
198        public boolean recoverMessageReference(String reference) throws Exception {
199            return delegate.recoverMessageReference(new MessageId(reference));
200        }
201
202        public void setLastRecovered(LastRecoveredEntry lastRecovered) {
203            this.lastRecovered = lastRecovered;
204            recoveredMarker = recoveredCount;
205        }
206
207        public boolean complete() {
208            return  !delegate.hasSpace() || recoveredCount == maxMessages;
209        }
210
211        public boolean stalled() {
212            return recoveredMarker == recoveredCount;
213        }
214    }
215
216    public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
217            throws Exception {
218        //Duration duration = new Duration("recoverNextMessages");
219        TransactionContext c = persistenceAdapter.getTransactionContext();
220
221        String key = getSubscriptionKey(clientId, subscriptionName);
222        if (!subscriberLastRecoveredMap.containsKey(key)) {
223           subscriberLastRecoveredMap.put(key, new LastRecovered());
224        }
225        final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);        
226        LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
227        try {
228            if (LOG.isTraceEnabled()) {
229                LOG.trace(key + " existing last recovered: " + lastRecovered);
230            }
231            if (isPrioritizedMessages()) {
232                Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
233                for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) {
234                    LastRecoveredEntry entry = it.next();
235                    recoveredAwareListener.setLastRecovered(entry);
236                    //Duration microDuration = new Duration("recoverNextMessages:loop");
237                    adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
238                        entry.recovered, entry.priority, maxReturned, recoveredAwareListener);
239                    //microDuration.end(entry);
240                    if (recoveredAwareListener.stalled()) {
241                        if (recoveredAwareListener.complete()) {
242                            break;
243                        } else {
244                            entry.exhausted();
245                        }
246                    }
247                }
248            } else {
249                LastRecoveredEntry last = lastRecovered.defaultPriority();
250                recoveredAwareListener.setLastRecovered(last);
251                adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
252                        last.recovered, 0, maxReturned, recoveredAwareListener);
253            }
254            if (LOG.isTraceEnabled()) {
255                LOG.trace(key + " last recovered: " + lastRecovered);
256            }
257            //duration.end();
258        } catch (SQLException e) {
259            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
260        } finally {
261            c.close();
262        }
263    }
264
265    public void resetBatching(String clientId, String subscriptionName) {
266        subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName));
267    }
268
269    protected void onAdd(long sequenceId, byte priority) {
270        // update last recovered state
271        for (LastRecovered last : subscriberLastRecoveredMap.values()) {
272            last.updateStored(sequenceId, priority);
273        }
274    }
275
276
277    public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
278        TransactionContext c = persistenceAdapter.getTransactionContext();
279        try {
280            c = persistenceAdapter.getTransactionContext();
281            adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages());
282        } catch (SQLException e) {
283            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
284            throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
285        } finally {
286            c.close();
287        }
288    }
289
290    /**
291     * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
292     *      String)
293     */
294    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
295        TransactionContext c = persistenceAdapter.getTransactionContext();
296        try {
297            return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName);
298        } catch (SQLException e) {
299            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
300            throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e);
301        } finally {
302            c.close();
303        }
304    }
305
306    public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
307        TransactionContext c = persistenceAdapter.getTransactionContext();
308        try {
309            adapter.doDeleteSubscription(c, destination, clientId, subscriptionName);
310        } catch (SQLException e) {
311            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
312            throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
313        } finally {
314            c.close();
315            resetBatching(clientId, subscriptionName);
316        }
317    }
318
319    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
320        TransactionContext c = persistenceAdapter.getTransactionContext();
321        try {
322            return adapter.doGetAllSubscriptions(c, destination);
323        } catch (SQLException e) {
324            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
325            throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e);
326        } finally {
327            c.close();
328        }
329    }
330
331    public int getMessageCount(String clientId, String subscriberName) throws IOException {
332        //Duration duration = new Duration("getMessageCount");
333        int result = 0;
334        TransactionContext c = persistenceAdapter.getTransactionContext();
335        try {
336            result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages());
337        } catch (SQLException e) {
338            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
339            throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
340        } finally {
341            c.close();
342        }
343        if (LOG.isTraceEnabled()) {
344            LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result);
345        }
346        //duration.end();
347        return result;
348    }
349
350    protected String getSubscriptionKey(String clientId, String subscriberName) {
351        String result = clientId + ":";
352        result += subscriberName != null ? subscriberName : "NOT_SET";
353        return result;
354    }
355
356}