英文:
Spring Cloud Stream (Kafka) parameterize specified error channel {destination}.{group}.errors
问题
I am trying to see if the error channel I am passing to @ServiceActivator can be bounded/parameterized referring the value specified in YAML instead of hardcoding actual destination and consumer group in the code itself.
@ServiceActivator(
// 我不想在这里硬编码目标和消费者组
inputChannel = "stream-test-topic.my-consumer-group.errors"
)
public void handleError(ErrorMessage errorMessage) {
// 获取异常对象
Throwable errorMessagePayload = errorMessage.getPayload();
log.error("exception occurred", errorMessagePayload);
// 获取消息体
Message<?> originalMessage = errorMessage.getOriginalMessage();
if (originalMessage != null) {
log.error("Message Body: {}", originalMessage.getPayload());
} else {
log.error("The message body is empty");
}
}
英文:
I am trying to see if the error channel I am passing to @ServiceActivator can be bounded/parameterized referring the value specified in YAML instead of hardcoding actual destination and consumer group in the code itself.
@ServiceActivator(
// I do not want to hardcode destination and consumer group here
inputChannel = "stream-test-topic.my-consumer-group.errors"
)
public void handleError(ErrorMessage errorMessage) {
// Getting exception objects
Throwable errorMessagePayload = errorMessage.getPayload();
log.error("exception occurred", errorMessagePayload);
// Get message body
Message<?> originalMessage = errorMessage.getOriginalMessage();
if (originalMessage != null) {
log.error("Message Body: {}", originalMessage.getPayload());
} else {
log.error("The message body is empty");
}
}
答案1
得分: 2
你无法使用@ServiceActivator
来实现这一点;而应该使用Java DSL 代替:
@Value("${error.channel}")
String errors;
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(this.errors)
.handle(msg -> {
System.out.println(msg);
})
.get();
}
并设置
error:
channel: stream-test-topic.my-consumer-group.errors
英文:
You can't do that with @ServiceActivator
; use the Java DSL instead:
@Value("${error.channel}")
String errors;
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(this.errors)
.handle(msg -> {
System.out.println(msg);
})
.get();
}
And set
error:
channel: stream-test-topic.my-consumer-group.errors
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论