Kafka 生产者回调性能

huangapple go评论81阅读模式
英文:

Kafka Producer Callback performance

问题

我有一个Kafka生产者,它将消息发送到Kafka。并且我在成功和失败时都使用存储过程将消息记录在数据库中。如代码所示,我正在使用异步方式:

  1. 我应该将存储库中的callStoredProcedure方法标记为synchronized以避免死锁吗?我认为不需要使用synchronized,因为回调将在单个线程中顺序执行。

  2. 来自以下链接:

    https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

    请注意,回调通常会在生产者的I/O线程中执行,因此应该尽可能快,否则它们将延迟其他线程发送消息。如果您想要执行阻塞或计算密集型的回调,建议在回调体中使用自己的Executor来并行处理。

我应该在其他线程中执行回调吗?您能分享一下如何在其他线程中执行回调的代码片段吗?例如,将回调在3个线程中并行处理。

我的代码片段如下:

@Autowired
private Myrepository myrepository;

public void sendMessageToKafka(List<String> message) {

    for (String s : message) {

        future = kafkaTemplate.send(topicName, message);

        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {

                System.out.println("Message Sent " + result.getRecordMetadata().timestamp());

                myrepository.callStoredProcedure(result, "SUCCESS");

            }

            @Override
            public void onFailure(Throwable ex) {

                System.out.println(" sending failed ");
                myrepository.callStoredProcedure(result, "FAILED");

            }
        });

    }
}

如有需要,请执行回调的翻译。

英文:

I have Kafka Produce which sends the message to kafka .And i log the message in database in the both onsucess and onFailure with the help stored procedure . As shown in the code i am using asynchronous

  1. should i mark my callStoredProcedure method in the repository as synchronised to avoid deadlocks? i believe synchronised is not needed as callback will be executed sequentially in a single thread.

  2. from the below link

    https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

> Note that callbacks will generally execute in the I/O thread of the
> producer and so should be reasonably fast or they will delay the
> sending of messages from other threads. If you want to execute
> blocking or computationally expensive callbacks it is recommended to
> use your own Executor in the callback body to parallelize processing.

Should i execute callbacks in other thread ?
And can u share the code snippet how to excute callback in other thread. like parallelise callback in 3 threads

My code snippet

@Autowired
private Myrepository  myrepository;

public void sendMessageToKafka(List&lt;String&gt; message) {
    
    		for (String s : message) {
    
    			future = kafkaTemplate.send(topicName, message);
    
    			future.addCallback(new ListenableFutureCallback&lt;SendResult&lt;String, String&gt;&gt;() {
    
    				@Override
    				public void onSuccess(SendResult&lt;String, String&gt; result) {
    
    					System.out.println(&quot;Message Sent  &quot; + result.getRecordMetadata().timestamp());
    					
    					myrepository.callStoredProcedure(result,&quot;SUCCESS&quot;);
    
    				}
    
    				@Override
    				public void onFailure(Throwable ex) {
    
    					System.out.println(&quot; sending failed  &quot;);
    					myrepository.callStoredProcedure(result,&quot;FAILED&quot;);
    
    				}
    			});
    
    		}

答案1

得分: 3


private final ExecutorService exec = Executors.newSingleThreadExecutor();

...

this.exec.submit(() -> myrepository.callStoredProcedure(result, "SUCCESS"));

这些任务仍然会在单个线程上运行(但不是Kafka IO线程)。

如果它跟不上您的发布速率,您可能需要使用不同的执行器,例如缓存的线程池执行器或Spring的 ThreadPoolTaskExecutor

英文:

private final ExecutorService exec = Executors.newSingleThreadExecutor();


...

this.exec.submit(() -&gt; myrepository.callStoredProcedure(result,&quot;SUCCESS&quot;));

The tasks will still be run on a single thread (but not the Kafka IO thread).

If it can't keep up with your publishing rate, you might need to use a different executor such as a cached thread pool executor or Spring's ThreadPoolTaskExecutor.

huangapple
  • 本文由 发表于 2020年8月29日 05:08:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/63640880.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定