Spring Cloud Stream Kafka Binder 无法通过函数式编程发布和消费消息。

huangapple go评论68阅读模式
英文:

spring cloud stream kafka binder can not publish and consume message by functional programming

问题

I try to use functional programming way to produce and consume message.
这是我的尝试,使用函数式编程方式来生产和消费消息。

the spring-cloud-stream version is 3.2.7
Spring Cloud Stream的版本是3.2.7。

this is my application,yml
这是我的应用程序配置(application.yml):

spring:
cloud:
function:
definition: numberProducer,numberConsumer
stream:
bindings:
numberProducer-out-0:
destination: first-topic
numberConsumer-in-0:
group: group
destination: first-topic
kafka:
binder:
brokers: localhost:9092
bindings:
input:
consumer:
enableDlq: false
dlqName: dlq
dlq-partitions: 1
这是我的应用程序配置文件,YAML格式。

this is producer and consumer bean
这是生产者和消费者的Bean定义:

@Configuration
@Slf4j
public class KafkaConsumer {

@Bean
public Supplier numberProducer() {
return () -> new SecureRandom().nextInt(1, 100);
}

@Bean
public Consumer numberConsumer() {
return incomingNumber -> log.info("Incoming Number : {}", incomingNumber);
}
}
这是生产者和消费者的Bean定义,使用了Spring的@Configuration注解。

then I start application, but can not see message produced to topic
然后我启动应用程序,但看不到消息被发送到主题。

is there anything wrong in my code?
我的代码有什么问题吗?

after I debug, can see the bean has been created
在调试之后,可以看到这些Bean已经被创建。

by the way, I also try to use streambridge to send message, but if I don't add @EnableBinding annotation, I got the error that can not find the bean of StreamBridge
顺便说一下,我还尝试使用StreamBridge来发送消息,但如果我不添加@EnableBinding注解,我会得到找不到StreamBridge Bean的错误。

英文:

I try to use functional programming way to produce and consume message

the spring-cloud-stream version is 3.2.7

this is my application,yml

spring:
  cloud:
    function:
      definition: numberProducer,numberConsumer
    stream:
      bindings:
        numberProducer-out-0:
          destination: first-topic
        numberConsumer-in-0:
          group: group
          destination: first-topic
      kafka:
        binder:
          brokers: localhost:9092
        bindings:
          input:
            consumer:
              enableDlq: false
              dlqName: dlq
              dlq-partitions: 1

this is producer and consumer bean

@Configuration
@Slf4j
public class KafkaConsumer {

  @Bean
  public Supplier<Integer> numberProducer() {
    return () -> new SecureRandom().nextInt(1, 100);
  }

  @Bean
  public Consumer<Integer> numberConsumer() {
    return incomingNumber -> log.info("Incoming Number : {}", incomingNumber);
  }
}

then I start application, but can not see message produced to topic
is there anything wrong in my code?

after I debug, can see the bean has been created
Spring Cloud Stream Kafka Binder 无法通过函数式编程发布和消费消息。

by the way, I also try to use streambridge to send message, but if I don't add @EnableBinding annotation, I got the error that can not find the bean of StreamBridge

答案1

得分: 0

根本原因是我们需要使用最新的Spring Boot版本。

将Spring Boot版本升级到3.0.1,将Spring Cloud版本升级到2022.0.1。

英文:

I find the root cause
is that we need use the latest spring boot version

spring boot version to 3.0.1
spring cloud version to 2022.0.1

huangapple
  • 本文由 发表于 2023年3月9日 21:00:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/75684980.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定