英文:
How to set ID header in Spring Integration Kafka Message?
问题
我有一个演示的Spring Integration项目,它接收Kafka消息,聚合它们,然后释放它们。我正在尝试向项目中添加 JdbcMessageStore
。问题是它报错:
由于: java.lang.IllegalArgumentException: 不能存储没有ID头的消息
在 org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.2.15.RELEASE.jar:5.2.15.RELEASE]
在 org.springframework.integration.jdbc.store.JdbcMessageStore.addMessage(JdbcMessageStore.java:314) ~[spring-integration-jdbc-5.3.8.RELEASE.jar:5.3.8.RELEASE]
调试后我发现它要求在这个消息中有 UUID 头 id
。但问题是我无法手动设置Kafka头部id - 这是被禁止的(与 timestamp
头部一样) - 我尝试在不同项目的Kafka生产者中做过这个尝试。
如果我使用名为 Big Data Tools
的IDEA插件发送消息,我可以设置 id
头,但它被我的项目接收为字节数组,导致以下错误:
IllegalArgumentException 为头 'id' 指定了不正确的类型。期望 [UUID] 但实际类型为 [B]
我找不到解决此问题的方法。我需要想办法设置这个 id
头,以便能够将消息存储到数据库中。
谢谢。
英文:
I have a demo Spring Integration project which is receiving Kafka messages, aggregating them, and then releasing them. I'm trying to add JdbcMessageStore
to the project. The problem is that it failing with error:
Caused by: java.lang.IllegalArgumentException: Cannot store messages without an ID header
at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.2.15.RELEASE.jar:5.2.15.RELEASE]
at org.springframework.integration.jdbc.store.JdbcMessageStore.addMessage(JdbcMessageStore.java:314) ~[spring-integration-jdbc-5.3.8.RELEASE.jar:5.3.8.RELEASE]
After debugging I found that it requires the UUID header id
in this message. But the problem is that I can't manually set the Kafka header id - it is forbidden (the same as timestamp
header) - I tried to do this in Kafka producer in different project.
If I'm using IDEA plugin named Big Data Tools
and send a message from there I'm able to set id
header but it is received by my project as an array of bytes and it is failing with error
IllegalArgumentException Incorrect type specified for header 'id'. Expected [UUID] but actual type is [B]
I can't find any solution on how to resolve this issue. I need to set somehow this id
header to be able to store messages in the database.
Thanks in advance
答案1
得分: 2
KafkaMessageDrivenChannelAdapter
具有一个选项:
/**
* 设置用于记录型消费者的消息转换器。
* @param messageConverter 转换器。
*/
public void setRecordMessageConverter(RecordMessageConverter messageConverter) {
在这里,您可以使用以下方式设置MessagingMessageConverter
:
/**
* 为生成的消息生成{@link Message} {@code ids}。如果设置为{@code false},将尝试使用默认值。默认设置为{@code false}。
* @param generateMessageId 如果要生成消息ID,则为true
*/
public void setGenerateMessageId(boolean generateMessageId) {
this.generateMessageId = generateMessageId;
}
/**
* 为生成的消息生成{@code timestamp}。如果设置为{@code false},将使用-1。默认设置为{@code false}。
* @param generateTimestamp 如果要生成时间戳,则为true
*/
public void setGenerateTimestamp(boolean generateTimestamp) {
this.generateTimestamp = generateTimestamp;
}
将其设置为true
。
这样,从ConsumerRecord
创建的Message
将具有相应的id
和timestamp
标头。
您还可以简单地使用“虚拟”转换器返回传入的负载,框架将创建一个新的Message
,其中生成了这些标头。
英文:
The KafkaMessageDrivenChannelAdapter
has an option:
/**
* Set the message converter to use with a record-based consumer.
* @param messageConverter the converter.
*/
public void setRecordMessageConverter(RecordMessageConverter messageConverter) {
Where you can set a MessagingMessageConverter
with:
/**
* Generate {@link Message} {@code ids} for produced messages. If set to {@code false},
* will try to use a default value. By default set to {@code false}.
* @param generateMessageId true if a message id should be generated
*/
public void setGenerateMessageId(boolean generateMessageId) {
this.generateMessageId = generateMessageId;
}
/**
* Generate {@code timestamp} for produced messages. If set to {@code false}, -1 is
* used instead. By default set to {@code false}.
* @param generateTimestamp true if a timestamp should be generated
*/
public void setGenerateTimestamp(boolean generateTimestamp) {
this.generateTimestamp = generateTimestamp;
}
set to true
.
This way the Message
created from a ConsumerRecord
will have respective id
and timestamp
headers.
You also simply can have a "dummy" transformer to return incoming payload and the framework will create a new Message
where those headers are generated.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论