001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.IOException; 020import java.sql.SQLException; 021import java.util.Arrays; 022import java.util.Iterator; 023import java.util.Map; 024import java.util.concurrent.ConcurrentHashMap; 025 026import org.apache.activemq.ActiveMQMessageAudit; 027import org.apache.activemq.broker.ConnectionContext; 028import org.apache.activemq.command.ActiveMQTopic; 029import org.apache.activemq.command.Message; 030import org.apache.activemq.command.MessageAck; 031import org.apache.activemq.command.MessageId; 032import org.apache.activemq.command.SubscriptionInfo; 033import org.apache.activemq.store.MessageRecoveryListener; 034import org.apache.activemq.store.TopicMessageStore; 035import org.apache.activemq.util.ByteSequence; 036import org.apache.activemq.util.IOExceptionSupport; 037import org.apache.activemq.wireformat.WireFormat; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * 043 */ 044public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore { 045 046 private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class); 047 private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>(); 048 049 public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) { 050 super(persistenceAdapter, adapter, wireFormat, topic, audit); 051 } 052 053 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException { 054 if (ack != null && ack.isUnmatchedAck()) { 055 if (LOG.isTraceEnabled()) { 056 LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks."); 057 } 058 return; 059 } 060 TransactionContext c = persistenceAdapter.getTransactionContext(context); 061 try { 062 long[] res = adapter.getStoreSequenceId(c, destination, messageId); 063 if (this.isPrioritizedMessages()) { 064 adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName, res[0], res[1]); 065 } else { 066 adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]); 067 } 068 if (LOG.isTraceEnabled()) { 069 LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId); 070 } 071 } catch (SQLException e) { 072 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 073 throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e); 074 } finally { 075 c.close(); 076 } 077 } 078 079 /** 080 * @throws Exception 081 */ 082 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { 083 TransactionContext c = persistenceAdapter.getTransactionContext(); 084 try { 085 adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() { 086 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 087 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); 088 msg.getMessageId().setBrokerSequenceId(sequenceId); 089 return listener.recoverMessage(msg); 090 } 091 092 public boolean recoverMessageReference(String reference) throws Exception { 093 return listener.recoverMessageReference(new MessageId(reference)); 094 } 095 096 }); 097 } catch (SQLException e) { 098 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 099 throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e); 100 } finally { 101 c.close(); 102 } 103 } 104 105 private class LastRecovered implements Iterable<LastRecoveredEntry> { 106 LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10]; 107 LastRecovered() { 108 for (int i=0; i<perPriority.length; i++) { 109 perPriority[i] = new LastRecoveredEntry(i); 110 } 111 } 112 113 public void updateStored(long sequence, int priority) { 114 perPriority[priority].stored = sequence; 115 } 116 117 public LastRecoveredEntry defaultPriority() { 118 return perPriority[javax.jms.Message.DEFAULT_PRIORITY]; 119 } 120 121 public String toString() { 122 return Arrays.deepToString(perPriority); 123 } 124 125 public Iterator<LastRecoveredEntry> iterator() { 126 return new PriorityIterator(); 127 } 128 129 class PriorityIterator implements Iterator<LastRecoveredEntry> { 130 int current = 9; 131 public boolean hasNext() { 132 for (int i=current; i>=0; i--) { 133 if (perPriority[i].hasMessages()) { 134 current = i; 135 return true; 136 } 137 } 138 return false; 139 } 140 141 public LastRecoveredEntry next() { 142 return perPriority[current]; 143 } 144 145 public void remove() { 146 throw new RuntimeException("not implemented"); 147 } 148 } 149 } 150 151 private class LastRecoveredEntry { 152 final int priority; 153 long recovered = 0; 154 long stored = Integer.MAX_VALUE; 155 156 public LastRecoveredEntry(int priority) { 157 this.priority = priority; 158 } 159 160 public String toString() { 161 return priority + "-" + stored + ":" + recovered; 162 } 163 164 public void exhausted() { 165 stored = recovered; 166 } 167 168 public boolean hasMessages() { 169 return stored > recovered; 170 } 171 } 172 173 class LastRecoveredAwareListener implements JDBCMessageRecoveryListener { 174 final MessageRecoveryListener delegate; 175 final int maxMessages; 176 LastRecoveredEntry lastRecovered; 177 int recoveredCount; 178 int recoveredMarker; 179 180 public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) { 181 this.delegate = delegate; 182 this.maxMessages = maxMessages; 183 } 184 185 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 186 if (delegate.hasSpace()) { 187 Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); 188 msg.getMessageId().setBrokerSequenceId(sequenceId); 189 if (delegate.recoverMessage(msg)) { 190 lastRecovered.recovered = sequenceId; 191 recoveredCount++; 192 return true; 193 } 194 } 195 return false; 196 } 197 198 public boolean recoverMessageReference(String reference) throws Exception { 199 return delegate.recoverMessageReference(new MessageId(reference)); 200 } 201 202 public void setLastRecovered(LastRecoveredEntry lastRecovered) { 203 this.lastRecovered = lastRecovered; 204 recoveredMarker = recoveredCount; 205 } 206 207 public boolean complete() { 208 return !delegate.hasSpace() || recoveredCount == maxMessages; 209 } 210 211 public boolean stalled() { 212 return recoveredMarker == recoveredCount; 213 } 214 } 215 216 public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) 217 throws Exception { 218 //Duration duration = new Duration("recoverNextMessages"); 219 TransactionContext c = persistenceAdapter.getTransactionContext(); 220 221 String key = getSubscriptionKey(clientId, subscriptionName); 222 if (!subscriberLastRecoveredMap.containsKey(key)) { 223 subscriberLastRecoveredMap.put(key, new LastRecovered()); 224 } 225 final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key); 226 LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned); 227 try { 228 if (LOG.isTraceEnabled()) { 229 LOG.trace(key + " existing last recovered: " + lastRecovered); 230 } 231 if (isPrioritizedMessages()) { 232 Iterator<LastRecoveredEntry> it = lastRecovered.iterator(); 233 for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) { 234 LastRecoveredEntry entry = it.next(); 235 recoveredAwareListener.setLastRecovered(entry); 236 //Duration microDuration = new Duration("recoverNextMessages:loop"); 237 adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName, 238 entry.recovered, entry.priority, maxReturned, recoveredAwareListener); 239 //microDuration.end(entry); 240 if (recoveredAwareListener.stalled()) { 241 if (recoveredAwareListener.complete()) { 242 break; 243 } else { 244 entry.exhausted(); 245 } 246 } 247 } 248 } else { 249 LastRecoveredEntry last = lastRecovered.defaultPriority(); 250 recoveredAwareListener.setLastRecovered(last); 251 adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, 252 last.recovered, 0, maxReturned, recoveredAwareListener); 253 } 254 if (LOG.isTraceEnabled()) { 255 LOG.trace(key + " last recovered: " + lastRecovered); 256 } 257 //duration.end(); 258 } catch (SQLException e) { 259 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 260 } finally { 261 c.close(); 262 } 263 } 264 265 public void resetBatching(String clientId, String subscriptionName) { 266 subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName)); 267 } 268 269 protected void onAdd(long sequenceId, byte priority) { 270 // update last recovered state 271 for (LastRecovered last : subscriberLastRecoveredMap.values()) { 272 last.updateStored(sequenceId, priority); 273 } 274 } 275 276 277 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 278 TransactionContext c = persistenceAdapter.getTransactionContext(); 279 try { 280 c = persistenceAdapter.getTransactionContext(); 281 adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages()); 282 } catch (SQLException e) { 283 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 284 throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e); 285 } finally { 286 c.close(); 287 } 288 } 289 290 /** 291 * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String, 292 * String) 293 */ 294 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 295 TransactionContext c = persistenceAdapter.getTransactionContext(); 296 try { 297 return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName); 298 } catch (SQLException e) { 299 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 300 throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e); 301 } finally { 302 c.close(); 303 } 304 } 305 306 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 307 TransactionContext c = persistenceAdapter.getTransactionContext(); 308 try { 309 adapter.doDeleteSubscription(c, destination, clientId, subscriptionName); 310 } catch (SQLException e) { 311 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 312 throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e); 313 } finally { 314 c.close(); 315 resetBatching(clientId, subscriptionName); 316 } 317 } 318 319 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 320 TransactionContext c = persistenceAdapter.getTransactionContext(); 321 try { 322 return adapter.doGetAllSubscriptions(c, destination); 323 } catch (SQLException e) { 324 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 325 throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e); 326 } finally { 327 c.close(); 328 } 329 } 330 331 public int getMessageCount(String clientId, String subscriberName) throws IOException { 332 //Duration duration = new Duration("getMessageCount"); 333 int result = 0; 334 TransactionContext c = persistenceAdapter.getTransactionContext(); 335 try { 336 result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages()); 337 } catch (SQLException e) { 338 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 339 throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e); 340 } finally { 341 c.close(); 342 } 343 if (LOG.isTraceEnabled()) { 344 LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result); 345 } 346 //duration.end(); 347 return result; 348 } 349 350 protected String getSubscriptionKey(String clientId, String subscriberName) { 351 String result = clientId + ":"; 352 result += subscriberName != null ? subscriberName : "NOT_SET"; 353 return result; 354 } 355 356}