Spring Cloud Stream(Kafka)参数化指定的错误通道 {destination}.{group}.errors

huangapple go评论70阅读模式
英文:

Spring Cloud Stream (Kafka) parameterize specified error channel {destination}.{group}.errors

问题

I am trying to see if the error channel I am passing to @ServiceActivator can be bounded/parameterized referring the value specified in YAML instead of hardcoding actual destination and consumer group in the code itself.

@ServiceActivator(
    // 我不想在这里硬编码目标和消费者组
    inputChannel = "stream-test-topic.my-consumer-group.errors"
)
public void handleError(ErrorMessage errorMessage) {
    // 获取异常对象
    Throwable errorMessagePayload = errorMessage.getPayload();
    log.error("exception occurred", errorMessagePayload);

    // 获取消息体
    Message<?> originalMessage = errorMessage.getOriginalMessage();
    if (originalMessage != null) {
        log.error("Message Body: {}", originalMessage.getPayload());
    } else {
        log.error("The message body is empty");
    }
}
英文:

I am trying to see if the error channel I am passing to @ServiceActivator can be bounded/parameterized referring the value specified in YAML instead of hardcoding actual destination and consumer group in the code itself.

@ServiceActivator(
        // I do not want to hardcode destination and consumer group here
        inputChannel = &quot;stream-test-topic.my-consumer-group.errors&quot;
    )
    public void handleError(ErrorMessage errorMessage) {
        // Getting exception objects
        Throwable errorMessagePayload = errorMessage.getPayload();
        log.error(&quot;exception occurred&quot;, errorMessagePayload);

        // Get message body
        Message&lt;?&gt; originalMessage = errorMessage.getOriginalMessage();
        if (originalMessage != null) {
            log.error(&quot;Message Body: {}&quot;, originalMessage.getPayload());
        } else {
            log.error(&quot;The message body is empty&quot;);
        }
    }

答案1

得分: 2

你无法使用@ServiceActivator来实现这一点;而应该使用Java DSL 代替:

@Value("${error.channel}")
String errors;

@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from(this.errors)
            .handle(msg -> {
                System.out.println(msg);
            })
            .get();
}

并设置

error:
  channel: stream-test-topic.my-consumer-group.errors
英文:

You can't do that with @ServiceActivator; use the Java DSL instead:

@Value(&quot;${error.channel}&quot;)
String errors;

@Bean
public IntegrationFlow flow() {
	return IntegrationFlows.from(this.errors)
			.handle(msg -&gt; {
				System.out.println(msg);
			})
			.get();
}

And set

error:
  channel: stream-test-topic.my-consumer-group.errors

huangapple
  • 本文由 发表于 2020年8月6日 01:45:16
  • 转载请务必保留本文链接:https://go.coder-hub.com/63270755.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定