查看原文
其他

JDBC查询各种姿势(普通、流式、游标)

蚊子squirrel SpringForAll社区 2021-05-27

点击上方☝SpringForAll社区 轻松关注!

及时获取有趣有料的技术文章

本文来源:http://r6d.cn/FSzY

问题

通过JDBC对MySQL进行数据查询时,有个很容易踩的坑,以下面代码为例:

    public static void selectNormal() throws SQLException{
        Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test""root""123456");
        PreparedStatement statement = connection.prepareStatement("select * from test",ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY);
        //statement.setFetchSize(100);
        ResultSet resultSet = statement.executeQuery();
        
        while(resultSet.next()){
            System.out.println(resultSet.getString(1));
        }
        resultSet.close();
        statement.close();
        connection.close();
    }

这段代码在查询结果数据条数较大时则会出现内存溢出OOM问题:

为了更容易模拟错误,可将jvm内存设置较小,增加jvm参数 -Xms16m -Xmx16m

Exception in thread "mainjava.lang.OutOfMemoryErrorJava heap space
   at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2213)
   at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1992)
   at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3413)
   at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:471)
   at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3115)
   at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2344)
   at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2739)
   at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486)
   at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
   at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
   at com.cmbc.dap.dao.test.MysqlBatchTest.selectNormal(MysqlBatchTest.java:46)
   at com.cmbc.dap.dao.test.MysqlBatchTest.main(MysqlBatchTest.java:13)

你可能会说设置fetchSize即可,但不幸的是,将上述代码设置fetchSize代码注释打开,依然会报出同样错误,fetchSize并没有生效,MySQL仍然一股脑将所有数据加载到内存,直到撑爆。对于大数据量下查询,如果才能保证应用程序正确运行呢?寻根溯源,我们还是通过查看MySQL驱动源码来找答案。

MySQL驱动 查询实现原理

com.mysql.jdbc.PreparedStatement

public java.sql.ResultSet executeQuery() throws SQLException {
        synchronized (checkClosed().getConnectionMutex()) {

            MySQLConnection locallyScopedConn = this.connection;

            checkForDml(this.originalSql, this.firstCharOfStmt);

            this.batchedGeneratedKeys = null;

            resetCancelledState();

            implicitlyCloseAllOpenResults();

            clearWarnings();

            if (this.doPingInstead) {
                doPingInstead();

                return this.results;
            }

            setupStreamingTimeout(locallyScopedConn);

            Buffer sendPacket = fillSendPacket();

            String oldCatalog = null;

            if (!locallyScopedConn.getCatalog().equals(this.currentCatalog)) {
                oldCatalog = locallyScopedConn.getCatalog();
                locallyScopedConn.setCatalog(this.currentCatalog);
            }

            //
            // Check if we have cached metadata for this query...
            //
            CachedResultSetMetaData cachedMetadata = null;
            if (locallyScopedConn.getCacheResultSetMetadata()) {
                cachedMetadata = locallyScopedConn.getCachedMetaData(this.originalSql);
            }

            Field[] metadataFromCache = null;

            if (cachedMetadata != null) {
                metadataFromCache = cachedMetadata.fields;
            }

            locallyScopedConn.setSessionMaxRows(this.maxRows);

            this.results = executeInternal(this.maxRows, sendPacket, createStreamingResultSet(), true, metadataFromCache, false);

            if (oldCatalog != null) {
                locallyScopedConn.setCatalog(oldCatalog);
            }

            if (cachedMetadata != null) {
                locallyScopedConn.initializeResultsMetadataFromCache(this.originalSql, cachedMetadata, this.results);
            } else {
                if (locallyScopedConn.getCacheResultSetMetadata()) {
                    locallyScopedConn.initializeResultsMetadataFromCache(this.originalSql, null /* will be created */this.results);
                }
            }
            this.lastInsertId = this.results.getUpdateID();

            return this.results;
        }
    }

上面代码中我们特别注意createStreamingResultSet方法,此方法返回是否创建流式结果集,即采用流式查询。流式查询与普通查询不同之处在于并不是一次性将所有数据加载到内存,在调用next()方法时,MySQL驱动只从网络数据流获取到1条数据,然后返回应用,这样就避免了内存溢出问题。我们看下该方法的实现:

 /**
     * We only stream result sets when they are forward-only, read-only, and the
     * fetch size has been set to Integer.MIN_VALUE
     * 
     * @return true if this result set should be streamed row at-a-time, rather
     *         than read all at once.
     */

    protected boolean createStreamingResultSet() {
        return ((this.resultSetType == java.sql.ResultSet.TYPE_FORWARD_ONLY) && (this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY)
                && (this.fetchSize == Integer.MIN_VALUE));
    }

可以看到满足这三个条件即会采用流式查询,前面两个其实就是MySQL创建Statement的默认的游标类型,在PreparedStatement类我们可以看到

    private static final int DEFAULT_RESULT_SET_TYPE = ResultSet.TYPE_FORWARD_ONLY;
    private static final int DEFAULT_RESULT_SET_CONCURRENCY = ResultSet.CONCUR_READ_ONLY;
    public java.sql.PreparedStatement prepareStatement(String sql)
            throws SQLException 
{
        return prepareStatement(sql, DEFAULT_RESULT_SET_TYPE,
                DEFAULT_RESULT_SET_CONCURRENCY);
    }

因此创建statement,不指定后面两个参数默认也是满足流式查询的条件的。

PreparedStatement statement = connection.prepareStatement("select * from test");

而第三个条件却很奇怪,fetchSize必须为Integer.MIN_VALUE即-2147483648,而这样一个负数是MySQL自定义的的特殊含义值,在JDBC接口规范并无此说明。至此我们就知道了如何使用流式查询了,修改代码如下:

public static void selectStream() throws SQLException{
        Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test""root""123456");
        PreparedStatement statement = connection.prepareStatement("select * from test",ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY);
        statement.setFetchSize(Integer.MIN_VALUE);
        
        long begin = System.currentTimeMillis();
        ResultSet resultSet = statement.executeQuery();
        
        while(resultSet.next()){
            //System.out.println(resultSet.getString(1));
        }
        long end = System.currentTimeMillis();
        System.out.println("selectStream span time="+(end-begin) + "ms");
        
        resultSet.close();
        statement.close();
        connection.close();
    }

运行,果然解决了OOM问题,无论数据量多大,都可以正常查询了。

在StatementImpl中有enableStreamingResults()方法,该方法其实就是设置这三个条件的,网上很多文章介绍此种方式开启流式查询,但笔者不太推荐这种方式,因为需要强制转换为MySQL驱动中的StatementImpl类,这其实已经并非JDBC的标准接口。

     public void enableStreamingResults() throws SQLException {
        synchronized (checkClosed().getConnectionMutex()) {
            this.originalResultSetType = this.resultSetType;
            this.originalFetchSize = this.fetchSize;

            setFetchSize(Integer.MIN_VALUE);
            setResultSetType(ResultSet.TYPE_FORWARD_ONLY);
        }
    }

至此,我们已经知道如何使用流式查询解决大数据查询时的OOM问题,但流式查询的实现原理我们还不清楚,因此我们继续看源代码一探究竟,为了更方便展示方法调用层次,我画了一个调用序列图:

我们直接看com.mysql.jdbc.MysqlIO中的getResultSet方法:

/**
     * Build a result set. Delegates to buildResultSetWithRows() to build a
     * JDBC-version-specific ResultSet, given rows as byte data, and field
     * information.
     *
     * @param callingStatement DOCUMENT ME!
     * @param columnCount the number of columns in the result set
     * @param maxRows the maximum number of rows to read (-1 means all rows)
     * @param resultSetType (TYPE_FORWARD_ONLY, TYPE_SCROLL_????)
     * @param resultSetConcurrency the type of result set (CONCUR_UPDATABLE or
     *        READ_ONLY)
     * @param streamResults should the result set be read all at once, or
     *        streamed?
     * @param catalog the database name in use when the result set was created
     * @param isBinaryEncoded is this result set in native encoding?
     * @param unpackFieldInfo should we read MYSQL_FIELD info (if available)?
     *
     * @return a result set
     *
     * @throws SQLException if a database access error occurs
     */

    protected ResultSetImpl getResultSet(StatementImpl callingStatement,
        long columnCount, int maxRows, int resultSetType,
        int resultSetConcurrency, boolean streamResults, String catalog,
        boolean isBinaryEncoded, Field[] metadataFromCache)
        throws SQLException 
{
        Buffer packet; // The packet from the server
        Field[] fields = null;

        // Read in the column information

        if (metadataFromCache == null /* we want the metadata from the server */) {
            fields = new Field[(int) columnCount];

            for (int i = 0; i < columnCount; i++) {
                Buffer fieldPacket = null;

                fieldPacket = readPacket();
                fields[i] = unpackField(fieldPacket, false);
            }
        } else {
            for (int i = 0; i < columnCount; i++) {
                skipPacket();
            }
        }

        packet = reuseAndReadPacket(this.reusablePacket);
        
        readServerStatusForResultSets(packet);

        //
        // Handle cursor-based fetch first
        //

        if (this.connection.versionMeetsMinimum(502)
                && this.connection.getUseCursorFetch()
                && isBinaryEncoded
                && callingStatement != null
                && callingStatement.getFetchSize() != 0
                && callingStatement.getResultSetType() == ResultSet.TYPE_FORWARD_ONLY) {
            ServerPreparedStatement prepStmt = (com.mysql.jdbc.ServerPreparedStatement) callingStatement;

            boolean usingCursor = true;

            //
            // Server versions 5.0.5 or newer will only open
            // a cursor and set this flag if they can, otherwise
            // they punt and go back to mysql_store_results() behavior
            //

            if (this.connection.versionMeetsMinimum(505)) {
                usingCursor = (this.serverStatus &
                        SERVER_STATUS_CURSOR_EXISTS) != 0;
            }

            if (usingCursor) {
                RowData rows = new RowDataCursor(
                    this,
                    prepStmt,
                    fields);

                ResultSetImpl rs = buildResultSetWithRows(
                    callingStatement,
                    catalog,
                    fields,
                    rows, resultSetType, resultSetConcurrency, isBinaryEncoded);

                if (usingCursor) {
                    rs.setFetchSize(callingStatement.getFetchSize());
                }

                return rs;
            }
        }

        RowData rowData = null;

        if (!streamResults) {
            rowData = readSingleRowSet(columnCount, maxRows,
                    resultSetConcurrency, isBinaryEncoded,
                    (metadataFromCache == null) ? fields : metadataFromCache);
        } else {
            rowData = new RowDataDynamic(this, (int) columnCount,
                    (metadataFromCache == null) ? fields : metadataFromCache,
                    isBinaryEncoded);
            this.streamingData = rowData;
        }

        ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog,
                (metadataFromCache == null) ? fields : metadataFromCache,
            rowData, resultSetType, resultSetConcurrency, isBinaryEncoded);



        return rs;
    }

三种查询方式

上代码可以看到,MySQL驱动会根据不同的参数设置选择对应的ResultSet实现类,分别对应三种查询方式:

  • 1. RowDataStatic 静态结果集,默认的查询方式,普通查询
  • 2. RowDataDynamic 动态结果集,流式查询
  • 3. RowDataCursor 游标结果集,服务器端基于游标查询

简单看下这几个类的实现代码:

方式1 普通查询
 private RowData readSingleRowSet(long columnCount, int maxRows, int resultSetConcurrency, boolean isBinaryEncoded, Field[] fields) throws SQLException {
        RowData rowData;
        ArrayList<ResultSetRow> rows = new ArrayList<ResultSetRow>();

        boolean useBufferRowExplicit = useBufferRowExplicit(fields);

        // Now read the data
        ResultSetRow row = nextRow(fields, (int) columnCount, isBinaryEncoded, resultSetConcurrency, false, useBufferRowExplicit, falsenull);

        int rowCount = 0;

        if (row != null) {
            rows.add(row);
            rowCount = 1;
        }

        while (row != null) {
            row = nextRow(fields, (int) columnCount, isBinaryEncoded, resultSetConcurrency, false, useBufferRowExplicit, falsenull);

            if (row != null) {
                if ((maxRows == -1) || (rowCount < maxRows)) {
                    rows.add(row);
                    rowCount++;
                }
            }
        }

        rowData = new RowDataStatic(rows);

        return rowData;
    }

可以看出,此种方式其实就是一次性把查询的所有结果集都保存在本地数组中,所以如果数据量太大,超过jvm内存,则会报文中篇头所示的OOM错误。

方式2 流式查询

每次只获取一条结果集,待应用处理完再次调用next()时,继续获取下一条数据,由代码可以看出流式查询获取数据的方法与普通查询其实是一样的( this.io.nextRow),不同之处在与普通查询时先获取所有数据,然后交给应用处理(next方法其实都是从内存数组遍历),而流式查询时逐条获取,待应用处理完再去拿下一条数据。com.mysql.jdbc.RowDataDynamic

private void nextRecord() throws SQLException {

        try {
            if (!this.noMoreRows) {
                this.nextRow = this.io.nextRow(this.metadata, this.columnCount, this.isBinaryEncoded, java.sql.ResultSet.CONCUR_READ_ONLY, true,
                        this.useBufferRowExplicit, truenull);

                if (this.nextRow == null) {
                    this.noMoreRows = true;
                    this.isAfterEnd = true;
                    this.moreResultsExisted = this.io.tackOnMoreStreamingResults(this.owner);

                    if (this.index == -1) {
                        this.wasEmpty = true;
                    }
                }
            } else {
                this.nextRow = null;
                this.isAfterEnd = true;
            }
        } catch (SQLException sqlEx) {
            if (sqlEx instanceof StreamingNotifiable) {
                ((StreamingNotifiable) sqlEx).setWasStreamingResults();
            }

            // There won't be any more rows
            this.noMoreRows = true;

            // don't wrap SQLExceptions
            throw sqlEx;
        } catch (Exception ex) {
            String exceptionType = ex.getClass().getName();
            String exceptionMessage = ex.getMessage();

            exceptionMessage += Messages.getString("RowDataDynamic.7");
            exceptionMessage += Util.stackTraceToString(ex);

            SQLException sqlEx = SQLError.createSQLException(
                    Messages.getString("RowDataDynamic.8") + exceptionType + Messages.getString("RowDataDynamic.9") + exceptionMessage,
                    SQLError.SQL_STATE_GENERAL_ERROR, this.exceptionInterceptor);
            sqlEx.initCause(ex);

            throw sqlEx;
        }
    }
方式3  RowDataCursor 基于游标

从代码我们惊喜的发现,MySQL其实是支持游标查询的,这种方式下MySQL服务器端一次只发送fetchSize条数据,MySQL驱动会获取完fetchSize条数据后返回给应用,应用处理完继续调用next()时,继续发送fetch命令,继续获取下一批次fetchSize条数据。

 protected List<ResultSetRow> fetchRowsViaCursor(List<ResultSetRow> fetchedRows, long statementId, Field[] columnTypes, int fetchSize,
            boolean useBufferRowExplicit) throws SQLException 
{

        if (fetchedRows == null) {
            fetchedRows = new ArrayList<ResultSetRow>(fetchSize);
        } else {
            fetchedRows.clear();
        }

        this.sharedSendPacket.clear();

        this.sharedSendPacket.writeByte((byte) MysqlDefs.COM_FETCH);
        this.sharedSendPacket.writeLong(statementId);
        this.sharedSendPacket.writeLong(fetchSize);

        sendCommand(MysqlDefs.COM_FETCH, nullthis.sharedSendPacket, truenull0);

        ResultSetRow row = null;

        while ((row = nextRow(columnTypes, columnTypes.length, true, ResultSet.CONCUR_READ_ONLY, false, useBufferRowExplicit, falsenull)) != null) {
            fetchedRows.add(row);
        }

        return fetchedRows;
    }

我们看下基于游标的查询测试代码:(设置useCursorFetch=true,指定fetchSize)

public static void selectStreamWithUseCursorFetch() throws SQLException{
        Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useCursorFetch=true""root""123456");
        PreparedStatement statement = connection.prepareStatement("select * from test");
        statement.setFetchSize(10);
        
        long begin = System.currentTimeMillis();
        ResultSet resultSet = statement.executeQuery();
        
        
        while(resultSet.next()){
            //System.out.println(resultSet.getString(1));
        }
        
        long end = System.currentTimeMillis();
        System.out.println("selectStreamWithUseCursorFetch span time="+(end-begin) + "ms");
        resultSet.close();
        statement.close();
        connection.close();
    }

运行发现大数据量时这种方式也可正常运行。应用指定每次查询获取的条数fetchSize,MySQL服务器每次只查询指定条数的数据,因此单次查询相比与前面两种方式占用MySQL时间较短。但由于MySQL方不知道客户端什么时候将数据消费完,MySQL需要建立一个临时空间来存放每次查询出的数据,大数据量时MySQL服务器IOPS、磁盘占用都会飙升,而且需要与服务器进行更多次的网络通讯,因此最终查询效率是不如流式查询的。

本地测试查询100w数据,方式2与方式3执行时间对比:

selectStreamWithUseCursorFetch span time=507ms selectStream span time=155ms

从结果上看,由于基于游标方式,服务器端需要更多额外处理,查询性能更低些,对于大数据量一般情况下推荐基于动态结果集的流式查询。

总结:

本文通过对MySQL驱动中查询模块的源码进行剖析,可知MySQL支持三种不同的查询方式,分别适用不同的场景,了解其各自优缺点后,才能在实际项目中正确使用。一、普通查询

  • 优点:应用代码简单,数据量较小时操作速度快。
  • 缺点:数据量大时会出现OOM问题。

二、流式查询

  • 优点:大数据量时不会有OOM问题。
  • 缺点:占用数据库时间更长,导致网络拥塞的可能性较大。

三、游标查询

  • 优点:大数据量时不会有OOM问题,相比流式查询对数据库单次占用时间较短。
  • 缺点:相比流式查询,对服务端资源消耗更大,响应时间更长。


墙裂推荐

【深度】互联网技术人的社群,点击了解!



● Spring事务的坑都给你总结好了!!!

● API网关正在经历身份危机

● 后端生成Token架构与设计详解

● Hadoop 框架学习笔记之整体认知



关注公众号,回复“spring”有惊喜!!!

如果资源对你有帮助的话


❤️给个「在看」,是最大的支持❤️

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存