英文:
Quarkus Panache Repository call silently fails when called from an EventBus @ConsumeEvent
问题
Quarkus 1.8.3.Final
对一个访问 PanacheRepository 的方法进行直接调用能够如预期般工作,但是当通过 EventBus 调用相同的方法时,调用会到达该方法并执行每一行,直到达到任何 repository 调用,然后会在没有任何指示的情况下静默地失败/退出。
根据日志,直接调用在 Quarkus 主线程中执行,而 eventbus 调用在 vert.x-eventloop-thread-2 中执行。
还尝试了以下步骤的组合,但结果相同:
- 将 EventBus 的消费方包装为 Mutiny Uni。
- 使消费方返回 Uni<Void>。
- 显式将消费方的 blocking 设置为 false(默认值)。
- 尝试使用 *io.vertx.core.eventbus.EventBus* 和 *io.vertx.mutiny.core.eventbus.EventBus* 实现。
- 将消费方放置在相同的服务和不同的服务中。
- 在被调用的方法上使用 @Transactional 注解。
```java
@ApplicationScoped
public class TestEntityRepository implements PanacheRepository<TestEntity> { }
@Slf4j
@Startup
@Transactional
@ApplicationScoped
public class TestService {
@Inject
TestEntityRepository repository;
@Inject
EventBus eventBus;
public void startUp(@Observes StartupEvent event) {
// 通过 EventBus 的调用在 listEntities() 中打印日志,当到达 repository 调用时悄然停止。
eventBus.sendAndForget("test_topic", "eventbus call");
// 这会在 listEntities() 中打印日志,然后列出 repository 中的所有实体。
listEntities("direct call");
}
@ConsumeEvent("test_topic")
public void listEntities(String testMessage) {
log.info("通过: " + testMessage + " 打印所有实体");
repository.findAll().stream().map(TestEntity::toString).forEach(log::info);
}
}
以下是日志中 EventBus 部分的摘录:
2020-10-19 21:24:39,612 INFO [org.acm.com.TestService] (vert.x-eventloop-thread-1) 通过 eventbus call 打印所有实体
2020-10-19 21:24:39,617 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.setRollbackOnly
2020-10-19 21:24:39,617 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::preventCommit( BasicAction: 0:ffff7f000101:a721:5f8de7f7:0 status: ActionStatus.RUNNING)
2020-10-19 21:24:39,617 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.getStatus: javax.transaction.Status.STATUS_MARKED_ROLLBACK
2020-10-19 21:24:39,618 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) BaseTransaction.rollback
2020-10-19 21:24:39,618 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.rollbackAndDisassociate
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::Abort() for action-id 0:ffff7f000101:a721:5f8de7f7:0
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::removeChildThread () action 0:ffff7f000101:a721:5f8de7f7:0 removing 1
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::removeChildThread () action 0:ffff7f000101:a721:5f8de7f7:0 removing 1 result = true
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) TransactionReaper::remove ( BasicAction: 0:ffff7f000101:a721:5f8de7f7:0 status: ActionStatus.ABORTED )
更新: 创建了错误报告
<details>
<summary>英文:</summary>
Quarkus 1.8.3.Final
Making a direct call to a method that accesses a PanacheRepository works as expected, but when calling the same method via the EventBus, the call reaches the method and executes every line until it reaches any repository call and then just silently fails/exits without any indication of what happened.
According to the logs the direct call is executed in the Quarkus Main Thread and the eventbus call is executed in the vert.x-eventloop-thread-2.
Also tried combinations of the below steps with the same result:
- Wrapped the consumer side of the EventBus to a Mutiny Uni.
- Made the consumer return a Uni<Void>.
- Made the consumer explicitly blocking=false (default).
- Trying both *io.vertx.core.eventbus.EventBus* and *io.vertx.mutiny.core.eventbus.EventBus* implementations.
- Placed the consumer to the same and to a different service.
- With the @Transactional annotation on the called method.
```java
@ApplicationScoped
public class TestEntityRepository implements PanacheRepository<TestEntity> { }
@Slf4j
@Startup
@Transactional
@ApplicationScoped
public class TestService {
@Inject
TestEntityRepository repository;
@Inject
EventBus eventBus;
public void startUp(@Observes StartupEvent event) {
// Call via the event bus prints the log in listEntities() and silently stops when it reaches the repository call.
eventBus.sendAndForget("test_topic", "eventbus call");
// This prints the log in listEntities() and then lists all the entities in the repository.
listEntities("direct call");
}
@ConsumeEvent("test_topic")
public void listEntities(String testMessage) {
log.info("Printing all entities via: " + testMessage);
repository.findAll().stream().map(TestEntity::toString).forEach(log::info);
}
Here is an excerpt from the EventBus part of the log:
2020-10-19 21:24:39,612 INFO [org.acm.com.TestService] (vert.x-eventloop-thread-1) Printing all entities via: eventbus call
2020-10-19 21:24:39,617 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.setRollbackOnly
2020-10-19 21:24:39,617 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::preventCommit( BasicAction: 0:ffff7f000101:a721:5f8de7f7:0 status: ActionStatus.RUNNING)
2020-10-19 21:24:39,617 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.getStatus: javax.transaction.Status.STATUS_MARKED_ROLLBACK
2020-10-19 21:24:39,618 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) BaseTransaction.rollback
2020-10-19 21:24:39,618 TRACE [com.arj.ats.jta] (vert.x-eventloop-thread-1) TransactionImple.rollbackAndDisassociate
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::Abort() for action-id 0:ffff7f000101:a721:5f8de7f7:0
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::removeChildThread () action 0:ffff7f000101:a721:5f8de7f7:0 removing 1
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) BasicAction::removeChildThread () action 0:ffff7f000101:a721:5f8de7f7:0 removing 1 result = true
2020-10-19 21:24:39,618 TRACE [com.arj.ats.arjuna] (vert.x-eventloop-thread-1) TransactionReaper::remove ( BasicAction: 0:ffff7f000101:a721:5f8de7f7:0 status: ActionStatus.ABORTED )
Update: bugreport created
答案1
得分: 0
事实证明,EventBus 正在吞噬有关阻塞调用的异常,并且返回类似于 Uni<Void> 这样的响应,甚至明确将数据库调用包装到 Uni 中,仍然会使用与预期相同的线程,而不是期望的工作线程。
解决方案是在事件侦听器上使用 blocking=true。因此,它不会设置预期的行为(就像我之前认为的那样),而是为阻塞调用准备事件循环。
工作代码如下:
```java
@Slf4j
@Startup
@ApplicationScoped
public class TestService {
@Inject
TestEntityRepository repository;
@Inject
EventBus eventBus;
public void startUp(@Observes StartupEvent event) {
eventBus.sendAndForget("test_topic", "eventbus call");
}
@ConsumeEvent(value = "test_topic", blocking = true)
@Transactional
public Uni<Void> listEntities(String testMessage) {
log.info("通过以下方式打印所有实体:" + testMessage);
try {
repository.findAll().forEach(log::info);
return Uni.createFrom().voidItem();
} catch (Exception e) {
return Uni.createFrom().failure(e);
}
}
}
英文:
It turned out that the EventBus was swallowing the exception about the blocking call and that returning a reactive response like an Uni<Void> or even explicitly wrapping the database call into a Uni will still use the same thread, instead of a worker thread as expected.
The solution was to use blocking=true on the event listener. So it does not set the expected behaviour (as I thought) but prepares the event loop for a blocking call..
The working code is:
@Slf4j
@Startup
@ApplicationScoped
public class TestService {
@Inject
TestEntityRepository repository;
@Inject
EventBus eventBus;
public void startUp(@Observes StartupEvent event) {
eventBus.sendAndForget("test_topic", "eventbus call");
}
@ConsumeEvent(value = "test_topic", blocking = true)
@Transactional
public Uni<Void> listEntities(String testMessage) {
log.info("Printing all entities via: " + testMessage);
try {
repository.findAll().forEach(log::info);
return Uni.createFrom().voidItem();
} catch (Exception e) {
return Uni.createFrom().failure(e);
}
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论