如何在Spring Cloud Stream中获取关联ID

huangapple go评论60阅读模式
英文:

How to get correlation id in Spring Cloud Stream

问题

以下是翻译好的部分:

Spring团队,

下面的生产者可以成功地将值发送到Kafka主题。

@Bean
Supplier<Flux<Integer>> someProducer(){
    return () -> Flux.range(1, 10);
}

但是...我们如何获取像使用ReactiveKafkaSender一样获取消息产生的关联ID?由于Flux是由Spring内部订阅的,是否有任何方法可以获取?

英文:

Spring Team,

The below producer is able to send the values to a kafka topic successfully.

@Bean
Supplier&lt;Flux&lt;Integer&gt;&gt; someProducer(){
	return () -&gt; Flux.range(1, 10);
}

But..how do we get the correlation id of the message produced as we get using ReactiveKafkaSender? Since the flux is subscribed by Spring internally, Is there any way to get?

答案1

得分: 1

Currently, the binder does not support getting the complete SenderResult, only the RecordMetadata of a successful send.

Please open a bug on GitHub (spring-cloud-stream) and reference this question.

To get the RecordMetadata you can use Supplier&lt;Flux&lt;Message&lt;Integer&gt;&gt;&gt; and set the senderResult header in the message to an AtomicInteger&lt;Mono&lt;RecordMetadata&gt;&gt;; it will be populated with a Mono<RecordMeta> which you can subscribe to.

There's a test here: https://github.com/spring-cloud/spring-cloud-stream/blob/29c3cd7cddf9b853c57fca2b2118f1b64e5dde30/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderTests.java#L315-L323

However, I can see that this is not much use without the correlation metadata.

英文:

Currently, the binder does not support getting the complete SenderResult, only the RecordMetadata of a successful send.

Please open a bug on GitHub (spring-cloud-stream) and reference this question.

To get the RecordMetadata you can use Supplier&lt;Flux&lt;Message&lt;Integer&gt;&gt;&gt; and set the senderResult header in the message to an AtomicInteger&lt;Mono&lt;RecordMetadata&gt;&gt;; it will be populated with a Mono<RecordMeta> which you can subscribe to.

There's a test here: https://github.com/spring-cloud/spring-cloud-stream/blob/29c3cd7cddf9b853c57fca2b2118f1b64e5dde30/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderTests.java#L315-L323

However, I can see that this is not much use without the correlation metadata.

huangapple
  • 本文由 发表于 2023年5月11日 03:05:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/76221809.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定