KStreams – 如何处理一个主题上的消息延迟

huangapple go评论103阅读模式
英文:

KStreams - How to handle delay of messages on one topic

问题

I have translated the non-code part of your text as requested:

我有一个基于Spring Boot的KStreams应用程序,我正在跨多个主题进行数据连接。当一个主题中存在延迟时,处理这种情况的最佳实践是什么?我已经阅读了链接,例如https://stackoverflow.com/questions/41707854/how-to-manage-kafka-kstream-to-kstream-windowed-join和其他链接。

这是我用于生成模拟数据到两个主题 - Employee和Finance的示例代码(Spring Boot应用程序):

同样适用于财务主题:

这些记录关联的时间戳类型是*** TimeStampType.CREATE_TIME ,我假设它与Streams中的事件时间***相同。

我有一个简单的KStreams应用程序,该应用程序将财务主题重新键入,使财务流键匹配员工流键,然后进行连接,如下所示:

如果匹配键的记录在财务主题中超过30秒后到达,则不会发生连接。关于如何解决这个问题的任何见解都将有助于。先谢谢你。

P.S .: 这些数据是虚构的。如果它与您的部门ID/工资匹配,那只是巧合。 KStreams – 如何处理一个主题上的消息延迟

英文:

I've a Spring boot based KStreams application where I am joining data across multiple topics. What is/are best practice(s) to handle a situation when there is a delay in one topic? I've read links such as https://stackoverflow.com/questions/41707854/how-to-manage-kafka-kstream-to-kstream-windowed-join and others.

Here is my sample code(Spring Boot App) to produce mock data to 2 topics - Employee and Finance. Code for employee topic below:

private void sendEmpData() {
	IntStream.range(0, 1).forEach(index -> {
		EmployeeKey key = new EmployeeKey();
		key.setEmployeeId(1);

		Employee employee = new Employee();
		employee.setDepartmentId(1000);
        employee.setEmployeeFirstName("John);
		employee.setEmployeeId(1);
		employee.setEmployeeLastName("Doe");

		kafkaTemplateForEmp.send(EMP_TOPIC, key, employee);
	});
}

Likewise for the finance topic:

private void sendFinanceData() {
	IntStream.range(0, 1).forEach(index -> {
		FinanceKey key = new FinanceKey();
		key.setEmployeeId(1);
		key.setDepartmentId(1000);

		Finance finance = new Finance();
		finance.setDepartmentId(1000);
		finance.setEmployeeId(1);
		finance.setSalary(2000);

		kafkaTemplateForFinance.send(FINANCE_TOPIC, key, finance);
	});
}

The timestamp type associated with these records is TimeStampType.CREATE_TIME which I am assuming to be the same as event time in Streams.

I've a simple KStreams app which rekeys the finance topic to have the finance stream key match to employee stream key and then do the join as below:

employeeKStream.join(reKeyedStream,
            (employee, finance) -> new EmployeeFinance(employee.getEmployeeId(),
                    employee.getEmployeeFirstName(),
                    employee.getEmployeeLastName(),
                    employee.getDepartmentId(),
                    finance.getSalary(),
                    finance.getSalaryGrade()),
            JoinWindows.of(windowRetentionTimeMs), //30 seconds
            Joined.with(
                    employeeKeySerde,
                    employeeSerde,
                    financeSerde)).to(outputTopic, Produced.with(employeeKeySerde, employeeFinanceSerde));

If a record with matching key arrives more than 30 seconds later in finance topic, then the join doesn't happen. Any insights on how to address this would be helpful. Thank you in advance.

P.S.: This data is a work of fiction. If it matches your department Id/salary, its merely coincidental. KStreams – 如何处理一个主题上的消息延迟

答案1

得分: 0

如果匹配键的记录在金融主题中到达超过30秒后,那么连接不会发生。

默认情况下,Kafka Streams 使用 24 小时的宽限期,因此,即使存在滞后或乱序数据,您的连接也应该正常工作。请注意,Kafka 中的“滞后”始终指的是“读取”路径!

在金融主题中到达超过30秒后

但是,我认为您实际上并不是指您有“滞后”(在您回退阅读的意义上),而是您的上游“写入”被延迟了 - 对于这种情况,事件时间可能只是被错误地分配了:

请注意,当写入 Kafka 主题时,如果您没有明确指定时间戳,生产者将在调用 send() 时分配时间戳 - 而不是在创建 ProducerRecord 实例时。如果您想在创建 ProducerRecord 时分配时间戳,您需要手动传入要分配的时间戳到构造函数中(不确定 Spring Boot 是否允许您这样做)。

作为替代方案(如果您无法明确设置记录时间戳),您可以将时间戳嵌入到值中(当然,这需要您更改您的 EmployeeFinance 类)。在使用 Kafka Streams 处理这些数据时,您可以使用自定义的 TimestampExtractor 来获取您的自定义时间戳,而不是记录时间戳。

英文:

> If a record with matching key arrives more than 30 seconds later in finance topic, then the join doesn't happen.

By default, Kafka Streams uses a grace period of 24h, hence, even if there is lag or out-of-order data, your join should work. Note that lag in Kafka always refers to the read path!

> arrives more than 30 seconds later in finance topic

However, I think you don't really mean that you have lag (in the sense that you fall back reading), but your upstream write is delayed -- for this case, the event time may just be assigned incorrectly:

Note, that when writing to a Kafka topic and you don't specify the timestamp explicitly, the producer will assign the timestamp when send() is called -- not when the ProducerRecord instance is created. If you want to assign a timestamp when the ProducerRecord is created, you need to pass in the timestamp you want to assign into the constructor manually (not sure if Spring boot allows you to do this).

As an alternative (in case you cannot set the record timestamp explicitly), you could embed the timestamp in the value (this of course requires that you change your Employee and Finance classes. When processing this data with Kafka Streams, you can use in a custom TimestampExtractor to get your custom timestamp instead of the record timestamp.

huangapple
  • 本文由 发表于 2020年1月6日 22:43:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/59614053.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定