001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.File; 020import java.io.IOException; 021import java.sql.Connection; 022import java.sql.SQLException; 023import java.util.Collections; 024import java.util.Set; 025import java.util.concurrent.ScheduledFuture; 026import java.util.concurrent.ScheduledThreadPoolExecutor; 027import java.util.concurrent.ThreadFactory; 028import java.util.concurrent.TimeUnit; 029 030import javax.sql.DataSource; 031 032import org.apache.activemq.ActiveMQMessageAudit; 033import org.apache.activemq.broker.BrokerService; 034import org.apache.activemq.broker.BrokerServiceAware; 035import org.apache.activemq.broker.ConnectionContext; 036import org.apache.activemq.command.ActiveMQDestination; 037import org.apache.activemq.command.ActiveMQQueue; 038import org.apache.activemq.command.ActiveMQTopic; 039import org.apache.activemq.command.Message; 040import org.apache.activemq.command.MessageId; 041import org.apache.activemq.command.ProducerId; 042import org.apache.activemq.openwire.OpenWireFormat; 043import org.apache.activemq.store.MessageStore; 044import org.apache.activemq.store.PersistenceAdapter; 045import org.apache.activemq.store.TopicMessageStore; 046import org.apache.activemq.store.TransactionStore; 047import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter; 048import org.apache.activemq.store.memory.MemoryTransactionStore; 049import org.apache.activemq.usage.SystemUsage; 050import org.apache.activemq.util.ByteSequence; 051import org.apache.activemq.util.FactoryFinder; 052import org.apache.activemq.util.IOExceptionSupport; 053import org.apache.activemq.util.LongSequenceGenerator; 054import org.apache.activemq.wireformat.WireFormat; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058/** 059 * A {@link PersistenceAdapter} implementation using JDBC for persistence 060 * storage. 061 * 062 * This persistence adapter will correctly remember prepared XA transactions, 063 * but it will not keep track of local transaction commits so that operations 064 * performed against the Message store are done as a single uow. 065 * 066 * @org.apache.xbean.XBean element="jdbcPersistenceAdapter" 067 * 068 * 069 */ 070public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter, 071 BrokerServiceAware { 072 073 private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class); 074 private static FactoryFinder adapterFactoryFinder = new FactoryFinder( 075 "META-INF/services/org/apache/activemq/store/jdbc/"); 076 private static FactoryFinder lockFactoryFinder = new FactoryFinder( 077 "META-INF/services/org/apache/activemq/store/jdbc/lock/"); 078 079 private WireFormat wireFormat = new OpenWireFormat(); 080 private BrokerService brokerService; 081 private Statements statements; 082 private JDBCAdapter adapter; 083 private MemoryTransactionStore transactionStore; 084 private ScheduledThreadPoolExecutor clockDaemon; 085 private ScheduledFuture<?> cleanupTicket, keepAliveTicket; 086 private int cleanupPeriod = 1000 * 60 * 5; 087 private boolean useExternalMessageReferences; 088 private boolean useDatabaseLock = true; 089 private long lockKeepAlivePeriod = 1000*30; 090 private long lockAcquireSleepInterval = DefaultDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL; 091 private DatabaseLocker databaseLocker; 092 private boolean createTablesOnStartup = true; 093 private DataSource lockDataSource; 094 private int transactionIsolation; 095 096 protected int maxProducersToAudit=1024; 097 protected int maxAuditDepth=1000; 098 protected boolean enableAudit=false; 099 protected int auditRecoveryDepth = 1024; 100 protected ActiveMQMessageAudit audit; 101 102 protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); 103 protected int maxRows = DefaultJDBCAdapter.MAX_ROWS; 104 105 public JDBCPersistenceAdapter() { 106 } 107 108 public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) { 109 super(ds); 110 this.wireFormat = wireFormat; 111 } 112 113 public Set<ActiveMQDestination> getDestinations() { 114 // Get a connection and insert the message into the DB. 115 TransactionContext c = null; 116 try { 117 c = getTransactionContext(); 118 return getAdapter().doGetDestinations(c); 119 } catch (IOException e) { 120 return emptyDestinationSet(); 121 } catch (SQLException e) { 122 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 123 return emptyDestinationSet(); 124 } finally { 125 if (c != null) { 126 try { 127 c.close(); 128 } catch (Throwable e) { 129 } 130 } 131 } 132 } 133 134 @SuppressWarnings("unchecked") 135 private Set<ActiveMQDestination> emptyDestinationSet() { 136 return Collections.EMPTY_SET; 137 } 138 139 protected void createMessageAudit() { 140 if (enableAudit && audit == null) { 141 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 142 TransactionContext c = null; 143 144 try { 145 c = getTransactionContext(); 146 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() { 147 public void messageId(MessageId id) { 148 audit.isDuplicate(id); 149 } 150 }); 151 } catch (Exception e) { 152 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e); 153 } finally { 154 if (c != null) { 155 try { 156 c.close(); 157 } catch (Throwable e) { 158 } 159 } 160 } 161 } 162 } 163 164 public void initSequenceIdGenerator() { 165 TransactionContext c = null; 166 try { 167 c = getTransactionContext(); 168 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() { 169 public void messageId(MessageId id) { 170 audit.isDuplicate(id); 171 } 172 }); 173 } catch (Exception e) { 174 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e); 175 } finally { 176 if (c != null) { 177 try { 178 c.close(); 179 } catch (Throwable e) { 180 } 181 } 182 } 183 184 } 185 186 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 187 MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit); 188 if (transactionStore != null) { 189 rc = transactionStore.proxy(rc); 190 } 191 return rc; 192 } 193 194 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 195 TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit); 196 if (transactionStore != null) { 197 rc = transactionStore.proxy(rc); 198 } 199 return rc; 200 } 201 202 /** 203 * Cleanup method to remove any state associated with the given destination 204 * No state retained.... nothing to do 205 * 206 * @param destination Destination to forget 207 */ 208 public void removeQueueMessageStore(ActiveMQQueue destination) { 209 } 210 211 /** 212 * Cleanup method to remove any state associated with the given destination 213 * No state retained.... nothing to do 214 * 215 * @param destination Destination to forget 216 */ 217 public void removeTopicMessageStore(ActiveMQTopic destination) { 218 } 219 220 public TransactionStore createTransactionStore() throws IOException { 221 if (transactionStore == null) { 222 transactionStore = new MemoryTransactionStore(this); 223 } 224 return this.transactionStore; 225 } 226 227 public long getLastMessageBrokerSequenceId() throws IOException { 228 TransactionContext c = getTransactionContext(); 229 try { 230 long seq = getAdapter().doGetLastMessageStoreSequenceId(c); 231 sequenceGenerator.setLastSequenceId(seq); 232 long brokerSeq = 0; 233 if (seq != 0) { 234 byte[] msg = getAdapter().doGetMessageById(c, seq); 235 if (msg != null) { 236 Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg)); 237 brokerSeq = last.getMessageId().getBrokerSequenceId(); 238 } else { 239 LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!"); 240 } 241 } 242 return brokerSeq; 243 } catch (SQLException e) { 244 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 245 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e); 246 } finally { 247 c.close(); 248 } 249 } 250 251 public long getLastProducerSequenceId(ProducerId id) throws IOException { 252 TransactionContext c = getTransactionContext(); 253 try { 254 return getAdapter().doGetLastProducerSequenceId(c, id); 255 } catch (SQLException e) { 256 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 257 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e); 258 } finally { 259 c.close(); 260 } 261 } 262 263 264 public void start() throws Exception { 265 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); 266 267 if (isCreateTablesOnStartup()) { 268 TransactionContext transactionContext = getTransactionContext(); 269 transactionContext.begin(); 270 try { 271 try { 272 getAdapter().doCreateTables(transactionContext); 273 } catch (SQLException e) { 274 LOG.warn("Cannot create tables due to: " + e); 275 JDBCPersistenceAdapter.log("Failure Details: ", e); 276 } 277 } finally { 278 transactionContext.commit(); 279 } 280 } 281 282 if (isUseDatabaseLock()) { 283 DatabaseLocker service = getDatabaseLocker(); 284 if (service == null) { 285 LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter"); 286 } else { 287 service.start(); 288 if (lockKeepAlivePeriod > 0) { 289 keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() { 290 public void run() { 291 databaseLockKeepAlive(); 292 } 293 }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS); 294 } 295 if (brokerService != null) { 296 brokerService.getBroker().nowMasterBroker(); 297 } 298 } 299 } 300 301 cleanup(); 302 303 // Cleanup the db periodically. 304 if (cleanupPeriod > 0) { 305 cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() { 306 public void run() { 307 cleanup(); 308 } 309 }, cleanupPeriod, cleanupPeriod, TimeUnit.MILLISECONDS); 310 } 311 312 createMessageAudit(); 313 } 314 315 public synchronized void stop() throws Exception { 316 if (cleanupTicket != null) { 317 cleanupTicket.cancel(true); 318 cleanupTicket = null; 319 } 320 if (keepAliveTicket != null) { 321 keepAliveTicket.cancel(false); 322 keepAliveTicket = null; 323 } 324 325 // do not shutdown clockDaemon as it may kill the thread initiating shutdown 326 DatabaseLocker service = getDatabaseLocker(); 327 if (service != null) { 328 service.stop(); 329 } 330 } 331 332 public void cleanup() { 333 TransactionContext c = null; 334 try { 335 LOG.debug("Cleaning up old messages."); 336 c = getTransactionContext(); 337 getAdapter().doDeleteOldMessages(c, false); 338 getAdapter().doDeleteOldMessages(c, true); 339 } catch (IOException e) { 340 LOG.warn("Old message cleanup failed due to: " + e, e); 341 } catch (SQLException e) { 342 LOG.warn("Old message cleanup failed due to: " + e); 343 JDBCPersistenceAdapter.log("Failure Details: ", e); 344 } finally { 345 if (c != null) { 346 try { 347 c.close(); 348 } catch (Throwable e) { 349 } 350 } 351 LOG.debug("Cleanup done."); 352 } 353 } 354 355 public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) { 356 this.clockDaemon = clockDaemon; 357 } 358 359 public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() { 360 if (clockDaemon == null) { 361 clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { 362 public Thread newThread(Runnable runnable) { 363 Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer"); 364 thread.setDaemon(true); 365 return thread; 366 } 367 }); 368 } 369 return clockDaemon; 370 } 371 372 public JDBCAdapter getAdapter() throws IOException { 373 if (adapter == null) { 374 setAdapter(createAdapter()); 375 } 376 return adapter; 377 } 378 379 public DatabaseLocker getDatabaseLocker() throws IOException { 380 if (databaseLocker == null && isUseDatabaseLock()) { 381 setDatabaseLocker(loadDataBaseLocker()); 382 } 383 return databaseLocker; 384 } 385 386 /** 387 * Sets the database locker strategy to use to lock the database on startup 388 * @throws IOException 389 */ 390 public void setDatabaseLocker(DatabaseLocker locker) throws IOException { 391 databaseLocker = locker; 392 databaseLocker.setPersistenceAdapter(this); 393 databaseLocker.setLockAcquireSleepInterval(getLockAcquireSleepInterval()); 394 } 395 396 public DataSource getLockDataSource() throws IOException { 397 if (lockDataSource == null) { 398 lockDataSource = getDataSource(); 399 if (lockDataSource == null) { 400 throw new IllegalArgumentException( 401 "No dataSource property has been configured"); 402 } 403 } else { 404 LOG.info("Using a separate dataSource for locking: " 405 + lockDataSource); 406 } 407 return lockDataSource; 408 } 409 410 public void setLockDataSource(DataSource dataSource) { 411 this.lockDataSource = dataSource; 412 } 413 414 public BrokerService getBrokerService() { 415 return brokerService; 416 } 417 418 public void setBrokerService(BrokerService brokerService) { 419 this.brokerService = brokerService; 420 } 421 422 /** 423 * @throws IOException 424 */ 425 protected JDBCAdapter createAdapter() throws IOException { 426 427 adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter"); 428 429 // Use the default JDBC adapter if the 430 // Database type is not recognized. 431 if (adapter == null) { 432 adapter = new DefaultJDBCAdapter(); 433 LOG.debug("Using default JDBC Adapter: " + adapter); 434 } 435 return adapter; 436 } 437 438 private Object loadAdapter(FactoryFinder finder, String kind) throws IOException { 439 Object adapter = null; 440 TransactionContext c = getTransactionContext(); 441 try { 442 try { 443 // Make the filename file system safe. 444 String dirverName = c.getConnection().getMetaData().getDriverName(); 445 dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase(); 446 447 try { 448 adapter = finder.newInstance(dirverName); 449 LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass()); 450 } catch (Throwable e) { 451 LOG.info("Database " + kind + " driver override not found for : [" + dirverName 452 + "]. Will use default implementation."); 453 } 454 } catch (SQLException e) { 455 LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: " 456 + e.getMessage()); 457 JDBCPersistenceAdapter.log("Failure Details: ", e); 458 } 459 } finally { 460 c.close(); 461 } 462 return adapter; 463 } 464 465 public void setAdapter(JDBCAdapter adapter) { 466 this.adapter = adapter; 467 this.adapter.setStatements(getStatements()); 468 this.adapter.setMaxRows(getMaxRows()); 469 } 470 471 public WireFormat getWireFormat() { 472 return wireFormat; 473 } 474 475 public void setWireFormat(WireFormat wireFormat) { 476 this.wireFormat = wireFormat; 477 } 478 479 public TransactionContext getTransactionContext(ConnectionContext context) throws IOException { 480 if (context == null) { 481 return getTransactionContext(); 482 } else { 483 TransactionContext answer = (TransactionContext)context.getLongTermStoreContext(); 484 if (answer == null) { 485 answer = getTransactionContext(); 486 context.setLongTermStoreContext(answer); 487 } 488 return answer; 489 } 490 } 491 492 public TransactionContext getTransactionContext() throws IOException { 493 TransactionContext answer = new TransactionContext(this); 494 if (transactionIsolation > 0) { 495 answer.setTransactionIsolation(transactionIsolation); 496 } 497 return answer; 498 } 499 500 public void beginTransaction(ConnectionContext context) throws IOException { 501 TransactionContext transactionContext = getTransactionContext(context); 502 transactionContext.begin(); 503 } 504 505 public void commitTransaction(ConnectionContext context) throws IOException { 506 TransactionContext transactionContext = getTransactionContext(context); 507 transactionContext.commit(); 508 } 509 510 public void rollbackTransaction(ConnectionContext context) throws IOException { 511 TransactionContext transactionContext = getTransactionContext(context); 512 transactionContext.rollback(); 513 } 514 515 public int getCleanupPeriod() { 516 return cleanupPeriod; 517 } 518 519 /** 520 * Sets the number of milliseconds until the database is attempted to be 521 * cleaned up for durable topics 522 */ 523 public void setCleanupPeriod(int cleanupPeriod) { 524 this.cleanupPeriod = cleanupPeriod; 525 } 526 527 public void deleteAllMessages() throws IOException { 528 TransactionContext c = getTransactionContext(); 529 try { 530 getAdapter().doDropTables(c); 531 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); 532 getAdapter().doCreateTables(c); 533 LOG.info("Persistence store purged."); 534 } catch (SQLException e) { 535 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 536 throw IOExceptionSupport.create(e); 537 } finally { 538 c.close(); 539 } 540 } 541 542 public boolean isUseExternalMessageReferences() { 543 return useExternalMessageReferences; 544 } 545 546 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 547 this.useExternalMessageReferences = useExternalMessageReferences; 548 } 549 550 public boolean isCreateTablesOnStartup() { 551 return createTablesOnStartup; 552 } 553 554 /** 555 * Sets whether or not tables are created on startup 556 */ 557 public void setCreateTablesOnStartup(boolean createTablesOnStartup) { 558 this.createTablesOnStartup = createTablesOnStartup; 559 } 560 561 public boolean isUseDatabaseLock() { 562 return useDatabaseLock; 563 } 564 565 /** 566 * Sets whether or not an exclusive database lock should be used to enable 567 * JDBC Master/Slave. Enabled by default. 568 */ 569 public void setUseDatabaseLock(boolean useDatabaseLock) { 570 this.useDatabaseLock = useDatabaseLock; 571 } 572 573 public static void log(String msg, SQLException e) { 574 String s = msg + e.getMessage(); 575 while (e.getNextException() != null) { 576 e = e.getNextException(); 577 s += ", due to: " + e.getMessage(); 578 } 579 LOG.warn(s, e); 580 } 581 582 public Statements getStatements() { 583 if (statements == null) { 584 statements = new Statements(); 585 } 586 return statements; 587 } 588 589 public void setStatements(Statements statements) { 590 this.statements = statements; 591 } 592 593 /** 594 * @param usageManager The UsageManager that is controlling the 595 * destination's memory usage. 596 */ 597 public void setUsageManager(SystemUsage usageManager) { 598 } 599 600 protected void databaseLockKeepAlive() { 601 boolean stop = false; 602 try { 603 DatabaseLocker locker = getDatabaseLocker(); 604 if (locker != null) { 605 if (!locker.keepAlive()) { 606 stop = true; 607 } 608 } 609 } catch (IOException e) { 610 LOG.error("Failed to get database when trying keepalive: " + e, e); 611 } 612 if (stop) { 613 stopBroker(); 614 } 615 } 616 617 protected void stopBroker() { 618 // we can no longer keep the lock so lets fail 619 LOG.info("No longer able to keep the exclusive lock so giving up being a master"); 620 try { 621 brokerService.stop(); 622 } catch (Exception e) { 623 LOG.warn("Failure occurred while stopping broker"); 624 } 625 } 626 627 protected DatabaseLocker loadDataBaseLocker() throws IOException { 628 DatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock"); 629 if (locker == null) { 630 locker = new DefaultDatabaseLocker(); 631 LOG.debug("Using default JDBC Locker: " + locker); 632 } 633 return locker; 634 } 635 636 public void setBrokerName(String brokerName) { 637 } 638 639 public String toString() { 640 return "JDBCPersistenceAdapter(" + super.toString() + ")"; 641 } 642 643 public void setDirectory(File dir) { 644 } 645 646 // interesting bit here is proof that DB is ok 647 public void checkpoint(boolean sync) throws IOException { 648 // by pass TransactionContext to avoid IO Exception handler 649 Connection connection = null; 650 try { 651 connection = getDataSource().getConnection(); 652 } catch (SQLException e) { 653 LOG.debug("Could not get JDBC connection for checkpoint: " + e); 654 throw IOExceptionSupport.create(e); 655 } finally { 656 if (connection != null) { 657 try { 658 connection.close(); 659 } catch (Throwable ignored) { 660 } 661 } 662 } 663 } 664 665 public long size(){ 666 return 0; 667 } 668 669 public long getLockKeepAlivePeriod() { 670 return lockKeepAlivePeriod; 671 } 672 673 public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) { 674 this.lockKeepAlivePeriod = lockKeepAlivePeriod; 675 } 676 677 public long getLockAcquireSleepInterval() { 678 return lockAcquireSleepInterval; 679 } 680 681 /** 682 * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker 683 * not applied if DataBaseLocker is injected. 684 */ 685 public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) { 686 this.lockAcquireSleepInterval = lockAcquireSleepInterval; 687 } 688 689 /** 690 * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED 691 * This allowable dirty isolation level may not be achievable in clustered DB environments 692 * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ 693 * see isolation level constants in {@link java.sql.Connection} 694 * @param transactionIsolation the isolation level to use 695 */ 696 public void setTransactionIsolation(int transactionIsolation) { 697 this.transactionIsolation = transactionIsolation; 698 } 699 700 public int getMaxProducersToAudit() { 701 return maxProducersToAudit; 702 } 703 704 public void setMaxProducersToAudit(int maxProducersToAudit) { 705 this.maxProducersToAudit = maxProducersToAudit; 706 } 707 708 public int getMaxAuditDepth() { 709 return maxAuditDepth; 710 } 711 712 public void setMaxAuditDepth(int maxAuditDepth) { 713 this.maxAuditDepth = maxAuditDepth; 714 } 715 716 public boolean isEnableAudit() { 717 return enableAudit; 718 } 719 720 public void setEnableAudit(boolean enableAudit) { 721 this.enableAudit = enableAudit; 722 } 723 724 public int getAuditRecoveryDepth() { 725 return auditRecoveryDepth; 726 } 727 728 public void setAuditRecoveryDepth(int auditRecoveryDepth) { 729 this.auditRecoveryDepth = auditRecoveryDepth; 730 } 731 732 public long getNextSequenceId() { 733 synchronized(sequenceGenerator) { 734 return sequenceGenerator.getNextSequenceId(); 735 } 736 } 737 738 public int getMaxRows() { 739 return maxRows; 740 } 741 742 /* 743 * the max rows return from queries, with sparse selectors this may need to be increased 744 */ 745 public void setMaxRows(int maxRows) { 746 this.maxRows = maxRows; 747 } 748}