Spark作业需要1小时来处理10MB的文件。

huangapple go评论70阅读模式
英文:

Spark Job takes one hour to process 10MB file

问题

我是新手使用Spark,我在EMR集群上运行Spark作业。作业需要大约一小时才能完成。

在作业中,有一段代码用于从S3中读取文件(文件大小为10MB),并将其内容写入DynamoDB。

我不明白为什么完成这个任务需要这么长时间?我本来期望它会快得多。

以下是Spark作业的代码:

public class PopulateCovid19Citations {

    private static final String APP_NAME = "PopulateCovid19Citations";
    private static Logger LOGGER = LogManager.getLogger(PopulateCovid19Citations.class);
    static Logger log = Logger.getLogger(PopulateCovid19Citations.class.getName());

    public static void main(String[] args) throws Exception {

        Logger.getLogger("org").setLevel(Level.ERROR);

        // Building the Spark session
        JavaSparkContext javaSparkContext = SparkConfiguration.buildSparkContext(APP_NAME);
        SparkSession sparkSession = SparkSession.builder()
            .sparkContext(javaSparkContext.sc()).getOrCreate();

        String fileLocationTest = "s3a://mybucket/WHOCovid19CitationsDatabase.csv";
        Dataset<Row> fullCitations =
            sparkSession.read().format("com.databricks.spark.csv").option("inferSchema", "true")
                .option("header", "true").load(fileLocationTest);
        fullCitations.show(10);

        // Selecting only the relevant columns for our exercise
        Dataset<Row> citations = fullCitations.select(
            col("Title"),
            col("Authors"),
            col("Abstract"),
            col("Published Year"),
            col("Journal"),
            col("Study"),
            col("Tags"));
        citations.show(10);


        // Removing citations with null title
        Dataset<Row> filteredCitations = citations.filter(col("Title").isNotNull());

        // Building a RDD composed of DynamoDB writable items that matches the Covid19Citation table
        JavaPairRDD<Text, DynamoDBItemWritable> dynamoCitations = filteredCitations.javaRDD().mapToPair(citation -> {
            Map<String, AttributeValue> attributes = new HashMap<>();
            putStringAttribute(attributes, "citationCode", UUID.randomUUID().toString());
            putStringAttribute(attributes, "title", citation.getAs("Title"));
            putStringAttribute(attributes, "authors", citation.getAs("Authors"));
            putStringAttribute(attributes, "abstract", citation.getAs("Abstract"));
            putNumberAttribute(attributes, "publishedYear", citation.getAs("Published Year"));
            putStringAttribute(attributes, "journal", citation.getAs("Journal"));
            putStringAttribute(attributes, "study", citation.getAs("Study"));
            putStringAttribute(attributes, "tags", citation.getAs("Tags"));

            DynamoDBItemWritable dynamoDBItemWritable = new DynamoDBItemWritable();
            dynamoDBItemWritable.setItem(attributes);
            return new Tuple2<>(new Text(""), dynamoDBItemWritable);
        });

        // Writing data to the DynamoDB table
        JobConf jobConf = SparkConfiguration.buildJobConf(javaSparkContext, Boolean.FALSE);
        dynamoCitations.saveAsHadoopDataset(jobConf);
        sparkSession.stop();
    }

    private static void putStringAttribute(Map<String, AttributeValue> attributes, String key, Object fieldValue) {
        if (fieldValue != null) {
            attributes.put(key, new AttributeValue(fieldValue.toString()));
        }
    }

    private static void putNumberAttribute(Map<String, AttributeValue> attributes, String key, Object fieldValue) {
        if (fieldValue != null) {
            try {
                Integer integerFieldValue = Integer.parseInt(fieldValue.toString());
                AttributeValue attributeValue = new AttributeValue();
                attributeValue.setN(integerFieldValue.toString());
                attributes.put(key, attributeValue);
            } catch (Exception e) {
                LOGGER.info("cannot convert " + fieldValue + " to integer");
            }
        }
    }
}

以下是构建Spark上下文的配置:

public static JavaSparkContext buildSparkContext(String application) throws ClassNotFoundException {
        SparkConf conf = new SparkConf()
                .setAppName(application)
                  .set("spark.executor.cores", "5")
                  .set("spark.executor.memory", "42G")
                  .set("spark.executor.memoryOverhead", "4G")
                  .set("spark.driver.memory", "42G")
                  .set("spark.driver.cores", "5")
                  .set("spark.executor.instances", "25")
                  .set("spark.default.parallelism", "250")
                  .set("spark.dynamicAllocation.enabled", "false")
                .registerKryoClasses(new Class[]{
                        Class.forName("org.apache.hadoop.io.Text"),
                        Class.forName("org.apache.hadoop.dynamodb.DynamoDBItemWritable")
                });
        return new JavaSparkContext(conf);
    }

我正在使用m6a.16xlarge(64 vcore,259GB内存)的EMR集群实例类型。

请问有人能帮助我吗?提前感谢。

英文:

I'm new to Spark and I'm running spark job on EMR cluster.
It takes about one hour for job to finish.

In job, there is a code for reading the file from S3 (file size is 10MB) and writing it's content into dynamo db.

I don't understand why it takes so long to finish this task? I was expecting that it would be finished much more faster.

Here is the code for Spark job:

public class PopulateCovid19Citations {
private static final String APP_NAME = &quot;PopulateCovid19Citations&quot;;
private static Logger LOGGER = LogManager.getLogger(PopulateCovid19Citations.class);
static Logger log = Logger.getLogger(PopulateCovid19Citations.class.getName());
public static void main(String[] args) throws Exception {
Logger.getLogger(&quot;org&quot;).setLevel(Level.ERROR);
// Building the Spark session
JavaSparkContext javaSparkContext = SparkConfiguration.buildSparkContext(APP_NAME);
SparkSession sparkSession = SparkSession.builder()
.sparkContext(javaSparkContext.sc()).getOrCreate();
String fileLocationTest = &quot;s3a://mybucket/WHOCovid19CitationsDatabase.csv&quot;;
Dataset&lt;Row&gt; fullCitations =
sparkSession.read().format(&quot;com.databricks.spark.csv&quot;).option(&quot;inferSchema&quot;, &quot;true&quot;)
.option(&quot;header&quot;, &quot;true&quot;).load(fileLocationTest);
fullCitations.show(10);
// Selecting only the relevant columns for our exercise
Dataset&lt;Row&gt; citations = fullCitations.select(
col(&quot;Title&quot;),
col(&quot;Authors&quot;),
col(&quot;Abstract&quot;),
col(&quot;Published Year&quot;),
col(&quot;Journal&quot;),
col(&quot;Study&quot;),
col(&quot;Tags&quot;));
citations.show(10);
// Removing citations with null title
Dataset&lt;Row&gt; filteredCitations = citations.filter(col(&quot;Title&quot;).isNotNull());
// Building a RDD composed of DynamoDB writable items that matches the Covid19Citation table
JavaPairRDD&lt;Text, DynamoDBItemWritable&gt; dynamoCitations = filteredCitations.javaRDD().mapToPair(citation -&gt; {
Map&lt;String, AttributeValue&gt; attributes = new HashMap&lt;&gt;();
putStringAttribute(attributes, &quot;citationCode&quot;, UUID.randomUUID().toString());
putStringAttribute(attributes, &quot;title&quot;, citation.getAs(&quot;Title&quot;));
putStringAttribute(attributes, &quot;authors&quot;, citation.getAs(&quot;Authors&quot;));
putStringAttribute(attributes, &quot;abstract&quot;, citation.getAs(&quot;Abstract&quot;));
putNumberAttribute(attributes, &quot;publishedYear&quot;, citation.getAs(&quot;Published Year&quot;));
putStringAttribute(attributes, &quot;journal&quot;, citation.getAs(&quot;Journal&quot;));
putStringAttribute(attributes, &quot;study&quot;, citation.getAs(&quot;Study&quot;));
putStringAttribute(attributes, &quot;tags&quot;, citation.getAs(&quot;Tags&quot;));
DynamoDBItemWritable dynamoDBItemWritable = new DynamoDBItemWritable();
dynamoDBItemWritable.setItem(attributes);
return new Tuple2&lt;&gt;(new Text(&quot;&quot;), dynamoDBItemWritable);
});
// Writing data to the DynamoDB table
JobConf jobConf = SparkConfiguration.buildJobConf(javaSparkContext, Boolean.FALSE);
dynamoCitations.saveAsHadoopDataset(jobConf);
sparkSession.stop();
}
private static void putStringAttribute(Map&lt;String, AttributeValue&gt; attributes, String key, Object fieldValue) {
if (fieldValue != null) {
attributes.put(key, new AttributeValue(fieldValue.toString()));
}
}
private static void putNumberAttribute(Map&lt;String, AttributeValue&gt; attributes, String key, Object fieldValue) {
if (fieldValue != null) {
try {
Integer integerFieldValue = Integer.parseInt(fieldValue.toString());
AttributeValue attributeValue = new AttributeValue();
attributeValue.setN(integerFieldValue.toString());
attributes.put(key, attributeValue);
} catch (Exception e) {
LOGGER.info(&quot;cannot convert &quot; + fieldValue + &quot; to integer&quot;);
}
}
}
}

And here is the configuration for building Spark context:

public static JavaSparkContext buildSparkContext(String application) throws ClassNotFoundException {
SparkConf conf = new SparkConf()
.setAppName(application)
.set(&quot;spark.executor.cores&quot;, &quot;5&quot;)
.set(&quot;spark.executor.memory&quot;, &quot;42G&quot;)
.set(&quot;spark.executor.memoryOverhead&quot;, &quot;4G&quot;)
.set(&quot;spark.driver.memory&quot;, &quot;42G&quot;)
.set(&quot;spark.driver.cores&quot;, &quot;5&quot;)
.set(&quot;spark.executor.instances&quot;, &quot;25&quot;)
.set(&quot;spark.default.parallelism&quot;, &quot;250&quot;)
.set(&quot;spark.dynamicAllocation.enabled&quot;, &quot;false&quot;)
.registerKryoClasses(new Class&lt;?&gt;[]{
Class.forName(&quot;org.apache.hadoop.io.Text&quot;),
Class.forName(&quot;org.apache.hadoop.dynamodb.DynamoDBItemWritable&quot;)
});
return new JavaSparkContext(conf);
}

Im using EMR cluster with m6a.16xlarge (64vcore, 259GB memory) for EC2 instance types.

Can somebody please help me with this?

Thanks in advance.

答案1

得分: 2

I finally found what was the problem. In yarn log I saw lines like this:

23/06/14 11:26:28 WARN DynamoDBFibonacciRetryer: Retry: 1 Exception: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: TM68B1L448VTTOIVEJ2G77Q4TJVV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)
23/06/14 11:26:29 WARN DynamoDBFibonacciRetryer: Retry: 1 Exception: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: 2HRBEQ17BV6KHT7P4FDM4TGL3JVV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)
23/06/14 11:26:29 WARN DynamoDBFibonacciRetryer: Retry: 2 Exception: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: D5FESCOIUVRQ764RL80IM8FEANVV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)
23/06/14 11:26:29 WARN DynamoDBFibonacciRetryer: Retry: 2 Exception: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: 9TFNUKUGKM12B1VTIQSGRD46C7VV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)

The conclusion is that the database was bottleneck in my case. Seems like we exceeded throughput for the DynamoDB table. To fix this I needed to change provisioning configuration for DynamoDB.

英文:

I finally found what was the problem. In yarn log I saw lines like this :

 23/06/14 11:26:28 WARN DynamoDBFibonacciRetryer: Retry: 1 Exception: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: TM68B1L448VTTOIVEJ2G77Q4TJVV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)
23/06/14 11:26:29 WARN DynamoDBFibonacciRetryer: Retry: 1 Exception: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: 2HRBEQ17BV6KHT7P4FDM4TGL3JVV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)
23/06/14 11:26:29 WARN DynamoDBFibonacciRetryer: Retry: 2 Exception: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: D5FESCOIUVRQ764RL80IM8FEANVV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)
23/06/14 11:26:29 WARN DynamoDBFibonacciRetryer: Retry: 2 Exception: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: 9TFNUKUGKM12B1VTIQSGRD46C7VV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)

The conslusion is that database was bottleneck in my case.
Seems like we exceeded throughput for the dynamodb table. To fix this I needed to change provisioning configuration for dynamo db.

答案2

得分: 1

CSV模式推断的实现方式是“读取整个文件并确定每列的类型”。虽然有一些抽样,但实际上是相同的原理:在进行任何计算之前,需要进行1次或多次HTTP GET请求来流式传输整个文件。

理想情况下,迁移到现代文件格式,比如Avro,其中包含明确定义的模式。如果无法这样做,请在应用程序中明确声明CSV文件的模式。

这可能不是您问题的唯一原因,但它将是处理过程中的第一个瓶颈。

英文:

CSV schema inference is implemented as "read through the entire file and work out the type of each column". There's a bit of sampling, but its the same thing really: 1 or more HTTP GET requests to stream through the entire file before any computation begins

Ideally, move off CSV to a modern file format, such as Avro, with a well defined schema. If you can't do that, explicitly declare the schema for the csv file in your app.

This may not be the sole cause of your problems, but it will be the first bottleneck in the processing

答案3

得分: 1

如果您对相同数据进行迭代操作,应使用persist()方法以避免冗余的重新计算。

我建议在创建fullCitations数据集后至少进行持久化。

有关持久性的更多信息,请查看文档:https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

另外,我建议检查一下Spark的配置。您有一个拥有64个虚拟内核和259GB RAM的集群,但您声明了25个执行器,每个执行器有5个内核和42GB RAM。我不确定您是否需要为每个执行器分配这么多资源,但如果您的集群没有足够的资源来启动所有执行器,可能会影响性能。

英文:

If you do iterative operations with the same data, you should use persist() method to avoid redundant recalculation.

I would recommend persisting at least fullCitations dataset after its creation.

See more about persistence in the documentation: https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

Also, I would check the Spark configuration. You have a cluster with 64 vcores and 259GB RAM, but you declared 25 executors with 5 cores and 42GB RAM for each executor. I am not sure you need such a large amount of resources for every executor, but it may affect performance if your cluster does not have enough resources to start all executors.

huangapple
  • 本文由 发表于 2023年6月13日 07:27:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/76460852.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定