英文:
Kafka Stream-GlobalKTable join on a specific field
问题
以下是您要翻译的部分:
我有一个KStream,它被反序列化为一个POJO,如下所示:
public class FinancialMessage {
public String user_id;
public String stock_symbol;
public String exchange_id;
}
这是全局Ktable记录的外观:
public class CompanySectors {
public String company_id;
public String company_name;
public String tckr;
public String sector_cd;
}
我想要能够将KStream的“stock_symbol”字段与Ktable的“tckr”字段进行连接。这可能吗?我想在将其流式传输到另一个主题之前创建一个新的“EnrichedMessage”对象。我有以下代码,但似乎遇到了一些空指针异常。
异常线程“trade-enrichment-stream-0c7e7782-4217-4450-8086-21871b4ebc45-StreamThread-1”java.lang.NullPointerException
at com.domain.EnrichedMessage.<init>(EnrichedMessage.java:51)
at com.domain.TradeEnrichmentTopology.lambda$3(TradeEnrichmentTopology.java:73)
at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:79)
...
以下是代码片段的外观。
KStream<String, FinancialMessage> financialMessageStream =
builder.stream(
INCOMING_TOPIC,
Consumed.with(Serdes.String(), financialMessageSerde)
);
GlobalKTable<String, CompanySectors> companySectorsStore =
builder.globalTable(
KTABLE_TOPIC,
Consumed.with(Serdes.String(), companySectorsSerde)
);
KStream<String, EnrichedMessage> enrichedStream = financialMessageStream.leftJoin(
companySectorsStore,
(financialMessageKey, financialMessageValue) -> financialMessageValue.stock_symbol,
(financialMessageValue, companySectorsValue) -> new EnrichedMessage(financialMessageValue, companySectorsValue)
);
enrichedStream.to(
OUTGOING_TOPIC,
Produced.with(Serdes.String(), enrichedMessageSerde));
希望这对您有所帮助。如果您有任何其他问题,请随时提问。
英文:
So I have a KStream that that gets deserialized into a POJO like so
public class FinancialMessage {
public String user_id;
public String stock_symbol;
public String exchange_id;
}
And here's how the Global Ktable record looks like
public class CompanySectors {
public String company_id;
public String company_name;
public String tckr;
public String sector_cd;
}
I want to be able to join the KStream's stock_symbol field with the Ktable's tckr field. Is this possible? I want to create a new EnrichedMessage object before I stream it into another topic. I had code like below but I seem to be getting some null pointer exceptions.
Exception in thread "trade-enrichment-stream-0c7e7782-4217-4450-8086-21871b4ebc45-StreamThread-1" java.lang.NullPointerException
at com.domain.EnrichedMessage.<init>(EnrichedMessage.java:51)
at com.domain.TradeEnrichmentTopology.lambda$3(TradeEnrichmentTopology.java:73)
at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:79)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:101)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:801)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Here's what the code snippet looks like.
KStream<String, FinancialMessage> financialMessageStream =
builder.stream(
INCOMING_TOPIC,
Consumed.with(Serdes.String(), financialMessageSerde)
);
GlobalKTable<String, CompanySectors> companySectorsStore =
builder.globalTable(
KTABLE_TOPIC,
Consumed.with(Serdes.String(), companySectorsSerde)
);
KStream<String, EnrichedMessage> enrichedStream = financialMessageStream.leftJoin(
companySectorsStore,
(financialMessageKey, financialMessageValue) -> financialMessageValue.stock_symbol,
(financialMessageValue, companySectorsValue) -> new EnrichedMessage(financialMessageValue, companySectorsValue)
);
enrichedStream.to(
OUTGOING_TOPIC,
Produced.with(Serdes.String(), enrichedMessageSerde));
I imagine that there might be some error in my leftJoin logic.
答案1
得分: 4
When doing a left join, you can assume that the left stream's record is not null; however, you cannot assume that the right GlobalKTable will have a record for matching the given key, and therefore the resulting record could be null. In your case, when you instantiate a new EnrichedMessage(financialMessageValue, companySectorsValue), are you sure that companySectorsValue isn't null? If it is null, are you handling it properly? It appears that your NPE is occurring in the constructor of EnrichedMessage, so just make sure that you know that companySectorsValue can be null.
Also, ensure your GlobalKTable is prepopulated before any join logic occurs.
英文:
When doing a left join, you can assume that the left stream's record is not null; however, you cannot assume that the right GlobalKTable will have a record for matching the given key, and therefore the resulting record could be null. In your case, when you instantiate a new EnrichedMessage(financialMessageValue, companySectorsValue), are you sure that companySectorsValue isn't null? If it is null, are you handling it properly? It appears that your NPE is occurring in the constructor of EnrichedMessage, so just make sure that you know that companySectorsValue can be null.
Also, ensure your GlobalKTable is prepopulated before any join logic occurs.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论