延迟消息消费的开始

huangapple go评论67阅读模式
英文:

Delay the start of message consumption

问题

我有一个非常简单的Reactor Kafka消费者。

然而,为了正确处理消息,必须先执行一些@Postconstruct。

@Postconstruct将在内存中初始化一些对象,准备数据等等...

如果消息开始被消耗,但@Postconstruct没有完成,消息处理肯定会失败。

@Postconstruct需要一些时间来完成,大约3秒左右。

我尝试过:

因此,目前在我的代码中,我只会处理消息,并在最初的3秒左右,所有消息处理都会失败,然后我会将失败的消息放入另一个主题。

在一段时间后,大约3秒后,@Postconstruct完成,消息会正常处理,一切正常。

然而,我发现这非常多余,我有另一个主题和另一个工作,只是在这里重新处理在这3秒内失败的消息。

有没有更简单的方法,告诉Reactor Kafka只有在@Postconstruct完成后才开始消费(也许通过在应用程序内发送一些信号),或者甚至只等待3秒左右?

到目前为止,我尝试了这个:

public Flux<String> myConsumer() {
        return KafkaReceiver
                .create(receiverOptions)
                .receive()
                .delayUntil(a -> Mono.delay(Duration.ofSeconds(4)))
                .flatMap((oneMessage) ->
                        Mono.deferContextual(contextView -> {
                            var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView);
                            try (scope) {
                                return consume(oneMessage);
                            }
                        }), 500)
                .name("greeting.call")		//1
                .tag("latency", "low")	//2
                .tap(Micrometer.observation(observationRegistry));

然而,它没有做我期望的事情,它只是延迟。

英文:

I have a very simple Reactor Kafka consumer.

However, in order for the message to be handled properly, couple of @Postconstruct should be executed beforehand.

The @Postconstruct will initialize some objects in memory, prepare data, etc...

If the messages start being consumed, but the @Postconstruct is not complete, the message handling will fail for sure.

The @Postconstruct needs some time to complete, roughly 3ish seconds.
What I tried:
Therefore, currently, in my code, I would just handle the messages, and for the first 3ish seconds, all messages processing will fail, and I would put the failed messages in another topic.

At some point, after the 3ish seconds, the @Postconstruct is complete, the messages are being processed normally, happy.

However, I found this very redundant, I have another topic and another job which existence is just here to re process the messages failed within those 3 seconds.

Is there a simpler way, to tell Reactor Kafka to only start consuming, only once the @Postconstruct is complete (maybe by sending some signal within the app), or maybe even just to wait 3ish seconds?

So far, I tried this:

  public Flux&lt;String&gt; myConsumer() {
        return KafkaReceiver
                .create(receiverOptions)
                .receive()
                .delayUntil(a -&gt; Mono.delay(Duration.ofSeconds(4)))
                .flatMap((oneMessage) -&gt;
                        Mono.deferContextual(contextView -&gt; {
                            var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView);
                            try (scope) {
                                return consume(oneMessage);
                            }
                        }), 500)
                .name(&quot;greeting.call&quot;)		//1
                .tag(&quot;latency&quot;, &quot;low&quot;)	//2
                .tap(Micrometer.observation(observationRegistry));

However, it is not doing what I expect, it is just delaying.

答案1

得分: 1

不要在后置构造方法中使用 subscribe() 订阅 Flux。而是实现 SmartLifecycle 并在 start() 中订阅。

或者使用事件监听器,在上下文刷新后才订阅。

这样,在开始接收记录之前,所有的 Bean 都会被完全连接起来。

英文:

You should not subscribe() to the flux in a post construct method. Instead, implement SmartLifecycle and subscribe in start().

Or use an event listener and only subscribe after the context is refreshed.

That way, all beans will be fully wired up before you start to receive the records.

huangapple
  • 本文由 发表于 2023年2月8日 14:30:21
  • 转载请务必保留本文链接:https://go.coder-hub.com/75382084.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定