Spring Cloud Stream(Kafka)参数化指定的错误通道 {destination}.{group}.errors

huangapple go评论98阅读模式
英文:

Spring Cloud Stream (Kafka) parameterize specified error channel {destination}.{group}.errors

问题

I am trying to see if the error channel I am passing to @ServiceActivator can be bounded/parameterized referring the value specified in YAML instead of hardcoding actual destination and consumer group in the code itself.

  1. @ServiceActivator(
  2. // 我不想在这里硬编码目标和消费者组
  3. inputChannel = "stream-test-topic.my-consumer-group.errors"
  4. )
  5. public void handleError(ErrorMessage errorMessage) {
  6. // 获取异常对象
  7. Throwable errorMessagePayload = errorMessage.getPayload();
  8. log.error("exception occurred", errorMessagePayload);
  9. // 获取消息体
  10. Message<?> originalMessage = errorMessage.getOriginalMessage();
  11. if (originalMessage != null) {
  12. log.error("Message Body: {}", originalMessage.getPayload());
  13. } else {
  14. log.error("The message body is empty");
  15. }
  16. }
英文:

I am trying to see if the error channel I am passing to @ServiceActivator can be bounded/parameterized referring the value specified in YAML instead of hardcoding actual destination and consumer group in the code itself.

  1. @ServiceActivator(
  2. // I do not want to hardcode destination and consumer group here
  3. inputChannel = &quot;stream-test-topic.my-consumer-group.errors&quot;
  4. )
  5. public void handleError(ErrorMessage errorMessage) {
  6. // Getting exception objects
  7. Throwable errorMessagePayload = errorMessage.getPayload();
  8. log.error(&quot;exception occurred&quot;, errorMessagePayload);
  9. // Get message body
  10. Message&lt;?&gt; originalMessage = errorMessage.getOriginalMessage();
  11. if (originalMessage != null) {
  12. log.error(&quot;Message Body: {}&quot;, originalMessage.getPayload());
  13. } else {
  14. log.error(&quot;The message body is empty&quot;);
  15. }
  16. }

答案1

得分: 2

你无法使用@ServiceActivator来实现这一点;而应该使用Java DSL 代替:

  1. @Value("${error.channel}")
  2. String errors;
  3. @Bean
  4. public IntegrationFlow flow() {
  5. return IntegrationFlows.from(this.errors)
  6. .handle(msg -> {
  7. System.out.println(msg);
  8. })
  9. .get();
  10. }

并设置

  1. error:
  2. channel: stream-test-topic.my-consumer-group.errors
英文:

You can't do that with @ServiceActivator; use the Java DSL instead:

  1. @Value(&quot;${error.channel}&quot;)
  2. String errors;
  3. @Bean
  4. public IntegrationFlow flow() {
  5. return IntegrationFlows.from(this.errors)
  6. .handle(msg -&gt; {
  7. System.out.println(msg);
  8. })
  9. .get();
  10. }

And set

  1. error:
  2. channel: stream-test-topic.my-consumer-group.errors

huangapple
  • 本文由 发表于 2020年8月6日 01:45:16
  • 转载请务必保留本文链接:https://go.coder-hub.com/63270755.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定