如何配置Alpakka Slick以在Java中启用从我的数据库进行流式处理?

huangapple go评论79阅读模式
英文:

How can I configure Alpakka Slick to enable streaming from my database in Java?

问题

我正在尝试在Java应用程序中使用Akka Streams和Postgres使用Slick查询来流式传输结果:

Source mySource = Slick.source(
  slickSession,
  "SELECT * from message where started_instant is null",
  (this::createQueueItemFromSlickRow));

Slick文档中明确指出,为了在Postgres中进行流式传输,需要设置额外的参数。然而,我只能找到Scala示例,并且在Alpakka文档中没有关于如何配置这些参数的信息。

目前,一旦流已经开始,我无法处理数据库中新加入的对象。

英文:

I am trying to stream results from a query using Slick within a Java application, using Akka Streams and Postgres:

Source mySource = Slick.source(
  slickSession,
  "SELECT * from message where started_instant is null",
  (this::createQueueItemFromSlickRow));

In the Slick documentation it is clear that in order to stream with Postgres, additional parameters need to be set. However, I can only find Scala examples, and there is no information in the Alpakka documentation on how to configure this.

As it is, I can't process any new objects that come into the database after the stream has started.

答案1

得分: 1

正如您所述,Slick文档中指出:

> 注意:某些数据库系统可能需要以特定方式设置会话参数,以支持在客户端内存中不缓存所有数据的流式传输。例如,PostgreSQL需要同时使用.withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n)(其中n是所需的页面大小)和.transactionally以实现正确的流式传输。

Alpakka的Slick支持最近在其Java DSL中公开了PreparedStatement<sup>1</sup>,您可以使用它来设置上述参数,如果您不能使用Scala API的话。例如:

PreparedStatement statement =
  connection.prepareStatement(
    "MY SQL STATEMENT",
    ResultSet.TYPE_FORWARD_ONLY, // java.sql.ResultSet
    ResultSet.CONCUR_READ_ONLY);

查看SlickTest.java类和上述拉取请求以获取更多详细信息。

<sup>1</sup>这个更改是即将发布的Alpakka 2.0.2版本的一部分(在撰写本文时,版本2.0.1是当前版本)。

英文:

As you stated, the Slick documentation notes the following:

> Note: Some database systems may require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side. For example, PostgreSQL requires both .withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n) (with the desired page size n) and .transactionally for proper streaming.

Alpakka's Slick support recently exposed the PreparedStatement in its Java DSL<sup>1</sup>, which you can use to set the aforementioned parameters if you cannot use the Scala API. For example:

PreparedStatement statement =
  connection.prepareStatement(
    &quot;MY SQL STATEMENT&quot;,
    ResultSet.TYPE_FORWARD_ONLY, // java.sql.ResultSet
    ResultSet.CONCUR_READ_ONLY);

Check out the SlickTest.java class and the above pull request for more details.

<sup>1</sup>This change is part of the upcoming Alpakka 2.0.2 release. (Version 2.0.1 is the current release at the time of this writing.)

huangapple
  • 本文由 发表于 2020年8月8日 00:25:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/63305747.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定