如何从一个响应式应用程序获取连接

huangapple go评论65阅读模式
英文:

How to get a Connection from a Reactive app

问题

在一个非响应式的应用程序中,你可以简单地从DataSource获取连接。

在响应式中,我找到了ConnectionFactory,但它返回一个org.reactivestreams.Publisher。看起来你只能对org.reactivestreams.Subscriber进行subscribe()

英文:

In a non-reactive app, you can simply get the connection from the DataSource.

In Reactive, I found ConnectionFactory, but it returns an org.reactivestreams.Publisher. With that, it seems you can only subscribe() an org.reactivestreams.Subscriber.

答案1

得分: 1

当你获取到一个发布者(publisher)时,你需要将其包装成Mono::from或者Flux::from,以便能够订阅它。对于一个连接,你应该将其包装成Mono::from并像这样进行缓存。

private final Mono<? extends Connection> connection;

public SomeEntityDao() {
    ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
            .option(DRIVER, H2_DRIVER)
            .option(PASSWORD, "")
            .option(URL, "mem:test;DB_CLOSE_DELAY=-1")
            .option(USER, "sa")
            .build());
    connection = Mono.from(connectionFactory.create()).cache();
}

当你想要使用它时,你可以使用Mono::flatMap来处理连接并创建语句。由于你在语句上使用了Mono::flatMap,所以你可以在结果上再次使用Mono::flatMap来继续流程。

public Mono<SomeEntity> findById(Long id) {
    return connection.flatMap(con -> Mono.from(con.createStatement("select * from some_entity where id = $1")
                    .bind("$1", id)
                    .execute()))
            .flatMap(result -> Mono.from(result.map(mapper)));
}

提醒:我的mapper是一个函数。

private final BiFunction<Row, RowMetadata, SomeEntity> mapper = (row, rowMetadata) -> {
    SomeEntity someEntity = new SomeEntity();
    someEntity.setId(row.get("id", Long.class));
    someEntity.setName(row.get("name", String.class));
    return someEntity;
};

如果你要返回多个实体,那么可以使用Mono::flatMapMany

public Flux<SomeEntity> findAll() {
    return connection.flatMap(con -> Mono.from(con.createStatement("select * from some_entity")
                    .execute()))
            .flatMapMany(result -> result.map(mapper));
}

但总的来说,如果你使用R2DBC,那么你还应该包括响应式连接池。示例可以在WebFluxR2dbc中找到。

英文:

When you get a publisher then you need to wrap it in either a Mono::from or a Flux::from in order to be able to subscribe to it. For a connection you should wrap it in a Mono::from and cache it like so.

private final Mono&lt;? extends Connection&gt; connection;

public SomeEntityDao() {
    ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
            .option(DRIVER, H2_DRIVER)
            .option(PASSWORD, &quot;&quot;)
            .option(URL, &quot;mem:test;DB_CLOSE_DELAY=-1&quot;)
            .option(USER, &quot;sa&quot;)
            .build());
    connection = Mono.from(connectionFactory.create()).cache();
}

When you want to use it then you can Mono::flatMap the connection and create a statement. Since you did a Mono::flatMap on the statement then you can do a Mono::flatMap on the result to continue the flow.

public Mono&lt;SomeEntity&gt; findById(Long id) {
    return connection.flatMap(con -&gt; Mono.from(con.createStatement(&quot;select * from some_entity where id = $1&quot;)
                    .bind(&quot;$1&quot;, id)
                    .execute()))
            .flatMap(result -&gt; Mono.from(result.map(mapper)));
}

FYI: My mapper is a function.

private final BiFunction&lt;Row, RowMetadata, SomeEntity&gt; mapper = (row, rowMetadata) -&gt; {
    SomeEntity someEntity = new SomeEntity();
    someEntity.setId(row.get(&quot;id&quot;, Long.class));
    someEntity.setName(row.get(&quot;name&quot;, String.class));
    return someEntity;
};

If you are returning more than one Entity, then use Mono::flatMapMany

public Flux&lt;SomeEntity&gt; findAll() {
    return connection.flatMap(con -&gt; Mono.from(con.createStatement(&quot;select * from some_entity&quot;)
                    .execute()))
            .flatMapMany(result -&gt; result.map(mapper));
}

Ultimately though, if you are using R2DBC, then you should also include the reactive connection pool. Example given at WebFluxR2dbc.

huangapple
  • 本文由 发表于 2023年6月19日 21:55:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/76507323.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定