001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.IOException;
020import java.sql.SQLException;
021import java.util.concurrent.atomic.AtomicLong;
022
023import org.apache.activemq.ActiveMQMessageAudit;
024import org.apache.activemq.broker.ConnectionContext;
025import org.apache.activemq.command.ActiveMQDestination;
026import org.apache.activemq.command.Message;
027import org.apache.activemq.command.MessageAck;
028import org.apache.activemq.command.MessageId;
029import org.apache.activemq.store.AbstractMessageStore;
030import org.apache.activemq.store.MessageRecoveryListener;
031import org.apache.activemq.util.ByteSequence;
032import org.apache.activemq.util.ByteSequenceData;
033import org.apache.activemq.util.IOExceptionSupport;
034import org.apache.activemq.wireformat.WireFormat;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * 
040 */
041public class JDBCMessageStore extends AbstractMessageStore {
042
043    class Duration {
044        static final int LIMIT = 100;
045        final long start = System.currentTimeMillis();
046        final String name;
047
048        Duration(String name) {
049            this.name = name;
050        }
051        void end() {
052            end(null);
053        }
054        void end(Object o) {
055            long duration = System.currentTimeMillis() - start;
056
057            if (duration > LIMIT) {
058                System.err.println(name + " took a long time: " + duration + "ms " + o);
059            }
060        }
061    }
062    private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class);
063    protected final WireFormat wireFormat;
064    protected final JDBCAdapter adapter;
065    protected final JDBCPersistenceAdapter persistenceAdapter;
066    protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1);
067    protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
068
069    protected ActiveMQMessageAudit audit;
070    
071    public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) {
072        super(destination);
073        this.persistenceAdapter = persistenceAdapter;
074        this.adapter = adapter;
075        this.wireFormat = wireFormat;
076        this.audit = audit;
077    }
078    
079    public void addMessage(ConnectionContext context, Message message) throws IOException {
080        MessageId messageId = message.getMessageId();
081        if (audit != null && audit.isDuplicate(message)) {
082            if (LOG.isDebugEnabled()) {
083                LOG.debug(destination.getPhysicalName()
084                    + " ignoring duplicated (add) message, already stored: "
085                    + messageId);
086            }
087            return;
088        }
089        
090        long sequenceId = persistenceAdapter.getNextSequenceId();
091        
092        // Serialize the Message..
093        byte data[];
094        try {
095            ByteSequence packet = wireFormat.marshal(message);
096            data = ByteSequenceData.toByteArray(packet);
097        } catch (IOException e) {
098            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
099        }
100
101        // Get a connection and insert the message into the DB.
102        TransactionContext c = persistenceAdapter.getTransactionContext(context);
103        try {      
104            adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration(), message.getPriority());
105        } catch (SQLException e) {
106            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
107            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
108        } finally {
109            c.close();
110        }
111        onAdd(sequenceId, message.getPriority());
112    }
113
114    protected void onAdd(long sequenceId, byte priority) {
115    }
116
117    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
118        // Get a connection and insert the message into the DB.
119        TransactionContext c = persistenceAdapter.getTransactionContext(context);
120        try {
121            adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef);
122        } catch (SQLException e) {
123            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
124            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
125        } finally {
126            c.close();
127        }
128    }
129
130    public Message getMessage(MessageId messageId) throws IOException {
131        // Get a connection and pull the message out of the DB
132        TransactionContext c = persistenceAdapter.getTransactionContext();
133        try {
134            byte data[] = adapter.doGetMessage(c, messageId);
135            if (data == null) {
136                return null;
137            }
138
139            Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data));
140            return answer;
141        } catch (IOException e) {
142            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
143        } catch (SQLException e) {
144            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
145            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
146        } finally {
147            c.close();
148        }
149    }
150
151    public String getMessageReference(MessageId messageId) throws IOException {
152        long id = messageId.getBrokerSequenceId();
153
154        // Get a connection and pull the message out of the DB
155        TransactionContext c = persistenceAdapter.getTransactionContext();
156        try {
157            return adapter.doGetMessageReference(c, id);
158        } catch (IOException e) {
159            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
160        } catch (SQLException e) {
161            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
162            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
163        } finally {
164            c.close();
165        }
166    }
167
168    public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
169        
170        long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId())[0];
171
172        // Get a connection and remove the message from the DB
173        TransactionContext c = persistenceAdapter.getTransactionContext(context);
174        try {
175            adapter.doRemoveMessage(c, seq);
176        } catch (SQLException e) {
177            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
178            throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e);
179        } finally {
180            c.close();
181        }
182    }
183
184    public void recover(final MessageRecoveryListener listener) throws Exception {
185
186        // Get all the Message ids out of the database.
187        TransactionContext c = persistenceAdapter.getTransactionContext();
188        try {
189            c = persistenceAdapter.getTransactionContext();
190            adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
191                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
192                    Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
193                    msg.getMessageId().setBrokerSequenceId(sequenceId);
194                    return listener.recoverMessage(msg);
195                }
196
197                public boolean recoverMessageReference(String reference) throws Exception {
198                    return listener.recoverMessageReference(new MessageId(reference));
199                }
200            });
201        } catch (SQLException e) {
202            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
203            throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e);
204        } finally {
205            c.close();
206        }
207    }
208
209    /**
210     * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
211     */
212    public void removeAllMessages(ConnectionContext context) throws IOException {
213        // Get a connection and remove the message from the DB
214        TransactionContext c = persistenceAdapter.getTransactionContext(context);
215        try {
216            adapter.doRemoveAllMessages(c, destination);
217        } catch (SQLException e) {
218            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
219            throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e);
220        } finally {
221            c.close();
222        }
223    }
224
225    public int getMessageCount() throws IOException {
226        int result = 0;
227        TransactionContext c = persistenceAdapter.getTransactionContext();
228        try {
229
230            result = adapter.doGetMessageCount(c, destination);
231
232        } catch (SQLException e) {
233            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
234            throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e);
235        } finally {
236            c.close();
237        }
238        return result;
239    }
240
241    /**
242     * @param maxReturned
243     * @param listener
244     * @throws Exception
245     * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
246     *      org.apache.activemq.store.MessageRecoveryListener)
247     */
248    public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
249        TransactionContext c = persistenceAdapter.getTransactionContext();
250        try {
251            adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(),
252                    maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
253
254                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
255                    if (listener.hasSpace()) {
256                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
257                        msg.getMessageId().setBrokerSequenceId(sequenceId);
258                        listener.recoverMessage(msg);
259                        lastRecoveredSequenceId.set(sequenceId);
260                        lastRecoveredPriority.set(msg.getPriority());
261                        return true;
262                    }
263                    return false;
264                }
265
266                public boolean recoverMessageReference(String reference) throws Exception {
267                    if (listener.hasSpace()) {
268                        listener.recoverMessageReference(new MessageId(reference));
269                        return true;
270                    }
271                    return false;
272                }
273
274            });
275        } catch (SQLException e) {
276            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
277        } finally {
278            c.close();
279        }
280
281    }
282
283    /**
284     * @see org.apache.activemq.store.MessageStore#resetBatching()
285     */
286    public void resetBatching() {
287        if (LOG.isTraceEnabled()) {
288            LOG.trace(destination.getPhysicalName() + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
289        }
290        lastRecoveredSequenceId.set(-1);
291        lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
292
293    }
294
295    @Override
296    public void setBatch(MessageId messageId) {
297        try {
298            long[] storedValues = getStoreSequenceIdForMessageId(messageId);
299            lastRecoveredSequenceId.set(storedValues[0]);
300            lastRecoveredPriority.set(storedValues[1]);
301        } catch (IOException ignoredAsAlreadyLogged) {
302            lastRecoveredSequenceId.set(-1);
303            lastRecoveredPriority.set(Byte.MAX_VALUE -1);
304        }
305        if (LOG.isTraceEnabled()) {
306            LOG.trace(destination.getPhysicalName() + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
307                    + ", priority: " + lastRecoveredPriority.get());
308        }
309    }
310
311    private long[] getStoreSequenceIdForMessageId(MessageId messageId) throws IOException {
312        long[] result = new long[]{-1, Byte.MAX_VALUE -1};
313        TransactionContext c = persistenceAdapter.getTransactionContext();
314        try {
315            result = adapter.getStoreSequenceId(c, destination, messageId);
316        } catch (SQLException e) {
317            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
318            throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
319        } finally {
320            c.close();
321        }
322        return result;
323    }
324    
325    public void setPrioritizedMessages(boolean prioritizedMessages) {
326        super.setPrioritizedMessages(prioritizedMessages);
327    }   
328}