Spring Kafka MessageListenerContainer Resume/Pause # spring-kafka

huangapple go评论101阅读模式
英文:

Spring Kafka MessageListenerContainer Resume/Pause # spring-kafka

问题

由于原生的 KafkaConsumer 不是线程安全的,因此不建议从不同的线程调用 pause 和 resume 方法,而是应该使用 kafka-consumer 处理线程。

但是,Spring Kafka 提供了另一层 KafkaMessageListenerContainer,它在内部使用 kafka-consumer。所以我的问题是,我们可以使用 KafkaListenerEndpointRegistry 按 ID 获取监听容器,并从其他线程而不是消费者处理线程调用 resume 或 pause 方法吗?

kafkaListenerEndpointRegistry.getListenerContainer("id").pause();

ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(() -> {
    System.out.println("CurrentThread: " + Thread.currentThread().getId() + " " + Thread.currentThread().getName());
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    kafkaListenerEndpointRegistry.getListenerContainer("id").resume();
});
英文:

As native KafkaConsumer is not thread safe, so it is discouraged to call pause and resume methods from different thread instead of kafka-consumer processing thread.
but as spring-kafka provides another layer KafkaMessageListenerContainer which internally use kafka-consumer. So my question is can we use KafkaListenerEndpointRegistry to get the listener container by id and call resume or pause method from other thread rather than consumer processing thread.

kafkaListenerEndpointRegistry.getListenerContainer("id").pause();
       

    ExecutorService executorService  = newFixedThreadPool(2);
    executorService.submit(()->{

        System.out.println("CurrentThread: {}" + Thread.currentThread().getId()+ " " + Thread.currentThread().getName());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        kafkaListenerEndpointRegistry.getListenerContainer("id").resume();
    });

答案1

得分: 1

是的; container.pause() 设置一个标志,告诉Consumer线程在下一次poll()调用之前暂停。同样,resume()重置该标志,以便在下一次轮询之前恢复Consumer线程。

英文:

Yes; container.pause() sets a flag to tell the Consumer thread to pause before its next poll() call. Similarly, resume() resets the flag so the consumer thread will resume the Consumer before the next poll.

huangapple
  • 本文由 发表于 2020年7月22日 19:13:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/63032912.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定