英文:
How to get a Connection from a Reactive app
问题
在一个非响应式的应用程序中,你可以简单地从DataSource
获取连接。
在响应式中,我找到了ConnectionFactory
,但它返回一个org.reactivestreams.Publisher
。看起来你只能对org.reactivestreams.Subscriber
进行subscribe()
。
英文:
In a non-reactive app, you can simply get the connection from the DataSource
.
In Reactive, I found ConnectionFactory
, but it returns an org.reactivestreams.Publisher
. With that, it seems you can only subscribe()
an org.reactivestreams.Subscriber
.
答案1
得分: 1
当你获取到一个发布者(publisher)时,你需要将其包装成Mono::from
或者Flux::from
,以便能够订阅它。对于一个连接,你应该将其包装成Mono::from
并像这样进行缓存。
private final Mono<? extends Connection> connection;
public SomeEntityDao() {
ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(DRIVER, H2_DRIVER)
.option(PASSWORD, "")
.option(URL, "mem:test;DB_CLOSE_DELAY=-1")
.option(USER, "sa")
.build());
connection = Mono.from(connectionFactory.create()).cache();
}
当你想要使用它时,你可以使用Mono::flatMap
来处理连接并创建语句。由于你在语句上使用了Mono::flatMap
,所以你可以在结果上再次使用Mono::flatMap
来继续流程。
public Mono<SomeEntity> findById(Long id) {
return connection.flatMap(con -> Mono.from(con.createStatement("select * from some_entity where id = $1")
.bind("$1", id)
.execute()))
.flatMap(result -> Mono.from(result.map(mapper)));
}
提醒:我的mapper
是一个函数。
private final BiFunction<Row, RowMetadata, SomeEntity> mapper = (row, rowMetadata) -> {
SomeEntity someEntity = new SomeEntity();
someEntity.setId(row.get("id", Long.class));
someEntity.setName(row.get("name", String.class));
return someEntity;
};
如果你要返回多个实体,那么可以使用Mono::flatMapMany
。
public Flux<SomeEntity> findAll() {
return connection.flatMap(con -> Mono.from(con.createStatement("select * from some_entity")
.execute()))
.flatMapMany(result -> result.map(mapper));
}
但总的来说,如果你使用R2DBC,那么你还应该包括响应式连接池。示例可以在WebFluxR2dbc中找到。
英文:
When you get a publisher then you need to wrap it in either a Mono::from
or a Flux::from
in order to be able to subscribe to it. For a connection you should wrap it in a Mono::from
and cache it like so.
private final Mono<? extends Connection> connection;
public SomeEntityDao() {
ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(DRIVER, H2_DRIVER)
.option(PASSWORD, "")
.option(URL, "mem:test;DB_CLOSE_DELAY=-1")
.option(USER, "sa")
.build());
connection = Mono.from(connectionFactory.create()).cache();
}
When you want to use it then you can Mono::flatMap
the connection and create a statement. Since you did a Mono::flatMap
on the statement then you can do a Mono::flatMap
on the result to continue the flow.
public Mono<SomeEntity> findById(Long id) {
return connection.flatMap(con -> Mono.from(con.createStatement("select * from some_entity where id = $1")
.bind("$1", id)
.execute()))
.flatMap(result -> Mono.from(result.map(mapper)));
}
FYI: My mapper
is a function.
private final BiFunction<Row, RowMetadata, SomeEntity> mapper = (row, rowMetadata) -> {
SomeEntity someEntity = new SomeEntity();
someEntity.setId(row.get("id", Long.class));
someEntity.setName(row.get("name", String.class));
return someEntity;
};
If you are returning more than one Entity, then use Mono::flatMapMany
public Flux<SomeEntity> findAll() {
return connection.flatMap(con -> Mono.from(con.createStatement("select * from some_entity")
.execute()))
.flatMapMany(result -> result.map(mapper));
}
Ultimately though, if you are using R2DBC, then you should also include the reactive connection pool. Example given at WebFluxR2dbc.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论