使用Java Stream消耗数据库游标

huangapple go评论74阅读模式
英文:

Consuming a database cursor using a Java Stream

问题

我想使用Java Stream来消耗数据库游标。我希望Java Stream在需要时获取并处理行,避免首先在内存中加载所有500万行,然后再进行处理。

是否有可能在不将整个表加载到内存中的情况下进行消耗?

到目前为止,我的代码如下:

Cursor<Product> products = DAO.selectCursor(...);

// 1. 初始化变量
long count = 0;
...
for (Iterator<Product> it = products.iterator(); it.hasNext();) {
  Product p = it.next();
  // 2. 处理每一行
  ...
}
// 3. 结束(处理总计、统计等)
double avg = total / count;
...

这样做确实可以,但有些繁琐,我想利用Stream API提供的便利性。

英文:

I would like to consume a database cursor using a Java Stream. I would prefer the Java stream to fetch and process the rows as they are needed, and avoid loading all 5 million rows in memory first, and then processing them later.

Is it possible to consume it without loading the whole table in RAM?

So far my code looks like:

Cursor&lt;Product&gt; products = DAO.selectCursor(...);

// 1. Initialize variables
long count = 0;
...
for (Iterator&lt;Product&gt; it = products.iterator(); it.hasNext();) {
  Product p = it.next();
  // 2. Processing each row
  ...
}
// 3. Concluding (processing totals, stats, etc.)
double avg = total / count;
...

It does work well, but it's a bit cumbersome and I would like to take advantage of the Stream API.

答案1

得分: 4

首先,我们必须讨论如何从数据库中获取数据。如果您的意图是遍历大量记录,并且不想一次性将它们全部加载到内存中,您有两个选择:

  1. 对结果进行分页。
  2. 使您的驱动程序进行分页。

如果您已经有一个基于Cursor的迭代器,根据需要检索分页数据,那么您可以使用JDK API中的SpliteratorsStreamSupport实用类将其转换为Stream

Stream<Product> products = StreamSupport.stream(
                Spliterators.spliteratorUnknownSize(cursor.iterator(),
                        Spliterator.NONNULL |
                                Spliterator.ORDERED |
                                Spliterator.IMMUTABLE), false)

否则,您将不得不构建自己的内容。

驱动程序分页

如果您的JDBC驱动程序支持提取大小属性,您可以这样做:

Connection con = ds.getConnection();
con.setAutoCommit(false);
PreparedStatement stm = con.prepareStatement("SELECT order_number FROM orders WHERE order_date >= '2018-08-12'", ResultSet.TYPE_FORWARD_ONLY);
stm.setFetchSize(1000);
ResultSet rs = stm.executeQuery();

此时,rs 包含了前1000条记录的提取,直到您读取了前一页才会从数据库中检索更多数据。

在所有这些内容中,棘手的部分在于在完成阅读所有记录之前,您不能关闭任何资源(即连接、预处理语句和结果集),并且由于我们要构建的流默认上是惰性的,这意味着我们必须保持所有这些资源保持打开状态,直到我们完成了流的处理。

也许最简单的方法是围绕这种逻辑构建一个迭代器,当迭代器实际达到所有数据的末尾时,然后可以关闭所有资源(即 !rs.next() ),或者另一种替代方案是在流关闭时执行所有工作(Stream.onClose() )。

一旦我们有了迭代器,使用JDK API中的SpliteratorsStreamSupport实用类非常简单地可以构建出一个流。

我简陋的实现可能看起来有点像这样。这只是为了说明目的。您可能需要根据您的特定情况进行更多的处理。

public Stream<String> getUsers() {
    DataSource ds = jdbcTemplate.getDataSource();
    try {
        Connection conn = ds.getConnection();
        conn.setAutoCommit(false);
        PreparedStatement stm = conn.prepareStatement("SELECT id FROM users", ResultSet.TYPE_FORWARD_ONLY);
        //fetch size是保证每次仅有1000条记录
        stm.setFetchSize(1000);
        ResultSet rs = stm.executeQuery();

        Iterator<String> sqlIter = new Iterator<>() {
            @Override
            public boolean hasNext() {
                try {
                    return rs.next();
                } catch (SQLException e) {
                    closeResources(conn, stm, rs);
                    throw new RuntimeException("Failed to read record from ResultSet", e);
                }
            }

            @Override
            public String next() {
                try {
                    return rs.getString("id");
                } catch (SQLException e) {
                    closeResources(conn, stm, rs);
                    throw new RuntimeException("Failed to read record from ResultSet", e);
                }
            }
        };

        //将迭代器转换为流
        return StreamSupport.stream(
                Spliterators.spliteratorUnknownSize(sqlIter,
                        Spliterator.NONNULL |
                                Spliterator.ORDERED |
                                Spliterator.IMMUTABLE), false
        ).onClose(() -> {
            //确保在完成流处理时关闭资源
            closeResources(conn, stm, rs);
        });


    } catch (SQLException e) {
        logger.error("Failed to process data", e);
        throw new RuntimeException(e);
    }
}

private void closeResources(Connection conn, PreparedStatement ps, ResultSet rs) {
    try (conn; ps; rs) {
        logger.info("Resources successfully closed");
    } catch (SQLException e) {
        logger.warn("Failed to properly close database sources", e);
    }
}

关键点在于要注意我们返回的流应该运行一些onClose逻辑,因此在使用流时,务必在完成处理后执行stream.close()以确保关闭到此点为止的所有资源(即connstmrs)。

可能最好的方式是使用try-with-resources,以便try语句会负责关闭流。

try (Stream<String> users = userRepo.getUsers()) {
    //将用户打印到主输出,每次检索1K个
    users.forEach(System.out::println);
}

手动分页

另一种方法是自己进行分页,这取决于数据库,但是使用类似于limit和offset的select子句,您可以请求特定页面的记录,处理它们,然后检索更多。

select id from users LIMIT 1000 OFFSET 5

在这种情况下,您的迭代器将消耗所有页面,并在完成时请求下一页,直到在最后一页找不到更多记录为止。

这种另一种方法的优势在于资源可以立即在迭代器本身中进行控制。

我不会为此开发一个示例,留给您尝试。

英文:

First, we must discuss how you are going to get the data from your database. If your intent is to go over a large number of records, and you don't want to load them all at once in memory, you have two options:

  1. Paginate the results.
  2. Make your driver paginate the results.

If you already have an iterator based on a Cursor that retrieves paginated data as needed, then you can use Spliterators and StreamSupport utility classes from JDK API to turn it into a Stream.

Stream&lt;Product&gt; products = StreamSupport.stream(
                Spliterators.spliteratorUnknownSize(cursor.iterator(),
                        Spliterator.NONNULL |
                                Spliterator.ORDERED |
                                Spliterator.IMMUTABLE), false)

Otherwise you will have to build something of your own.

Driver Pagination

If you JDBC driver supports the fetch size property you can do something like this:

Connection con = ds.getConnection();
con.setAutoCommit(false);
PreparedStatement stm = con.prepareStatement(&quot;SELECT order_number FROM orders WHERE order_date &gt;= &#39;2018-08-12&#39;&quot;, ResultSet.TYPE_FORWARD_ONLY);
stm.setFetchSize(1000);
ResultSet rs = stm.executeQuery();

At this point, rs contains the first fetch of 1000 records, and it won't retrieve more from the database until you have read the previous page.

The tricky part on all this is that you cannot close any of the resources (i.e. connection, prepared statement and resultset) until you are done reading all the records, and since the stream we want to build is lazy by default, this implies we have to keep all these resources opened until we are done with the stream.

Perhaps the easiest way is to build an Iterator around this logic, and when the iterator actually reaches the end of all the data, then you can close all the resources (i.e. !rs.next()) or another alternative is to do all the work when the stream is closed (Stream.onClose()).

Once we have an iterator, is very simple to build a stream out of it using Spliterators and StreamSupport utility classes from the JDK API.

My rudimentary implementation would look somewhat like this. This is just for illustration purposes. You may want to give it some more love to your particular case.

public Stream&lt;String&gt; getUsers() {
    DataSource ds = jdbcTemplate.getDataSource();
    try {
        Connection conn = ds.getConnection();
        conn.setAutoCommit(false);
        PreparedStatement stm = conn.prepareStatement(&quot;SELECT id FROM users&quot;, ResultSet.TYPE_FORWARD_ONLY);
        //fetch size is what guarantees only 1000 records at the time
        stm.setFetchSize(1000);
        ResultSet rs = stm.executeQuery();

        Iterator&lt;String&gt; sqlIter = new Iterator&lt;&gt;() {
            @Override
            public boolean hasNext() {
                try {
                    return rs.next();
                } catch (SQLException e) {
                    closeResources(conn, stm, rs);
                    throw new RuntimeException(&quot;Failed to read record from ResultSet&quot;, e);
                }
            }

            @Override
            public String next() {
                try {
                    return rs.getString(&quot;id&quot;);
                } catch (SQLException e) {
                    closeResources(conn, stm, rs);
                    throw new RuntimeException(&quot;Failed to read record from ResultSet&quot;, e);
                }
            }
        };

        //turn iterator into a stream
        return StreamSupport.stream(
                Spliterators.spliteratorUnknownSize(sqlIter,
                        Spliterator.NONNULL |
                                Spliterator.ORDERED |
                                Spliterator.IMMUTABLE), false
        ).onClose(() -&gt; {
            //make sure to close resources when done with the stream
            closeResources(conn, stm, rs);
        });


    } catch (SQLException e) {
        logger.error(&quot;Failed to process data&quot;, e);
        throw new RuntimeException(e);
    }
}

private void closeResources(Connection conn, PreparedStatement ps, ResultSet rs) {
    try (conn; ps; rs) {
        logger.info(&quot;Resources successfully closed&quot;);
    } catch (SQLException e) {
        logger.warn(&quot;Failed to properly close database sources&quot;, e);
    }
}

The key point here is to notice that the stream we return is supposed to run some onClose logic, so when we use stream must make sure we do a stream.close() when we are done with it to ensure we close all resources kept alive up to this point (i.e. conn,stm and rs).

The best way perhaps is to use a try-with-resources such that the try will take care of closing the stream.

try(Stream&lt;String&gt; users = userRepo.getUsers()){
    //print users to the main output retrieving 1K at the time
    users.forEach(System.out::println);
}

Manual Pagination

An alternative is that you paginate the results yourself, this depends on the database but using select clauses like limit and offset you can request a specific page of records, process them and then retrieve some more.

select id from users LIMIT 1000 OFFSET 5

In this case, your iterator would consume all the page and when done, request the next page, until no more records are found in the final page.

The advantage of this another approach is that resources can be immediately controlled in the iterator itself.

I won't develop an example of this and leave it for you to try.

huangapple
  • 本文由 发表于 2020年10月16日 21:28:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/64390132.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定