英文:
akka.pattern.AskTimeoutException is thrown while persisting an event
问题
我刚刚开始使用 Lagom 和 Akka。我正在遵循 使用 Akka Persistence Typed 进行领域建模 中描述的设计。
我尝试创建一个全新的实体实例(EntityState
)。但是事件没有被持久化,我得到了以下错误:
00:54:27.862 [error] com.example.impl.entity.EntityClass [persistencePhase=running-cmd, akkaAddress=akka://XXX@127.0.0.1:60685, akkaSource=akka://XXX/system/sharding/StateClass/186/ID1, sourceActorSystem=XXX, persistenceId=StateClass|ID1] - Supervisor StopSupervisor saw failure: null
java.lang.NullPointerException: null
at akka.persistence.typed.javadsl.EventSourcedBehavior.$anonfun$apply$4(EventSourcedBehavior.scala:195)
at akka.persistence.typed.internal.Running$RunningState.applyEvent(Running.scala:78)
at akka.persistence.typed.internal.Running$HandlingCommands.applyEffects(Running.scala:153)
at akka.persistence.typed.internal.Running$HandlingCommands.onCommand(Running.scala:123)
at akka.persistence.typed.internal.Running$HandlingCommands.onMessage(Running.scala:105)
at akka.persistence.typed.internal.Running$HandlingCommands.onMessage(Running.scala:100)
at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:83)
我有一个 Create 命令,调用 onCreate()
,最终尝试持久化一个 EntityCreated 事件。
服务实现方法
@Override
public ServiceCall<CreateMessage, StateView> createState(){
return message ->
entityRef(message.getName())
.<EntityClass.Accepted>ask(replyTo -> new EntityClass.Create(message, replyTo), askTimeout)
.thenApply(accepted -> toStateView(accepted.getSummary()));
}
命令处理程序:
private ReplyEffect<Event, StateClass> onCreate(StateClass state, Create cmd) {
return Effect()
.persist(new EntityCreated(cmd.getDetails().getName(), Instant.now()))
.thenReply(cmd.replyTo, e -> new Accepted(EntityClass.toSummary(e)));
}
我能够确认以下内容:
- 在
persist()
过程中抛出异常 - 事件在 Cassandra 中不存在
非常感谢你的帮助!提前谢谢!
英文:
I just started with Lagom & Akka. I am following the design decribed in Domain Modelling with Akka Persistence Typed
I am trying to create a brand new instance of an entity (EntityState
). But the event is not getting persisted, and I am getting the following error:
00:54:27.862 [error] com.example.impl.entity.EntityClass [persistencePhase=running-cmd, akkaAddress=akka://XXX@127.0.0.1:60685, akkaSource=akka://XXX/system/sharding/StateClass/186/ID1, sourceActorSystem=XXX, persistenceId=StateClass|ID1] - Supervisor StopSupervisor saw failure: null
java.lang.NullPointerException: null
at akka.persistence.typed.javadsl.EventSourcedBehavior.$anonfun$apply$4(EventSourcedBehavior.scala:195)
at akka.persistence.typed.internal.Running$RunningState.applyEvent(Running.scala:78)
at akka.persistence.typed.internal.Running$HandlingCommands.applyEffects(Running.scala:153)
at akka.persistence.typed.internal.Running$HandlingCommands.onCommand(Running.scala:123)
at akka.persistence.typed.internal.Running$HandlingCommands.onMessage(Running.scala:105)
at akka.persistence.typed.internal.Running$HandlingCommands.onMessage(Running.scala:100)
at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:83)
I have a Create command, which invokes onCreate()
, and eventually attempts to persist an EntityCreated event.
Service Impl method
@Override
public ServiceCall<CreateMessage, StateView> createState(){
return message ->
entityRef(message.getName())
.<EntityClass.Accepted>ask(replyTo -> new EntityClass.Create(message, replyTo), askTimeout)
.thenApply(accepted -> toStateView(accepted.getSummary()));
}
Command handler:
private ReplyEffect<Event, StateClass> onCreate(StateClass state, Create cmd) {
return Effect()
.persist(new EntityCreated(cmd.getDetails().getName(), Instant.now()))
.thenReply(cmd.replyTo, e -> new Accepted(EntityClass.toSummary(e)));
}
I am able to confirm the following:
- exception is thrown during
persist()
- the event is not present in Cassandra
Your help is appreciated. Thank you in advance!
答案1
得分: 0
看起来异常的真正原因是因为我应该添加处理事件逻辑,如下所示:
在 helloEvents()
中,我需要添加类似以下的逻辑:
if (eventAndOffset.first() instanceof HelloEvent.GreetingMessageChanged) {
HelloEvent.GreetingMessageChanged messageChanged = (HelloEvent.GreetingMessageChanged) eventAndOffset.first();
eventToPublish = new GreetingMessageChanged(messageChanged.getName(), messageChanged.getMessage());
}
此外,在聚合的 eventHandler()
中,我需要添加类似以下的逻辑:
builder.forAnyState()
.onEvent(GreetingMessageChanged.class, (state, evt) ->
// 我们只需更新当前状态以使用来自事件的问候消息。
state.withMessage(evt.message)
);
英文:
It seems that the real cause for the exception was because I should have added logic for handling the event as follows:
in helloEvents()
, I needed to add logic similar to the following:
if (eventAndOffset.first() instanceof HelloEvent.GreetingMessageChanged) {
HelloEvent.GreetingMessageChanged messageChanged = (HelloEvent.GreetingMessageChanged) eventAndOffset.first();
eventToPublish = new GreetingMessageChanged(messageChanged.getName(), messageChanged.getMessage());
}
In Addition, in the aggregate's eventHandler()
, I needed to add logic similar to the following:
builder.forAnyState()
.onEvent(GreetingMessageChanged.class, (state, evt) ->
// We simply update the current state to use the greeting message from
// the event.
state.withMessage(evt.message)
);
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论