001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.File;
020import java.io.IOException;
021import java.sql.Connection;
022import java.sql.SQLException;
023import java.util.Collections;
024import java.util.Set;
025import java.util.concurrent.ScheduledFuture;
026import java.util.concurrent.ScheduledThreadPoolExecutor;
027import java.util.concurrent.ThreadFactory;
028import java.util.concurrent.TimeUnit;
029
030import javax.sql.DataSource;
031
032import org.apache.activemq.ActiveMQMessageAudit;
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.BrokerServiceAware;
035import org.apache.activemq.broker.ConnectionContext;
036import org.apache.activemq.command.ActiveMQDestination;
037import org.apache.activemq.command.ActiveMQQueue;
038import org.apache.activemq.command.ActiveMQTopic;
039import org.apache.activemq.command.Message;
040import org.apache.activemq.command.MessageId;
041import org.apache.activemq.command.ProducerId;
042import org.apache.activemq.openwire.OpenWireFormat;
043import org.apache.activemq.store.MessageStore;
044import org.apache.activemq.store.PersistenceAdapter;
045import org.apache.activemq.store.TopicMessageStore;
046import org.apache.activemq.store.TransactionStore;
047import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
048import org.apache.activemq.store.memory.MemoryTransactionStore;
049import org.apache.activemq.usage.SystemUsage;
050import org.apache.activemq.util.ByteSequence;
051import org.apache.activemq.util.FactoryFinder;
052import org.apache.activemq.util.IOExceptionSupport;
053import org.apache.activemq.util.LongSequenceGenerator;
054import org.apache.activemq.wireformat.WireFormat;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058/**
059 * A {@link PersistenceAdapter} implementation using JDBC for persistence
060 * storage.
061 * 
062 * This persistence adapter will correctly remember prepared XA transactions,
063 * but it will not keep track of local transaction commits so that operations
064 * performed against the Message store are done as a single uow.
065 * 
066 * @org.apache.xbean.XBean element="jdbcPersistenceAdapter"
067 * 
068 * 
069 */
070public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter,
071    BrokerServiceAware {
072
073    private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
074    private static FactoryFinder adapterFactoryFinder = new FactoryFinder(
075                                                                   "META-INF/services/org/apache/activemq/store/jdbc/");
076    private static FactoryFinder lockFactoryFinder = new FactoryFinder(
077                                                                    "META-INF/services/org/apache/activemq/store/jdbc/lock/");
078
079    private WireFormat wireFormat = new OpenWireFormat();
080    private BrokerService brokerService;
081    private Statements statements;
082    private JDBCAdapter adapter;
083    private MemoryTransactionStore transactionStore;
084    private ScheduledThreadPoolExecutor clockDaemon;
085    private ScheduledFuture<?> cleanupTicket, keepAliveTicket;
086    private int cleanupPeriod = 1000 * 60 * 5;
087    private boolean useExternalMessageReferences;
088    private boolean useDatabaseLock = true;
089    private long lockKeepAlivePeriod = 1000*30;
090    private long lockAcquireSleepInterval = DefaultDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
091    private DatabaseLocker databaseLocker;
092    private boolean createTablesOnStartup = true;
093    private DataSource lockDataSource;
094    private int transactionIsolation;
095    
096    protected int maxProducersToAudit=1024;
097    protected int maxAuditDepth=1000;
098    protected boolean enableAudit=false;
099    protected int auditRecoveryDepth = 1024;
100    protected ActiveMQMessageAudit audit;
101    
102    protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
103    protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
104
105    public JDBCPersistenceAdapter() {
106    }
107
108    public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
109        super(ds);
110        this.wireFormat = wireFormat;
111    }
112
113    public Set<ActiveMQDestination> getDestinations() {
114        // Get a connection and insert the message into the DB.
115        TransactionContext c = null;
116        try {
117            c = getTransactionContext();
118            return getAdapter().doGetDestinations(c);
119        } catch (IOException e) {
120            return emptyDestinationSet();
121        } catch (SQLException e) {
122            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
123            return emptyDestinationSet();
124        } finally {
125            if (c != null) {
126                try {
127                    c.close();
128                } catch (Throwable e) {
129                }
130            }
131        }
132    }
133
134    @SuppressWarnings("unchecked")
135    private Set<ActiveMQDestination> emptyDestinationSet() {
136        return Collections.EMPTY_SET;
137    }
138    
139    protected void createMessageAudit() {
140        if (enableAudit && audit == null) {
141            audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
142            TransactionContext c = null;
143            
144            try {
145                c = getTransactionContext();
146                getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
147                    public void messageId(MessageId id) {
148                        audit.isDuplicate(id);
149                    }
150                });
151            } catch (Exception e) {
152                LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
153            } finally {
154                if (c != null) {
155                    try {
156                        c.close();
157                    } catch (Throwable e) {
158                    }
159                }
160            }
161        }
162    }
163    
164    public void initSequenceIdGenerator() {
165        TransactionContext c = null;
166        try {
167            c = getTransactionContext();
168            getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
169                public void messageId(MessageId id) {
170                    audit.isDuplicate(id);
171                }
172            });
173        } catch (Exception e) {
174            LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
175        } finally {
176            if (c != null) {
177                try {
178                    c.close();
179                } catch (Throwable e) {
180                }
181            }
182        }
183        
184    }
185
186    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
187        MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
188        if (transactionStore != null) {
189            rc = transactionStore.proxy(rc);
190        }
191        return rc;
192    }
193
194    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
195        TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
196        if (transactionStore != null) {
197            rc = transactionStore.proxy(rc);
198        }
199        return rc;
200    }
201
202    /**
203     * Cleanup method to remove any state associated with the given destination
204     * No state retained.... nothing to do
205     *
206     * @param destination Destination to forget
207     */
208    public void removeQueueMessageStore(ActiveMQQueue destination) {
209    }
210
211    /**
212     * Cleanup method to remove any state associated with the given destination
213     * No state retained.... nothing to do
214     *
215     * @param destination Destination to forget
216     */
217    public void removeTopicMessageStore(ActiveMQTopic destination) {
218    }
219
220    public TransactionStore createTransactionStore() throws IOException {
221        if (transactionStore == null) {
222            transactionStore = new MemoryTransactionStore(this);
223        }
224        return this.transactionStore;
225    }
226
227    public long getLastMessageBrokerSequenceId() throws IOException {
228        TransactionContext c = getTransactionContext();
229        try {
230            long seq =  getAdapter().doGetLastMessageStoreSequenceId(c);
231            sequenceGenerator.setLastSequenceId(seq);
232            long brokerSeq = 0;
233            if (seq != 0) {
234                byte[] msg = getAdapter().doGetMessageById(c, seq);
235                if (msg != null) {
236                    Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg));
237                    brokerSeq = last.getMessageId().getBrokerSequenceId();
238                } else {
239                   LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!");
240                }
241            }
242            return brokerSeq;
243        } catch (SQLException e) {
244            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
245            throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
246        } finally {
247            c.close();
248        }
249    }
250    
251    public long getLastProducerSequenceId(ProducerId id) throws IOException {
252        TransactionContext c = getTransactionContext();
253        try {
254            return getAdapter().doGetLastProducerSequenceId(c, id);
255        } catch (SQLException e) {
256            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
257            throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
258        } finally {
259            c.close();
260        }
261    }
262
263
264    public void start() throws Exception {
265        getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
266
267        if (isCreateTablesOnStartup()) {
268            TransactionContext transactionContext = getTransactionContext();
269            transactionContext.begin();
270            try {
271                try {
272                    getAdapter().doCreateTables(transactionContext);
273                } catch (SQLException e) {
274                    LOG.warn("Cannot create tables due to: " + e);
275                    JDBCPersistenceAdapter.log("Failure Details: ", e);
276                }
277            } finally {
278                transactionContext.commit();
279            }
280        }
281
282        if (isUseDatabaseLock()) {
283            DatabaseLocker service = getDatabaseLocker();
284            if (service == null) {
285                LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
286            } else {
287                service.start();
288                if (lockKeepAlivePeriod > 0) {
289                    keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
290                        public void run() {
291                            databaseLockKeepAlive();
292                        }
293                    }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
294                }
295                if (brokerService != null) {
296                    brokerService.getBroker().nowMasterBroker();
297                }
298            }
299        }
300
301        cleanup();
302
303        // Cleanup the db periodically.
304        if (cleanupPeriod > 0) {
305            cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
306                public void run() {
307                    cleanup();
308                }
309            }, cleanupPeriod, cleanupPeriod, TimeUnit.MILLISECONDS);
310        }
311        
312        createMessageAudit();
313    }
314
315    public synchronized void stop() throws Exception {
316        if (cleanupTicket != null) {
317            cleanupTicket.cancel(true);
318            cleanupTicket = null;
319        }
320        if (keepAliveTicket != null) {
321            keepAliveTicket.cancel(false);
322            keepAliveTicket = null;
323        }
324        
325        // do not shutdown clockDaemon as it may kill the thread initiating shutdown
326        DatabaseLocker service = getDatabaseLocker();
327        if (service != null) {
328            service.stop();
329        }
330    }
331
332    public void cleanup() {
333        TransactionContext c = null;
334        try {
335            LOG.debug("Cleaning up old messages.");
336            c = getTransactionContext();
337            getAdapter().doDeleteOldMessages(c, false);
338            getAdapter().doDeleteOldMessages(c, true);
339        } catch (IOException e) {
340            LOG.warn("Old message cleanup failed due to: " + e, e);
341        } catch (SQLException e) {
342            LOG.warn("Old message cleanup failed due to: " + e);
343            JDBCPersistenceAdapter.log("Failure Details: ", e);
344        } finally {
345            if (c != null) {
346                try {
347                    c.close();
348                } catch (Throwable e) {
349                }
350            }
351            LOG.debug("Cleanup done.");
352        }
353    }
354
355    public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) {
356        this.clockDaemon = clockDaemon;
357    }
358
359    public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
360        if (clockDaemon == null) {
361            clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
362                public Thread newThread(Runnable runnable) {
363                    Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer");
364                    thread.setDaemon(true);
365                    return thread;
366                }
367            });
368        }
369        return clockDaemon;
370    }
371
372    public JDBCAdapter getAdapter() throws IOException {
373        if (adapter == null) {
374            setAdapter(createAdapter());
375        }
376        return adapter;
377    }
378
379    public DatabaseLocker getDatabaseLocker() throws IOException {
380        if (databaseLocker == null && isUseDatabaseLock()) {
381            setDatabaseLocker(loadDataBaseLocker());
382        }
383        return databaseLocker;
384    }
385
386    /**
387     * Sets the database locker strategy to use to lock the database on startup
388     * @throws IOException 
389     */
390    public void setDatabaseLocker(DatabaseLocker locker) throws IOException {
391        databaseLocker = locker;
392        databaseLocker.setPersistenceAdapter(this);
393        databaseLocker.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
394    }
395
396    public DataSource getLockDataSource() throws IOException {
397        if (lockDataSource == null) {
398            lockDataSource = getDataSource();
399            if (lockDataSource == null) {
400                throw new IllegalArgumentException(
401                        "No dataSource property has been configured");
402            }
403        } else {
404            LOG.info("Using a separate dataSource for locking: "
405                    + lockDataSource);
406        }
407        return lockDataSource;
408    }
409    
410    public void setLockDataSource(DataSource dataSource) {
411        this.lockDataSource = dataSource;
412    }
413
414    public BrokerService getBrokerService() {
415        return brokerService;
416    }
417
418    public void setBrokerService(BrokerService brokerService) {
419        this.brokerService = brokerService;
420    }
421
422    /**
423     * @throws IOException
424     */
425    protected JDBCAdapter createAdapter() throws IOException {
426       
427        adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
428       
429        // Use the default JDBC adapter if the
430        // Database type is not recognized.
431        if (adapter == null) {
432            adapter = new DefaultJDBCAdapter();
433            LOG.debug("Using default JDBC Adapter: " + adapter);
434        }
435        return adapter;
436    }
437
438    private Object loadAdapter(FactoryFinder finder, String kind) throws IOException {
439        Object adapter = null;
440        TransactionContext c = getTransactionContext();
441        try {
442            try {
443                // Make the filename file system safe.
444                String dirverName = c.getConnection().getMetaData().getDriverName();
445                dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase();
446
447                try {
448                    adapter = finder.newInstance(dirverName);
449                    LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass());
450                } catch (Throwable e) {
451                    LOG.info("Database " + kind + " driver override not found for : [" + dirverName
452                             + "].  Will use default implementation.");
453                }
454            } catch (SQLException e) {
455                LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: "
456                          + e.getMessage());
457                JDBCPersistenceAdapter.log("Failure Details: ", e);
458            }
459        } finally {
460            c.close();
461        }
462        return adapter;
463    }
464
465    public void setAdapter(JDBCAdapter adapter) {
466        this.adapter = adapter;
467        this.adapter.setStatements(getStatements());
468        this.adapter.setMaxRows(getMaxRows());
469    }
470
471    public WireFormat getWireFormat() {
472        return wireFormat;
473    }
474
475    public void setWireFormat(WireFormat wireFormat) {
476        this.wireFormat = wireFormat;
477    }
478
479    public TransactionContext getTransactionContext(ConnectionContext context) throws IOException {
480        if (context == null) {
481            return getTransactionContext();
482        } else {
483            TransactionContext answer = (TransactionContext)context.getLongTermStoreContext();
484            if (answer == null) {
485                answer = getTransactionContext();
486                context.setLongTermStoreContext(answer);
487            }
488            return answer;
489        }
490    }
491
492    public TransactionContext getTransactionContext() throws IOException {
493        TransactionContext answer = new TransactionContext(this);
494        if (transactionIsolation > 0) {
495            answer.setTransactionIsolation(transactionIsolation);
496        }
497        return answer;
498    }
499
500    public void beginTransaction(ConnectionContext context) throws IOException {
501        TransactionContext transactionContext = getTransactionContext(context);
502        transactionContext.begin();
503    }
504
505    public void commitTransaction(ConnectionContext context) throws IOException {
506        TransactionContext transactionContext = getTransactionContext(context);
507        transactionContext.commit();
508    }
509
510    public void rollbackTransaction(ConnectionContext context) throws IOException {
511        TransactionContext transactionContext = getTransactionContext(context);
512        transactionContext.rollback();
513    }
514
515    public int getCleanupPeriod() {
516        return cleanupPeriod;
517    }
518
519    /**
520     * Sets the number of milliseconds until the database is attempted to be
521     * cleaned up for durable topics
522     */
523    public void setCleanupPeriod(int cleanupPeriod) {
524        this.cleanupPeriod = cleanupPeriod;
525    }
526
527    public void deleteAllMessages() throws IOException {
528        TransactionContext c = getTransactionContext();
529        try {
530            getAdapter().doDropTables(c);
531            getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
532            getAdapter().doCreateTables(c);
533            LOG.info("Persistence store purged.");
534        } catch (SQLException e) {
535            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
536            throw IOExceptionSupport.create(e);
537        } finally {
538            c.close();
539        }
540    }
541
542    public boolean isUseExternalMessageReferences() {
543        return useExternalMessageReferences;
544    }
545
546    public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
547        this.useExternalMessageReferences = useExternalMessageReferences;
548    }
549
550    public boolean isCreateTablesOnStartup() {
551        return createTablesOnStartup;
552    }
553
554    /**
555     * Sets whether or not tables are created on startup
556     */
557    public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
558        this.createTablesOnStartup = createTablesOnStartup;
559    }
560
561    public boolean isUseDatabaseLock() {
562        return useDatabaseLock;
563    }
564
565    /**
566     * Sets whether or not an exclusive database lock should be used to enable
567     * JDBC Master/Slave. Enabled by default.
568     */
569    public void setUseDatabaseLock(boolean useDatabaseLock) {
570        this.useDatabaseLock = useDatabaseLock;
571    }
572
573    public static void log(String msg, SQLException e) {
574        String s = msg + e.getMessage();
575        while (e.getNextException() != null) {
576            e = e.getNextException();
577            s += ", due to: " + e.getMessage();
578        }
579        LOG.warn(s, e);
580    }
581
582    public Statements getStatements() {
583        if (statements == null) {
584            statements = new Statements();
585        }
586        return statements;
587    }
588
589    public void setStatements(Statements statements) {
590        this.statements = statements;
591    }
592
593    /**
594     * @param usageManager The UsageManager that is controlling the
595     *                destination's memory usage.
596     */
597    public void setUsageManager(SystemUsage usageManager) {
598    }
599
600    protected void databaseLockKeepAlive() {
601        boolean stop = false;
602        try {
603            DatabaseLocker locker = getDatabaseLocker();
604            if (locker != null) {
605                if (!locker.keepAlive()) {
606                    stop = true;
607                }
608            }
609        } catch (IOException e) {
610            LOG.error("Failed to get database when trying keepalive: " + e, e);
611        }
612        if (stop) {
613            stopBroker();
614        }
615    }
616
617    protected void stopBroker() {
618        // we can no longer keep the lock so lets fail
619        LOG.info("No longer able to keep the exclusive lock so giving up being a master");
620        try {
621            brokerService.stop();
622        } catch (Exception e) {
623            LOG.warn("Failure occurred while stopping broker");
624        }
625    }
626
627    protected DatabaseLocker loadDataBaseLocker() throws IOException {
628        DatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");       
629        if (locker == null) {
630            locker = new DefaultDatabaseLocker();
631            LOG.debug("Using default JDBC Locker: " + locker);
632        }
633        return locker;
634    }
635
636    public void setBrokerName(String brokerName) {
637    }
638
639    public String toString() {
640        return "JDBCPersistenceAdapter(" + super.toString() + ")";
641    }
642
643    public void setDirectory(File dir) {
644    }
645
646    // interesting bit here is proof that DB is ok
647    public void checkpoint(boolean sync) throws IOException {
648        // by pass TransactionContext to avoid IO Exception handler
649        Connection connection = null;
650        try {
651            connection = getDataSource().getConnection();
652        } catch (SQLException e) {
653            LOG.debug("Could not get JDBC connection for checkpoint: " + e);
654            throw IOExceptionSupport.create(e);
655        } finally {
656            if (connection != null) {
657                try {
658                    connection.close();
659                } catch (Throwable ignored) {
660                }
661            }
662        }
663    }
664
665    public long size(){
666        return 0;
667    }
668
669    public long getLockKeepAlivePeriod() {
670        return lockKeepAlivePeriod;
671    }
672
673    public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
674        this.lockKeepAlivePeriod = lockKeepAlivePeriod;
675    }
676
677    public long getLockAcquireSleepInterval() {
678        return lockAcquireSleepInterval;
679    }
680
681    /**
682     * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
683     * not applied if DataBaseLocker is injected.
684     */
685    public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
686        this.lockAcquireSleepInterval = lockAcquireSleepInterval;
687    }
688    
689    /**
690     * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
691     * This allowable dirty isolation level may not be achievable in clustered DB environments
692     * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ
693     * see isolation level constants in {@link java.sql.Connection}
694     * @param transactionIsolation the isolation level to use
695     */
696    public void setTransactionIsolation(int transactionIsolation) {
697        this.transactionIsolation = transactionIsolation;
698    }
699
700        public int getMaxProducersToAudit() {
701                return maxProducersToAudit;
702        }
703
704        public void setMaxProducersToAudit(int maxProducersToAudit) {
705                this.maxProducersToAudit = maxProducersToAudit;
706        }
707
708        public int getMaxAuditDepth() {
709                return maxAuditDepth;
710        }
711
712        public void setMaxAuditDepth(int maxAuditDepth) {
713                this.maxAuditDepth = maxAuditDepth;
714        }
715
716        public boolean isEnableAudit() {
717                return enableAudit;
718        }
719
720        public void setEnableAudit(boolean enableAudit) {
721                this.enableAudit = enableAudit;
722        }
723
724    public int getAuditRecoveryDepth() {
725        return auditRecoveryDepth;
726    }
727
728    public void setAuditRecoveryDepth(int auditRecoveryDepth) {
729        this.auditRecoveryDepth = auditRecoveryDepth;
730    }
731
732    public long getNextSequenceId() {
733        synchronized(sequenceGenerator) {
734            return sequenceGenerator.getNextSequenceId();
735        }
736    }
737
738    public int getMaxRows() {
739        return maxRows;
740    }
741
742    /*
743     * the max rows return from queries, with sparse selectors this may need to be increased
744     */
745    public void setMaxRows(int maxRows) {
746        this.maxRows = maxRows;
747    }
748}