英文:
ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG = false is not working
问题
SpringBoot2
SpringKafka
我目前面临的问题是ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG = false未正常工作。我已将allow.auto.create.topics设置为false,但主题仍在运行时自动创建。
以下是我所做的代码更改。
EmbeddedKafkaIntegrationTest.java(单元测试类)
// 你的代码
KafkaConsumerService.java
// 你的代码
KafkaProducerService.java
// 你的代码
application.yml
// 你的代码
根据以上代码更改,我通过在代理和消费者之间更改标志进行了一些测试。
- 如果我在代理级别将auto.create.topics.enable=false更改为true,则测试用例将因无法订阅主题而失败。这是可以接受的,因为这是预期的行为。
- 如果我启用了auto.create.topics.enable=true,但我禁用了allow.auto.create.topics=false,则主题仍然会自动创建。这不是可以接受的,因为这是意外的行为。
我已经检查了控制台日志和调试模式,属性确实设置为false。但事情就是不按预期工作。
以上测试是使用@EmbeddedKafka进行的。我尝试使用TestContainers Kafka,结果仍然相同。不起作用。
请问有人可以指导我是否做出了不正确的更改吗?
更新:
经过一些测试,似乎主题是在Producer部分创建的。
我创建了一个简单的消耗测试用例(没有生成代码),主题没有自动创建。
我创建了另一个简单的生成测试用例(没有消耗代码),我看到主题被自动创建。
有人知道如何在生产者上禁用主题自动创建吗?或者这是预期的功能吗?
我了解我可以在代理级别禁用主题自动创建(auto.create.topics.enable=false)。但是,有时我们可能无法在基础设施级别修改属性。
英文:
SpringBoot2
SpringKafka
I'm currently facing an issue where the ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG = false is not working properly. I've set the allow.auto.create.topics to false, but the Topic still auto created during the runtime.
Here is the code changes that I've made.
EmbeddedKafkaIntegrationTest.java (Unit Test class)
package com.mbag.kafka;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.util.StringUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertTrue;
@SpringBootTest(classes = {KafkaConsumerService.class, KafkaProducerService.class})
@Import(com.mbag.kafka.EmbeddedKafkaIntegrationTest.KafkaConfigurationTest.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"port=9092",
"auto.create.topics.enable=true"
})
public class EmbeddedKafkaIntegrationTest {
@Autowired
private KafkaConsumerService consumer;
@Autowired
private KafkaProducerService producer;
@Value("${test.topic}")
private String topic;
@Test
public void givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived() throws Exception {
String data = "Sending with our own simple KafkaProducer";
producer.send(topic, data);
boolean messageConsumed = consumer.getLatch().await(3, TimeUnit.SECONDS);
assertTrue(messageConsumed);
assertTrue(consumer.getPayload().contains(data));
}
@Configuration
@EnableKafka
static class KafkaConfigurationTest {
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean("kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(record -> record.value().contains("test"));
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest latest
props.put(ConsumerConfig.GROUP_ID_CONFIG, "localtest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
}
KafkaConsumerService.java
package com.mbag.kafka;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.FixedDelayStrategy;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
@Component
@Data
public class KafkaConsumerService {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
private CountDownLatch latch = new CountDownLatch(1);
private String payload;
@KafkaListener(topics = "${test.topic}", groupId = "localtest", containerFactory = "kafkaListenerContainerFactory")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("Received payload='{}'", consumerRecord.toString());
payload = consumerRecord.toString();
latch.countDown();
}
public void resetLatch() {
latch = new CountDownLatch(1);
}
}
KafkaProducerService.java
package com.mbag.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducerService {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
application.yml
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: localtest
properties:
allow.auto.create.topics: false
test:
topic: embedded-test-topic
topic-test: testing-only
Based on above code changes, I did some testing by changing the flag between broker and consumer.
- If I changed the auto.create.topics.enable=false on broker level, the test case will failed due to unable to subscribe the topic. This is fine since expected the behavior.
- If I enable the auto.create.topics.enable=true, but I disabled the allow.auto.create.topics=false, the topic still auto created. This is not fine as unexpected behavior.
I've checked the logs in console and debug mode, the property is indeed set to false. But things just don't work as expected.
Above testing is using @EmbeddedKafka. I did tried to use TestContainers Kafka, and the result still the same. Not working.
Can someone please advise if I make any incorrect changes here?
UPDATE:
After some testing, it seems like the topic was created during the Producer part.
I created a simple test cases for consume (without produce code), the topic didn't get created.
I created another simple test cases for produce (without consume code), I saw the topic being created automatically.
Does anyone know how do I disable the topic auto creation on producer? Or if this is intended feature?
I'm understand I can disable the topic auto creation on broker level (auto.create.topics.enable=false). However, sometimes we may not allow to modify the properties on infra level.
答案1
得分: 1
您会在代理上禁用自动创建。
理想情况下,您还会禁止消费者进行此操作。否则,您将创建空主题,而没有活动的生产者(例如,您误输入主题名称,但由于主题已被创建为空,您不会收到任何消息或警告)。
在其他测试运行之前,您需要自己使用 AdminClient 实际创建主题。
英文:
> how do I disable the topic auto creation on producer?
You would disable auto creation on the broker.
Ideally, you'd also disallow from the consumer. Otherwise, you'd be making empty topics with no active producer (for example, you mistype the topic name, and you'd get no messages or warning since the topic would be created empty).
Use AdminClient on your own to actually create topics before other tests run.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论