英文:
Why Webflux 'then' operator not execute?
问题
The second code snippet appears to be a more concise and functional way of achieving the desired behavior. In the first code snippet, the issue you mentioned where the code after then(reqMono)
is not executing is because it's not properly chained to the previous operations.
In the second code snippet, you correctly chain the operations together using flatMap
, and you handle the case where findByAccount
doesn't find a match with switchIfEmpty
. This ensures that the subsequent operations execute as expected.
The first code snippet doesn't work as intended because then(reqMono)
doesn't properly integrate with the reactive flow. In the second snippet, by chaining the operations correctly, you ensure that each step is executed in sequence and handles the reactive nature of the Mono
correctly.
So, the second code snippet is a more appropriate and functional way to save a user to the database with the desired behavior.
英文:
public Mono<Integer> saveAccount(Mono<AccountSaveRequest> requestMono) {
var reqMono = requestMono.map(AccountSaveRequest::convertEntity);
return reqMono
.flatMap(req -> this.accountRepository.findByAccount(req.account())
.doOnNext(model -> {
if (req.id() == null || !model.id().equals(req.id())) {
throw new BusinessException("account exists");
}
})
)
.then(reqMono)
.flatMap(req -> this.accountRepository.findByMobile(req.mobile())
.doOnNext(model -> {
if (req.id() == null || !model.id().equals(req.id())) {
throw new BusinessException("mobile exists");
}
})
)
.then(Mono.defer(() -> reqMono.flatMap(this.accountRepository::save)))
.map(AccountEntity::id);
}
See code, easy to understand, this method is used to save a user to database.
First receive a AccountSaveRequest model from request and mapped to AccountEntity, then check if
'account' field is already exists in database.
If not exists, I use 'then(reqMono)' operator to switch to another Mono, the following Mono is used to check the field 'mobile' is already exists in database if mobile not blank.
If mobile still not exists, I finally save the account to database and return id to consumer.
The method will return an id if everything is ok. Howerver the code after 'then(reqMono)' not execute.
I tried this in another way,see code
public Mono<Integer> saveAccount(Mono<AccountSaveRequest> requestMono) {
return requestMono.map(AccountSaveRequest::convertEntity)
.flatMap(req -> this.accountRepository.findByAccount(req.account())
.doOnNext(model -> {
if (req.id() == null || !model.id().equals(req.id())) {
throw new BusinessException("account exists");
}
})
.switchIfEmpty(
Mono.defer(() -> this.accountRepository.findByMobile(req.mobile())
.doOnNext(model -> {
if (req.id() == null || !model.id().equals(req.id())) {
throw new BusinessException("mobile exists");
}
})
)
)
.then(Mono.defer(() -> this.accountRepository.save(req)))
.map(AccountEntity::id)
);
}
I wrapped the the code after 'then(reqMono)' into the first flatMap call, and it works correctly
Can find the problem? THKS
答案1
得分: 1
In the first case, it doesn't work because you're trying to consume reqMono
twice. I.e., you subscribe to it (flatMap
) twice. This doesn't work because for the second subscription, it has already produced the result and completed.
你第一个示例不起作用,因为你尝试两次消费 reqMono
。也就是说,你两次订阅它(使用 flatMap
)。这不起作用,因为在第二次订阅时,它已经产生了结果并完成。
You can do reqMono.cache()
before the subscriptions, and then the first way would work. Though that is not a very reactive way.
你可以在订阅之前使用 reqMono.cache()
,然后第一个方法将起作用。尽管这不是一种非常响应式的方式。
The second example is how it's supposed to be done with Reactor. You process an item once and then produce results based on that data.
第二个示例是使用 Reactor 应该完成的方式。你处理一个项目一次,然后根据该数据生成结果。
PS. The other things that don't look good are:
附注:看起来不太好的其他地方是:
- You use
doOnNext
for side effects. That's also not how it's supposed to work. UseflatMap
if you want to produce a different outcome based on the input.
你使用 doOnNext
来进行副作用操作。这也不是它应该工作的方式。如果你想根据输入产生不同的结果,应该使用 flatMap
。
- You
throw
exceptions. UseMono.error
instead.
你使用 throw
来抛出异常。应该使用 Mono.error
。
- Your
Mono.defer
doesn't make sense. Just callfindByAccount
,findByMobile
, andsave
directly, since they already return aProducer
.
你的 Mono.defer
没有意义。直接调用 findByAccount
、findByMobile
和 save
,因为它们已经返回了一个 Producer
。
So I think your code should look like this:
所以我认为你的代码应该像这样:
public Mono<Integer> saveAccount(Mono<AccountSaveRequest> requestMono) {
return requestMono
.map(AccountSaveRequest::convertEntity)
.flatMap(req ->
this.accountRepository.findByAccount(req.account())
.switchIfEmpty(this.accountRepository.findByMobile(req.mobile()))
.flatMap(model ->
req.id() == null || !model.id().equals(req.id()) ?
Mono.error(new BusinessException("exists")) :
Mono.empty()
)
.switchIfEmpty(this.accountRepository.save(req))
.map(AccountEntity::id)
);
}
英文:
In the first case it doesn't work because you're trying to consume reqMono
twice. I.e., you subscribe on it (flatMap
) twice. Which doesn't work because for the second subscription it's already produced the result and complete.
You can do reqMono.cache()
before the subscriptions and then the first way would work. Thought that is not very reactive way.
The second example is how it supposed be with Reactor. You process an item once and then produce results based on that data.
PS. The other things that don't look good are:
- You use
doOnNext
for side effects. That's also is not how it supposed to work. UseflatMap
if you want to produce different outcome based on the input. - You
throw
exceptions. UseMono.error
instead. - Your
Mono.defer
make no sense. Just callfindByAccount
,findByMobile
andsave
directly, since they already give aProducer
So I think your code should look like this:
public Mono<Integer> saveAccount(Mono<AccountSaveRequest> requestMono) {
return requestMono
.map(AccountSaveRequest::convertEntity)
.flatMap(req ->
this.accountRepository.findByAccount(req.account())
.switchIfEmpty(this.accountRepository.findByMobile(req.mobile()))
.flatMap(model ->
if (req.id() == null || !model.id().equals(req.id())) {
Mono.error(new BusinessException("exists"))
} else {
Mono.empty()
}
)
.switchIfEmpty(this.accountRepository.save(req))
.map(AccountEntity::id)
)
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论