英文:
Spring Cloud Stream (Kafka) parameterize specified error channel {destination}.{group}.errors
问题
I am trying to see if the error channel I am passing to @ServiceActivator can be bounded/parameterized referring the value specified in YAML instead of hardcoding actual destination and consumer group in the code itself.
@ServiceActivator(
    // 我不想在这里硬编码目标和消费者组
    inputChannel = "stream-test-topic.my-consumer-group.errors"
)
public void handleError(ErrorMessage errorMessage) {
    // 获取异常对象
    Throwable errorMessagePayload = errorMessage.getPayload();
    log.error("exception occurred", errorMessagePayload);
    // 获取消息体
    Message<?> originalMessage = errorMessage.getOriginalMessage();
    if (originalMessage != null) {
        log.error("Message Body: {}", originalMessage.getPayload());
    } else {
        log.error("The message body is empty");
    }
}
英文:
I am trying to see if the error channel I am passing to @ServiceActivator can be bounded/parameterized referring the value specified in YAML instead of hardcoding actual destination and consumer group in the code itself.
@ServiceActivator(
        // I do not want to hardcode destination and consumer group here
        inputChannel = "stream-test-topic.my-consumer-group.errors"
    )
    public void handleError(ErrorMessage errorMessage) {
        // Getting exception objects
        Throwable errorMessagePayload = errorMessage.getPayload();
        log.error("exception occurred", errorMessagePayload);
        // Get message body
        Message<?> originalMessage = errorMessage.getOriginalMessage();
        if (originalMessage != null) {
            log.error("Message Body: {}", originalMessage.getPayload());
        } else {
            log.error("The message body is empty");
        }
    }
答案1
得分: 2
你无法使用@ServiceActivator来实现这一点;而应该使用Java DSL 代替:
@Value("${error.channel}")
String errors;
@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from(this.errors)
            .handle(msg -> {
                System.out.println(msg);
            })
            .get();
}
并设置
error:
  channel: stream-test-topic.my-consumer-group.errors
英文:
You can't do that with @ServiceActivator; use the Java DSL instead:
@Value("${error.channel}")
String errors;
@Bean
public IntegrationFlow flow() {
	return IntegrationFlows.from(this.errors)
			.handle(msg -> {
				System.out.println(msg);
			})
			.get();
}
And set
error:
  channel: stream-test-topic.my-consumer-group.errors
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论