ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG = false 不起作用。

huangapple go评论76阅读模式
英文:

ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG = false is not working

问题

SpringBoot2
SpringKafka

我目前面临的问题是ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG = false未正常工作。我已将allow.auto.create.topics设置为false,但主题仍在运行时自动创建。

以下是我所做的代码更改。

EmbeddedKafkaIntegrationTest.java(单元测试类)

// 你的代码

KafkaConsumerService.java

// 你的代码

KafkaProducerService.java

// 你的代码

application.yml

// 你的代码

根据以上代码更改,我通过在代理和消费者之间更改标志进行了一些测试。

  1. 如果我在代理级别将auto.create.topics.enable=false更改为true,则测试用例将因无法订阅主题而失败。这是可以接受的,因为这是预期的行为。
  2. 如果我启用了auto.create.topics.enable=true,但我禁用了allow.auto.create.topics=false,则主题仍然会自动创建。这不是可以接受的,因为这是意外的行为。

我已经检查了控制台日志和调试模式,属性确实设置为false。但事情就是不按预期工作。

以上测试是使用@EmbeddedKafka进行的。我尝试使用TestContainers Kafka,结果仍然相同。不起作用。

请问有人可以指导我是否做出了不正确的更改吗?

更新:
经过一些测试,似乎主题是在Producer部分创建的。
我创建了一个简单的消耗测试用例(没有生成代码),主题没有自动创建。
我创建了另一个简单的生成测试用例(没有消耗代码),我看到主题被自动创建。

有人知道如何在生产者上禁用主题自动创建吗?或者这是预期的功能吗?

我了解我可以在代理级别禁用主题自动创建(auto.create.topics.enable=false)。但是,有时我们可能无法在基础设施级别修改属性。

英文:

SpringBoot2
SpringKafka

I'm currently facing an issue where the ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG = false is not working properly. I've set the allow.auto.create.topics to false, but the Topic still auto created during the runtime.

Here is the code changes that I've made.

EmbeddedKafkaIntegrationTest.java (Unit Test class)

package com.mbag.kafka;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.util.StringUtils;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertTrue;

@SpringBootTest(classes = {KafkaConsumerService.class, KafkaProducerService.class})
@Import(com.mbag.kafka.EmbeddedKafkaIntegrationTest.KafkaConfigurationTest.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = {
        "listeners=PLAINTEXT://localhost:9092",
        "port=9092",
        "auto.create.topics.enable=true"
})
public class EmbeddedKafkaIntegrationTest {

    @Autowired
    private KafkaConsumerService consumer;

    @Autowired
    private KafkaProducerService producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    public void givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived() throws Exception {
        String data = "Sending with our own simple KafkaProducer";

        producer.send(topic, data);

        boolean messageConsumed = consumer.getLatch().await(3, TimeUnit.SECONDS);

        assertTrue(messageConsumed);
        assertTrue(consumer.getPayload().contains(data));
    }

    @Configuration
    @EnableKafka
    static class KafkaConfigurationTest {

        @Bean
        public KafkaAdmin admin() {
            Map<String, Object> configs = new HashMap<>();
            configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            return new KafkaAdmin(configs);
        }

        @Bean("kafkaListenerContainerFactory")
        ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setRecordFilterStrategy(record -> record.value().contains("test"));
            return factory;
        }

        @Bean
        public ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }

        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest latest
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "localtest");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }

        @Bean
        public ProducerFactory<String, String> producerFactory() {
            Map<String, Object> configProps = new HashMap<>();
            configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        }

        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }

    }

}

KafkaConsumerService.java

package com.mbag.kafka;

import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.FixedDelayStrategy;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;

import java.util.concurrent.CountDownLatch;

@Component
@Data
public class KafkaConsumerService {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    private CountDownLatch latch = new CountDownLatch(1);
    private String payload;

    @KafkaListener(topics = "${test.topic}", groupId = "localtest", containerFactory = "kafkaListenerContainerFactory")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("Received payload='{}'", consumerRecord.toString());
        payload = consumerRecord.toString();
        latch.countDown();
    }

    public void resetLatch() {
        latch = new CountDownLatch(1);
    }

}

KafkaProducerService.java

package com.mbag.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaProducerService {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String payload) {
        LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
        kafkaTemplate.send(topic, payload);
    }

}

application.yml

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: localtest
      properties:
        allow.auto.create.topics: false
test:
  topic: embedded-test-topic
  topic-test: testing-only

Based on above code changes, I did some testing by changing the flag between broker and consumer.

  1. If I changed the auto.create.topics.enable=false on broker level, the test case will failed due to unable to subscribe the topic. This is fine since expected the behavior.
  2. If I enable the auto.create.topics.enable=true, but I disabled the allow.auto.create.topics=false, the topic still auto created. This is not fine as unexpected behavior.

I've checked the logs in console and debug mode, the property is indeed set to false. But things just don't work as expected.

Above testing is using @EmbeddedKafka. I did tried to use TestContainers Kafka, and the result still the same. Not working.

Can someone please advise if I make any incorrect changes here?

UPDATE:
After some testing, it seems like the topic was created during the Producer part.
I created a simple test cases for consume (without produce code), the topic didn't get created.
I created another simple test cases for produce (without consume code), I saw the topic being created automatically.

Does anyone know how do I disable the topic auto creation on producer? Or if this is intended feature?

I'm understand I can disable the topic auto creation on broker level (auto.create.topics.enable=false). However, sometimes we may not allow to modify the properties on infra level.

答案1

得分: 1

您会在代理上禁用自动创建。

理想情况下,您还会禁止消费者进行此操作。否则,您将创建空主题,而没有活动的生产者(例如,您误输入主题名称,但由于主题已被创建为空,您不会收到任何消息或警告)。

在其他测试运行之前,您需要自己使用 AdminClient 实际创建主题。

英文:

> how do I disable the topic auto creation on producer?

You would disable auto creation on the broker.

Ideally, you'd also disallow from the consumer. Otherwise, you'd be making empty topics with no active producer (for example, you mistype the topic name, and you'd get no messages or warning since the topic would be created empty).

Use AdminClient on your own to actually create topics before other tests run.

huangapple
  • 本文由 发表于 2023年6月13日 18:14:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/76463848.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定