Apache Beam: How to solve "ParDo requires a deterministic key coder in order to use state and timers" while using Deduplication function

huangapple go评论108阅读模式
英文:

Apache Beam: How to solve "ParDo requires a deterministic key coder in order to use state and timers" while using Deduplication function

问题

I'm trying to deduplicate input messages from Google Cloud Pubsub using deduplication function of Apache beam. However, I run into an error after creating KV<String, MyModel> pair and passing it to Deduplicate transform.

Error:

ParDo requires a deterministic key coder in order to use state and timers

Code:

PCollection<KV<String, MyModel>> deduplicatedEvents =
    messages
        .apply(
            "CreateKVPairs",
            ParDo.of(
                new DoFn<MyModel, KV<String, MyModel>>() {
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    c.output(KV.of(c.element().getUniqueKey(),c.element()));
                  }
                }))
        .apply(
            "Deduplicate",
            Deduplicate.<KV<String, MyModel>>values());

How should I create deterministic coder which can encode/decode string as key, to make this work?

Any input would be really helpful.

英文:

I'm trying to deduplicate input messages from Google Cloud Pubsub using deduplication function of Apache beam. However, I run into an error after creating KV&lt;String, MyModel&gt; pair and passing it to Deduplicate transform.

Error:

ParDo requires a deterministic key coder in order to use state and timers

Code:

PCollection&lt;KV&lt;String, MyModel&gt;&gt; deduplicatedEvents =
    messages
        .apply(
            &quot;CreateKVPairs&quot;,
            ParDo.of(
                new DoFn&lt;MyModel, KV&lt;String, MyModel&gt;&gt;() {
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    c.output(KV.of(c.element().getUniqueKey(),c.element()));
                  }
                }))
        .apply(
            &quot;Deduplicate&quot;,
            Deduplicate.&lt;KV&lt;String, MyModel&gt;&gt;values());

How should I create deterministic coder which can encode/decode string as key, to make this work?

Any input would be really helpful.

答案1

得分: 4

The Deduplicate transform works by putting the whole element into the key and then doing a key grouping operation (in this case a stateful ParDo). Because Beam is language-independent, grouping by key is done using the encoded form of elements. Two elements that encode to the same bytes are "equal" while two elements that encode to different bytes are "unequal".

A deterministic coder is a concept about how equality in a language (like Java) relates to Beam equality. It means that if two Java objects are equal according to Java equals() then they must have the same encoded bytes. For simple data like strings, numbers, arrays, this is easy. It is helpful to think about what makes a coder non-deterministic. For example, when encoding two Map instances, they may be equals() at the Java level but the key-value pairs are encoded in a different order making them unequal for Beam.

If you have a nondeterministic coder for MyModel, then Deduplicate will not work right and you will end up with duplicates because Beam considers the differently encoded objects to be unequal.

Probably the easiest way to automatically get a high-quality deterministic coder is to leverage Beam's schema inference: https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types. You will need to ensure that all the fields can also be encoded deterministically.

英文:

The Deduplicate transform works by putting the whole element into the key and then doing a key grouping operation (in this case a stateful ParDo). Because Beam is language-independent, grouping by key is done using the encoded form of elements. Two elements that encode to the same bytes are "equal" while two elements that encode to different bytes are "unequal".

A deterministic coder is a concept about how equality in a language (like Java) relates to Beam equality. It means that if two Java objects are equal according to Java equals() then they must have the same encoded bytes. For simple data like strings, numbers, arrays, this is easy. It is helpful to think about what makes a coder non-deterministic. For example, when encoding two Map instances, they may be equals() at the Java level but the key-value pairs are encoded in a different order making them unequal for Beam.

If you have a nondeterministic coder for MyModel, then Deduplicate will not work right and you will end up with duplicates because Beam considers the differently encoded objects to be unequal.

Probably the easiest way to automatically get a high quality deterministic coder is to leverage Beam's schema inference: https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types. You will need to ensure that all the fields can also be encoded deterministically.

huangapple
  • 本文由 发表于 2020年8月3日 01:28:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/63219092.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定