初始令牌位置在 axon kafka 扩展中

huangapple go评论42阅读模式
英文:

Initial token position in axon kafka extension

问题

I'm using Axon kafka extension (4.5.4) to receive events from multiple sources (MultiStreamableMessageSource). How can I set initial token position (head) for StreamableMessageSource?

使用Axon Kafka扩展(4.5.4)从多个来源(MultiStreamableMessageSource)接收事件。如何设置StreamableMessageSource的初始标记位置(头部)?

With this configuration

使用此配置

val config = TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
        .andInitialTrackingToken { it.createHeadToken() }

i get UnsupportedOperationException.

我得到UnsupportedOperationException

As far as I understand, I need to write a custom implementation of creating a KafkaTrackingToken in the andInitialTrackingToken lambda. Is there an example? What if I don't know the specific partition offsets? Also, I have multiple MessageSource and it would look ugly.

据我所了解,我需要编写一个自定义实现,在andInitialTrackingToken lambda中创建KafkaTrackingToken。有示例吗?如果我不知道特定的分区偏移量怎么办?而且我有多个MessageSource,看起来会很丑陋。

PS. I know I could use SubscribableMessageSource, but that doesn't work for me, because I need to combine several MessageSources into one (like MultiStreamableMessageSource does)

附注:我知道我可以使用SubscribableMessageSource,但对我来说不适用,因为我需要将多个MessageSources组合成一个(就像MultiStreamableMessageSource一样)。

英文:

I'm using Axon kafka extension (4.5.4) to receive events from multiple sources (MultiStreamableMessageSource). How can I set initial token position (head) for StreamableMessageSource?

With this configuration

val config = TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
        .andInitialTrackingToken { it.createHeadToken() }

i get UnsupportedOperationException.

As far as I understand, I need to write a custom implementation of creating a KafkaTrackingToken in the andInitialTrackingToken lambda. Is there an example? What if I don't know the specific partition offsets? Also, I have multiple MessageSource and it would look ugly.

PS. I know I could use SubscribableMessageSource, but that doesn't work for me, because I need to combine several MessageSources into one (like MultiStreamableMessageSource does)

答案1

得分: 2

I will provide the translation for the code-related text you've provided:

"It's not supported, not even with the latest, 4.7.0 version. In theory it should be possible, at least head, tail, and by timestamp. The issue to support those is now merged. So this will be supported from the 4.8.0 release, which should happen in a few months.

Currently, it will always default to offset 0, if there is no known offset. It's the ConsumerSeekUtil that does this. With the pr, other tokens should be supported, which can move to a timestamp, or the end, when no offset is yet there."

英文:

It's not supported, not even with the latest, 4.7.0 version. In theory it should be possible, at least head, tail, and by timestamp. The issue to support those is now merged. So this will be supported from the 4.8.0 release, which should happen in a few months.

Currently, it will always default to offset 0, if there is no known offset. It's the ConsumerSeekUtil that does this. With the pr, other tokens should be supported, which can move to a timestamp, or the end, when no offset is yet there.

huangapple
  • 本文由 发表于 2023年4月11日 05:45:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/75980970.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定