英文:
Clickhouse can't get messages from Kafka with TabSeparated formatting
问题
I send messages to Kafka from my Spring Boot application:
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("uniqTopic123", "testKey", "Test\tTest");
future.addCallback(
(v) -> System.out.println("SUCCESS: " + v),
(v) -> System.out.println("FAIL: " + v)
);
kafkaTemplate.flush();
application.properties:
spring.kafka.consumer.group-id=app.1
kafka.server=<kafka_host>:9092
kafka.producer.id=kafkaProducerId
Configuration:
@Value("${kafka.server}")
private String kafkaServer;
@Value("${kafka.producer.id}")
private String kafkaProducerId;
@Value("${spring.kafka.consumer.group-id}")
private String kafkaGroupId;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerId);
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(
new DefaultKafkaProducerFactory<>(producerConfigs()));
}
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
return new DefaultKafkaConsumerFactory<>(props);
}
In logs I can see messages like:
SUCCESS: SendResult [producerRecord=ProducerRecord(topic=uniqTopic123, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=testKey, value=Test Test, timestamp=null), recordMetadata=uniqTopic123-0@1]
But my listener doesn't catch any messages:
@KafkaListener(topics="uniqTopic123")
public void msgListener(ConsumerRecord<String, String> record){
System.out.println("test ======> " + record.value());
}
And the table in ClickHouse is empty. My ClickHouse tables:
CREATE TABLE IF NOT EXISTS test (key String, message String)
ENGINE = Kafka('<kafka_host>:9092', 'uniqTopic123', 'app.1', 'TabSeparated');
CREATE TABLE IF NOT EXISTS test_table (key String, message String)
ENGINE = MergeTree() ORDER BY key;
CREATE MATERIALIZED VIEW consumer TO test_table AS
SELECT key, message FROM test;
What is wrong in my code?
UPD.: Kafka Tool shows that messages are in Kafka
UPD: The mistake was in the absence of a linefeed at the end of the message "Test\tTest\n".
英文:
I send messages to Kafka from my Spring Boot application
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("uniqTopic123", "testKey", "Test\tTest");
future.addCallback(
(v) -> System.out.println("SUCCESS: " + v),
(v) -> System.out.println("FAIL: " + v)
);
kafkaTemplate.flush();
application.properties
spring.kafka.consumer.group-id=app.1
kafka.server=<kafka_host>:9092
kafka.producer.id=kafkaProducerId
Configuration
@Value("${kafka.server}")
private String kafkaServer;
@Value("${kafka.producer.id}")
private String kafkaProducerId;
@Value("${spring.kafka.consumer.group-id}")
private String kafkaGroupId;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerId);
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(
new DefaultKafkaProducerFactory<>(producerConfigs()));
}
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
return new DefaultKafkaConsumerFactory<>(props);
}
In logs I can see messages like:
SUCCESS: SendResult [producerRecord=ProducerRecord(topic=uniqTopic123, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=testKey, value=Test Test, timestamp=null), recordMetadata=uniqTopic123-0@1]
But my listener doesn't catch any messages
@KafkaListener(topics="uniqTopic123")
public void msgListener(ConsumerRecord<String, String> record){
System.out.println("test ======> " + record.value());
}
And the table in ClickHouse is empty. My ClickHouse tables
CREATE TABLE IF NOT EXISTS test (key String, message String)
ENGINE = Kafka('<kafka_host>:9092', 'uniqTopic123', 'app.1', 'TabSeparated');
CREATE TABLE IF NOT EXISTS test_table (key String, message String)
ENGINE = MergeTree() ORDER BY key;
CREATE MATERIALIZED VIEW consumer TO test_table AS
SELECT key, message FROM test;
What is wrong in my code?
UPD.: Kafka Tool shows that messages are in Kafka
UPD:
The mistake was in absenсe of linefeed at the end of message "Test\tTest\n"
答案1
得分: 1
错误在于消息末尾没有换行符,消息为"Test\tTest\n"。
英文:
The mistake was in absenсe of linefeed at the end of message "Test\tTest\n"
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论