001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.IOException; 020import java.sql.SQLException; 021import java.util.concurrent.atomic.AtomicLong; 022 023import org.apache.activemq.ActiveMQMessageAudit; 024import org.apache.activemq.broker.ConnectionContext; 025import org.apache.activemq.command.ActiveMQDestination; 026import org.apache.activemq.command.Message; 027import org.apache.activemq.command.MessageAck; 028import org.apache.activemq.command.MessageId; 029import org.apache.activemq.store.AbstractMessageStore; 030import org.apache.activemq.store.MessageRecoveryListener; 031import org.apache.activemq.util.ByteSequence; 032import org.apache.activemq.util.ByteSequenceData; 033import org.apache.activemq.util.IOExceptionSupport; 034import org.apache.activemq.wireformat.WireFormat; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * 040 */ 041public class JDBCMessageStore extends AbstractMessageStore { 042 043 class Duration { 044 static final int LIMIT = 100; 045 final long start = System.currentTimeMillis(); 046 final String name; 047 048 Duration(String name) { 049 this.name = name; 050 } 051 void end() { 052 end(null); 053 } 054 void end(Object o) { 055 long duration = System.currentTimeMillis() - start; 056 057 if (duration > LIMIT) { 058 System.err.println(name + " took a long time: " + duration + "ms " + o); 059 } 060 } 061 } 062 private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class); 063 protected final WireFormat wireFormat; 064 protected final JDBCAdapter adapter; 065 protected final JDBCPersistenceAdapter persistenceAdapter; 066 protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1); 067 protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1); 068 069 protected ActiveMQMessageAudit audit; 070 071 public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) { 072 super(destination); 073 this.persistenceAdapter = persistenceAdapter; 074 this.adapter = adapter; 075 this.wireFormat = wireFormat; 076 this.audit = audit; 077 } 078 079 public void addMessage(ConnectionContext context, Message message) throws IOException { 080 MessageId messageId = message.getMessageId(); 081 if (audit != null && audit.isDuplicate(message)) { 082 if (LOG.isDebugEnabled()) { 083 LOG.debug(destination.getPhysicalName() 084 + " ignoring duplicated (add) message, already stored: " 085 + messageId); 086 } 087 return; 088 } 089 090 long sequenceId = persistenceAdapter.getNextSequenceId(); 091 092 // Serialize the Message.. 093 byte data[]; 094 try { 095 ByteSequence packet = wireFormat.marshal(message); 096 data = ByteSequenceData.toByteArray(packet); 097 } catch (IOException e) { 098 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 099 } 100 101 // Get a connection and insert the message into the DB. 102 TransactionContext c = persistenceAdapter.getTransactionContext(context); 103 try { 104 adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration(), message.getPriority()); 105 } catch (SQLException e) { 106 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 107 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 108 } finally { 109 c.close(); 110 } 111 onAdd(sequenceId, message.getPriority()); 112 } 113 114 protected void onAdd(long sequenceId, byte priority) { 115 } 116 117 public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { 118 // Get a connection and insert the message into the DB. 119 TransactionContext c = persistenceAdapter.getTransactionContext(context); 120 try { 121 adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef); 122 } catch (SQLException e) { 123 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 124 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 125 } finally { 126 c.close(); 127 } 128 } 129 130 public Message getMessage(MessageId messageId) throws IOException { 131 // Get a connection and pull the message out of the DB 132 TransactionContext c = persistenceAdapter.getTransactionContext(); 133 try { 134 byte data[] = adapter.doGetMessage(c, messageId); 135 if (data == null) { 136 return null; 137 } 138 139 Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data)); 140 return answer; 141 } catch (IOException e) { 142 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 143 } catch (SQLException e) { 144 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 145 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 146 } finally { 147 c.close(); 148 } 149 } 150 151 public String getMessageReference(MessageId messageId) throws IOException { 152 long id = messageId.getBrokerSequenceId(); 153 154 // Get a connection and pull the message out of the DB 155 TransactionContext c = persistenceAdapter.getTransactionContext(); 156 try { 157 return adapter.doGetMessageReference(c, id); 158 } catch (IOException e) { 159 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 160 } catch (SQLException e) { 161 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 162 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 163 } finally { 164 c.close(); 165 } 166 } 167 168 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 169 170 long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId())[0]; 171 172 // Get a connection and remove the message from the DB 173 TransactionContext c = persistenceAdapter.getTransactionContext(context); 174 try { 175 adapter.doRemoveMessage(c, seq); 176 } catch (SQLException e) { 177 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 178 throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e); 179 } finally { 180 c.close(); 181 } 182 } 183 184 public void recover(final MessageRecoveryListener listener) throws Exception { 185 186 // Get all the Message ids out of the database. 187 TransactionContext c = persistenceAdapter.getTransactionContext(); 188 try { 189 c = persistenceAdapter.getTransactionContext(); 190 adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() { 191 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 192 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); 193 msg.getMessageId().setBrokerSequenceId(sequenceId); 194 return listener.recoverMessage(msg); 195 } 196 197 public boolean recoverMessageReference(String reference) throws Exception { 198 return listener.recoverMessageReference(new MessageId(reference)); 199 } 200 }); 201 } catch (SQLException e) { 202 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 203 throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e); 204 } finally { 205 c.close(); 206 } 207 } 208 209 /** 210 * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) 211 */ 212 public void removeAllMessages(ConnectionContext context) throws IOException { 213 // Get a connection and remove the message from the DB 214 TransactionContext c = persistenceAdapter.getTransactionContext(context); 215 try { 216 adapter.doRemoveAllMessages(c, destination); 217 } catch (SQLException e) { 218 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 219 throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e); 220 } finally { 221 c.close(); 222 } 223 } 224 225 public int getMessageCount() throws IOException { 226 int result = 0; 227 TransactionContext c = persistenceAdapter.getTransactionContext(); 228 try { 229 230 result = adapter.doGetMessageCount(c, destination); 231 232 } catch (SQLException e) { 233 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 234 throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e); 235 } finally { 236 c.close(); 237 } 238 return result; 239 } 240 241 /** 242 * @param maxReturned 243 * @param listener 244 * @throws Exception 245 * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int, 246 * org.apache.activemq.store.MessageRecoveryListener) 247 */ 248 public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception { 249 TransactionContext c = persistenceAdapter.getTransactionContext(); 250 try { 251 adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(), 252 maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() { 253 254 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 255 if (listener.hasSpace()) { 256 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); 257 msg.getMessageId().setBrokerSequenceId(sequenceId); 258 listener.recoverMessage(msg); 259 lastRecoveredSequenceId.set(sequenceId); 260 lastRecoveredPriority.set(msg.getPriority()); 261 return true; 262 } 263 return false; 264 } 265 266 public boolean recoverMessageReference(String reference) throws Exception { 267 if (listener.hasSpace()) { 268 listener.recoverMessageReference(new MessageId(reference)); 269 return true; 270 } 271 return false; 272 } 273 274 }); 275 } catch (SQLException e) { 276 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 277 } finally { 278 c.close(); 279 } 280 281 } 282 283 /** 284 * @see org.apache.activemq.store.MessageStore#resetBatching() 285 */ 286 public void resetBatching() { 287 if (LOG.isTraceEnabled()) { 288 LOG.trace(destination.getPhysicalName() + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get()); 289 } 290 lastRecoveredSequenceId.set(-1); 291 lastRecoveredPriority.set(Byte.MAX_VALUE - 1); 292 293 } 294 295 @Override 296 public void setBatch(MessageId messageId) { 297 try { 298 long[] storedValues = getStoreSequenceIdForMessageId(messageId); 299 lastRecoveredSequenceId.set(storedValues[0]); 300 lastRecoveredPriority.set(storedValues[1]); 301 } catch (IOException ignoredAsAlreadyLogged) { 302 lastRecoveredSequenceId.set(-1); 303 lastRecoveredPriority.set(Byte.MAX_VALUE -1); 304 } 305 if (LOG.isTraceEnabled()) { 306 LOG.trace(destination.getPhysicalName() + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get() 307 + ", priority: " + lastRecoveredPriority.get()); 308 } 309 } 310 311 private long[] getStoreSequenceIdForMessageId(MessageId messageId) throws IOException { 312 long[] result = new long[]{-1, Byte.MAX_VALUE -1}; 313 TransactionContext c = persistenceAdapter.getTransactionContext(); 314 try { 315 result = adapter.getStoreSequenceId(c, destination, messageId); 316 } catch (SQLException e) { 317 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 318 throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e); 319 } finally { 320 c.close(); 321 } 322 return result; 323 } 324 325 public void setPrioritizedMessages(boolean prioritizedMessages) { 326 super.setPrioritizedMessages(prioritizedMessages); 327 } 328}