英文:
Kafka Producer Callback performance
问题
我有一个Kafka生产者,它将消息发送到Kafka。并且我在成功和失败时都使用存储过程将消息记录在数据库中。如代码所示,我正在使用异步方式:
-
我应该将存储库中的
callStoredProcedure
方法标记为synchronized
以避免死锁吗?我认为不需要使用synchronized
,因为回调将在单个线程中顺序执行。 -
来自以下链接:
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
请注意,回调通常会在生产者的I/O线程中执行,因此应该尽可能快,否则它们将延迟其他线程发送消息。如果您想要执行阻塞或计算密集型的回调,建议在回调体中使用自己的Executor来并行处理。
我应该在其他线程中执行回调吗?您能分享一下如何在其他线程中执行回调的代码片段吗?例如,将回调在3个线程中并行处理。
我的代码片段如下:
@Autowired
private Myrepository myrepository;
public void sendMessageToKafka(List<String> message) {
for (String s : message) {
future = kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("Message Sent " + result.getRecordMetadata().timestamp());
myrepository.callStoredProcedure(result, "SUCCESS");
}
@Override
public void onFailure(Throwable ex) {
System.out.println(" sending failed ");
myrepository.callStoredProcedure(result, "FAILED");
}
});
}
}
如有需要,请执行回调的翻译。
英文:
I have Kafka Produce which sends the message to kafka .And i log the message in database in the both onsucess and onFailure with the help stored procedure . As shown in the code i am using asynchronous
-
should i mark my callStoredProcedure method in the repository as synchronised to avoid deadlocks? i believe synchronised is not needed as callback will be executed sequentially in a single thread.
-
from the below link
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
> Note that callbacks will generally execute in the I/O thread of the
> producer and so should be reasonably fast or they will delay the
> sending of messages from other threads. If you want to execute
> blocking or computationally expensive callbacks it is recommended to
> use your own Executor in the callback body to parallelize processing.
Should i execute callbacks in other thread ?
And can u share the code snippet how to excute callback in other thread. like parallelise callback in 3 threads
My code snippet
@Autowired
private Myrepository myrepository;
public void sendMessageToKafka(List<String> message) {
for (String s : message) {
future = kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("Message Sent " + result.getRecordMetadata().timestamp());
myrepository.callStoredProcedure(result,"SUCCESS");
}
@Override
public void onFailure(Throwable ex) {
System.out.println(" sending failed ");
myrepository.callStoredProcedure(result,"FAILED");
}
});
}
答案1
得分: 3
private final ExecutorService exec = Executors.newSingleThreadExecutor();
...
this.exec.submit(() -> myrepository.callStoredProcedure(result, "SUCCESS"));
这些任务仍然会在单个线程上运行(但不是Kafka IO线程)。
如果它跟不上您的发布速率,您可能需要使用不同的执行器,例如缓存的线程池执行器或Spring的 ThreadPoolTaskExecutor
。
英文:
private final ExecutorService exec = Executors.newSingleThreadExecutor();
...
this.exec.submit(() -> myrepository.callStoredProcedure(result,"SUCCESS"));
The tasks will still be run on a single thread (but not the Kafka IO thread).
If it can't keep up with your publishing rate, you might need to use a different executor such as a cached thread pool executor or Spring's ThreadPoolTaskExecutor
.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论