英文:
Delay the start of message consumption
问题
我有一个非常简单的Reactor Kafka消费者。
然而,为了正确处理消息,必须先执行一些@Postconstruct。
@Postconstruct将在内存中初始化一些对象,准备数据等等...
如果消息开始被消耗,但@Postconstruct没有完成,消息处理肯定会失败。
@Postconstruct需要一些时间来完成,大约3秒左右。
我尝试过:
因此,目前在我的代码中,我只会处理消息,并在最初的3秒左右,所有消息处理都会失败,然后我会将失败的消息放入另一个主题。
在一段时间后,大约3秒后,@Postconstruct完成,消息会正常处理,一切正常。
然而,我发现这非常多余,我有另一个主题和另一个工作,只是在这里重新处理在这3秒内失败的消息。
有没有更简单的方法,告诉Reactor Kafka只有在@Postconstruct完成后才开始消费(也许通过在应用程序内发送一些信号),或者甚至只等待3秒左右?
到目前为止,我尝试了这个:
public Flux<String> myConsumer() {
return KafkaReceiver
.create(receiverOptions)
.receive()
.delayUntil(a -> Mono.delay(Duration.ofSeconds(4)))
.flatMap((oneMessage) ->
Mono.deferContextual(contextView -> {
var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView);
try (scope) {
return consume(oneMessage);
}
}), 500)
.name("greeting.call") //1
.tag("latency", "low") //2
.tap(Micrometer.observation(observationRegistry));
然而,它没有做我期望的事情,它只是延迟。
英文:
I have a very simple Reactor Kafka consumer.
However, in order for the message to be handled properly, couple of @Postconstruct should be executed beforehand.
The @Postconstruct will initialize some objects in memory, prepare data, etc...
If the messages start being consumed, but the @Postconstruct is not complete, the message handling will fail for sure.
The @Postconstruct needs some time to complete, roughly 3ish seconds.
What I tried:
Therefore, currently, in my code, I would just handle the messages, and for the first 3ish seconds, all messages processing will fail, and I would put the failed messages in another topic.
At some point, after the 3ish seconds, the @Postconstruct is complete, the messages are being processed normally, happy.
However, I found this very redundant, I have another topic and another job which existence is just here to re process the messages failed within those 3 seconds.
Is there a simpler way, to tell Reactor Kafka to only start consuming, only once the @Postconstruct is complete (maybe by sending some signal within the app), or maybe even just to wait 3ish seconds?
So far, I tried this:
public Flux<String> myConsumer() {
return KafkaReceiver
.create(receiverOptions)
.receive()
.delayUntil(a -> Mono.delay(Duration.ofSeconds(4)))
.flatMap((oneMessage) ->
Mono.deferContextual(contextView -> {
var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView);
try (scope) {
return consume(oneMessage);
}
}), 500)
.name("greeting.call") //1
.tag("latency", "low") //2
.tap(Micrometer.observation(observationRegistry));
However, it is not doing what I expect, it is just delaying.
答案1
得分: 1
不要在后置构造方法中使用 subscribe()
订阅 Flux。而是实现 SmartLifecycle
并在 start()
中订阅。
或者使用事件监听器,在上下文刷新后才订阅。
这样,在开始接收记录之前,所有的 Bean 都会被完全连接起来。
英文:
You should not subscribe()
to the flux in a post construct method. Instead, implement SmartLifecycle
and subscribe in start()
.
Or use an event listener and only subscribe after the context is refreshed.
That way, all beans will be fully wired up before you start to receive the records.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论