Kafka Stream-GlobalKTable 在特定字段上进行连接。

huangapple go评论85阅读模式
英文:

Kafka Stream-GlobalKTable join on a specific field

问题

以下是您要翻译的部分:

我有一个KStream它被反序列化为一个POJO如下所示

public class FinancialMessage {

public String user_id;
public String stock_symbol;
public String exchange_id;

}

这是全局Ktable记录的外观

public class CompanySectors {

public String company_id;
public String company_name;
public String tckr;
public String sector_cd;
}

我想要能够将KStream的stock_symbol字段与Ktable的tckr字段进行连接这可能吗我想在将其流式传输到另一个主题之前创建一个新的EnrichedMessage对象我有以下代码但似乎遇到了一些空指针异常

异常线程trade-enrichment-stream-0c7e7782-4217-4450-8086-21871b4ebc45-StreamThread-1java.lang.NullPointerException
at com.domain.EnrichedMessage.<init>(EnrichedMessage.java:51)
at com.domain.TradeEnrichmentTopology.lambda$3(TradeEnrichmentTopology.java:73)
at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:79)
...

以下是代码片段的外观

KStream<String, FinancialMessage> financialMessageStream =
builder.stream(
INCOMING_TOPIC,
Consumed.with(Serdes.String(), financialMessageSerde)
);

GlobalKTable<String, CompanySectors> companySectorsStore = 
builder.globalTable(
KTABLE_TOPIC,
Consumed.with(Serdes.String(), companySectorsSerde)
);

KStream<String, EnrichedMessage> enrichedStream = financialMessageStream.leftJoin(
companySectorsStore,
(financialMessageKey, financialMessageValue) -> financialMessageValue.stock_symbol,
(financialMessageValue, companySectorsValue) -> new EnrichedMessage(financialMessageValue, companySectorsValue)
);

enrichedStream.to(
OUTGOING_TOPIC,
Produced.with(Serdes.String(), enrichedMessageSerde));

希望这对您有所帮助。如果您有任何其他问题,请随时提问。

英文:

So I have a KStream that that gets deserialized into a POJO like so

public class FinancialMessage {
public String user_id;
public String stock_symbol;
public String exchange_id;
}

And here's how the Global Ktable record looks like

public class CompanySectors {
public String company_id;
public String company_name;
public String tckr;
public String sector_cd;
}

I want to be able to join the KStream's stock_symbol field with the Ktable's tckr field. Is this possible? I want to create a new EnrichedMessage object before I stream it into another topic. I had code like below but I seem to be getting some null pointer exceptions.

Exception in thread &quot;trade-enrichment-stream-0c7e7782-4217-4450-8086-21871b4ebc45-StreamThread-1&quot; java.lang.NullPointerException
at com.domain.EnrichedMessage.&lt;init&gt;(EnrichedMessage.java:51)
at com.domain.TradeEnrichmentTopology.lambda$3(TradeEnrichmentTopology.java:73)
at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:79)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:101)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:801)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)

Here's what the code snippet looks like.

KStream&lt;String, FinancialMessage&gt; financialMessageStream =
builder.stream(
INCOMING_TOPIC,
Consumed.with(Serdes.String(), financialMessageSerde)
);
GlobalKTable&lt;String, CompanySectors&gt; companySectorsStore = 
builder.globalTable(
KTABLE_TOPIC,
Consumed.with(Serdes.String(), companySectorsSerde)
);
KStream&lt;String, EnrichedMessage&gt; enrichedStream = financialMessageStream.leftJoin(
companySectorsStore,
(financialMessageKey, financialMessageValue) -&gt; financialMessageValue.stock_symbol,
(financialMessageValue, companySectorsValue) -&gt; new EnrichedMessage(financialMessageValue, companySectorsValue)
);
enrichedStream.to(
OUTGOING_TOPIC,
Produced.with(Serdes.String(), enrichedMessageSerde));

I imagine that there might be some error in my leftJoin logic.

答案1

得分: 4

When doing a left join, you can assume that the left stream's record is not null; however, you cannot assume that the right GlobalKTable will have a record for matching the given key, and therefore the resulting record could be null. In your case, when you instantiate a new EnrichedMessage(financialMessageValue, companySectorsValue), are you sure that companySectorsValue isn't null? If it is null, are you handling it properly? It appears that your NPE is occurring in the constructor of EnrichedMessage, so just make sure that you know that companySectorsValue can be null.

Also, ensure your GlobalKTable is prepopulated before any join logic occurs.

英文:

When doing a left join, you can assume that the left stream's record is not null; however, you cannot assume that the right GlobalKTable will have a record for matching the given key, and therefore the resulting record could be null. In your case, when you instantiate a new EnrichedMessage(financialMessageValue, companySectorsValue), are you sure that companySectorsValue isn't null? If it is null, are you handling it properly? It appears that your NPE is occurring in the constructor of EnrichedMessage, so just make sure that you know that companySectorsValue can be null.

Also, ensure your GlobalKTable is prepopulated before any join logic occurs.

huangapple
  • 本文由 发表于 2020年8月3日 21:42:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/63230590.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定