Kafka Springboot – 在运行时向生产者发送 topicName

huangapple go评论100阅读模式
英文:

Kafka Springboot - send topicName to producer on the fly

问题

我正在编写一个应用程序,我想要在给定的主题中发送消息(如果主题存在),如果不存在,则创建一个新主题,然后发送我的消息。

我遇到的问题是,我希望新主题的名称基于我发送到该主题的数据。因此,我希望能够从我的服务类(下面)动态传递topicName给生产者。

我的配置类如下:

@EnableKafka
@Configuration
class TimeOnTaskConfig {

    @Bean
    public NewTopic TimeOnTaskQueue() {
        return TopicBuilder.name("testing123")
            .partitions(1)
            .replicas(1)
            .build();
    }

    @Bean
    public Producer<TaskEvent> timeOnTaskProducer(final KafkaTemplate<String, TaskEvent> template) {
        return new Producer<>(
            Producer.Config.builder()
                .topicName("testing123")
                .build(),
            template);
    }

   
    @Bean
    public KafkaTemplate<String, TaskEvent> kafkaTemplate(
        final ProducerFactory<String, TaskEvent> pf) {
        return new KafkaTemplate<>(pf);
    }
}

但是,我希望能够传递一个topicName参数,而不是传入 "testing123"。然而,当我在timeOnTaskProducer定义中添加一个字符串参数时,它会给我一个 "无法自动装配。找不到 'String' 类型的 bean" 错误消息。

我应该如何从我的服务中传递topicName参数?

我的服务供参考:

@Service
@Slf4j
public class TimeOnTaskService {

    private final Producer<TaskEvent> taskProducer;

    public TimeOnTaskService(final Producer<TaskEvent> taskProducer) {
        this.taskProducer = taskProducer;
    }

    public void enqueue(final TaskEvent event) {
        try {
            // 我想在这里做类似这样的事情:taskProducer.setTopicName("section" + event.getID());
            taskProducer.send(event);
        } catch (Exception e) {
            log.error(String.format("enqueue method failed in TimeOnTaskService. %s",
                e.getMessage()));
            throw new RuntimeException(e.getMessage(), e);
        }
    }
}

以及我的生产者:

public class Producer<Req> {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final KafkaTemplate<String, Req> template;
    private final Config config;

    public Producer(Config config, KafkaTemplate<String, Req> template) {
        this.config = config;
        this.template = template;
    }

    public void send(final TaskEvent requestStub) throws ExecutionException, InterruptedException, TimeoutException {
        logger.info("{}::send()", getClass().getCanonicalName());

        final ProducerRecord<String, TaskEvent> record = new ProducerRecord<>(config.topicName, requestStub);
        ListenableFuture future = template.send((ProducerRecord<String, Req>) record);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Req>>() {

            @Override
            public void onSuccess(SendResult<String, Req> result) {
                logger.info("Sent message=[" + requestStub +
                    "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }

            @Override
            public void onFailure(Throwable ex) {
                logger.error("Unable to send message=[" +
                    requestStub + "] due to : " + ex.getMessage());
            }
        });
    }

    @Getter
    @Builder
    @ToString
    public static class Config {
        private final String topicName;
        private final int sendTimeout;
    }
}
英文:

I'm writing an application in which I want to send a message into a given topic if it exists and if not, create a new topic and then send my message.

The problem I'm having is I want the new topic name to be based on the data I'm sending into that topic. So I want to be able to pass the topicName to the producer from my service class (below) on the fly.

My config class looks like this:

@EnableKafka
@Configuration
class TimeOnTaskConfig {
@Bean
public NewTopic TimeOnTaskQueue() {
return TopicBuilder.name(&quot;testing123&quot;)
.partitions(1)
.replicas(1)
.build();
}
@Bean
public Producer&lt;TaskEvent&gt; timeOnTaskProducer(final KafkaTemplate&lt;String, TaskEvent&gt; template) {
return new Producer&lt;&gt;(
Producer.Config.builder()
.topicName(&quot;testing123&quot;)
.build(),
template);
}
@Bean
public KafkaTemplate&lt;String, TaskEvent&gt; kafkaTemplate(
final ProducerFactory&lt;String, TaskEvent&gt; pf) {
return new KafkaTemplate&lt;&gt;(pf);
}

but instead of passing in "testing123", I'd like to be able to pass a topicName parameter. When I add in a string parameter though to the timeOnTaskProducer definition, it gives me a Could not autowire. No beans of &#39;String&#39; type found. error message.

How should I go about sending the topicName as a parameter from my service?
My service for reference:

@Service
@Slf4j
public class TimeOnTaskService {
private final Producer&lt;TaskEvent&gt; taskProducer;
public TimeOnTaskService(final Producer&lt;TaskEvent&gt; taskProducer) {
this.taskProducer = taskProducer;
}
public void enqueue(final TaskEvent event) {
try {
//I&#39;d like to be able to do something here like: taskProducer.setTopicName(&quot;section&quot; + event.getID());
taskProducer.send(event);
} catch (Exception e) {
log.error(String.format(&quot;enqueue method failed in TimeOnTaskService. %s&quot;,
e.getMessage()));
throw new RuntimeException(e.getMessage(), e);
}
}
}

and my producer:

public class Producer&lt;Req&gt; {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final KafkaTemplate&lt;String, Req&gt; template;
private final Config config;
public Producer(Config config, KafkaTemplate&lt;String, Req&gt; template) {
this.config = config;
this.template = template;
}
public void send(final TaskEvent requestStub) throws ExecutionException, InterruptedException, TimeoutException {
logger.info(&quot;{}::send()&quot;, getClass().getCanonicalName());
final ProducerRecord&lt;String, TaskEvent&gt; record = new ProducerRecord&lt;&gt;(config.topicName, requestStub);
ListenableFuture future = template.send((ProducerRecord&lt;String, Req&gt;) record);
future.addCallback(new ListenableFutureCallback&lt;SendResult&lt;String, Req&gt;&gt;() {
@Override
public void onSuccess(SendResult&lt;String, Req&gt; result) {
logger.info(&quot;Sent message=[&quot; + requestStub +
&quot;] with offset=[&quot; + result.getRecordMetadata().offset() + &quot;]&quot;);
}
@Override
public void onFailure(Throwable ex) {
logger.error(&quot;Unable to send message=[&quot;
+ requestStub + &quot;] due to : &quot; + ex.getMessage());
}
});
}
@Getter
@Builder
@ToString
public static class Config {
private final String topicName;
private final int sendTimeout;
}
}

答案1

得分: 2

以下是您要翻译的代码部分:

public void send(final String topic, final TaskEvent requestStub) throws ExecutionException, InterruptedException, TimeoutException {
    logger.info("{}::send()", getClass().getCanonicalName());

    final ProducerRecord<String, TaskEvent> record = new ProducerRecord<>(topic, requestStub);
    ListenableFuture future = template.send((ProducerRecord<String, Req>) record);

    future.addCallback(new ListenableFutureCallback<SendResult<String, Req>>() {

        @Override
        public void onSuccess(SendResult<String, Req> result) {
            logger.info("Sent message=[" + requestStub +
                "] with offset=[" + result.getRecordMetadata().offset() + "]");
        }

        @Override
        public void onFailure(Throwable ex) {
            logger.error("Unable to send message=[" +
                requestStub + "] due to : " + ex.getMessage());
        }
    });
}

public void enqueue(final TaskEvent event) {
    try {
        String topicName = "section" + event.getID();
        taskProducer.send(topicName, event);
    } catch (Exception e) {
        log.error(String.format("enqueue method failed in TimeOnTaskService. %s",
            e.getMessage()));
        throw new RuntimeException(e.getMessage(), e);
    }
}

Kafka将自动创建新主题(如果启用了auto.create.topics.enable=true,这是默认设置),但强烈建议提前创建主题,以便您可以隐式地控制它们的配置。

英文:

There is no need to create a new producer for every topic you want to publish to, you can simply use the KafkaTemplate and call send(String topic, T data) to specify the topic instead:

public void send(final String topic, final TaskEvent requestStub) throws ExecutionException, InterruptedException, TimeoutException {
    logger.info(&quot;{}::send()&quot;, getClass().getCanonicalName());

    final ProducerRecord&lt;String, TaskEvent&gt; record = new ProducerRecord&lt;&gt;(topic, requestStub);
    ListenableFuture future = template.send((ProducerRecord&lt;String, Req&gt;) record);

    future.addCallback(new ListenableFutureCallback&lt;SendResult&lt;String, Req&gt;&gt;() {

        @Override
        public void onSuccess(SendResult&lt;String, Req&gt; result) {
            logger.info(&quot;Sent message=[&quot; + requestStub +
                &quot;] with offset=[&quot; + result.getRecordMetadata().offset() + &quot;]&quot;);
        }

        @Override
        public void onFailure(Throwable ex) {
            logger.error(&quot;Unable to send message=[&quot;
                + requestStub + &quot;] due to : &quot; + ex.getMessage());
        }
    });
}
public void enqueue(final TaskEvent event) {
    try {
        String topicName = &quot;section&quot; + event.getID();
        taskProducer.send(topicName, event);
    } catch (Exception e) {
        log.error(String.format(&quot;enqueue method failed in TimeOnTaskService. %s&quot;,
            e.getMessage()));
        throw new RuntimeException(e.getMessage(), e);
    }
}

Kafka will automatically create a new topic, (if auto.create.topics.enable=true is enabled, which is the default), but it's really recommended to create topics AOT, so you can control their configurations implicitly

答案2

得分: 0

NewTopic beans只在上下文初始化期间声明。要动态创建主题,您需要配置并使用AdminClient

https://docs.spring.io/spring-kafka/docs/current/reference/html/#configuring-topics

英文:

NewTopic beans are only declared during context initialization. To create topics dynamically, you would need to configure and use an AdminClient.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#configuring-topics

huangapple
  • 本文由 发表于 2023年6月27日 17:40:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/76563556.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定