001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc.adapter;
018
019import java.io.IOException;
020import java.sql.PreparedStatement;
021import java.sql.ResultSet;
022import java.sql.SQLException;
023import java.sql.Statement;
024import java.util.ArrayList;
025import java.util.HashSet;
026import java.util.LinkedList;
027import java.util.Set;
028import java.util.concurrent.locks.ReadWriteLock;
029import java.util.concurrent.locks.ReentrantReadWriteLock;
030
031import org.apache.activemq.command.ActiveMQDestination;
032import org.apache.activemq.command.MessageId;
033import org.apache.activemq.command.ProducerId;
034import org.apache.activemq.command.SubscriptionInfo;
035import org.apache.activemq.store.jdbc.JDBCAdapter;
036import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
037import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
038import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
039import org.apache.activemq.store.jdbc.Statements;
040import org.apache.activemq.store.jdbc.TransactionContext;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
046 * encouraged to override the default implementation of methods to account for differences in JDBC Driver
047 * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
048 * The databases/JDBC drivers that use this adapter are:
049 * <ul>
050 * <li></li>
051 * </ul>
052 * 
053 * @org.apache.xbean.XBean element="defaultJDBCAdapter"
054 * 
055 * 
056 */
057public class DefaultJDBCAdapter implements JDBCAdapter {
058    private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class);
059    public static final int MAX_ROWS = 10000;
060    protected Statements statements;
061    protected boolean batchStatments = true;
062    protected boolean prioritizedMessages;
063    protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
064    // needs to be min twice the prefetch for a durable sub and large enough for selector range
065    protected int maxRows = MAX_ROWS;
066
067    protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
068        s.setBytes(index, data);
069    }
070
071    protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
072        return rs.getBytes(index);
073    }
074
075    public void doCreateTables(TransactionContext c) throws SQLException, IOException {
076        Statement s = null;
077        cleanupExclusiveLock.writeLock().lock();
078        try {
079            // Check to see if the table already exists. If it does, then don't
080            // log warnings during startup.
081            // Need to run the scripts anyways since they may contain ALTER
082            // statements that upgrade a previous version
083            // of the table
084            boolean alreadyExists = false;
085            ResultSet rs = null;
086            try {
087                rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),
088                        new String[] { "TABLE" });
089                alreadyExists = rs.next();
090            } catch (Throwable ignore) {
091            } finally {
092                close(rs);
093            }
094            s = c.getConnection().createStatement();
095            String[] createStatments = this.statements.getCreateSchemaStatements();
096            for (int i = 0; i < createStatments.length; i++) {
097                // This will fail usually since the tables will be
098                // created already.
099                try {
100                    LOG.debug("Executing SQL: " + createStatments[i]);
101                    s.execute(createStatments[i]);
102                } catch (SQLException e) {
103                    if (alreadyExists) {
104                        LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: "
105                                + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
106                                + " Vendor code: " + e.getErrorCode());
107                    } else {
108                        LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
109                                + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
110                                + " Vendor code: " + e.getErrorCode());
111                        JDBCPersistenceAdapter.log("Failure details: ", e);
112                    }
113                }
114            }
115            c.getConnection().commit();
116        } finally {
117            cleanupExclusiveLock.writeLock().unlock();
118            try {
119                s.close();
120            } catch (Throwable e) {
121            }
122        }
123    }
124
125    public void doDropTables(TransactionContext c) throws SQLException, IOException {
126        Statement s = null;
127        cleanupExclusiveLock.writeLock().lock();
128        try {
129            s = c.getConnection().createStatement();
130            String[] dropStatments = this.statements.getDropSchemaStatements();
131            for (int i = 0; i < dropStatments.length; i++) {
132                // This will fail usually since the tables will be
133                // created already.
134                try {
135                    LOG.debug("Executing SQL: " + dropStatments[i]);
136                    s.execute(dropStatments[i]);
137                } catch (SQLException e) {
138                    LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
139                            + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
140                            + e.getErrorCode());
141                    JDBCPersistenceAdapter.log("Failure details: ", e);
142                }
143            }
144            c.getConnection().commit();
145        } finally {
146            cleanupExclusiveLock.writeLock().unlock();
147            try {
148                s.close();
149            } catch (Throwable e) {
150            }
151        }
152    }
153
154    public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
155        PreparedStatement s = null;
156        ResultSet rs = null;
157        cleanupExclusiveLock.readLock().lock();
158        try {
159            s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
160            rs = s.executeQuery();
161            long seq1 = 0;
162            if (rs.next()) {
163                seq1 = rs.getLong(1);
164            }
165            rs.close();
166            s.close();
167            s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
168            rs = s.executeQuery();
169            long seq2 = 0;
170            if (rs.next()) {
171                seq2 = rs.getLong(1);
172            }
173            long seq = Math.max(seq1, seq2);
174            return seq;
175        } finally {
176            cleanupExclusiveLock.readLock().unlock();
177            close(rs);
178            close(s);
179        }
180    }
181    
182    public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
183        PreparedStatement s = null;
184        ResultSet rs = null;
185        cleanupExclusiveLock.readLock().lock();
186        try {
187            s = c.getConnection().prepareStatement(
188                    this.statements.getFindMessageByIdStatement());
189            s.setLong(1, storeSequenceId);
190            rs = s.executeQuery();
191            if (!rs.next()) {
192                return null;
193            }
194            return getBinaryData(rs, 1);
195        } finally {
196            cleanupExclusiveLock.readLock().unlock();
197            close(rs);
198            close(s);
199        }
200    }
201    
202
203    public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
204            long expiration, byte priority) throws SQLException, IOException {
205        PreparedStatement s = c.getAddMessageStatement();
206        cleanupExclusiveLock.readLock().lock();
207        try {
208            if (s == null) {
209                s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
210                if (this.batchStatments) {
211                    c.setAddMessageStatement(s);
212                }
213            }
214            s.setLong(1, sequence);
215            s.setString(2, messageID.getProducerId().toString());
216            s.setLong(3, messageID.getProducerSequenceId());
217            s.setString(4, destination.getQualifiedName());
218            s.setLong(5, expiration);
219            s.setLong(6, priority);
220            setBinaryData(s, 7, data);
221            if (this.batchStatments) {
222                s.addBatch();
223            } else if (s.executeUpdate() != 1) {
224                throw new SQLException("Failed add a message");
225            }
226        } finally {
227            cleanupExclusiveLock.readLock().unlock();
228            if (!this.batchStatments) {
229                if (s != null) {
230                    s.close();
231                }
232            }
233        }
234    }
235
236    public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
237            long expirationTime, String messageRef) throws SQLException, IOException {
238        PreparedStatement s = c.getAddMessageStatement();
239        cleanupExclusiveLock.readLock().lock();
240        try {
241            if (s == null) {
242                s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
243                if (this.batchStatments) {
244                    c.setAddMessageStatement(s);
245                }
246            }
247            s.setLong(1, messageID.getBrokerSequenceId());
248            s.setString(2, messageID.getProducerId().toString());
249            s.setLong(3, messageID.getProducerSequenceId());
250            s.setString(4, destination.getQualifiedName());
251            s.setLong(5, expirationTime);
252            s.setString(6, messageRef);
253            if (this.batchStatments) {
254                s.addBatch();
255            } else if (s.executeUpdate() != 1) {
256                throw new SQLException("Failed add a message");
257            }
258        } finally {
259            cleanupExclusiveLock.readLock().unlock();
260            if (!this.batchStatments) {
261                s.close();
262            }
263        }
264    }
265
266    public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
267        PreparedStatement s = null;
268        ResultSet rs = null;
269        cleanupExclusiveLock.readLock().lock();
270        try {
271            s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
272            s.setString(1, messageID.getProducerId().toString());
273            s.setLong(2, messageID.getProducerSequenceId());
274            s.setString(3, destination.getQualifiedName());
275            rs = s.executeQuery();
276            if (!rs.next()) {
277                return new long[]{0,0};
278            }
279            return new long[]{rs.getLong(1), rs.getLong(2)};
280        } finally {
281            cleanupExclusiveLock.readLock().unlock();
282            close(rs);
283            close(s);
284        }
285    }
286
287    public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
288        PreparedStatement s = null;
289        ResultSet rs = null;
290        cleanupExclusiveLock.readLock().lock();
291        try {
292            s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
293            s.setString(1, id.getProducerId().toString());
294            s.setLong(2, id.getProducerSequenceId());
295            rs = s.executeQuery();
296            if (!rs.next()) {
297                return null;
298            }
299            return getBinaryData(rs, 1);
300        } finally {
301            cleanupExclusiveLock.readLock().unlock();
302            close(rs);
303            close(s);
304        }
305    }
306
307    public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
308        PreparedStatement s = null;
309        ResultSet rs = null;
310        cleanupExclusiveLock.readLock().lock();
311        try {
312            s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
313            s.setLong(1, seq);
314            rs = s.executeQuery();
315            if (!rs.next()) {
316                return null;
317            }
318            return rs.getString(1);
319        } finally {
320            cleanupExclusiveLock.readLock().unlock();
321            close(rs);
322            close(s);
323        }
324    }
325
326    public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException {
327        PreparedStatement s = c.getRemovedMessageStatement();
328        cleanupExclusiveLock.readLock().lock();
329        try {
330            if (s == null) {
331                s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatement());
332                if (this.batchStatments) {
333                    c.setRemovedMessageStatement(s);
334                }
335            }
336            s.setLong(1, seq);
337            if (this.batchStatments) {
338                s.addBatch();
339            } else if (s.executeUpdate() != 1) {
340                throw new SQLException("Failed to remove message");
341            }
342        } finally {
343            cleanupExclusiveLock.readLock().unlock();
344            if (!this.batchStatments && s != null) {
345                s.close();
346            }
347        }
348    }
349
350    public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
351            throws Exception {
352        PreparedStatement s = null;
353        ResultSet rs = null;
354        cleanupExclusiveLock.readLock().lock();
355        try {
356            s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
357            s.setString(1, destination.getQualifiedName());
358            rs = s.executeQuery();
359            if (this.statements.isUseExternalMessageReferences()) {
360                while (rs.next()) {
361                    if (!listener.recoverMessageReference(rs.getString(2))) {
362                        break;
363                    }
364                }
365            } else {
366                while (rs.next()) {
367                    if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
368                        break;
369                    }
370                }
371            }
372        } finally {
373            cleanupExclusiveLock.readLock().unlock();
374            close(rs);
375            close(s);
376        }
377    }
378
379    public void doMessageIdScan(TransactionContext c, int limit, 
380            JDBCMessageIdScanListener listener) throws SQLException, IOException {
381        PreparedStatement s = null;
382        ResultSet rs = null;
383        cleanupExclusiveLock.readLock().lock();
384        try {
385            s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
386            s.setMaxRows(limit);
387            rs = s.executeQuery();
388            // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
389            LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
390            while (rs.next()) {
391                reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
392            }
393            if (LOG.isDebugEnabled()) {
394                LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
395            }
396            for (MessageId id : reverseOrderIds) {
397                listener.messageId(id);
398            }
399        } finally {
400            cleanupExclusiveLock.readLock().unlock();
401            close(rs);
402            close(s);
403        }
404    }
405    
406    public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
407            String subscriptionName, long seq, long prio) throws SQLException, IOException {
408        PreparedStatement s = c.getUpdateLastAckStatement();
409        cleanupExclusiveLock.readLock().lock();
410        try {
411            if (s == null) {
412                s = c.getConnection().prepareStatement(this.statements.getUpdateLastPriorityAckRowOfDurableSubStatement());
413                if (this.batchStatments) {
414                    c.setUpdateLastAckStatement(s);
415                }
416            }
417            s.setLong(1, seq);
418            s.setString(2, destination.getQualifiedName());
419            s.setString(3, clientId);
420            s.setString(4, subscriptionName);
421            s.setLong(5, prio);
422            if (this.batchStatments) {
423                s.addBatch();
424            } else if (s.executeUpdate() != 1) {
425                throw new SQLException("Failed update last ack with priority: " + prio + ", for sub: " + subscriptionName);
426            }
427        } finally {
428            cleanupExclusiveLock.readLock().unlock();
429            if (!this.batchStatments) {
430                close(s);
431            }
432        }
433    }
434
435
436    public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
437                                        String subscriptionName, long seq, long priority) throws SQLException, IOException {
438        PreparedStatement s = c.getUpdateLastAckStatement();
439        cleanupExclusiveLock.readLock().lock();
440        try {
441            if (s == null) {
442                s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement());
443                if (this.batchStatments) {
444                    c.setUpdateLastAckStatement(s);
445                }
446            }
447            s.setLong(1, seq);
448            s.setString(2, destination.getQualifiedName());
449            s.setString(3, clientId);
450            s.setString(4, subscriptionName);
451
452            if (this.batchStatments) {
453                s.addBatch();
454            } else if (s.executeUpdate() != 1) {
455                throw new IOException("Could not update last ack seq : "
456                            + seq + ", for sub: " + subscriptionName);
457            }
458        } finally {
459            cleanupExclusiveLock.readLock().unlock();
460            if (!this.batchStatments) {
461                close(s);
462            }            
463        }
464    }
465
466    public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
467            String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
468        // dumpTables(c,
469        // destination.getQualifiedName(),clientId,subscriptionName);
470        PreparedStatement s = null;
471        ResultSet rs = null;
472        cleanupExclusiveLock.readLock().lock();
473        try {
474            s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
475            s.setString(1, destination.getQualifiedName());
476            s.setString(2, clientId);
477            s.setString(3, subscriptionName);
478            rs = s.executeQuery();
479            if (this.statements.isUseExternalMessageReferences()) {
480                while (rs.next()) {
481                    if (!listener.recoverMessageReference(rs.getString(2))) {
482                        break;
483                    }
484                }
485            } else {
486                while (rs.next()) {
487                    if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
488                        break;
489                    }
490                }
491            }
492        } finally {
493            cleanupExclusiveLock.readLock().unlock();
494            close(rs);
495            close(s);
496        }
497    }
498
499    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
500            String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
501        
502        PreparedStatement s = null;
503        ResultSet rs = null;
504        cleanupExclusiveLock.readLock().lock();
505        try {
506            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
507            s.setMaxRows(Math.max(maxReturned * 2, maxRows));
508            s.setString(1, destination.getQualifiedName());
509            s.setString(2, clientId);
510            s.setString(3, subscriptionName);
511            s.setLong(4, seq);
512            rs = s.executeQuery();
513            int count = 0;
514            if (this.statements.isUseExternalMessageReferences()) {
515                while (rs.next() && count < maxReturned) {
516                    if (listener.recoverMessageReference(rs.getString(1))) {
517                        count++;
518                    }
519                }
520            } else {
521                while (rs.next() && count < maxReturned) {
522                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
523                        count++;
524                    }
525                }
526            }
527        } finally {
528            cleanupExclusiveLock.readLock().unlock();
529            close(rs);
530            close(s);
531        }
532    }
533
534    public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
535            String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
536
537        PreparedStatement s = null;
538        ResultSet rs = null;
539        cleanupExclusiveLock.readLock().lock();
540        try {
541            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
542            s.setMaxRows(maxRows);
543            s.setString(1, destination.getQualifiedName());
544            s.setString(2, clientId);
545            s.setString(3, subscriptionName);
546            s.setLong(4, seq);
547            s.setLong(5, priority);
548            rs = s.executeQuery();
549            int count = 0;
550            if (this.statements.isUseExternalMessageReferences()) {
551                while (rs.next() && count < maxReturned) {
552                    if (listener.recoverMessageReference(rs.getString(1))) {
553                        count++;
554                    }
555                }
556            } else {
557                while (rs.next() && count < maxReturned) {
558                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
559                        count++;
560                    }
561                }
562            }
563        } finally {
564            cleanupExclusiveLock.readLock().unlock();
565            close(rs);
566            close(s);
567        }
568    }
569
570    public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
571            String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException {
572        PreparedStatement s = null;
573        ResultSet rs = null;
574        int result = 0;
575        cleanupExclusiveLock.readLock().lock();
576        try {
577            if (isPrioritizedMessages) {
578                s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
579            } else {
580                s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());    
581            }
582            s.setString(1, destination.getQualifiedName());
583            s.setString(2, clientId);
584            s.setString(3, subscriptionName);
585            rs = s.executeQuery();
586            if (rs.next()) {
587                result = rs.getInt(1);
588            }
589        } finally {
590            cleanupExclusiveLock.readLock().unlock();
591            close(rs);
592            close(s);
593        }
594        return result;
595    }
596
597    /**
598     * @param c 
599     * @param info 
600     * @param retroactive 
601     * @throws SQLException 
602     * @throws IOException 
603     */
604    public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages)
605            throws SQLException, IOException {
606        // dumpTables(c, destination.getQualifiedName(), clientId,
607        // subscriptionName);
608        PreparedStatement s = null;
609        cleanupExclusiveLock.readLock().lock();
610        try {
611            long lastMessageId = -1;
612            if (!retroactive) {
613                s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
614                ResultSet rs = null;
615                try {
616                    rs = s.executeQuery();
617                    if (rs.next()) {
618                        lastMessageId = rs.getLong(1);
619                    }
620                } finally {
621                    close(rs);
622                    close(s);
623                }
624            }
625            s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
626            int maxPriority = 1;
627            if (isPrioritizedMessages) {
628                maxPriority = 10;
629            }
630
631            for (int priority = 0; priority < maxPriority; priority++) {
632                s.setString(1, info.getDestination().getQualifiedName());
633                s.setString(2, info.getClientId());
634                s.setString(3, info.getSubscriptionName());
635                s.setString(4, info.getSelector());
636                s.setLong(5, lastMessageId);
637                s.setString(6, info.getSubscribedDestination().getQualifiedName());
638                s.setLong(7, priority);
639
640                if (s.executeUpdate() != 1) {
641                    throw new IOException("Could not create durable subscription for: " + info.getClientId());
642                }
643            }
644
645        } finally {
646            cleanupExclusiveLock.readLock().unlock();
647            close(s);
648        }
649    }
650
651    public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
652            String clientId, String subscriptionName) throws SQLException, IOException {
653        PreparedStatement s = null;
654        ResultSet rs = null;
655        cleanupExclusiveLock.readLock().lock();
656        try {
657            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
658            s.setString(1, destination.getQualifiedName());
659            s.setString(2, clientId);
660            s.setString(3, subscriptionName);
661            rs = s.executeQuery();
662            if (!rs.next()) {
663                return null;
664            }
665            SubscriptionInfo subscription = new SubscriptionInfo();
666            subscription.setDestination(destination);
667            subscription.setClientId(clientId);
668            subscription.setSubscriptionName(subscriptionName);
669            subscription.setSelector(rs.getString(1));
670            subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
671                    ActiveMQDestination.QUEUE_TYPE));
672            return subscription;
673        } finally {
674            cleanupExclusiveLock.readLock().unlock();
675            close(rs);
676            close(s);
677        }
678    }
679
680    public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
681            throws SQLException, IOException {
682        PreparedStatement s = null;
683        ResultSet rs = null;
684        cleanupExclusiveLock.readLock().lock();
685        try {
686            s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
687            s.setString(1, destination.getQualifiedName());
688            rs = s.executeQuery();
689            ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
690            while (rs.next()) {
691                SubscriptionInfo subscription = new SubscriptionInfo();
692                subscription.setDestination(destination);
693                subscription.setSelector(rs.getString(1));
694                subscription.setSubscriptionName(rs.getString(2));
695                subscription.setClientId(rs.getString(3));
696                subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
697                        ActiveMQDestination.QUEUE_TYPE));
698                rc.add(subscription);
699            }
700            return rc.toArray(new SubscriptionInfo[rc.size()]);
701        } finally {
702            cleanupExclusiveLock.readLock().unlock();
703            close(rs);
704            close(s);
705        }
706    }
707
708    public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
709            IOException {
710        PreparedStatement s = null;
711        cleanupExclusiveLock.readLock().lock();
712        try {
713            s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
714            s.setString(1, destinationName.getQualifiedName());
715            s.executeUpdate();
716            s.close();
717            s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
718            s.setString(1, destinationName.getQualifiedName());
719            s.executeUpdate();
720        } finally {
721            cleanupExclusiveLock.readLock().unlock();
722            close(s);
723        }
724    }
725
726    public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
727            String subscriptionName) throws SQLException, IOException {
728        PreparedStatement s = null;
729        cleanupExclusiveLock.readLock().lock();
730        try {
731            s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
732            s.setString(1, destination.getQualifiedName());
733            s.setString(2, clientId);
734            s.setString(3, subscriptionName);
735            s.executeUpdate();
736        } finally {
737            cleanupExclusiveLock.readLock().unlock();
738            close(s);
739        }
740    }
741
742    public void doDeleteOldMessages(TransactionContext c, boolean isPrioritizedMessages) throws SQLException, IOException {
743        PreparedStatement s = null;
744        cleanupExclusiveLock.writeLock().lock();
745        try {
746            if (isPrioritizedMessages) {
747                LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
748                s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
749            } else {
750                LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement());
751                s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement());
752            }
753            s.setLong(1, System.currentTimeMillis());
754            int i = s.executeUpdate();
755            LOG.debug("Deleted " + i + " old message(s).");
756        } finally {
757            cleanupExclusiveLock.writeLock().unlock();
758            close(s);
759        }
760    }
761
762    public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
763            String clientId, String subscriberName) throws SQLException, IOException {
764        PreparedStatement s = null;
765        ResultSet rs = null;
766        long result = -1;
767        cleanupExclusiveLock.readLock().lock();
768        try {
769            s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
770            s.setString(1, destination.getQualifiedName());
771            s.setString(2, clientId);
772            s.setString(3, subscriberName);
773            rs = s.executeQuery();
774            if (rs.next()) {
775                result = rs.getLong(1);
776            }
777        } finally {
778            cleanupExclusiveLock.readLock().unlock();
779            close(rs);
780            close(s);
781        }
782        return result;
783    }
784
785    private static void close(PreparedStatement s) {
786        try {
787            s.close();
788        } catch (Throwable e) {
789        }
790    }
791
792    private static void close(ResultSet rs) {
793        try {
794            rs.close();
795        } catch (Throwable e) {
796        }
797    }
798
799    public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException {
800        HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
801        PreparedStatement s = null;
802        ResultSet rs = null;
803        cleanupExclusiveLock.readLock().lock();
804        try {
805            s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
806            rs = s.executeQuery();
807            while (rs.next()) {
808                rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
809            }
810        } finally {
811            cleanupExclusiveLock.readLock().unlock();
812            close(rs);
813            close(s);
814        }
815        return rc;
816    }
817
818    /**
819     * @return true if batchStements
820     */
821    public boolean isBatchStatments() {
822        return this.batchStatments;
823    }
824
825    /**
826     * @param batchStatments
827     */
828    public void setBatchStatments(boolean batchStatments) {
829        this.batchStatments = batchStatments;
830    }
831
832    public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
833        this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
834    }
835
836    /**
837     * @return the statements
838     */
839    public Statements getStatements() {
840        return this.statements;
841    }
842
843    public void setStatements(Statements statements) {
844        this.statements = statements;
845    }
846
847    public int getMaxRows() {
848        return maxRows;
849    }
850
851    public void setMaxRows(int maxRows) {
852        this.maxRows = maxRows;
853    }    
854
855    /**
856     * @param c
857     * @param destination
858     * @param clientId
859     * @param subscriberName
860     * @return
861     * @throws SQLException
862     * @throws IOException
863     */
864    public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,
865            String clientId, String subscriberName) throws SQLException, IOException {
866        PreparedStatement s = null;
867        ResultSet rs = null;
868        cleanupExclusiveLock.readLock().lock();
869        try {
870            s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement());
871            s.setString(1, destination.getQualifiedName());
872            s.setString(2, clientId);
873            s.setString(3, subscriberName);
874            rs = s.executeQuery();
875            if (!rs.next()) {
876                return null;
877            }
878            return getBinaryData(rs, 1);
879        } finally {
880            close(rs);
881            cleanupExclusiveLock.readLock().unlock();
882            close(s);
883        }
884    }
885
886    public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
887            IOException {
888        PreparedStatement s = null;
889        ResultSet rs = null;
890        int result = 0;
891        cleanupExclusiveLock.readLock().lock();
892        try {
893            s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
894            s.setString(1, destination.getQualifiedName());
895            rs = s.executeQuery();
896            if (rs.next()) {
897                result = rs.getInt(1);
898            }
899        } finally {
900            cleanupExclusiveLock.readLock().unlock();
901            close(rs);
902            close(s);
903        }
904        return result;
905    }
906
907    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
908            long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
909        PreparedStatement s = null;
910        ResultSet rs = null;
911        cleanupExclusiveLock.readLock().lock();
912        try {
913            if (isPrioritizedMessages) {
914                s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
915            } else {
916                s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
917            }
918            s.setMaxRows(Math.max(maxReturned * 2, maxRows));
919            s.setString(1, destination.getQualifiedName());
920            s.setLong(2, nextSeq);
921            if (isPrioritizedMessages) {
922                s.setLong(3, priority);
923                s.setLong(4, priority);
924            }
925            rs = s.executeQuery();
926            int count = 0;
927            if (this.statements.isUseExternalMessageReferences()) {
928                while (rs.next() && count < maxReturned) {
929                    if (listener.recoverMessageReference(rs.getString(1))) {
930                        count++;
931                    } else {
932                        LOG.debug("Stopped recover next messages");
933                        break;
934                    }
935                }
936            } else {
937                while (rs.next() && count < maxReturned) {
938                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
939                        count++;
940                    } else {
941                        LOG.debug("Stopped recover next messages");
942                        break;
943                    }
944                }
945            }
946        } catch (Exception e) {
947            e.printStackTrace();
948        } finally {
949            cleanupExclusiveLock.readLock().unlock();
950            close(rs);
951            close(s);
952        }
953    }
954    
955/*    public void dumpTables(Connection c, String destinationName, String clientId, String
956      subscriptionName) throws SQLException { 
957        printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 
958        printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 
959        PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM " 
960                + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " 
961                + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 
962                + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" 
963                + " ORDER BY M.ID");
964      s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
965      printQuery(s,System.out); }
966
967    public void dumpTables(Connection c) throws SQLException {
968        printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
969        printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
970    }
971
972    private void printQuery(Connection c, String query, PrintStream out)
973            throws SQLException {
974        printQuery(c.prepareStatement(query), out);
975    }
976
977    private void printQuery(PreparedStatement s, PrintStream out)
978            throws SQLException {
979
980        ResultSet set = null;
981        try {
982            set = s.executeQuery();
983            ResultSetMetaData metaData = set.getMetaData();
984            for (int i = 1; i <= metaData.getColumnCount(); i++) {
985                if (i == 1)
986                    out.print("||");
987                out.print(metaData.getColumnName(i) + "||");
988            }
989            out.println();
990            while (set.next()) {
991                for (int i = 1; i <= metaData.getColumnCount(); i++) {
992                    if (i == 1)
993                        out.print("|");
994                    out.print(set.getString(i) + "|");
995                }
996                out.println();
997            }
998        } finally {
999            try {
1000                set.close();
1001            } catch (Throwable ignore) {
1002            }
1003            try {
1004                s.close();
1005            } catch (Throwable ignore) {
1006            }
1007        }
1008    }  */  
1009
1010    public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
1011            throws SQLException, IOException {
1012        PreparedStatement s = null;
1013        ResultSet rs = null;
1014        cleanupExclusiveLock.readLock().lock();
1015        try {
1016            s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
1017            s.setString(1, id.toString());
1018            rs = s.executeQuery();
1019            long seq = -1;
1020            if (rs.next()) {
1021                seq = rs.getLong(1);
1022            }
1023            return seq;
1024        } finally {
1025            cleanupExclusiveLock.readLock().unlock();
1026            close(rs);
1027            close(s);
1028        }
1029    }
1030
1031}