英文:
Apache Beam: How to solve "ParDo requires a deterministic key coder in order to use state and timers" while using Deduplication function
问题
I'm trying to deduplicate input messages from Google Cloud Pubsub using deduplication function of Apache beam. However, I run into an error after creating KV<String, MyModel>
pair and passing it to Deduplicate
transform.
Error:
ParDo requires a deterministic key coder in order to use state and timers
Code:
PCollection<KV<String, MyModel>> deduplicatedEvents =
messages
.apply(
"CreateKVPairs",
ParDo.of(
new DoFn<MyModel, KV<String, MyModel>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(c.element().getUniqueKey(),c.element()));
}
}))
.apply(
"Deduplicate",
Deduplicate.<KV<String, MyModel>>values());
How should I create deterministic coder which can encode/decode string as key, to make this work?
Any input would be really helpful.
英文:
I'm trying to deduplicate input messages from Google Cloud Pubsub using deduplication function of Apache beam. However, I run into an error after creating KV<String, MyModel>
pair and passing it to Deduplicate
transform.
Error:
ParDo requires a deterministic key coder in order to use state and timers
Code:
PCollection<KV<String, MyModel>> deduplicatedEvents =
messages
.apply(
"CreateKVPairs",
ParDo.of(
new DoFn<MyModel, KV<String, MyModel>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(c.element().getUniqueKey(),c.element()));
}
}))
.apply(
"Deduplicate",
Deduplicate.<KV<String, MyModel>>values());
How should I create deterministic coder which can encode/decode string as key, to make this work?
Any input would be really helpful.
答案1
得分: 4
The Deduplicate
transform works by putting the whole element into the key and then doing a key grouping operation (in this case a stateful ParDo). Because Beam is language-independent, grouping by key is done using the encoded form of elements. Two elements that encode to the same bytes are "equal" while two elements that encode to different bytes are "unequal".
A deterministic coder is a concept about how equality in a language (like Java) relates to Beam equality. It means that if two Java objects are equal according to Java equals()
then they must have the same encoded bytes. For simple data like strings, numbers, arrays, this is easy. It is helpful to think about what makes a coder non-deterministic. For example, when encoding two Map
instances, they may be equals()
at the Java level but the key-value pairs are encoded in a different order making them unequal for Beam.
If you have a nondeterministic coder for MyModel
, then Deduplicate
will not work right and you will end up with duplicates because Beam considers the differently encoded objects to be unequal.
Probably the easiest way to automatically get a high-quality deterministic coder is to leverage Beam's schema inference: https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types. You will need to ensure that all the fields can also be encoded deterministically.
英文:
The Deduplicate
transform works by putting the whole element into the key and then doing a key grouping operation (in this case a stateful ParDo). Because Beam is language-independent, grouping by key is done using the encoded form of elements. Two elements that encode to the same bytes are "equal" while two elements that encode to different bytes are "unequal".
A deterministic coder is a concept about how equality in a language (like Java) relates to Beam equality. It means that if two Java objects are equal according to Java equals()
then they must have the same encoded bytes. For simple data like strings, numbers, arrays, this is easy. It is helpful to think about what makes a coder non-deterministic. For example, when encoding two Map
instances, they may be equals()
at the Java level but the key-value pairs are encoded in a different order making them unequal for Beam.
If you have a nondeterministic coder for MyModel
, then Deduplicate
will not work right and you will end up with duplicates because Beam considers the differently encoded objects to be unequal.
Probably the easiest way to automatically get a high quality deterministic coder is to leverage Beam's schema inference: https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types. You will need to ensure that all the fields can also be encoded deterministically.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论