akka.pattern.AskTimeoutException 在持久化事件时被抛出

huangapple go评论81阅读模式
英文:

akka.pattern.AskTimeoutException is thrown while persisting an event

问题

我刚刚开始使用 Lagom 和 Akka。我正在遵循 使用 Akka Persistence Typed 进行领域建模 中描述的设计。

我尝试创建一个全新的实体实例(EntityState)。但是事件没有被持久化,我得到了以下错误:

00:54:27.862 [error] com.example.impl.entity.EntityClass [persistencePhase=running-cmd, akkaAddress=akka://XXX@127.0.0.1:60685, akkaSource=akka://XXX/system/sharding/StateClass/186/ID1, sourceActorSystem=XXX, persistenceId=StateClass|ID1] - Supervisor StopSupervisor saw failure: null
java.lang.NullPointerException: null
	at akka.persistence.typed.javadsl.EventSourcedBehavior.$anonfun$apply$4(EventSourcedBehavior.scala:195)
	at akka.persistence.typed.internal.Running$RunningState.applyEvent(Running.scala:78)
	at akka.persistence.typed.internal.Running$HandlingCommands.applyEffects(Running.scala:153)
	at akka.persistence.typed.internal.Running$HandlingCommands.onCommand(Running.scala:123)
	at akka.persistence.typed.internal.Running$HandlingCommands.onMessage(Running.scala:105)
	at akka.persistence.typed.internal.Running$HandlingCommands.onMessage(Running.scala:100)
	at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:83) 

我有一个 Create 命令,调用 onCreate(),最终尝试持久化一个 EntityCreated 事件。

服务实现方法

@Override
public ServiceCall<CreateMessage, StateView> createState(){
    return message ->
            entityRef(message.getName())
                    .<EntityClass.Accepted>ask(replyTo -> new EntityClass.Create(message, replyTo), askTimeout)
                    .thenApply(accepted -> toStateView(accepted.getSummary()));
}

命令处理程序:

private ReplyEffect<Event, StateClass> onCreate(StateClass state, Create cmd) {
    return Effect()
            .persist(new EntityCreated(cmd.getDetails().getName(), Instant.now()))
            .thenReply(cmd.replyTo, e -> new Accepted(EntityClass.toSummary(e)));
}

我能够确认以下内容:

  • persist() 过程中抛出异常
  • 事件在 Cassandra 中不存在

非常感谢你的帮助!提前谢谢!

英文:

I just started with Lagom & Akka. I am following the design decribed in Domain Modelling with Akka Persistence Typed

I am trying to create a brand new instance of an entity (EntityState). But the event is not getting persisted, and I am getting the following error:

00:54:27.862 [error] com.example.impl.entity.EntityClass [persistencePhase=running-cmd, akkaAddress=akka://XXX@127.0.0.1:60685, akkaSource=akka://XXX/system/sharding/StateClass/186/ID1, sourceActorSystem=XXX, persistenceId=StateClass|ID1] - Supervisor StopSupervisor saw failure: null
java.lang.NullPointerException: null
	at akka.persistence.typed.javadsl.EventSourcedBehavior.$anonfun$apply$4(EventSourcedBehavior.scala:195)
	at akka.persistence.typed.internal.Running$RunningState.applyEvent(Running.scala:78)
	at akka.persistence.typed.internal.Running$HandlingCommands.applyEffects(Running.scala:153)
	at akka.persistence.typed.internal.Running$HandlingCommands.onCommand(Running.scala:123)
	at akka.persistence.typed.internal.Running$HandlingCommands.onMessage(Running.scala:105)
	at akka.persistence.typed.internal.Running$HandlingCommands.onMessage(Running.scala:100)
	at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:83) 

I have a Create command, which invokes onCreate(), and eventually attempts to persist an EntityCreated event.

Service Impl method

@Override
public ServiceCall&lt;CreateMessage, StateView&gt; createState(){
    return message -&gt;
            entityRef(message.getName())
                    .&lt;EntityClass.Accepted&gt;ask(replyTo -&gt; new EntityClass.Create(message, replyTo), askTimeout)
                    .thenApply(accepted -&gt; toStateView(accepted.getSummary()));
}

Command handler:

private ReplyEffect&lt;Event, StateClass&gt; onCreate(StateClass state, Create cmd) {
    return Effect()
            .persist(new EntityCreated(cmd.getDetails().getName(), Instant.now()))
            .thenReply(cmd.replyTo, e -&gt; new Accepted(EntityClass.toSummary(e)));
}

I am able to confirm the following:

  • exception is thrown during persist()
  • the event is not present in Cassandra

Your help is appreciated. Thank you in advance!

答案1

得分: 0

看起来异常的真正原因是因为我应该添加处理事件逻辑,如下所示:
helloEvents() 中,我需要添加类似以下的逻辑:

if (eventAndOffset.first() instanceof HelloEvent.GreetingMessageChanged) {
    HelloEvent.GreetingMessageChanged messageChanged = (HelloEvent.GreetingMessageChanged) eventAndOffset.first();
    eventToPublish = new GreetingMessageChanged(messageChanged.getName(), messageChanged.getMessage());
}

此外,在聚合的 eventHandler() 中,我需要添加类似以下的逻辑:

builder.forAnyState()
    .onEvent(GreetingMessageChanged.class, (state, evt) ->
        // 我们只需更新当前状态以使用来自事件的问候消息。
        state.withMessage(evt.message)
);
英文:

It seems that the real cause for the exception was because I should have added logic for handling the event as follows:
in helloEvents(), I needed to add logic similar to the following:

if (eventAndOffset.first() instanceof HelloEvent.GreetingMessageChanged) {
    HelloEvent.GreetingMessageChanged messageChanged = (HelloEvent.GreetingMessageChanged) eventAndOffset.first();
    eventToPublish = new GreetingMessageChanged(messageChanged.getName(), messageChanged.getMessage());
}

In Addition, in the aggregate's eventHandler(), I needed to add logic similar to the following:

builder.forAnyState()
    .onEvent(GreetingMessageChanged.class, (state, evt) -&gt;
        // We simply update the current state to use the greeting message from
        // the event.
        state.withMessage(evt.message)
);

huangapple
  • 本文由 发表于 2020年8月28日 02:30:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/63622186.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定