001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc.adapter; 018 019import java.io.IOException; 020import java.sql.PreparedStatement; 021import java.sql.ResultSet; 022import java.sql.SQLException; 023import java.sql.Statement; 024import java.util.ArrayList; 025import java.util.HashSet; 026import java.util.LinkedList; 027import java.util.Set; 028import java.util.concurrent.locks.ReadWriteLock; 029import java.util.concurrent.locks.ReentrantReadWriteLock; 030 031import org.apache.activemq.command.ActiveMQDestination; 032import org.apache.activemq.command.MessageId; 033import org.apache.activemq.command.ProducerId; 034import org.apache.activemq.command.SubscriptionInfo; 035import org.apache.activemq.store.jdbc.JDBCAdapter; 036import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener; 037import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener; 038import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 039import org.apache.activemq.store.jdbc.Statements; 040import org.apache.activemq.store.jdbc.TransactionContext; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is 046 * encouraged to override the default implementation of methods to account for differences in JDBC Driver 047 * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/> 048 * The databases/JDBC drivers that use this adapter are: 049 * <ul> 050 * <li></li> 051 * </ul> 052 * 053 * @org.apache.xbean.XBean element="defaultJDBCAdapter" 054 * 055 * 056 */ 057public class DefaultJDBCAdapter implements JDBCAdapter { 058 private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class); 059 public static final int MAX_ROWS = 10000; 060 protected Statements statements; 061 protected boolean batchStatments = true; 062 protected boolean prioritizedMessages; 063 protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock(); 064 // needs to be min twice the prefetch for a durable sub and large enough for selector range 065 protected int maxRows = MAX_ROWS; 066 067 protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { 068 s.setBytes(index, data); 069 } 070 071 protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException { 072 return rs.getBytes(index); 073 } 074 075 public void doCreateTables(TransactionContext c) throws SQLException, IOException { 076 Statement s = null; 077 cleanupExclusiveLock.writeLock().lock(); 078 try { 079 // Check to see if the table already exists. If it does, then don't 080 // log warnings during startup. 081 // Need to run the scripts anyways since they may contain ALTER 082 // statements that upgrade a previous version 083 // of the table 084 boolean alreadyExists = false; 085 ResultSet rs = null; 086 try { 087 rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(), 088 new String[] { "TABLE" }); 089 alreadyExists = rs.next(); 090 } catch (Throwable ignore) { 091 } finally { 092 close(rs); 093 } 094 s = c.getConnection().createStatement(); 095 String[] createStatments = this.statements.getCreateSchemaStatements(); 096 for (int i = 0; i < createStatments.length; i++) { 097 // This will fail usually since the tables will be 098 // created already. 099 try { 100 LOG.debug("Executing SQL: " + createStatments[i]); 101 s.execute(createStatments[i]); 102 } catch (SQLException e) { 103 if (alreadyExists) { 104 LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: " 105 + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() 106 + " Vendor code: " + e.getErrorCode()); 107 } else { 108 LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: " 109 + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() 110 + " Vendor code: " + e.getErrorCode()); 111 JDBCPersistenceAdapter.log("Failure details: ", e); 112 } 113 } 114 } 115 c.getConnection().commit(); 116 } finally { 117 cleanupExclusiveLock.writeLock().unlock(); 118 try { 119 s.close(); 120 } catch (Throwable e) { 121 } 122 } 123 } 124 125 public void doDropTables(TransactionContext c) throws SQLException, IOException { 126 Statement s = null; 127 cleanupExclusiveLock.writeLock().lock(); 128 try { 129 s = c.getConnection().createStatement(); 130 String[] dropStatments = this.statements.getDropSchemaStatements(); 131 for (int i = 0; i < dropStatments.length; i++) { 132 // This will fail usually since the tables will be 133 // created already. 134 try { 135 LOG.debug("Executing SQL: " + dropStatments[i]); 136 s.execute(dropStatments[i]); 137 } catch (SQLException e) { 138 LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i] 139 + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: " 140 + e.getErrorCode()); 141 JDBCPersistenceAdapter.log("Failure details: ", e); 142 } 143 } 144 c.getConnection().commit(); 145 } finally { 146 cleanupExclusiveLock.writeLock().unlock(); 147 try { 148 s.close(); 149 } catch (Throwable e) { 150 } 151 } 152 } 153 154 public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException { 155 PreparedStatement s = null; 156 ResultSet rs = null; 157 cleanupExclusiveLock.readLock().lock(); 158 try { 159 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement()); 160 rs = s.executeQuery(); 161 long seq1 = 0; 162 if (rs.next()) { 163 seq1 = rs.getLong(1); 164 } 165 rs.close(); 166 s.close(); 167 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement()); 168 rs = s.executeQuery(); 169 long seq2 = 0; 170 if (rs.next()) { 171 seq2 = rs.getLong(1); 172 } 173 long seq = Math.max(seq1, seq2); 174 return seq; 175 } finally { 176 cleanupExclusiveLock.readLock().unlock(); 177 close(rs); 178 close(s); 179 } 180 } 181 182 public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException { 183 PreparedStatement s = null; 184 ResultSet rs = null; 185 cleanupExclusiveLock.readLock().lock(); 186 try { 187 s = c.getConnection().prepareStatement( 188 this.statements.getFindMessageByIdStatement()); 189 s.setLong(1, storeSequenceId); 190 rs = s.executeQuery(); 191 if (!rs.next()) { 192 return null; 193 } 194 return getBinaryData(rs, 1); 195 } finally { 196 cleanupExclusiveLock.readLock().unlock(); 197 close(rs); 198 close(s); 199 } 200 } 201 202 203 public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, 204 long expiration, byte priority) throws SQLException, IOException { 205 PreparedStatement s = c.getAddMessageStatement(); 206 cleanupExclusiveLock.readLock().lock(); 207 try { 208 if (s == null) { 209 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement()); 210 if (this.batchStatments) { 211 c.setAddMessageStatement(s); 212 } 213 } 214 s.setLong(1, sequence); 215 s.setString(2, messageID.getProducerId().toString()); 216 s.setLong(3, messageID.getProducerSequenceId()); 217 s.setString(4, destination.getQualifiedName()); 218 s.setLong(5, expiration); 219 s.setLong(6, priority); 220 setBinaryData(s, 7, data); 221 if (this.batchStatments) { 222 s.addBatch(); 223 } else if (s.executeUpdate() != 1) { 224 throw new SQLException("Failed add a message"); 225 } 226 } finally { 227 cleanupExclusiveLock.readLock().unlock(); 228 if (!this.batchStatments) { 229 if (s != null) { 230 s.close(); 231 } 232 } 233 } 234 } 235 236 public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, 237 long expirationTime, String messageRef) throws SQLException, IOException { 238 PreparedStatement s = c.getAddMessageStatement(); 239 cleanupExclusiveLock.readLock().lock(); 240 try { 241 if (s == null) { 242 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement()); 243 if (this.batchStatments) { 244 c.setAddMessageStatement(s); 245 } 246 } 247 s.setLong(1, messageID.getBrokerSequenceId()); 248 s.setString(2, messageID.getProducerId().toString()); 249 s.setLong(3, messageID.getProducerSequenceId()); 250 s.setString(4, destination.getQualifiedName()); 251 s.setLong(5, expirationTime); 252 s.setString(6, messageRef); 253 if (this.batchStatments) { 254 s.addBatch(); 255 } else if (s.executeUpdate() != 1) { 256 throw new SQLException("Failed add a message"); 257 } 258 } finally { 259 cleanupExclusiveLock.readLock().unlock(); 260 if (!this.batchStatments) { 261 s.close(); 262 } 263 } 264 } 265 266 public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException { 267 PreparedStatement s = null; 268 ResultSet rs = null; 269 cleanupExclusiveLock.readLock().lock(); 270 try { 271 s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement()); 272 s.setString(1, messageID.getProducerId().toString()); 273 s.setLong(2, messageID.getProducerSequenceId()); 274 s.setString(3, destination.getQualifiedName()); 275 rs = s.executeQuery(); 276 if (!rs.next()) { 277 return new long[]{0,0}; 278 } 279 return new long[]{rs.getLong(1), rs.getLong(2)}; 280 } finally { 281 cleanupExclusiveLock.readLock().unlock(); 282 close(rs); 283 close(s); 284 } 285 } 286 287 public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException { 288 PreparedStatement s = null; 289 ResultSet rs = null; 290 cleanupExclusiveLock.readLock().lock(); 291 try { 292 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); 293 s.setString(1, id.getProducerId().toString()); 294 s.setLong(2, id.getProducerSequenceId()); 295 rs = s.executeQuery(); 296 if (!rs.next()) { 297 return null; 298 } 299 return getBinaryData(rs, 1); 300 } finally { 301 cleanupExclusiveLock.readLock().unlock(); 302 close(rs); 303 close(s); 304 } 305 } 306 307 public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException { 308 PreparedStatement s = null; 309 ResultSet rs = null; 310 cleanupExclusiveLock.readLock().lock(); 311 try { 312 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); 313 s.setLong(1, seq); 314 rs = s.executeQuery(); 315 if (!rs.next()) { 316 return null; 317 } 318 return rs.getString(1); 319 } finally { 320 cleanupExclusiveLock.readLock().unlock(); 321 close(rs); 322 close(s); 323 } 324 } 325 326 public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException { 327 PreparedStatement s = c.getRemovedMessageStatement(); 328 cleanupExclusiveLock.readLock().lock(); 329 try { 330 if (s == null) { 331 s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatement()); 332 if (this.batchStatments) { 333 c.setRemovedMessageStatement(s); 334 } 335 } 336 s.setLong(1, seq); 337 if (this.batchStatments) { 338 s.addBatch(); 339 } else if (s.executeUpdate() != 1) { 340 throw new SQLException("Failed to remove message"); 341 } 342 } finally { 343 cleanupExclusiveLock.readLock().unlock(); 344 if (!this.batchStatments && s != null) { 345 s.close(); 346 } 347 } 348 } 349 350 public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener) 351 throws Exception { 352 PreparedStatement s = null; 353 ResultSet rs = null; 354 cleanupExclusiveLock.readLock().lock(); 355 try { 356 s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement()); 357 s.setString(1, destination.getQualifiedName()); 358 rs = s.executeQuery(); 359 if (this.statements.isUseExternalMessageReferences()) { 360 while (rs.next()) { 361 if (!listener.recoverMessageReference(rs.getString(2))) { 362 break; 363 } 364 } 365 } else { 366 while (rs.next()) { 367 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 368 break; 369 } 370 } 371 } 372 } finally { 373 cleanupExclusiveLock.readLock().unlock(); 374 close(rs); 375 close(s); 376 } 377 } 378 379 public void doMessageIdScan(TransactionContext c, int limit, 380 JDBCMessageIdScanListener listener) throws SQLException, IOException { 381 PreparedStatement s = null; 382 ResultSet rs = null; 383 cleanupExclusiveLock.readLock().lock(); 384 try { 385 s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement()); 386 s.setMaxRows(limit); 387 rs = s.executeQuery(); 388 // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid 389 LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>(); 390 while (rs.next()) { 391 reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3))); 392 } 393 if (LOG.isDebugEnabled()) { 394 LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids"); 395 } 396 for (MessageId id : reverseOrderIds) { 397 listener.messageId(id); 398 } 399 } finally { 400 cleanupExclusiveLock.readLock().unlock(); 401 close(rs); 402 close(s); 403 } 404 } 405 406 public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, 407 String subscriptionName, long seq, long prio) throws SQLException, IOException { 408 PreparedStatement s = c.getUpdateLastAckStatement(); 409 cleanupExclusiveLock.readLock().lock(); 410 try { 411 if (s == null) { 412 s = c.getConnection().prepareStatement(this.statements.getUpdateLastPriorityAckRowOfDurableSubStatement()); 413 if (this.batchStatments) { 414 c.setUpdateLastAckStatement(s); 415 } 416 } 417 s.setLong(1, seq); 418 s.setString(2, destination.getQualifiedName()); 419 s.setString(3, clientId); 420 s.setString(4, subscriptionName); 421 s.setLong(5, prio); 422 if (this.batchStatments) { 423 s.addBatch(); 424 } else if (s.executeUpdate() != 1) { 425 throw new SQLException("Failed update last ack with priority: " + prio + ", for sub: " + subscriptionName); 426 } 427 } finally { 428 cleanupExclusiveLock.readLock().unlock(); 429 if (!this.batchStatments) { 430 close(s); 431 } 432 } 433 } 434 435 436 public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, 437 String subscriptionName, long seq, long priority) throws SQLException, IOException { 438 PreparedStatement s = c.getUpdateLastAckStatement(); 439 cleanupExclusiveLock.readLock().lock(); 440 try { 441 if (s == null) { 442 s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement()); 443 if (this.batchStatments) { 444 c.setUpdateLastAckStatement(s); 445 } 446 } 447 s.setLong(1, seq); 448 s.setString(2, destination.getQualifiedName()); 449 s.setString(3, clientId); 450 s.setString(4, subscriptionName); 451 452 if (this.batchStatments) { 453 s.addBatch(); 454 } else if (s.executeUpdate() != 1) { 455 throw new IOException("Could not update last ack seq : " 456 + seq + ", for sub: " + subscriptionName); 457 } 458 } finally { 459 cleanupExclusiveLock.readLock().unlock(); 460 if (!this.batchStatments) { 461 close(s); 462 } 463 } 464 } 465 466 public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, 467 String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception { 468 // dumpTables(c, 469 // destination.getQualifiedName(),clientId,subscriptionName); 470 PreparedStatement s = null; 471 ResultSet rs = null; 472 cleanupExclusiveLock.readLock().lock(); 473 try { 474 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement()); 475 s.setString(1, destination.getQualifiedName()); 476 s.setString(2, clientId); 477 s.setString(3, subscriptionName); 478 rs = s.executeQuery(); 479 if (this.statements.isUseExternalMessageReferences()) { 480 while (rs.next()) { 481 if (!listener.recoverMessageReference(rs.getString(2))) { 482 break; 483 } 484 } 485 } else { 486 while (rs.next()) { 487 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 488 break; 489 } 490 } 491 } 492 } finally { 493 cleanupExclusiveLock.readLock().unlock(); 494 close(rs); 495 close(s); 496 } 497 } 498 499 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, 500 String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { 501 502 PreparedStatement s = null; 503 ResultSet rs = null; 504 cleanupExclusiveLock.readLock().lock(); 505 try { 506 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement()); 507 s.setMaxRows(Math.max(maxReturned * 2, maxRows)); 508 s.setString(1, destination.getQualifiedName()); 509 s.setString(2, clientId); 510 s.setString(3, subscriptionName); 511 s.setLong(4, seq); 512 rs = s.executeQuery(); 513 int count = 0; 514 if (this.statements.isUseExternalMessageReferences()) { 515 while (rs.next() && count < maxReturned) { 516 if (listener.recoverMessageReference(rs.getString(1))) { 517 count++; 518 } 519 } 520 } else { 521 while (rs.next() && count < maxReturned) { 522 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 523 count++; 524 } 525 } 526 } 527 } finally { 528 cleanupExclusiveLock.readLock().unlock(); 529 close(rs); 530 close(s); 531 } 532 } 533 534 public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, 535 String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { 536 537 PreparedStatement s = null; 538 ResultSet rs = null; 539 cleanupExclusiveLock.readLock().lock(); 540 try { 541 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement()); 542 s.setMaxRows(maxRows); 543 s.setString(1, destination.getQualifiedName()); 544 s.setString(2, clientId); 545 s.setString(3, subscriptionName); 546 s.setLong(4, seq); 547 s.setLong(5, priority); 548 rs = s.executeQuery(); 549 int count = 0; 550 if (this.statements.isUseExternalMessageReferences()) { 551 while (rs.next() && count < maxReturned) { 552 if (listener.recoverMessageReference(rs.getString(1))) { 553 count++; 554 } 555 } 556 } else { 557 while (rs.next() && count < maxReturned) { 558 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 559 count++; 560 } 561 } 562 } 563 } finally { 564 cleanupExclusiveLock.readLock().unlock(); 565 close(rs); 566 close(s); 567 } 568 } 569 570 public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination, 571 String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException { 572 PreparedStatement s = null; 573 ResultSet rs = null; 574 int result = 0; 575 cleanupExclusiveLock.readLock().lock(); 576 try { 577 if (isPrioritizedMessages) { 578 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority()); 579 } else { 580 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement()); 581 } 582 s.setString(1, destination.getQualifiedName()); 583 s.setString(2, clientId); 584 s.setString(3, subscriptionName); 585 rs = s.executeQuery(); 586 if (rs.next()) { 587 result = rs.getInt(1); 588 } 589 } finally { 590 cleanupExclusiveLock.readLock().unlock(); 591 close(rs); 592 close(s); 593 } 594 return result; 595 } 596 597 /** 598 * @param c 599 * @param info 600 * @param retroactive 601 * @throws SQLException 602 * @throws IOException 603 */ 604 public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages) 605 throws SQLException, IOException { 606 // dumpTables(c, destination.getQualifiedName(), clientId, 607 // subscriptionName); 608 PreparedStatement s = null; 609 cleanupExclusiveLock.readLock().lock(); 610 try { 611 long lastMessageId = -1; 612 if (!retroactive) { 613 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement()); 614 ResultSet rs = null; 615 try { 616 rs = s.executeQuery(); 617 if (rs.next()) { 618 lastMessageId = rs.getLong(1); 619 } 620 } finally { 621 close(rs); 622 close(s); 623 } 624 } 625 s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement()); 626 int maxPriority = 1; 627 if (isPrioritizedMessages) { 628 maxPriority = 10; 629 } 630 631 for (int priority = 0; priority < maxPriority; priority++) { 632 s.setString(1, info.getDestination().getQualifiedName()); 633 s.setString(2, info.getClientId()); 634 s.setString(3, info.getSubscriptionName()); 635 s.setString(4, info.getSelector()); 636 s.setLong(5, lastMessageId); 637 s.setString(6, info.getSubscribedDestination().getQualifiedName()); 638 s.setLong(7, priority); 639 640 if (s.executeUpdate() != 1) { 641 throw new IOException("Could not create durable subscription for: " + info.getClientId()); 642 } 643 } 644 645 } finally { 646 cleanupExclusiveLock.readLock().unlock(); 647 close(s); 648 } 649 } 650 651 public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, 652 String clientId, String subscriptionName) throws SQLException, IOException { 653 PreparedStatement s = null; 654 ResultSet rs = null; 655 cleanupExclusiveLock.readLock().lock(); 656 try { 657 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement()); 658 s.setString(1, destination.getQualifiedName()); 659 s.setString(2, clientId); 660 s.setString(3, subscriptionName); 661 rs = s.executeQuery(); 662 if (!rs.next()) { 663 return null; 664 } 665 SubscriptionInfo subscription = new SubscriptionInfo(); 666 subscription.setDestination(destination); 667 subscription.setClientId(clientId); 668 subscription.setSubscriptionName(subscriptionName); 669 subscription.setSelector(rs.getString(1)); 670 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2), 671 ActiveMQDestination.QUEUE_TYPE)); 672 return subscription; 673 } finally { 674 cleanupExclusiveLock.readLock().unlock(); 675 close(rs); 676 close(s); 677 } 678 } 679 680 public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) 681 throws SQLException, IOException { 682 PreparedStatement s = null; 683 ResultSet rs = null; 684 cleanupExclusiveLock.readLock().lock(); 685 try { 686 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement()); 687 s.setString(1, destination.getQualifiedName()); 688 rs = s.executeQuery(); 689 ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>(); 690 while (rs.next()) { 691 SubscriptionInfo subscription = new SubscriptionInfo(); 692 subscription.setDestination(destination); 693 subscription.setSelector(rs.getString(1)); 694 subscription.setSubscriptionName(rs.getString(2)); 695 subscription.setClientId(rs.getString(3)); 696 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4), 697 ActiveMQDestination.QUEUE_TYPE)); 698 rc.add(subscription); 699 } 700 return rc.toArray(new SubscriptionInfo[rc.size()]); 701 } finally { 702 cleanupExclusiveLock.readLock().unlock(); 703 close(rs); 704 close(s); 705 } 706 } 707 708 public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, 709 IOException { 710 PreparedStatement s = null; 711 cleanupExclusiveLock.readLock().lock(); 712 try { 713 s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement()); 714 s.setString(1, destinationName.getQualifiedName()); 715 s.executeUpdate(); 716 s.close(); 717 s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement()); 718 s.setString(1, destinationName.getQualifiedName()); 719 s.executeUpdate(); 720 } finally { 721 cleanupExclusiveLock.readLock().unlock(); 722 close(s); 723 } 724 } 725 726 public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, 727 String subscriptionName) throws SQLException, IOException { 728 PreparedStatement s = null; 729 cleanupExclusiveLock.readLock().lock(); 730 try { 731 s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement()); 732 s.setString(1, destination.getQualifiedName()); 733 s.setString(2, clientId); 734 s.setString(3, subscriptionName); 735 s.executeUpdate(); 736 } finally { 737 cleanupExclusiveLock.readLock().unlock(); 738 close(s); 739 } 740 } 741 742 public void doDeleteOldMessages(TransactionContext c, boolean isPrioritizedMessages) throws SQLException, IOException { 743 PreparedStatement s = null; 744 cleanupExclusiveLock.writeLock().lock(); 745 try { 746 if (isPrioritizedMessages) { 747 LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority()); 748 s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority()); 749 } else { 750 LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement()); 751 s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement()); 752 } 753 s.setLong(1, System.currentTimeMillis()); 754 int i = s.executeUpdate(); 755 LOG.debug("Deleted " + i + " old message(s)."); 756 } finally { 757 cleanupExclusiveLock.writeLock().unlock(); 758 close(s); 759 } 760 } 761 762 public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, 763 String clientId, String subscriberName) throws SQLException, IOException { 764 PreparedStatement s = null; 765 ResultSet rs = null; 766 long result = -1; 767 cleanupExclusiveLock.readLock().lock(); 768 try { 769 s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement()); 770 s.setString(1, destination.getQualifiedName()); 771 s.setString(2, clientId); 772 s.setString(3, subscriberName); 773 rs = s.executeQuery(); 774 if (rs.next()) { 775 result = rs.getLong(1); 776 } 777 } finally { 778 cleanupExclusiveLock.readLock().unlock(); 779 close(rs); 780 close(s); 781 } 782 return result; 783 } 784 785 private static void close(PreparedStatement s) { 786 try { 787 s.close(); 788 } catch (Throwable e) { 789 } 790 } 791 792 private static void close(ResultSet rs) { 793 try { 794 rs.close(); 795 } catch (Throwable e) { 796 } 797 } 798 799 public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException { 800 HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 801 PreparedStatement s = null; 802 ResultSet rs = null; 803 cleanupExclusiveLock.readLock().lock(); 804 try { 805 s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement()); 806 rs = s.executeQuery(); 807 while (rs.next()) { 808 rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE)); 809 } 810 } finally { 811 cleanupExclusiveLock.readLock().unlock(); 812 close(rs); 813 close(s); 814 } 815 return rc; 816 } 817 818 /** 819 * @return true if batchStements 820 */ 821 public boolean isBatchStatments() { 822 return this.batchStatments; 823 } 824 825 /** 826 * @param batchStatments 827 */ 828 public void setBatchStatments(boolean batchStatments) { 829 this.batchStatments = batchStatments; 830 } 831 832 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 833 this.statements.setUseExternalMessageReferences(useExternalMessageReferences); 834 } 835 836 /** 837 * @return the statements 838 */ 839 public Statements getStatements() { 840 return this.statements; 841 } 842 843 public void setStatements(Statements statements) { 844 this.statements = statements; 845 } 846 847 public int getMaxRows() { 848 return maxRows; 849 } 850 851 public void setMaxRows(int maxRows) { 852 this.maxRows = maxRows; 853 } 854 855 /** 856 * @param c 857 * @param destination 858 * @param clientId 859 * @param subscriberName 860 * @return 861 * @throws SQLException 862 * @throws IOException 863 */ 864 public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination, 865 String clientId, String subscriberName) throws SQLException, IOException { 866 PreparedStatement s = null; 867 ResultSet rs = null; 868 cleanupExclusiveLock.readLock().lock(); 869 try { 870 s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement()); 871 s.setString(1, destination.getQualifiedName()); 872 s.setString(2, clientId); 873 s.setString(3, subscriberName); 874 rs = s.executeQuery(); 875 if (!rs.next()) { 876 return null; 877 } 878 return getBinaryData(rs, 1); 879 } finally { 880 close(rs); 881 cleanupExclusiveLock.readLock().unlock(); 882 close(s); 883 } 884 } 885 886 public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, 887 IOException { 888 PreparedStatement s = null; 889 ResultSet rs = null; 890 int result = 0; 891 cleanupExclusiveLock.readLock().lock(); 892 try { 893 s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement()); 894 s.setString(1, destination.getQualifiedName()); 895 rs = s.executeQuery(); 896 if (rs.next()) { 897 result = rs.getInt(1); 898 } 899 } finally { 900 cleanupExclusiveLock.readLock().unlock(); 901 close(rs); 902 close(s); 903 } 904 return result; 905 } 906 907 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, 908 long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception { 909 PreparedStatement s = null; 910 ResultSet rs = null; 911 cleanupExclusiveLock.readLock().lock(); 912 try { 913 if (isPrioritizedMessages) { 914 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement()); 915 } else { 916 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement()); 917 } 918 s.setMaxRows(Math.max(maxReturned * 2, maxRows)); 919 s.setString(1, destination.getQualifiedName()); 920 s.setLong(2, nextSeq); 921 if (isPrioritizedMessages) { 922 s.setLong(3, priority); 923 s.setLong(4, priority); 924 } 925 rs = s.executeQuery(); 926 int count = 0; 927 if (this.statements.isUseExternalMessageReferences()) { 928 while (rs.next() && count < maxReturned) { 929 if (listener.recoverMessageReference(rs.getString(1))) { 930 count++; 931 } else { 932 LOG.debug("Stopped recover next messages"); 933 break; 934 } 935 } 936 } else { 937 while (rs.next() && count < maxReturned) { 938 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 939 count++; 940 } else { 941 LOG.debug("Stopped recover next messages"); 942 break; 943 } 944 } 945 } 946 } catch (Exception e) { 947 e.printStackTrace(); 948 } finally { 949 cleanupExclusiveLock.readLock().unlock(); 950 close(rs); 951 close(s); 952 } 953 } 954 955/* public void dumpTables(Connection c, String destinationName, String clientId, String 956 subscriptionName) throws SQLException { 957 printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 958 printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 959 PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM " 960 + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " 961 + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 962 + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" 963 + " ORDER BY M.ID"); 964 s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName); 965 printQuery(s,System.out); } 966 967 public void dumpTables(Connection c) throws SQLException { 968 printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 969 printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 970 } 971 972 private void printQuery(Connection c, String query, PrintStream out) 973 throws SQLException { 974 printQuery(c.prepareStatement(query), out); 975 } 976 977 private void printQuery(PreparedStatement s, PrintStream out) 978 throws SQLException { 979 980 ResultSet set = null; 981 try { 982 set = s.executeQuery(); 983 ResultSetMetaData metaData = set.getMetaData(); 984 for (int i = 1; i <= metaData.getColumnCount(); i++) { 985 if (i == 1) 986 out.print("||"); 987 out.print(metaData.getColumnName(i) + "||"); 988 } 989 out.println(); 990 while (set.next()) { 991 for (int i = 1; i <= metaData.getColumnCount(); i++) { 992 if (i == 1) 993 out.print("|"); 994 out.print(set.getString(i) + "|"); 995 } 996 out.println(); 997 } 998 } finally { 999 try { 1000 set.close(); 1001 } catch (Throwable ignore) { 1002 } 1003 try { 1004 s.close(); 1005 } catch (Throwable ignore) { 1006 } 1007 } 1008 } */ 1009 1010 public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) 1011 throws SQLException, IOException { 1012 PreparedStatement s = null; 1013 ResultSet rs = null; 1014 cleanupExclusiveLock.readLock().lock(); 1015 try { 1016 s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement()); 1017 s.setString(1, id.toString()); 1018 rs = s.executeQuery(); 1019 long seq = -1; 1020 if (rs.next()) { 1021 seq = rs.getLong(1); 1022 } 1023 return seq; 1024 } finally { 1025 cleanupExclusiveLock.readLock().unlock(); 1026 close(rs); 1027 close(s); 1028 } 1029 } 1030 1031}