在特定分区的主题中发送消息

huangapple go评论83阅读模式
英文:

Send message in specific partition of Topic

问题

以下是您要翻译的内容:

生产者是否有办法将消息发送到代理中主题的特定分区?

目前,我可以发送具有2个分区的主题,但无法控制发送到特定分区。

组件
@EnableBinding(Source.class)
public class RsvpsKafkaProducer {

    private static final int SENDING_MESSAGE_TIMEOUT_MS = 10000;

    private final Source source;

    public RsvpsKafkaProducer(Source source) {
        this.source = source;
    }

    public void sendRsvpMessage(WebSocketMessage<?> message) {
        
         System.out.println("sendRsvpMessage");
         source.output()
                .send(MessageBuilder.withPayload(message.getPayload())
                        .build(),
                        SENDING_MESSAGE_TIMEOUT_MS);   
    }
}

application.properties

spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
spring.cloud.stream.kafka.binder.brokers=localhost:9093

spring.cloud.stream.bindings.output.destination=meetupTopic
spring.cloud.stream.bindings.output.producer.partitionCount=2

spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.bindings.output.producer.headerMode=raw

我是否可以使用Spring Cloud Stream 实现此目标?我希望一些消息进入“P1”分区,另一些消息进入“P2”分区,都在“meetupTopic”主题内。

英文:

Is there any way producer could send messages to specific partiton of topic in broker ?

As of now, am able to send topic having 2 partitions, but dont have control to send in specific partition.

Component
@EnableBinding(Source.class)
public class RsvpsKafkaProducer {
  
    private static final int SENDING_MESSAGE_TIMEOUT_MS = 10000;

    private final Source source;

    public RsvpsKafkaProducer(Source source) {
        this.source = source;
    }

    public void sendRsvpMessage(WebSocketMessage&lt;?&gt; message) {
        
         System.out.println(&quot;sendRsvpMessage&quot;);
         source.output()
                .send(MessageBuilder.withPayload(message.getPayload())
                        .build(),
                        SENDING_MESSAGE_TIMEOUT_MS);   
    }
}

application.properties

spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
spring.cloud.stream.kafka.binder.brokers=localhost:9093

spring.cloud.stream.bindings.output.destination=meetupTopic
spring.cloud.stream.bindings.output.producer.partitionCount=2

spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.bindings.output.producer.headerMode=raw

Is there any way I could achieve it using spring cloud stream ? I want some messages to go in P1 partition and some to P2 partition within meetupTopic.

答案1

得分: 3

MessageBuilder.withPayload(message.getPayload())
                        .setHeader(KafkaHeaders.PARTITION_ID, 23)
                        .build()
英文:
MessageBuilder.withPayload(message.getPayload())
                        .setHeader(KafkaHeaders.PARTITION_ID, 23)
                        .build()

答案2

得分: 0

我还没有尝试过这个,但根据文档,看起来可能会起作用。

spring.cloud.stream.bindings.output.producer.partitionSelectorExpression=headers['partitionKey']

然后在发送消息时添加名为 partitionKey 的头部。

英文:

I haven't tried this but from the docs, it looks like it might work.

spring.cloud.stream.bindings.output.producer.partitionSelectorExpression=headers[&#39;partitionKey&#39;]

And then you add the header partitionKey when you send the message.

huangapple
  • 本文由 发表于 2020年10月22日 14:28:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/64476492.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定