英文:
Initial token position in axon kafka extension
问题
I'm using Axon kafka extension (4.5.4) to receive events from multiple sources (MultiStreamableMessageSource). How can I set initial token position (head) for StreamableMessageSource?
使用Axon Kafka扩展(4.5.4)从多个来源(MultiStreamableMessageSource)接收事件。如何设置StreamableMessageSource的初始标记位置(头部)?
With this configuration
使用此配置
val config = TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
.andInitialTrackingToken { it.createHeadToken() }
i get UnsupportedOperationException.
我得到UnsupportedOperationException。
As far as I understand, I need to write a custom implementation of creating a KafkaTrackingToken in the andInitialTrackingToken lambda. Is there an example? What if I don't know the specific partition offsets? Also, I have multiple MessageSource and it would look ugly.
据我所了解,我需要编写一个自定义实现,在andInitialTrackingToken lambda中创建KafkaTrackingToken。有示例吗?如果我不知道特定的分区偏移量怎么办?而且我有多个MessageSource,看起来会很丑陋。
PS. I know I could use SubscribableMessageSource, but that doesn't work for me, because I need to combine several MessageSources into one (like MultiStreamableMessageSource does)
附注:我知道我可以使用SubscribableMessageSource,但对我来说不适用,因为我需要将多个MessageSources组合成一个(就像MultiStreamableMessageSource一样)。
英文:
I'm using Axon kafka extension (4.5.4) to receive events from multiple sources (MultiStreamableMessageSource). How can I set initial token position (head) for StreamableMessageSource?
With this configuration
val config = TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
.andInitialTrackingToken { it.createHeadToken() }
i get UnsupportedOperationException.
As far as I understand, I need to write a custom implementation of creating a KafkaTrackingToken in the andInitialTrackingToken lambda. Is there an example? What if I don't know the specific partition offsets? Also, I have multiple MessageSource and it would look ugly.
PS. I know I could use SubscribableMessageSource, but that doesn't work for me, because I need to combine several MessageSources into one (like MultiStreamableMessageSource does)
答案1
得分: 2
I will provide the translation for the code-related text you've provided:
"It's not supported, not even with the latest, 4.7.0 version. In theory it should be possible, at least head, tail, and by timestamp. The issue to support those is now merged. So this will be supported from the 4.8.0 release, which should happen in a few months.
Currently, it will always default to offset 0, if there is no known offset. It's the ConsumerSeekUtil that does this. With the pr, other tokens should be supported, which can move to a timestamp, or the end, when no offset is yet there."
英文:
It's not supported, not even with the latest, 4.7.0 version. In theory it should be possible, at least head, tail, and by timestamp. The issue to support those is now merged. So this will be supported from the 4.8.0 release, which should happen in a few months.
Currently, it will always default to offset 0, if there is no known offset. It's the ConsumerSeekUtil that does this. With the pr, other tokens should be supported, which can move to a timestamp, or the end, when no offset is yet there.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论