Apache Beam + Dataflow对于仅1.8万数据速度太慢

huangapple go评论72阅读模式
英文:

Apache Beam + Dataflow too slow for only 18k data

问题

我们需要对简单但大量的数据执行大量计算。

输入数据是BigQuery表中的行,有两列:ID(整数)和DATA(字符串)。DATA的值的形式为"1#2#3#4#...",包含36个值。

输出数据的形式相同,但是DATA只是经过算法转换的结果。这是一个"一对一"的转换关系。

我们已经尝试过使用Apache Beam与Google Cloud Dataflow,但是当有多个worker实例时会出现错误。在我们的POC中,我们只使用了18,000个输入行,目标是约1百万行。

以下是类的简化版本(已删除写入部分,行为保持不变):

public class MyClass {
    static MyService myService = new MyService();

    static class ExtractDataFn extends DoFn<TableRow, KV<Long, String>> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            Long id = Long.parseLong((String) c.element().get("ID"));
            String data = (String) c.element().get("DATA");
            c.output(KV.of(id, data));
        }
    }

    public interface Options extends PipelineOptions {
        String getInput();
        void setInput(String value);

        @Default.Enum("EXPORT")
        TypedRead.Method getReadMethod();
        void setReadMethod(TypedRead.Method value);

        @Validation.Required
        String getOutput();
        void setOutput(String value);
    }

    static void run(Options options) {
        Pipeline p = Pipeline.create(options);

        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("ID").setType("INTEGER"));
        fields.add(new TableFieldSchema().setName("DATA").setType("STRING"));
        TableSchema schema = new TableSchema().setFields(fields);

        PCollection<TableRow> rowsFromBigQuery = p.apply(
                BigQueryIO.readTableRows().from(options.getInput()).withMethod(options.getReadMethod())
        );

        PCollection<KV<Long, String>> inputdata = rowsFromBigQuery.apply(ParDo.of(new ExtractDataFn()));
        PCollection<KV<Long, String>> outputData = applyTransform(inputdata);
        // 这里是将数据写入BigQuery表的部分
        p.run().waitUntilFinish();
    }

    static PCollection<KV<Long, String>> applyTransform(PCollection<KV<Long, String>> inputData) {
        PCollection<KV<Long, String>> forecasts = inputData.apply(ParDo.of(new DoFn<KV<Long, String>, KV<Long, String>>() {
            @ProcessElement
            public void processElement(@Element KV<Long, String> element, OutputReceiver<KV<Long, String>> receiver, ProcessContext c) {
                MyDto dto = new MyDto();
                List<Double> inputData = Arrays.asList(element.getValue().split("#")).stream().map(Double::valueOf).collect(Collectors.toList());
                dto.setInputData(inputData);
                dto = myService.calculate(dto); // 这是耗时操作
                String modifiedData = dto.getModifiedData().stream().map(Object::toString).collect(Collectors.joining(","));
                receiver.output(KV.of(element.getKey(), modifiedData));
            }
        }));

        return forecasts;
    }

    public static void main(String[] args) {
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        run(options);
    }
}

在GCP日志控制台中,我们可以看到工作人员数量增加到10个,持续约5分钟,然后减少到3或4个,然后出现类似以下消息的情况(有数百个消息),CPU占用率约为0%:

Proposing dynamic split of work unit myproject;2020-10-06_06_18_27-12689839210406435299;1231063355075246317 at {"fractionConsumed":0.5,"position":{"shufflePosition":"f_8A_wD_AAAB"}}

以及

Operation ongoing in step BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Read for at least 05m00s without outputting or completing in state read-shuffle at app//org.apache.beam.runners.dataflow.worker.ApplianceShuffleReader.readIncludingPosition(Native Method)

如果让它继续运行,最终会以类似以下错误结束:

Error message from worker: java.lang.RuntimeException: unexpected org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader.read(CachingShuffleBatchReader.java:77)

如果我将myService.calculate方法修改为更快的版本,所有数据都将由单个worker处理,没有问题。问题似乎只在并行处理任务时发生。

谢谢您的帮助。

英文:

we need to execute heavy calculation on simple but numerous data.<br>
Input data are rows in a BigQuery table, two columns: ID (Integer) and DATA (STRING). The DATA values are of the form "1#2#3#4#..." with 36 values.<br>
Ouput data are the same form, but DATA are just transformed by an algorithm.<br>
It's a "one for one" transformation.<br>

We have tried Apache Beam with Google Cloud Dataflow, but it does not work, there are errors as soon as several workers are instancied.<br>
For our POC we use only 18k input rows, the target is about 1 million.<br>
<br>
Here is a light version of the class (I've removed the write part, the behaviour remains the same):

public class MyClass {
static MyService myService = new MyService();
static class ExtractDataFn extends DoFn&lt;TableRow, KV&lt;Long, String&gt;&gt; {
@ProcessElement
public void processElement(ProcessContext c) {
Long id = Long.parseLong((String) c.element().get(&quot;ID&quot;));  
String data = (String) c.element().get(&quot;DATA&quot;);			
c.output(KV.of(id, data));
}
}
public interface Options extends PipelineOptions {
String getInput();
void setInput(String value);
@Default.Enum(&quot;EXPORT&quot;)
TypedRead.Method getReadMethod();
void setReadMethod(TypedRead.Method value);
@Validation.Required
String getOutput();
void setOutput(String value);
}
static void run(Options options) {
Pipeline p = Pipeline.create(options);
List&lt;TableFieldSchema&gt; fields = new ArrayList&lt;&gt;();
fields.add(new TableFieldSchema().setName(&quot;ID&quot;).setType(&quot;INTEGER&quot;));
fields.add(new TableFieldSchema().setName(&quot;DATA&quot;).setType(&quot;STRING&quot;));
TableSchema schema = new TableSchema().setFields(fields);
PCollection&lt;TableRow&gt; rowsFromBigQuery = p.apply(
BigQueryIO.readTableRows().from(options.getInput()).withMethod(options.getReadMethod())
);				
PCollection&lt;KV&lt;Long, String&gt;&gt; inputdata = rowsFromBigQuery.apply(ParDo.of(new ExtractDataFn()));
PCollection&lt;KV&lt;Long, String&gt;&gt; outputData = applyTransform(inputdata);
// Here goes the part where data are written in a BQ table
p.run().waitUntilFinish();
}
static PCollection&lt;KV&lt;Long, String&gt;&gt; applyTransform(PCollection&lt;KV&lt;Long, String&gt;&gt; inputData) {		
PCollection&lt;KV&lt;Long, String&gt;&gt; forecasts = inputData.apply(ParDo.of(new DoFn&lt;KV&lt;Long, String&gt;, KV&lt;Long, String&gt;&gt; () {
@ProcessElement
public void processElement(@Element KV&lt;Long, String&gt; element, OutputReceiver&lt;KV&lt;Long, String&gt;&gt; receiver, ProcessContext c) {
MyDto dto = new MyDto();
List&lt;Double&gt; inputData = Arrays.asList(element.getValue().split(&quot;#&quot;)).stream().map(Double::valueOf).collect(Collectors.toList());
dto.setInputData(inputData);				
dto = myService.calculate(dto); // here is the time consuming operation
String modifiedData = dto.getModifiedData().stream().map(Object::toString).collect(Collectors.joining(&quot;,&quot;));
receiver.output(KV.of(element.getKey(), modifiedData));
}
}))
;
return forecasts;
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}

}

In the GCP Logs console we can see the number of workers increasing up to 10, during about 5 minutes, it decreases to 3 or 4, and then we have this kind of messages (several hundreds of them), and CPU is about 0%:

Proposing dynamic split of work unit myproject;2020-10-06_06_18_27-12689839210406435299;1231063355075246317 at {&quot;fractionConsumed&quot;:0.5,&quot;position&quot;:{&quot;shufflePosition&quot;:&quot;f_8A_wD_AAAB&quot;}}

and

Operation ongoing in step BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Read for at least 05m00s without outputting or completing in state read-shuffle at app//org.apache.beam.runners.dataflow.worker.ApplianceShuffleReader.readIncludingPosition(Native Method)

If we let it run it finishes in error of this kind :

Error message from worker: java.lang.RuntimeException: unexpected org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader.read(CachingShuffleBatchReader.java:77)

If I modify the myService.calculate method to be faster, all the data are treated by only one worker and there is no problem. The problem seems to occured only when treatments are parallelized.

Thank you for your help

答案1

得分: 1

解决方案是通过添加允许工作节点间通信的规则来配置防火墙。

https://cloud.google.com/dataflow/docs/guides/routes-firewall

英文:

The solution was to configure the firewall by adding a rule allowing communication between workers.

https://cloud.google.com/dataflow/docs/guides/routes-firewall

huangapple
  • 本文由 发表于 2020年10月8日 17:32:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/64259658.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定