Webflux的’then’运算符为什么不执行?

huangapple go评论45阅读模式
英文:

Why Webflux 'then' operator not execute?

问题

The second code snippet appears to be a more concise and functional way of achieving the desired behavior. In the first code snippet, the issue you mentioned where the code after then(reqMono) is not executing is because it's not properly chained to the previous operations.

In the second code snippet, you correctly chain the operations together using flatMap, and you handle the case where findByAccount doesn't find a match with switchIfEmpty. This ensures that the subsequent operations execute as expected.

The first code snippet doesn't work as intended because then(reqMono) doesn't properly integrate with the reactive flow. In the second snippet, by chaining the operations correctly, you ensure that each step is executed in sequence and handles the reactive nature of the Mono correctly.

So, the second code snippet is a more appropriate and functional way to save a user to the database with the desired behavior.

英文:
public Mono<Integer> saveAccount(Mono<AccountSaveRequest> requestMono) {
    
    var reqMono = requestMono.map(AccountSaveRequest::convertEntity);
    
    return reqMono
       .flatMap(req -> this.accountRepository.findByAccount(req.account())
                 .doOnNext(model -> {
                     if (req.id() == null || !model.id().equals(req.id())) {
                         throw new BusinessException("account exists");
                     }
                  })
        )
       .then(reqMono)
       .flatMap(req -> this.accountRepository.findByMobile(req.mobile())
                 .doOnNext(model -> {
                     if (req.id() == null || !model.id().equals(req.id())) {
                         throw new BusinessException("mobile exists");
                     }
                 })
        )
       .then(Mono.defer(() -> reqMono.flatMap(this.accountRepository::save)))
       .map(AccountEntity::id);
}

See code, easy to understand, this method is used to save a user to database.

First receive a AccountSaveRequest model from request and mapped to AccountEntity, then check if
'account' field is already exists in database.

If not exists, I use 'then(reqMono)' operator to switch to another Mono, the following Mono is used to check the field 'mobile' is already exists in database if mobile not blank.

If mobile still not exists, I finally save the account to database and return id to consumer.

The method will return an id if everything is ok. Howerver the code after 'then(reqMono)' not execute.

I tried this in another way,see code

public Mono<Integer> saveAccount(Mono<AccountSaveRequest> requestMono) {
    return requestMono.map(AccountSaveRequest::convertEntity)
       .flatMap(req -> this.accountRepository.findByAccount(req.account())
                  .doOnNext(model -> {
                      if (req.id() == null || !model.id().equals(req.id())) {
                          throw new BusinessException("account exists");
                      }
                  })
                  .switchIfEmpty(
                       Mono.defer(() -> this.accountRepository.findByMobile(req.mobile())
                           .doOnNext(model -> {
                                if (req.id() == null || !model.id().equals(req.id())) {
                                    throw new BusinessException("mobile exists");
                                }
                           })
                       )
                  )
                  .then(Mono.defer(() -> this.accountRepository.save(req)))
                  .map(AccountEntity::id)
        );
}

I wrapped the the code after 'then(reqMono)' into the first flatMap call, and it works correctly

Can find the problem? THKS

答案1

得分: 1

In the first case, it doesn't work because you're trying to consume reqMono twice. I.e., you subscribe to it (flatMap) twice. This doesn't work because for the second subscription, it has already produced the result and completed.

你第一个示例不起作用,因为你尝试两次消费 reqMono。也就是说,你两次订阅它(使用 flatMap)。这不起作用,因为在第二次订阅时,它已经产生了结果并完成。

You can do reqMono.cache() before the subscriptions, and then the first way would work. Though that is not a very reactive way.

你可以在订阅之前使用 reqMono.cache(),然后第一个方法将起作用。尽管这不是一种非常响应式的方式。

The second example is how it's supposed to be done with Reactor. You process an item once and then produce results based on that data.

第二个示例是使用 Reactor 应该完成的方式。你处理一个项目一次,然后根据该数据生成结果。

PS. The other things that don't look good are:

附注:看起来不太好的其他地方是:

  1. You use doOnNext for side effects. That's also not how it's supposed to work. Use flatMap if you want to produce a different outcome based on the input.

你使用 doOnNext 来进行副作用操作。这也不是它应该工作的方式。如果你想根据输入产生不同的结果,应该使用 flatMap

  1. You throw exceptions. Use Mono.error instead.

你使用 throw 来抛出异常。应该使用 Mono.error

  1. Your Mono.defer doesn't make sense. Just call findByAccount, findByMobile, and save directly, since they already return a Producer.

你的 Mono.defer 没有意义。直接调用 findByAccountfindByMobilesave,因为它们已经返回了一个 Producer

So I think your code should look like this:

所以我认为你的代码应该像这样:

public Mono<Integer> saveAccount(Mono<AccountSaveRequest> requestMono) {
    return requestMono
        .map(AccountSaveRequest::convertEntity)
        .flatMap(req ->
            this.accountRepository.findByAccount(req.account())
                .switchIfEmpty(this.accountRepository.findByMobile(req.mobile()))
                .flatMap(model ->
                    req.id() == null || !model.id().equals(req.id()) ?
                        Mono.error(new BusinessException("exists")) :
                        Mono.empty()
                )
                .switchIfEmpty(this.accountRepository.save(req))
                .map(AccountEntity::id)
        );
}
英文:

In the first case it doesn't work because you're trying to consume reqMono twice. I.e., you subscribe on it (flatMap) twice. Which doesn't work because for the second subscription it's already produced the result and complete.

You can do reqMono.cache() before the subscriptions and then the first way would work. Thought that is not very reactive way.

The second example is how it supposed be with Reactor. You process an item once and then produce results based on that data.

PS. The other things that don't look good are:

  1. You use doOnNext for side effects. That's also is not how it supposed to work. Use flatMap if you want to produce different outcome based on the input.
  2. You throw exceptions. Use Mono.error instead.
  3. Your Mono.defer make no sense. Just call findByAccount, findByMobile and save directly, since they already give a Producer

So I think your code should look like this:

public Mono&lt;Integer&gt; saveAccount(Mono&lt;AccountSaveRequest&gt; requestMono) {
    return requestMono
       .map(AccountSaveRequest::convertEntity)
       .flatMap(req -&gt; 
           this.accountRepository.findByAccount(req.account())
              .switchIfEmpty(this.accountRepository.findByMobile(req.mobile()))
              .flatMap(model -&gt;
                 if (req.id() == null || !model.id().equals(req.id())) {
                     Mono.error(new BusinessException(&quot;exists&quot;))
                 } else {
                     Mono.empty()
                 }
              )
              .switchIfEmpty(this.accountRepository.save(req))
              .map(AccountEntity::id)
       )            
 }

huangapple
  • 本文由 发表于 2023年4月17日 18:18:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/76034076.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定