英文:
Kafka Stream-GlobalKTable join on a specific field
问题
以下是您要翻译的部分:
我有一个KStream,它被反序列化为一个POJO,如下所示:
public class FinancialMessage {
public String user_id;
public String stock_symbol;
public String exchange_id;
}
这是全局Ktable记录的外观:
public class CompanySectors {
public String company_id;
public String company_name;
public String tckr;
public String sector_cd;
}
我想要能够将KStream的“stock_symbol”字段与Ktable的“tckr”字段进行连接。这可能吗?我想在将其流式传输到另一个主题之前创建一个新的“EnrichedMessage”对象。我有以下代码,但似乎遇到了一些空指针异常。
异常线程“trade-enrichment-stream-0c7e7782-4217-4450-8086-21871b4ebc45-StreamThread-1”java.lang.NullPointerException
at com.domain.EnrichedMessage.<init>(EnrichedMessage.java:51)
at com.domain.TradeEnrichmentTopology.lambda$3(TradeEnrichmentTopology.java:73)
at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:79)
...
以下是代码片段的外观。
KStream<String, FinancialMessage> financialMessageStream =
builder.stream(
INCOMING_TOPIC,
Consumed.with(Serdes.String(), financialMessageSerde)
);
GlobalKTable<String, CompanySectors> companySectorsStore =
builder.globalTable(
KTABLE_TOPIC,
Consumed.with(Serdes.String(), companySectorsSerde)
);
KStream<String, EnrichedMessage> enrichedStream = financialMessageStream.leftJoin(
companySectorsStore,
(financialMessageKey, financialMessageValue) -> financialMessageValue.stock_symbol,
(financialMessageValue, companySectorsValue) -> new EnrichedMessage(financialMessageValue, companySectorsValue)
);
enrichedStream.to(
OUTGOING_TOPIC,
Produced.with(Serdes.String(), enrichedMessageSerde));
希望这对您有所帮助。如果您有任何其他问题,请随时提问。
英文:
So I have a KStream that that gets deserialized into a POJO like so
public class FinancialMessage {
public String user_id;
public String stock_symbol;
public String exchange_id;
}
And here's how the Global Ktable record looks like
public class CompanySectors {
public String company_id;
public String company_name;
public String tckr;
public String sector_cd;
}
I want to be able to join the KStream's stock_symbol
field with the Ktable's tckr
field. Is this possible? I want to create a new EnrichedMessage
object before I stream it into another topic. I had code like below but I seem to be getting some null pointer exceptions.
Exception in thread "trade-enrichment-stream-0c7e7782-4217-4450-8086-21871b4ebc45-StreamThread-1" java.lang.NullPointerException
at com.domain.EnrichedMessage.<init>(EnrichedMessage.java:51)
at com.domain.TradeEnrichmentTopology.lambda$3(TradeEnrichmentTopology.java:73)
at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:79)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:101)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:801)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Here's what the code snippet looks like.
KStream<String, FinancialMessage> financialMessageStream =
builder.stream(
INCOMING_TOPIC,
Consumed.with(Serdes.String(), financialMessageSerde)
);
GlobalKTable<String, CompanySectors> companySectorsStore =
builder.globalTable(
KTABLE_TOPIC,
Consumed.with(Serdes.String(), companySectorsSerde)
);
KStream<String, EnrichedMessage> enrichedStream = financialMessageStream.leftJoin(
companySectorsStore,
(financialMessageKey, financialMessageValue) -> financialMessageValue.stock_symbol,
(financialMessageValue, companySectorsValue) -> new EnrichedMessage(financialMessageValue, companySectorsValue)
);
enrichedStream.to(
OUTGOING_TOPIC,
Produced.with(Serdes.String(), enrichedMessageSerde));
I imagine that there might be some error in my leftJoin logic.
答案1
得分: 4
When doing a left join, you can assume that the left stream's record is not null; however, you cannot assume that the right GlobalKTable will have a record for matching the given key, and therefore the resulting record could be null. In your case, when you instantiate a new EnrichedMessage(financialMessageValue, companySectorsValue)
, are you sure that companySectorsValue
isn't null? If it is null, are you handling it properly? It appears that your NPE is occurring in the constructor of EnrichedMessage
, so just make sure that you know that companySectorsValue
can be null.
Also, ensure your GlobalKTable is prepopulated before any join logic occurs.
英文:
When doing a left join, you can assume that the left stream's record is not null; however, you cannot assume that the right GlobalKTable will have a record for matching the given key, and therefore the resulting record could be null. In your case, when you instantiate a new EnrichedMessage(financialMessageValue, companySectorsValue)
, are you sure that companySectorsValue
isn't null? If it is null, are you handling it properly? It appears that your NPE is occurring in the constructor of EnrichedMessage
, so just make sure that you know that companySectorsValue
can be null.
Also, ensure your GlobalKTable is prepopulated before any join logic occurs.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论