如何在Spring Integration Kafka消息中设置ID标头?

huangapple go评论52阅读模式
英文:

How to set ID header in Spring Integration Kafka Message?

问题

我有一个演示的Spring Integration项目,它接收Kafka消息,聚合它们,然后释放它们。我正在尝试向项目中添加 JdbcMessageStore。问题是它报错:

由于: java.lang.IllegalArgumentException: 不能存储没有ID头的消息
   在 org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.2.15.RELEASE.jar:5.2.15.RELEASE]
   在 org.springframework.integration.jdbc.store.JdbcMessageStore.addMessage(JdbcMessageStore.java:314) ~[spring-integration-jdbc-5.3.8.RELEASE.jar:5.3.8.RELEASE]

调试后我发现它要求在这个消息中有 UUID 头 id。但问题是我无法手动设置Kafka头部id - 这是被禁止的(与 timestamp 头部一样) - 我尝试在不同项目的Kafka生产者中做过这个尝试。

如果我使用名为 Big Data Tools 的IDEA插件发送消息,我可以设置 id 头,但它被我的项目接收为字节数组,导致以下错误:

IllegalArgumentException 为头 'id' 指定了不正确的类型。期望 [UUID] 但实际类型为 [B]

我找不到解决此问题的方法。我需要想办法设置这个 id 头,以便能够将消息存储到数据库中。
谢谢。

英文:

I have a demo Spring Integration project which is receiving Kafka messages, aggregating them, and then releasing them. I'm trying to add JdbcMessageStore to the project. The problem is that it failing with error:

Caused by: java.lang.IllegalArgumentException: Cannot store messages without an ID header
	at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.2.15.RELEASE.jar:5.2.15.RELEASE]
	at org.springframework.integration.jdbc.store.JdbcMessageStore.addMessage(JdbcMessageStore.java:314) ~[spring-integration-jdbc-5.3.8.RELEASE.jar:5.3.8.RELEASE]

After debugging I found that it requires the UUID header id in this message. But the problem is that I can't manually set the Kafka header id - it is forbidden (the same as timestamp header) - I tried to do this in Kafka producer in different project.

If I'm using IDEA plugin named Big Data Tools and send a message from there I'm able to set id header but it is received by my project as an array of bytes and it is failing with error

IllegalArgumentException Incorrect type specified for header 'id'. Expected [UUID] but actual type is [B]

I can't find any solution on how to resolve this issue. I need to set somehow this id header to be able to store messages in the database.
Thanks in advance

答案1

得分: 2

KafkaMessageDrivenChannelAdapter具有一个选项:

/**
 * 设置用于记录型消费者的消息转换器。
 * @param messageConverter 转换器。
 */
public void setRecordMessageConverter(RecordMessageConverter messageConverter) {

在这里,您可以使用以下方式设置MessagingMessageConverter

/**
 * 为生成的消息生成{@link Message} {@code ids}。如果设置为{@code false},将尝试使用默认值。默认设置为{@code false}。
 * @param generateMessageId 如果要生成消息ID,则为true
 */
public void setGenerateMessageId(boolean generateMessageId) {
    this.generateMessageId = generateMessageId;
}

/**
 * 为生成的消息生成{@code timestamp}。如果设置为{@code false},将使用-1。默认设置为{@code false}。
 * @param generateTimestamp 如果要生成时间戳,则为true
 */
public void setGenerateTimestamp(boolean generateTimestamp) {
    this.generateTimestamp = generateTimestamp;
}

将其设置为true

这样,从ConsumerRecord创建的Message将具有相应的idtimestamp标头。

您还可以简单地使用“虚拟”转换器返回传入的负载,框架将创建一个新的Message,其中生成了这些标头。

英文:

The KafkaMessageDrivenChannelAdapter has an option:

/**
 * Set the message converter to use with a record-based consumer.
 * @param messageConverter the converter.
 */
public void setRecordMessageConverter(RecordMessageConverter messageConverter) {

Where you can set a MessagingMessageConverter with:

/**
 * Generate {@link Message} {@code ids} for produced messages. If set to {@code false},
 * will try to use a default value. By default set to {@code false}.
 * @param generateMessageId true if a message id should be generated
 */
public void setGenerateMessageId(boolean generateMessageId) {
	this.generateMessageId = generateMessageId;
}

/**
 * Generate {@code timestamp} for produced messages. If set to {@code false}, -1 is
 * used instead. By default set to {@code false}.
 * @param generateTimestamp true if a timestamp should be generated
 */
public void setGenerateTimestamp(boolean generateTimestamp) {
	this.generateTimestamp = generateTimestamp;
}

set to true.

This way the Message created from a ConsumerRecord will have respective id and timestamp headers.

You also simply can have a "dummy" transformer to return incoming payload and the framework will create a new Message where those headers are generated.

huangapple
  • 本文由 发表于 2023年6月2日 00:28:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/76383957.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定