如何在没有临时存储的情况下使用Spark重新分区数据?

huangapple go评论107阅读模式
英文:

How to re-partition the data using Spark on no ephemeral storage?

问题

我在Kubernetes集群上运行Spark。在对具有许多分区的数据进行重新分区时,强制每个分区只有1个文件,我的Pod被驱逐。

错误如下:

节点资源不足:ephemeral-storage。容器sosreport-spark-cluster-opendatahub-w使用56291400Ki,超过了其请求0的限制。

我的Spark配置如下:

def create_spark_config(spark_cluster, executor_memory='16g', executor_cores='4', max_cores='16'):
    print('Spark集群是:{}'.format(spark_cluster))
    sc_conf = (
        pyspark.SparkConf().setMaster(spark_cluster) \
        .set('spark.driver.host', HOSTNAME) \
        .set('spark.driver.port', 42000) \
        .set('spark.driver.bindAddress', '0.0.0.0') \
        .set('spark.driver.blockManager.port', 42100) \
        .set('spark.executor.memory', '1536M') \
        .set('spark.executor.cores', '2') \
        .set('spark.sql.parquet.enableVectorizedReader', True) \
        .set('spark.kubernetes.memoryOverheadFactor', '0.20')
    )
    return sc_conf

这是我如何重新分区数据的方式:

def save_repartitioned_dataframe(bucket_name, df):
    dest_path = form_path_string(bucket_name, repartitioned_data=True)
    print('尝试保存重新分区的数据到:{}'.format(dest_path))
    df.repartition(1, "created_year", "created_month", "created_day").write.partitionBy(
        "created_year", "created_month", "created_day").mode("overwrite").parquet(dest_path)
    print('数据重新分区完成,保存在以下位置:')
    print(dest_path)
    _, count, distinct_count, num_partitions = read_dataframe_from_bucket(bucket_name, repartitioned_data=True)
    return count, distinct_count, num_partitions

希望这些翻译对你有所帮助。

英文:

I am running Spark on Kubernetes cluster. While re-partitioning data with many partitions, forcing it to have only 1 file per partition, I am getting my pods evicted.

The error is as following:

The node was low on resource: ephemeral-storage. Container sosreport-spark-cluster-opendatahub-w was using 56291400Ki, which exceeds its request of 0.

My Spark configs are:

def create_spark_config(spark_cluster, executor_memory='16g', executor_cores='4', max_cores='16'):
    print('Spark cluster is: {}'.format(spark_cluster))
    sc_conf = (
        pyspark.SparkConf().setMaster(spark_cluster) \
        .set('spark.driver.host', HOSTNAME) \
        .set('spark.driver.port', 42000) \
        .set('spark.driver.bindAddress', '0.0.0.0') \
        .set('spark.driver.blockManager.port', 42100) \
        .set('spark.executor.memory', '1536M') \
        .set('spark.executor.cores', '2') \
        .set('spark.sql.parquet.enableVectorizedReader', True) \
        .set('spark.kubernetes.memoryOverheadFactor', '0.20')
    )
    return sc_conf

This is how I am re-partitioning the data:

def save_repartitioned_dataframe(bucket_name, df):
    dest_path = form_path_string(bucket_name, repartitioned_data=True)
    print('Trying to save repartitioned data at: {}'.format(dest_path))
    df.repartition(1, "created_year", "created_month", "created_day").write.partitionBy(
        "created_year", "created_month", "created_day").mode("overwrite").parquet(dest_path)
    print('Data repartitioning complete with at the following location: ')
    print(dest_path)
    _, count, distinct_count, num_partitions = read_dataframe_from_bucket(bucket_name, repartitioned_data=True)
    return count, distinct_count, num_partitions

答案1

得分: 2

你的问题可能不是使用临时存储的问题,而是将整个数据发送给单个工作器的问题。

".repartition(1, "created_year", "created_month", "created_day")"

你将所有数据合并到一个 Spark 分区中,然后写入所有分区。

你可能想要做的是按分区键进行全局排序,然后使用 write.partitionBy。这样做会将大多数数据发送到单个表分区,限制了 Spark 分区的数量(如果你的分区很小,通常是一个 Spark 分区)。

通常的写法如下:

df.orderBy("partitionCol")
  .write
  .partitionBy("partitionCol")
  .insertInto("my_table")
英文:

Your problem likely isn't the fact that you're using ephemeral storage but the fact that you're send your entire data to a single worker.

".repartition(1, "created_year", "created_month", "created_day")"

You're combining all of your data into a single spark partition which then writes to all partitions.

What you probably want todo is do a global sort by the partition key and then do a write.partionBy. What this does is send most of the data to single table partition to a limited number of spark partitions. (typically a single spark partition if your partitions are small)

this typically looks like...

    df.orderBy("partitionCol")
      .write
      .partitionBy("partitionCol")
      .insertInto("my_table")

huangapple
  • 本文由 发表于 2020年8月7日 04:18:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/63291172.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定