英文:
How to trigger compaction in Kafka topic?
问题
我使用以下设置创建主题:
final AdminClient localKafkaAdmin = AdminClient.create(consumerProps);
final NewTopic topic = new NewTopic("test-v1", 1, (short) 1);
Map<String, String> topicConfigMap = new HashMap<>();
topicConfigMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
topicConfigMap.put(TopicConfig.SEGMENT_MS_CONFIG, "2");
topic.configs(topicConfigMap);
localKafkaAdmin.createTopics(Arrays.asList(topic));
之后,通过KafkaProducer发送具有相同键的3条消息。当我获取ConsumerRecords时,我发现压缩未应用,有3条记录。我做错了什么?我尝试了segment.bytes设置,但结果相同。如何立即触发压缩?
英文:
I create topic with this settings:
final AdminClient localKafkaAdmin = AdminClient.create(consumerProps);
final NewTopic topic = new NewTopic("test-v1", 1, (short) 1);
Map<String, String> topicConfigMap = new HashMap<>();
topicConfigMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
topicConfigMap.put(TopicConfig.SEGMENT_MS_CONFIG, "2");
topic.configs(topicConfigMap);
localKafkaAdmin.createTopics(Arrays.asList(topic));
after that send 3 messages with the same key via KafkaProducer. when I get ConsumerRecords I see that compaction wasn't applied, there are 3 records. What am I doing wrong? tried with segment.bytes setting but got the same result. how to trigger compaction immediately?
答案1
得分: 1
Compaction is configured by those parameters:
- log.cleanup.policy=compact
- log.cleaner.min.compaction.lag.ms
- log.cleaner.max.compaction.lag.ms
- min.cleanable.dirty.ratio
In your case to force compaction you probably want to set low value for one of the last 2. For example, log.cleaner.max.compaction.lag.ms = 2. Do not forget to check both per-topic and per-broker configuration, and wait until compaction will actually be ended, since parameters only sets whether records are eligible for compaction.
英文:
Compaction is configured by those parameters:
- log.cleanup.policy=compact
- log.cleaner.min.compaction.lag.ms
- log.cleaner.max.compaction.lag.ms
- min.cleanable.dirty.ratio
In your case to force compaction you probably want to set low value for one of the last 2. For example, log.cleaner.max.compaction.lag.ms = 2. Do not forget to check both per-topic and per-broker configuration, and wait until compaction will actually be ended, since parameters only sets whether records is eligible for compaction.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论