触发 N 次事件后,重新启动客户端。之前触发的事件将按顺序触发。

huangapple go评论140阅读模式
英文:

After triggering N events, restart the client. Previously triggered events will be triggered in sequence

问题

axonframework 版本:4.3.3

axon server 版本:4.3.5

发布事件:

    @PostMapping(value = "createPost")
    public void createPost(@RequestBody CreatePostDto createPostDto) {
        eventGateway.publish(new PostCreateEvent(UUID.randomUUID().toString(), createPostDto.getTitle(),
                createPostDto.getContent()));
    }

事件处理程序:

    @EventHandler
    public void handle(PostCreateEvent event) {
        System.out.println("在事件处理程序中");
    }

问题:
    触发 N 个事件后,重新启动客户端。之前触发的事件将按顺序触发

日志:

    2020-07-21 20:01:43.825  INFO 11676 --- [           main] o.s.b.a.h2.H2ConsoleAutoConfiguration    : H2 控制台可在 '/h2-console' 访问。数据库可在 'jdbc:h2:mem:testdb' 访问
    2020-07-21 20:01:43.935  INFO 11676 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : 正在初始化 ExecutorService 'applicationTaskExecutor'
    2020-07-21 20:01:43.980  INFO 11676 --- [         task-1] o.hibernate.jpa.internal.util.LogHelper  : HHH000204: 处理 PersistenceUnitInfo [名称: default]
    ...(略去部分日志)
    2020-07-21 20:01:45.492  INFO 11676 --- [axon.service]-0] o.a.e.TrackingEventProcessor             : 分配工作器到分段 Segment[0/0] 进行处理
    2020-07-21 20:01:45.489  INFO 11676 --- [axon.service]-0] o.a.e.TrackingEventProcessor             : 对于最后一个段工作器,使用当前线程:TrackingSegmentWorker{processor=cn.sailing.springbootaxon.service, segment=Segment[0/0]}
    ...(略去部分日志)
    在事件处理程序中
    在事件处理程序中
    在事件处理程序中
    在事件处理程序中
英文:

axonframework version:4.3.3

axon server version: 4.3.5

publish event:

@PostMapping(value = "createPost")
public void createPost(@RequestBody CreatePostDto createPostDto) {        
    eventGateway.publish(new PostCreateEvent(UUID.randomUUID().toString(), createPostDto.getTitle(),
            createPostDto.getContent()));
}

event handler:

@EventHandler
public void handle(PostCreateEvent event) {
    System.out.println("in event handler");
}

question:
After triggering N events, restart the client. Previously triggered events will be triggered in sequence

logs:

2020-07-21 20:01:43.825  INFO 11676 --- [           main] o.s.b.a.h2.H2ConsoleAutoConfiguration    : H2 console available at '/h2-console'. Database available at 'jdbc:h2:mem:testdb'
2020-07-21 20:01:43.935  INFO 11676 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-07-21 20:01:43.980  INFO 11676 --- [         task-1] o.hibernate.jpa.internal.util.LogHelper  : HHH000204: Processing PersistenceUnitInfo [name: default]
2020-07-21 20:01:44.023  INFO 11676 --- [         task-1] org.hibernate.Version                    : HHH000412: Hibernate ORM core version 5.4.17.Final
2020-07-21 20:01:44.137  INFO 11676 --- [         task-1] o.hibernate.annotations.common.Version   : HCANN000001: Hibernate Commons Annotations {5.1.0.Final}
2020-07-21 20:01:44.153  INFO 11676 --- [           main] o.a.serialization.ChainingConverter      : ContentTypeConverter of type [class org.axonframework.serialization.xml.XomToStringConverter] is ignored. It seems to rely on a class that is not available in the class loader: nu/xom/Document
2020-07-21 20:01:44.163  INFO 11676 --- [           main] o.a.serialization.ChainingConverter      : ContentTypeConverter of type [class org.axonframework.serialization.xml.InputStreamToXomConverter] is ignored. It seems to rely on a class that is not available in the class loader: nu/xom/ParsingException
2020-07-21 20:01:44.232  INFO 11676 --- [         task-1] org.hibernate.dialect.Dialect            : HHH000400: Using dialect: org.hibernate.dialect.H2Dialect
2020-07-21 20:01:44.258  WARN 11676 --- [           main] JpaBaseConfiguration$JpaWebConfiguration : spring.jpa.open-in-view is enabled by default. Therefore, database queries may be performed during view rendering. Explicitly configure spring.jpa.open-in-view to disable this warning
2020-07-21 20:01:44.521  INFO 11676 --- [           main] o.a.a.c.AxonServerConnectionManager      : Connecting using unencrypted connection...
2020-07-21 20:01:44.871  INFO 11676 --- [           main] o.a.a.c.AxonServerConnectionManager      : Requesting connection details from localhost:8124
2020-07-21 20:01:44.885  INFO 11676 --- [         task-1] o.h.e.t.j.p.i.JtaPlatformInitiator       : HHH000490: Using JtaPlatform implementation: [org.hibernate.engine.transaction.jta.platform.internal.NoJtaPlatform]
2020-07-21 20:01:44.900  INFO 11676 --- [         task-1] j.LocalContainerEntityManagerFactoryBean : Initialized JPA EntityManagerFactory for persistence unit 'default'
2020-07-21 20:01:45.205  INFO 11676 --- [           main] o.a.a.c.AxonServerConnectionManager      : Reusing existing channel
2020-07-21 20:01:45.211  INFO 11676 --- [           main] o.a.a.c.AxonServerConnectionManager      : Re-subscribing commands and queries
2020-07-21 20:01:45.228  INFO 11676 --- [           main] o.a.a.c.query.AxonServerQueryBus         : Creating new query stream subscriber
2020-07-21 20:01:45.272  INFO 11676 --- [           main] o.a.a.c.command.AxonServerCommandBus     : Creating new command stream subscriber
2020-07-21 20:01:45.297  INFO 11676 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2020-07-21 20:01:45.298  INFO 11676 --- [           main] DeferredRepositoryInitializationListener : Triggering deferred initialization of Spring Data repositories…
2020-07-21 20:01:45.400  INFO 11676 --- [           main] DeferredRepositoryInitializationListener : Spring Data repositories initialized!
2020-07-21 20:01:45.408  INFO 11676 --- [           main] c.s.s.SpringBootAxonApplication          : Started SpringBootAxonApplication in 3.394 seconds (JVM running for 3.674)
2020-07-21 20:01:45.487  INFO 11676 --- [axon.service]-0] o.a.e.TrackingEventProcessor             : Worker assigned to segment Segment[0/0] for processing
2020-07-21 20:01:45.489  INFO 11676 --- [axon.service]-0] o.a.e.TrackingEventProcessor             : Using current Thread for last segment worker: TrackingSegmentWorker{processor=cn.sailing.springbootaxon.service, segment=Segment[0/0]}
2020-07-21 20:01:45.492  INFO 11676 --- [axon.service]-0] o.a.e.TrackingEventProcessor             : Fetched token: null for segment: Segment[0/0]
2020-07-21 20:01:45.496  INFO 11676 --- [axon.service]-0] o.a.a.c.event.axon.AxonServerEventStore  : open stream: 0
in event handler
in event handler
in event handler
in event handler

答案1

得分: 3

抱歉,我无法理解你的问题。能否请你更清楚地描述一下?

无论如何,请记住,从你的日志中我看到你的客户端应用程序是从内存数据库启动的。这意味着每次重新启动客户端时,数据都会丢失,并且事件存储中的所有事件将被重新处理。

为了澄清这一点,你可以配置你的应用程序将数据库保存在文件中:
spring.datasource.url=jdbc:h2:./runtime/client/testdb;AUTO_SERVER=TRUE
Axon Framework 将负责存储有关上次处理的令牌的信息:这将防止你的应用程序再次处理事件存储中的所有事件。

你可以在参考指南中找到更多信息:
https://docs.axoniq.io/reference-guide/configuring-infrastructure-components/event-processing/event-processors#token-store

祝一切顺利。

英文:

sorry but I can't understand what is your question.
Can you please write it down better?

In any case, remember that from your log I see that your client application start with an in memory db. This means that every time you restart the client data are lost and all your events store in the event store will be handled again.

To clarify this you can configure your application to save your db in a file
spring.datasource.url=jdbc:h2:./runtime/client/testdb;AUTO_SERVER=TRUE
Axon Framework will take care to store information about the last processed token : this will prevent your application to handle all the events in the event store again.

You can find further information on the reference guide
https://docs.axoniq.io/reference-guide/configuring-infrastructure-components/event-processing/event-processors#token-store

Best.

答案2

得分: 0

最终解决方案:

@Configuration
public class AxonConfig {

    @Autowired
    public void configureProcessors(EventProcessingConfigurer eventProcessingConfigurer) {
        TrackingEventProcessorConfiguration tepConfig = TrackingEventProcessorConfiguration.forSingleThreadedProcessing().andInitialTrackingToken(StreamableMessageSource::createHeadToken);
        eventProcessingConfigurer.registerTrackingEventProcessorConfiguration(config -> tepConfig);
    }

}
英文:

Final solution:

@Configuration
public class AxonConfig {

    @Autowired
    public void configureProcessors(EventProcessingConfigurer eventProcessingConfigurer) {
        TrackingEventProcessorConfiguration tepConfig = TrackingEventProcessorConfiguration.forSingleThreadedProcessing().andInitialTrackingToken(StreamableMessageSource::createHeadToken);
        eventProcessingConfigurer.registerTrackingEventProcessorConfiguration(config -> tepConfig);
    }

}

huangapple
  • 本文由 发表于 2020年7月21日 20:27:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/63014565.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定