英文:
KStream-KStream Join not trigger left-join
问题
我使用kafka streams和spring-cloud-stream-binder-kafka-streams来构建一个流,该流从一个主题接收有关在Kubernetes集群中运行的Pod的信息,并根据此信息构建容器映像与关联Pod的映射。
根据这个需求,我尝试加入一个包含扫描操作的kstream。这个连接操作在正确的时机被触发,但是一旦我通过向正在运行的Pod主题中生产新条目,左连接操作就再也不会被触发,即使我期望一个新的Pod事件和一个新的容器映射应该创建一个左连接,其中扫描操作为空。
我添加了一些peek操作来查看发生了什么,看起来新的条目在左连接之前就“消失”了。所以peek container-map:
在代码中,我看到了一些配置,如下所示:
@Bean
public BiFunction<KStream<String, PodEventDto>, KStream<String, ScanAction>, KStream<String, ScanAction>> events() {
return (podEventStream, scanActionKStream) -> {
return podEventStream
.process(PodEventStreamProcessor::new, "container-pod-map")
.peek((key, value) -> {
System.out.println("container-map: " + key + " - " + value.getImage());
})
.map((key, value) -> KeyValue.pair(key, value))
.leftJoin(scanActionKStream,
(key, containerPodMap, scanAction) -> {
// 在初始启动后永远不会到达这里
},
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(SCAN_INTERVAL_IN_MINUTES)),
StreamJoined.with(Serdes.String(), new JsonSerde<>(ContainerPodMap.class), new JsonSerde<>(ScanAction.class)))
///...
}
public static class PodEventStreamProcessor extends ContextualProcessor<String, PodEventDto, String, ContainerPodMap> {
private KeyValueStore<String, ContainerPodMap> stateStore;
@Override
public void init(ProcessorContext<String, ContainerPodMap> context) {
super.init(context);
stateStore = context.getStateStore("container-pod-map");
}
@Override
public void process(Record<String, PodEventDto> podEventDtoRecord) {
String podId = podEventDtoRecord.value().getId();
for (PodContainerEventDto podContainer : podEventDtoRecord.value().getContainer()) {
// 在示例中简化以减少复杂性
context().forward(stringContainerPodMapRecord);
}
}
}
另外,还有一些Spring Cloud Stream的配置:
spring.cloud.stream.bindings.events-in-0.destination=pod-event
spring.cloud.stream.bindings.events-in-1.destination=scan-tasks
spring.cloud.stream.bindings.events-out-0.destination=scan-tasks
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
希望这些信息对你有帮助!如果你有任何其他问题,请随时问我。
英文:
I utilize kafka streams together with spring-cloud-stream-binder-kafka-streams to build a stream that receives from a topic information about pods running in a kubernetes cluster and build out of it a mapping of container image with associated pods.
public class ContainerPodMap {
private String image;
private List<String> pods;
}
Based on this I try to join a kstream that contains scan actions. This join gets triggered once properly, but as soon as I add new entries via producing into the running pods topic the leftjoin action gets never triggered again even though I would expect that a new pod event with a new containerpod-map should create a leftjoin where the scanAction is empty.
I implemented some peeks to see what is happening and it looks like the new entries simply "dies" infront of the leftjoin. So the peek container-map: <key> - <value> works as expected.
@Bean
public BiFunction<KStream<String, PodEventDto>, KStream<String, ScanAction>, KStream<String, ScanAction>> events() {
return (podEventStream, scanActionKStream) -> {
return podEventStream
.process(PodEventStreamProcessor::new, "container-pod-map")
.peek((key, value) -> {
System.out.println("container-map: " + key + " - " + value.getImage());
})
.map((key, value) -> KeyValue.pair(key, value))
.leftJoin(scanActionKStream,
(key, containerPodMap, scanAction) -> {
/// never arrive here after the initial startup
},
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(SCAN_INTERVAL_IN_MINUTES)),
StreamJoined.with(Serdes.String(), new JsonSerde<>(ContainerPodMap.class), new JsonSerde<>(ScanAction.class)))
///...
}
public static class PodEventStreamProcessor extends ContextualProcessor<String, PodEventDto, String, ContainerPodMap> {
private KeyValueStore<String, ContainerPodMap> stateStore;
@Override
public void init(ProcessorContext<String, ContainerPodMap> context) {
super.init(context);
stateStore = context.getStateStore("container-pod-map");
}
@Override
public void process(Record<String, PodEventDto> podEventDtoRecord) {
String podId = podEventDtoRecord.value().getId();
for (PodContainerEventDto podContainer : podEventDtoRecord.value().getContainer()) {
// simplyfied to reduce complexity in example
context().forward(stringContainerPodMapRecord);
}
spring.cloud.stream.bindings.events-in-0.destination=pod-event
spring.cloud.stream.bindings.events-in-1.destination=scan-tasks
spring.cloud.stream.bindings.events-out-0.destination=scan-tasks
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
答案1
得分: 1
左连接结果只在连接窗口关闭后才会被发出。只要连接窗口是打开的,就不清楚左侧输入记录是否会产生内连接结果。只有当左侧输入记录不产生内连接结果时,它才会作为左连接结果被发出。
如果只发送一条记录,流时间(仅根据记录时间戳推进)不会推进,因此连接窗口不会关闭。只有当处理另一条具有足够大时间戳的记录时,连接窗口关闭,才会发出左连接结果。
参考链接:https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/
英文:
Left join results are only emitted after the join window was closed. As long as the join window is open, it's not clear if the left input record will produce an inner join result or not. Only if the left input record does not produce an inner join result, it would be emitted as left join result.
If you only send a single record, stream time (which is only advanced based on record timestamps) does not advance, and thus the join window is not closed. Only if another record with a large enough timestamp is processed, such that the join window is close, a left-join result would be emitted.
Cf https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论