英文:
Send message in specific partition of Topic
问题
以下是您要翻译的内容:
生产者是否有办法将消息发送到代理中主题的特定分区?
目前,我可以发送具有2个分区的主题,但无法控制发送到特定分区。
组件
@EnableBinding(Source.class)
public class RsvpsKafkaProducer {
private static final int SENDING_MESSAGE_TIMEOUT_MS = 10000;
private final Source source;
public RsvpsKafkaProducer(Source source) {
this.source = source;
}
public void sendRsvpMessage(WebSocketMessage<?> message) {
System.out.println("sendRsvpMessage");
source.output()
.send(MessageBuilder.withPayload(message.getPayload())
.build(),
SENDING_MESSAGE_TIMEOUT_MS);
}
}
application.properties
spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
spring.cloud.stream.kafka.binder.brokers=localhost:9093
spring.cloud.stream.bindings.output.destination=meetupTopic
spring.cloud.stream.bindings.output.producer.partitionCount=2
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.bindings.output.producer.headerMode=raw
我是否可以使用Spring Cloud Stream 实现此目标?我希望一些消息进入“P1”分区,另一些消息进入“P2”分区,都在“meetupTopic”主题内。
英文:
Is there any way producer could send messages to specific partiton of topic in broker ?
As of now, am able to send topic having 2 partitions, but dont have control to send in specific partition.
Component
@EnableBinding(Source.class)
public class RsvpsKafkaProducer {
private static final int SENDING_MESSAGE_TIMEOUT_MS = 10000;
private final Source source;
public RsvpsKafkaProducer(Source source) {
this.source = source;
}
public void sendRsvpMessage(WebSocketMessage<?> message) {
System.out.println("sendRsvpMessage");
source.output()
.send(MessageBuilder.withPayload(message.getPayload())
.build(),
SENDING_MESSAGE_TIMEOUT_MS);
}
}
application.properties
spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
spring.cloud.stream.kafka.binder.brokers=localhost:9093
spring.cloud.stream.bindings.output.destination=meetupTopic
spring.cloud.stream.bindings.output.producer.partitionCount=2
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.bindings.output.producer.headerMode=raw
Is there any way I could achieve it using spring cloud stream ? I want some messages to go in P1
partition and some to P2
partition within meetupTopic
.
答案1
得分: 3
MessageBuilder.withPayload(message.getPayload())
.setHeader(KafkaHeaders.PARTITION_ID, 23)
.build()
英文:
MessageBuilder.withPayload(message.getPayload())
.setHeader(KafkaHeaders.PARTITION_ID, 23)
.build()
答案2
得分: 0
我还没有尝试过这个,但根据文档,看起来可能会起作用。
spring.cloud.stream.bindings.output.producer.partitionSelectorExpression=headers['partitionKey']
然后在发送消息时添加名为 partitionKey
的头部。
英文:
I haven't tried this but from the docs, it looks like it might work.
spring.cloud.stream.bindings.output.producer.partitionSelectorExpression=headers['partitionKey']
And then you add the header partitionKey
when you send the message.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论