英文:
Kafka Springboot - send topicName to producer on the fly
问题
我正在编写一个应用程序,我想要在给定的主题中发送消息(如果主题存在),如果不存在,则创建一个新主题,然后发送我的消息。
我遇到的问题是,我希望新主题的名称基于我发送到该主题的数据。因此,我希望能够从我的服务类(下面)动态传递topicName给生产者。
我的配置类如下:
@EnableKafka
@Configuration
class TimeOnTaskConfig {
@Bean
public NewTopic TimeOnTaskQueue() {
return TopicBuilder.name("testing123")
.partitions(1)
.replicas(1)
.build();
}
@Bean
public Producer<TaskEvent> timeOnTaskProducer(final KafkaTemplate<String, TaskEvent> template) {
return new Producer<>(
Producer.Config.builder()
.topicName("testing123")
.build(),
template);
}
@Bean
public KafkaTemplate<String, TaskEvent> kafkaTemplate(
final ProducerFactory<String, TaskEvent> pf) {
return new KafkaTemplate<>(pf);
}
}
但是,我希望能够传递一个topicName参数,而不是传入 "testing123"。然而,当我在timeOnTaskProducer定义中添加一个字符串参数时,它会给我一个 "无法自动装配。找不到 'String' 类型的 bean" 错误消息。
我应该如何从我的服务中传递topicName参数?
我的服务供参考:
@Service
@Slf4j
public class TimeOnTaskService {
private final Producer<TaskEvent> taskProducer;
public TimeOnTaskService(final Producer<TaskEvent> taskProducer) {
this.taskProducer = taskProducer;
}
public void enqueue(final TaskEvent event) {
try {
// 我想在这里做类似这样的事情:taskProducer.setTopicName("section" + event.getID());
taskProducer.send(event);
} catch (Exception e) {
log.error(String.format("enqueue method failed in TimeOnTaskService. %s",
e.getMessage()));
throw new RuntimeException(e.getMessage(), e);
}
}
}
以及我的生产者:
public class Producer<Req> {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final KafkaTemplate<String, Req> template;
private final Config config;
public Producer(Config config, KafkaTemplate<String, Req> template) {
this.config = config;
this.template = template;
}
public void send(final TaskEvent requestStub) throws ExecutionException, InterruptedException, TimeoutException {
logger.info("{}::send()", getClass().getCanonicalName());
final ProducerRecord<String, TaskEvent> record = new ProducerRecord<>(config.topicName, requestStub);
ListenableFuture future = template.send((ProducerRecord<String, Req>) record);
future.addCallback(new ListenableFutureCallback<SendResult<String, Req>>() {
@Override
public void onSuccess(SendResult<String, Req> result) {
logger.info("Sent message=[" + requestStub +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
logger.error("Unable to send message=[" +
requestStub + "] due to : " + ex.getMessage());
}
});
}
@Getter
@Builder
@ToString
public static class Config {
private final String topicName;
private final int sendTimeout;
}
}
英文:
I'm writing an application in which I want to send a message into a given topic if it exists and if not, create a new topic and then send my message.
The problem I'm having is I want the new topic name to be based on the data I'm sending into that topic. So I want to be able to pass the topicName to the producer from my service class (below) on the fly.
My config class looks like this:
@EnableKafka
@Configuration
class TimeOnTaskConfig {
@Bean
public NewTopic TimeOnTaskQueue() {
return TopicBuilder.name("testing123")
.partitions(1)
.replicas(1)
.build();
}
@Bean
public Producer<TaskEvent> timeOnTaskProducer(final KafkaTemplate<String, TaskEvent> template) {
return new Producer<>(
Producer.Config.builder()
.topicName("testing123")
.build(),
template);
}
@Bean
public KafkaTemplate<String, TaskEvent> kafkaTemplate(
final ProducerFactory<String, TaskEvent> pf) {
return new KafkaTemplate<>(pf);
}
but instead of passing in "testing123", I'd like to be able to pass a topicName parameter. When I add in a string parameter though to the timeOnTaskProducer definition, it gives me a Could not autowire. No beans of 'String' type found.
error message.
How should I go about sending the topicName as a parameter from my service?
My service for reference:
@Service
@Slf4j
public class TimeOnTaskService {
private final Producer<TaskEvent> taskProducer;
public TimeOnTaskService(final Producer<TaskEvent> taskProducer) {
this.taskProducer = taskProducer;
}
public void enqueue(final TaskEvent event) {
try {
//I'd like to be able to do something here like: taskProducer.setTopicName("section" + event.getID());
taskProducer.send(event);
} catch (Exception e) {
log.error(String.format("enqueue method failed in TimeOnTaskService. %s",
e.getMessage()));
throw new RuntimeException(e.getMessage(), e);
}
}
}
and my producer:
public class Producer<Req> {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final KafkaTemplate<String, Req> template;
private final Config config;
public Producer(Config config, KafkaTemplate<String, Req> template) {
this.config = config;
this.template = template;
}
public void send(final TaskEvent requestStub) throws ExecutionException, InterruptedException, TimeoutException {
logger.info("{}::send()", getClass().getCanonicalName());
final ProducerRecord<String, TaskEvent> record = new ProducerRecord<>(config.topicName, requestStub);
ListenableFuture future = template.send((ProducerRecord<String, Req>) record);
future.addCallback(new ListenableFutureCallback<SendResult<String, Req>>() {
@Override
public void onSuccess(SendResult<String, Req> result) {
logger.info("Sent message=[" + requestStub +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
logger.error("Unable to send message=["
+ requestStub + "] due to : " + ex.getMessage());
}
});
}
@Getter
@Builder
@ToString
public static class Config {
private final String topicName;
private final int sendTimeout;
}
}
答案1
得分: 2
以下是您要翻译的代码部分:
public void send(final String topic, final TaskEvent requestStub) throws ExecutionException, InterruptedException, TimeoutException {
logger.info("{}::send()", getClass().getCanonicalName());
final ProducerRecord<String, TaskEvent> record = new ProducerRecord<>(topic, requestStub);
ListenableFuture future = template.send((ProducerRecord<String, Req>) record);
future.addCallback(new ListenableFutureCallback<SendResult<String, Req>>() {
@Override
public void onSuccess(SendResult<String, Req> result) {
logger.info("Sent message=[" + requestStub +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
logger.error("Unable to send message=[" +
requestStub + "] due to : " + ex.getMessage());
}
});
}
public void enqueue(final TaskEvent event) {
try {
String topicName = "section" + event.getID();
taskProducer.send(topicName, event);
} catch (Exception e) {
log.error(String.format("enqueue method failed in TimeOnTaskService. %s",
e.getMessage()));
throw new RuntimeException(e.getMessage(), e);
}
}
Kafka将自动创建新主题(如果启用了auto.create.topics.enable=true
,这是默认设置),但强烈建议提前创建主题,以便您可以隐式地控制它们的配置。
英文:
There is no need to create a new producer for every topic you want to publish to, you can simply use the KafkaTemplate
and call send(String topic, T data)
to specify the topic instead:
public void send(final String topic, final TaskEvent requestStub) throws ExecutionException, InterruptedException, TimeoutException {
logger.info("{}::send()", getClass().getCanonicalName());
final ProducerRecord<String, TaskEvent> record = new ProducerRecord<>(topic, requestStub);
ListenableFuture future = template.send((ProducerRecord<String, Req>) record);
future.addCallback(new ListenableFutureCallback<SendResult<String, Req>>() {
@Override
public void onSuccess(SendResult<String, Req> result) {
logger.info("Sent message=[" + requestStub +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
logger.error("Unable to send message=["
+ requestStub + "] due to : " + ex.getMessage());
}
});
}
public void enqueue(final TaskEvent event) {
try {
String topicName = "section" + event.getID();
taskProducer.send(topicName, event);
} catch (Exception e) {
log.error(String.format("enqueue method failed in TimeOnTaskService. %s",
e.getMessage()));
throw new RuntimeException(e.getMessage(), e);
}
}
Kafka will automatically create a new topic, (if auto.create.topics.enable=true
is enabled, which is the default), but it's really recommended to create topics AOT, so you can control their configurations implicitly
答案2
得分: 0
NewTopic
beans只在上下文初始化期间声明。要动态创建主题,您需要配置并使用AdminClient
。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#configuring-topics
英文:
NewTopic
beans are only declared during context initialization. To create topics dynamically, you would need to configure and use an AdminClient
.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#configuring-topics
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论