“Quarkus Panache Repository”在从EventBus的@ConsumeEvent中调用时会悄无声息地失败。

huangapple go评论69阅读模式
英文:

Quarkus Panache Repository call silently fails when called from an EventBus @ConsumeEvent

问题

Quarkus 1.8.3.Final

对一个访问 PanacheRepository 的方法进行直接调用能够如预期般工作,但是当通过 EventBus 调用相同的方法时,调用会到达该方法并执行每一行,直到达到任何 repository 调用,然后会在没有任何指示的情况下静默地失败/退出。

根据日志,直接调用在 Quarkus 主线程中执行,而 eventbus 调用在 vert.x-eventloop-thread-2 中执行。

还尝试了以下步骤的组合,但结果相同:
- 将 EventBus 的消费方包装为 Mutiny Uni。
- 使消费方返回 Uni<Void>。
- 显式将消费方的 blocking 设置为 false(默认值)。
- 尝试使用 *io.vertx.core.eventbus.EventBus* 和 *io.vertx.mutiny.core.eventbus.EventBus* 实现。
- 将消费方放置在相同的服务和不同的服务中。
- 在被调用的方法上使用 @Transactional 注解。

```java
@ApplicationScoped
public class TestEntityRepository implements PanacheRepository<TestEntity> { }
@Slf4j
@Startup
@Transactional
@ApplicationScoped
public class TestService {

    @Inject
    TestEntityRepository repository;

    @Inject
    EventBus eventBus;

    public void startUp(@Observes StartupEvent event) {
        // 通过 EventBus 的调用在 listEntities() 中打印日志,当到达 repository 调用时悄然停止。
        eventBus.sendAndForget("test_topic", "eventbus call");
        // 这会在 listEntities() 中打印日志,然后列出 repository 中的所有实体。
        listEntities("direct call");
    }

    @ConsumeEvent("test_topic")
    public void listEntities(String testMessage) {
        log.info("通过: " + testMessage + " 打印所有实体");
        repository.findAll().stream().map(TestEntity::toString).forEach(log::info);
    }
}

以下是日志中 EventBus 部分的摘录:

2020-10-19 21:24:39,612 INFO  [org.acm.com.TestService] (vert.x-eventloop-thread-1) 通过 eventbus call 打印所有实体
2020-10-19 21:24:39,617 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.setRollbackOnly
2020-10-19 21:24:39,617 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::preventCommit( BasicAction: 0:ffff7f000101:a721:5f8de7f7:0 status: ActionStatus.RUNNING)
2020-10-19 21:24:39,617 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.getStatus: javax.transaction.Status.STATUS_MARKED_ROLLBACK
2020-10-19 21:24:39,618 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) BaseTransaction.rollback
2020-10-19 21:24:39,618 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.rollbackAndDisassociate
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::Abort() for action-id 0:ffff7f000101:a721:5f8de7f7:0
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::removeChildThread () action 0:ffff7f000101:a721:5f8de7f7:0 removing 1
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::removeChildThread () action 0:ffff7f000101:a721:5f8de7f7:0 removing 1 result = true
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) TransactionReaper::remove ( BasicAction: 0:ffff7f000101:a721:5f8de7f7:0 status: ActionStatus.ABORTED )

更新: 创建了错误报告


<details>
<summary>英文:</summary>

Quarkus 1.8.3.Final

Making a direct call to a method that accesses a PanacheRepository works as expected, but when calling the same method via the EventBus, the call reaches the method and executes every line until it reaches any repository call and then just silently fails/exits without any indication of what happened.

According to the logs the direct call is executed in the Quarkus Main Thread and the eventbus call is executed in the vert.x-eventloop-thread-2.


Also tried combinations of the below steps with the same result: 
- Wrapped the consumer side of the EventBus to a Mutiny Uni.
- Made the consumer return a Uni&lt;Void&gt;.
- Made the consumer explicitly blocking=false (default).
- Trying both *io.vertx.core.eventbus.EventBus* and *io.vertx.mutiny.core.eventbus.EventBus* implementations.
- Placed the consumer to the same and to a different service.
- With the @Transactional annotation on the called method.

```java
@ApplicationScoped
public class TestEntityRepository implements PanacheRepository&lt;TestEntity&gt; { }
@Slf4j
@Startup
@Transactional
@ApplicationScoped
public class TestService {

    @Inject
    TestEntityRepository repository;

    @Inject
    EventBus eventBus;

    public void startUp(@Observes StartupEvent event) {
        // Call via the event bus prints the log in listEntities() and silently stops when it reaches the repository call.
        eventBus.sendAndForget(&quot;test_topic&quot;, &quot;eventbus call&quot;);
        // This  prints the log in listEntities() and then lists all the entities in the repository.
        listEntities(&quot;direct call&quot;);
    }

    @ConsumeEvent(&quot;test_topic&quot;)
    public void listEntities(String testMessage) {
        log.info(&quot;Printing all entities via: &quot; + testMessage);
        repository.findAll().stream().map(TestEntity::toString).forEach(log::info);
    }

Here is an excerpt from the EventBus part of the log:

2020-10-19 21:24:39,612 INFO  [org.acm.com.TestService] (vert.x-eventloop-thread-1) Printing all entities via: eventbus call
2020-10-19 21:24:39,617 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.setRollbackOnly
2020-10-19 21:24:39,617 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::preventCommit( BasicAction: 0:ffff7f000101:a721:5f8de7f7:0 status: ActionStatus.RUNNING)
2020-10-19 21:24:39,617 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.getStatus: javax.transaction.Status.STATUS_MARKED_ROLLBACK
2020-10-19 21:24:39,618 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) BaseTransaction.rollback
2020-10-19 21:24:39,618 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.rollbackAndDisassociate
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::Abort() for action-id 0:ffff7f000101:a721:5f8de7f7:0
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::removeChildThread () action 0:ffff7f000101:a721:5f8de7f7:0 removing 1
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::removeChildThread () action 0:ffff7f000101:a721:5f8de7f7:0 removing 1 result = true
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) TransactionReaper::remove ( BasicAction: 0:ffff7f000101:a721:5f8de7f7:0 status: ActionStatus.ABORTED )

Update: bugreport created

答案1

得分: 0

事实证明EventBus 正在吞噬有关阻塞调用的异常并且返回类似于 Uni<Void> 这样的响应甚至明确将数据库调用包装到 Uni 中仍然会使用与预期相同的线程而不是期望的工作线程

解决方案是在事件侦听器上使用 blocking=true因此它不会设置预期的行为就像我之前认为的那样),而是为阻塞调用准备事件循环

工作代码如下

```java
@Slf4j
@Startup
@ApplicationScoped
public class TestService {

    @Inject
    TestEntityRepository repository;

    @Inject
    EventBus eventBus;

    public void startUp(@Observes StartupEvent event) {
        eventBus.sendAndForget("test_topic", "eventbus call");
    }

    @ConsumeEvent(value = "test_topic", blocking = true)
    @Transactional
    public Uni<Void> listEntities(String testMessage) {
        log.info("通过以下方式打印所有实体:" + testMessage);
        try {
            repository.findAll().forEach(log::info);
            return Uni.createFrom().voidItem();
        } catch (Exception e) {
            return Uni.createFrom().failure(e);
        }
    }
}
英文:

It turned out that the EventBus was swallowing the exception about the blocking call and that returning a reactive response like an Uni<Void> or even explicitly wrapping the database call into a Uni will still use the same thread, instead of a worker thread as expected.

The solution was to use blocking=true on the event listener. So it does not set the expected behaviour (as I thought) but prepares the event loop for a blocking call..

The working code is:

@Slf4j
@Startup
@ApplicationScoped
public class TestService {

    @Inject
    TestEntityRepository repository;

    @Inject
    EventBus eventBus;

    public void startUp(@Observes StartupEvent event) {
        eventBus.sendAndForget(&quot;test_topic&quot;, &quot;eventbus call&quot;);
    }

    @ConsumeEvent(value = &quot;test_topic&quot;, blocking = true)
    @Transactional
    public Uni&lt;Void&gt; listEntities(String testMessage) {
        log.info(&quot;Printing all entities via: &quot; + testMessage);
        try {
            repository.findAll().forEach(log::info);
            return Uni.createFrom().voidItem();
        } catch (Exception e) {
            return Uni.createFrom().failure(e);
        }
    }
}

huangapple
  • 本文由 发表于 2020年10月19日 07:23:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/64419294.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定