触发Kafka主题中的压缩应如何操作?

huangapple go评论78阅读模式
英文:

How to trigger compaction in Kafka topic?

问题

我使用以下设置创建主题:

final AdminClient localKafkaAdmin = AdminClient.create(consumerProps);
final NewTopic topic = new NewTopic("test-v1", 1, (short) 1);

Map<String, String> topicConfigMap = new HashMap<>();
topicConfigMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
topicConfigMap.put(TopicConfig.SEGMENT_MS_CONFIG, "2");
topic.configs(topicConfigMap);

localKafkaAdmin.createTopics(Arrays.asList(topic));

之后,通过KafkaProducer发送具有相同键的3条消息。当我获取ConsumerRecords时,我发现压缩未应用,有3条记录。我做错了什么?我尝试了segment.bytes设置,但结果相同。如何立即触发压缩?

英文:

I create topic with this settings:

    final AdminClient localKafkaAdmin = AdminClient.create(consumerProps);
    final NewTopic topic = new NewTopic(&quot;test-v1&quot;, 1, (short) 1);

    Map&lt;String, String&gt; topicConfigMap = new HashMap&lt;&gt;();
    topicConfigMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
    topicConfigMap.put(TopicConfig.SEGMENT_MS_CONFIG, &quot;2&quot;);
    topic.configs(topicConfigMap);

    localKafkaAdmin.createTopics(Arrays.asList(topic));

after that send 3 messages with the same key via KafkaProducer. when I get ConsumerRecords I see that compaction wasn't applied, there are 3 records. What am I doing wrong? tried with segment.bytes setting but got the same result. how to trigger compaction immediately?

答案1

得分: 1

Compaction is configured by those parameters:

  • log.cleanup.policy=compact
  • log.cleaner.min.compaction.lag.ms
  • log.cleaner.max.compaction.lag.ms
  • min.cleanable.dirty.ratio

In your case to force compaction you probably want to set low value for one of the last 2. For example, log.cleaner.max.compaction.lag.ms = 2. Do not forget to check both per-topic and per-broker configuration, and wait until compaction will actually be ended, since parameters only sets whether records are eligible for compaction.

英文:

Compaction is configured by those parameters:

  • log.cleanup.policy=compact
  • log.cleaner.min.compaction.lag.ms
  • log.cleaner.max.compaction.lag.ms
  • min.cleanable.dirty.ratio

In your case to force compaction you probably want to set low value for one of the last 2. For example, log.cleaner.max.compaction.lag.ms = 2. Do not forget to check both per-topic and per-broker configuration, and wait until compaction will actually be ended, since parameters only sets whether records is eligible for compaction.

huangapple
  • 本文由 发表于 2023年8月4日 22:28:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/76836847.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定