ClickHouse 无法从 Kafka 获取 TabSeparated 格式的消息。

huangapple go评论97阅读模式
英文:

Clickhouse can't get messages from Kafka with TabSeparated formatting

问题

I send messages to Kafka from my Spring Boot application:

  1. ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("uniqTopic123", "testKey", "Test\tTest");
  2. future.addCallback(
  3. (v) -> System.out.println("SUCCESS: " + v),
  4. (v) -> System.out.println("FAIL: " + v)
  5. );
  6. kafkaTemplate.flush();

application.properties:

  1. spring.kafka.consumer.group-id=app.1
  2. kafka.server=<kafka_host>:9092
  3. kafka.producer.id=kafkaProducerId

Configuration:

  1. @Value("${kafka.server}")
  2. private String kafkaServer;
  3. @Value("${kafka.producer.id}")
  4. private String kafkaProducerId;
  5. @Value("${spring.kafka.consumer.group-id}")
  6. private String kafkaGroupId;
  7. @Bean
  8. public Map<String, Object> producerConfigs() {
  9. Map<String, Object> props = new HashMap<>();
  10. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
  11. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  12. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  13. //props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerId);
  14. return props;
  15. }
  16. @Bean
  17. public KafkaTemplate<String, String> kafkaTemplate() {
  18. return new KafkaTemplate<String, String>(
  19. new DefaultKafkaProducerFactory<>(producerConfigs()));
  20. }
  21. @Bean
  22. public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
  23. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  24. factory.setConsumerFactory(consumerFactory());
  25. return factory;
  26. }
  27. @Bean
  28. public ConsumerFactory<String, String> consumerFactory() {
  29. Map<String, Object> props = new HashMap<>();
  30. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
  31. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  32. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  33. props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
  34. return new DefaultKafkaConsumerFactory<>(props);
  35. }

In logs I can see messages like:

  1. SUCCESS: SendResult [producerRecord=ProducerRecord(topic=uniqTopic123, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=testKey, value=Test Test, timestamp=null), recordMetadata=uniqTopic123-0@1]

But my listener doesn't catch any messages:

  1. @KafkaListener(topics="uniqTopic123")
  2. public void msgListener(ConsumerRecord<String, String> record){
  3. System.out.println("test ======> " + record.value());
  4. }

And the table in ClickHouse is empty. My ClickHouse tables:

  1. CREATE TABLE IF NOT EXISTS test (key String, message String)
  2. ENGINE = Kafka('<kafka_host>:9092', 'uniqTopic123', 'app.1', 'TabSeparated');
  3. CREATE TABLE IF NOT EXISTS test_table (key String, message String)
  4. ENGINE = MergeTree() ORDER BY key;
  5. CREATE MATERIALIZED VIEW consumer TO test_table AS
  6. SELECT key, message FROM test;

What is wrong in my code?

UPD.: Kafka Tool shows that messages are in Kafka

UPD: The mistake was in the absence of a linefeed at the end of the message "Test\tTest\n".

英文:

I send messages to Kafka from my Spring Boot application

  1. ListenableFuture&lt;SendResult&lt;String, String&gt;&gt; future = kafkaTemplate.send(&quot;uniqTopic123&quot;, &quot;testKey&quot;, &quot;Test\tTest&quot;);
  2. future.addCallback(
  3. (v) -&gt; System.out.println(&quot;SUCCESS: &quot; + v),
  4. (v) -&gt; System.out.println(&quot;FAIL: &quot; + v)
  5. );
  6. kafkaTemplate.flush();

application.properties

  1. spring.kafka.consumer.group-id=app.1
  2. kafka.server=&lt;kafka_host&gt;:9092
  3. kafka.producer.id=kafkaProducerId

Configuration

  1. @Value(&quot;${kafka.server}&quot;)
  2. private String kafkaServer;
  3. @Value(&quot;${kafka.producer.id}&quot;)
  4. private String kafkaProducerId;
  5. @Value(&quot;${spring.kafka.consumer.group-id}&quot;)
  6. private String kafkaGroupId;
  7. @Bean
  8. public Map&lt;String, Object&gt; producerConfigs() {
  9. Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
  10. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
  11. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  12. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  13. //props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerId);
  14. return props;
  15. }
  16. @Bean
  17. public KafkaTemplate&lt;String, String&gt; kafkaTemplate() {
  18. return new KafkaTemplate&lt;String, String&gt;(
  19. new DefaultKafkaProducerFactory&lt;&gt;(producerConfigs()));
  20. }
  21. @Bean
  22. public KafkaListenerContainerFactory&lt;?&gt; kafkaListenerContainerFactory() {
  23. ConcurrentKafkaListenerContainerFactory&lt;String, String&gt; factory = new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
  24. factory.setConsumerFactory(consumerFactory());
  25. return factory;
  26. }
  27. @Bean
  28. public ConsumerFactory&lt;String, String&gt; consumerFactory() {
  29. Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
  30. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
  31. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  32. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  33. props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
  34. return new DefaultKafkaConsumerFactory&lt;&gt;(props);
  35. }

In logs I can see messages like:

SUCCESS: SendResult [producerRecord=ProducerRecord(topic=uniqTopic123, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=testKey, value=Test Test, timestamp=null), recordMetadata=uniqTopic123-0@1]

But my listener doesn't catch any messages

  1. @KafkaListener(topics=&quot;uniqTopic123&quot;)
  2. public void msgListener(ConsumerRecord&lt;String, String&gt; record){
  3. System.out.println(&quot;test ======&gt; &quot; + record.value());
  4. }

And the table in ClickHouse is empty. My ClickHouse tables

  1. CREATE TABLE IF NOT EXISTS test (key String, message String)
  2. ENGINE = Kafka(&#39;&lt;kafka_host&gt;:9092&#39;, &#39;uniqTopic123&#39;, &#39;app.1&#39;, &#39;TabSeparated&#39;);
  3. CREATE TABLE IF NOT EXISTS test_table (key String, message String)
  4. ENGINE = MergeTree() ORDER BY key;
  5. CREATE MATERIALIZED VIEW consumer TO test_table AS
  6. SELECT key, message FROM test;

What is wrong in my code?

UPD.: Kafka Tool shows that messages are in Kafka
ClickHouse 无法从 Kafka 获取 TabSeparated 格式的消息。

UPD:
The mistake was in absenсe of linefeed at the end of message "Test\tTest\n"

答案1

得分: 1

错误在于消息末尾没有换行符,消息为"Test\tTest\n"。

英文:

The mistake was in absenсe of linefeed at the end of message "Test\tTest\n"

huangapple
  • 本文由 发表于 2020年8月7日 19:27:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/63300872.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定