Kafka Stream未填充。

huangapple go评论77阅读模式
英文:

Kafka Stream is not populated

问题

我有一个非常简单的Java/Spring应用程序来演示KStream功能,但不幸的是,我无法使KStream加载数据。想法是创建一个KStream对象,并使用控制器GET方法简单地检索其内容。示例代码:

@RestController
@RequestMapping("/resources/")
public class StreamController {

   private KafkaStreams streams;
   private KStream<String, ResourceMessage> resourceStream;

   StreamController() {

      // 配置流/消费者
      Properties props = new Properties();

      // 确保流从头开始
      props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

      props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
      props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
      props.put(StreamsConfig.STATE_DIR_CONFIG, Path.of(System.getProperty("java.io.tmpdir")).toAbsolutePath().toString());

      // 创建POJO序列化器和反序列化器
      StreamsBuilder builder = new StreamsBuilder();
      Map<String, Object> serdeProps = new HashMap<>();
      Serializer<ResourceMessage> resourceSerializer = new JsonPOJOSerializer<>();
      serdeProps.put("JsonPOJOClass", ResourceMessage.class);
      resourceSerializer.configure(serdeProps, false);
      Deserializer<ResourceMessage> resourceDeserializer = new JsonPOJODeserializer<>();
      serdeProps.put("JsonPOJOClass", Resource.class);
      resourceDeserializer.configure(serdeProps, false);
      Serde<ResourceMessage> resourceSerde = Serdes.serdeFrom(resourceSerializer, resourceDeserializer);

      // 使用POJO序列化器和反序列化器为值创建KStream
      resourceStream = builder.stream("Resources", Consumed.with(Serdes.String(), resourceSerde));
      streams = new KafkaStreams(builder.build(), props);
      streams.start();
     }

   // GET方法枚举KStream并返回内容
   @GetMapping(value = "/resource")
   public List<Resource> getResources() {

       List<ResourceMessage> messages = new LinkedList<ResourceMessage>();

       // 问题出在这里 - 主题中有消息,但KStream在foreach(...)中不返回任何值
       resourceStream.foreach((key, value) -> messages.add(value));

       return messages.stream().map(m -> Resource.builder()
            .id(m.getResouceId()).resource(m.getResource()).build()).collect(Collectors.toList());
   }
}

问题 - 主题中有消息,但是在foreach(...)中枚举KStream不从中检索到任何结果。KStream对象的状态为"RUNNING",日志中没有错误。

生成随机的APPLICATION_ID并将AUTO_OFFSET_RESET设置为"earliest"没有帮助。我可以明确看到使用Kafka工具的主题中有一些消息。在控制器运行时添加新消息也没有帮助。关于Kafka流处理,我是否漏掉了什么或者有什么地方我理解不正确?

附注:我正在使用这里的POJO序列化器和反序列化器示例。

英文:

I have a very simple Java/Spring application to demonstrate KStream functionality, but unfortunately I cannot make KStream to load data. Idea is to create a KStream object and simply retrieve its content using controller GET method. Sample code:

@RestController
@RequestMapping(&quot;/resources/&quot;)
public class StreamController {
private KafkaStreams streams;
private KStream&lt;String, ResourceMessage&gt; resourceStream;
StreamController() {
// configure streams/consumer
Properties props = new Properties();
// make sure stream starts from the beginning
props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;earliest&quot;);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;localhost:9092&quot;);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.STATE_DIR_CONFIG, Path.of(System.getProperty(&quot;java.io.tmpdir&quot;)).toAbsolutePath().toString());
//create POJO serdes
StreamsBuilder builder = new StreamsBuilder();
Map&lt;String, Object&gt; serdeProps = new HashMap&lt;&gt;();
Serializer&lt;ResourceMessage&gt; resourceSerializer = new JsonPOJOSerializer&lt;&gt;();
serdeProps.put(&quot;JsonPOJOClass&quot;, ResourceMessage.class);
resourceSerializer.configure(serdeProps, false);
Deserializer&lt;ResourceMessage&gt; resourceDeserializer = new JsonPOJODeserializer&lt;&gt;();
serdeProps.put(&quot;JsonPOJOClass&quot;, Resource.class);
resourceDeserializer.configure(serdeProps, false);
Serde&lt;ResourceMessage&gt; resourceSerde = Serdes.serdeFrom(resourceSerializer, resourceDeserializer);
// create KStream with POJO serdes for value
resourceStream = builder.stream(&quot;Resources&quot;, Consumed.with(Serdes.String(), resourceSerde));
streams = new KafkaStreams(builder.build(), props);
streams.start();
}
// GET method that enumerates KStream and returns contents
@GetMapping(value = &quot;/resource&quot;)
public List&lt;Resource&gt; getResources() {
List&lt;ResourceMessage&gt; messages = new LinkedList&lt;ResourceMessage&gt;();
// problem is here - there are messages in the topic but KStream returns no values in foreach(...)
resourceStream.foreach((key, value) -&gt; messages.add(value));
return messages.stream().map(m -&gt; Resource.builder()
.id(m.getResouceId()).resource(m.getResource()).build()).collect(Collectors.toList());
}
}

Problem - there are messages in the topic, but KStream enumeration in foreach(...) retrieves no results from it. KStream object state is "RUNNING" and there are no errors in the logs.

Generating random APPLICATION_ID and setting AUTO_OFFSET_RESET to "earliest" does not help. I can clearly see some messages in the topic using Kafka Tool. Adding new messages while controller is running also does not help. Is there anything I'm missing or do not understand about Kafka streaming?

PS I'm using POJO serializer and deserializer examples from here.

答案1

得分: 1

Kafka Streams 是用于实时流处理的 Kafka 客户端。在您的情况下,您不需要使用 Kafka Streams 客户端(它不起作用),您需要一个简单的 Kafka 消费者,该消费者从 Kafka 拉取记录并通过 Rest API 将其发送回。例如:

@RestController
@RequestMapping("/resources/")
public class StreamController {

   private KafkaStreams streams;
   private Consumer<String, ResourceMessage> consumer;

   StreamController() {

      // 配置消费者属性
      Properties props = new Properties();

      // 使用适当的序列化程序和反序列化程序创建正确的属性

      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

      // 使用属性创建消费者
      consumer = new KafkaConsumer<>(props);
	  consumer.subscribe(Collections.singletonList(TOPIC));
   }

   // GET 方法用于枚举 KStream 并返回内容
   @GetMapping(value = "/resource")
   public List<Resource> getResources() {

       List<ResourceMessage> messages = new LinkedList<ResourceMessage>();

       ConsumerRecords<String, ResourceMessage> consumerRecords =
                    consumer.poll(1000);
       
	   // 转换记录为您的自定义 POJO
	   messages = consumerRecords... 

       return messages.stream().map(m -> Resource.builder()
            .id(m.getResouceId()).resource(m.getResource()).build()).collect(Collectors.toList());
   }
}

您可以在这里找到一个完整的示例 链接

更新

另外,您应该知道 RestController 是请求范围的,因此会为每个请求创建一个控制器实例。因此,最终您将得不到 API 响应的内容。如果您想要使用 Kafka Streams,您可以在主方法中启动它,同时您拥有一个 Spring Boot 应用程序。
您可以参考这个示例,链接

英文:

Kafka Streams is a Kafka client used for real-time stream processing. In your case, you don't need a Kafka Streams client (it won't work), you need a simple Kafka Consumer that polls records from Kafka and send it back using the Rest API. for example:

@RestController
@RequestMapping(&quot;/resources/&quot;)
public class StreamController {
private KafkaStreams streams;
private Consumer&lt;String, ResourceMessage&gt; consumer;
StreamController() {
// configure consumer properties
Properties props = new Properties();
// make the right properties with your Serialize and deserialiser
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;localhost:9092&quot;);
// Create the consumer using props.
consumer = new KafkaConsumer&lt;&gt;(props);
consumer.subscribe(Collections.singletonList(TOPIC));
}
// GET method that enumerates KStream and returns contents
@GetMapping(value = &quot;/resource&quot;)
public List&lt;Resource&gt; getResources() {
List&lt;ResourceMessage&gt; messages = new LinkedList&lt;ResourceMessage&gt;();
ConsumerRecords&lt;String, ResourceMessage&gt; consumerRecords =
consumer.poll(1000);
messages = consumerRecords... // Convert the records to your custom POJO
return messages.stream().map(m -&gt; Resource.builder()
.id(m.getResouceId()).resource(m.getResource()).build()).collect(Collectors.toList());
}
}

You can find here a full example link.

UPDATE

Also, you should know that a RestControlled is request scoped, so there is an instance of your controller created for each request. So finally you will end up with nothing for the API response. If you want to use Kafka Streams, you can launch it in your main method, while you have a Spring boot application.
You can see this for example, link.

huangapple
  • 本文由 发表于 2020年9月4日 16:14:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/63737332.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定