英文:
Spring Kafka MessageListenerContainer Resume/Pause # spring-kafka
问题
由于原生的 KafkaConsumer 不是线程安全的,因此不建议从不同的线程调用 pause 和 resume 方法,而是应该使用 kafka-consumer 处理线程。
但是,Spring Kafka 提供了另一层 KafkaMessageListenerContainer,它在内部使用 kafka-consumer。所以我的问题是,我们可以使用 KafkaListenerEndpointRegistry 按 ID 获取监听容器,并从其他线程而不是消费者处理线程调用 resume 或 pause 方法吗?
kafkaListenerEndpointRegistry.getListenerContainer("id").pause();
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(() -> {
System.out.println("CurrentThread: " + Thread.currentThread().getId() + " " + Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaListenerEndpointRegistry.getListenerContainer("id").resume();
});
英文:
As native KafkaConsumer is not thread safe, so it is discouraged to call pause and resume methods from different thread instead of kafka-consumer processing thread.
but as spring-kafka provides another layer KafkaMessageListenerContainer which internally use kafka-consumer. So my question is can we use KafkaListenerEndpointRegistry to get the listener container by id and call resume or pause method from other thread rather than consumer processing thread.
kafkaListenerEndpointRegistry.getListenerContainer("id").pause();
ExecutorService executorService = newFixedThreadPool(2);
executorService.submit(()->{
System.out.println("CurrentThread: {}" + Thread.currentThread().getId()+ " " + Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaListenerEndpointRegistry.getListenerContainer("id").resume();
});
答案1
得分: 1
是的; container.pause()
设置一个标志,告诉Consumer
线程在下一次poll()
调用之前暂停。同样,resume()
重置该标志,以便在下一次轮询之前恢复Consumer
线程。
英文:
Yes; container.pause()
sets a flag to tell the Consumer
thread to pause before its next poll()
call. Similarly, resume()
resets the flag so the consumer thread will resume the Consumer
before the next poll.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论