英文:
How to re-partition the data using Spark on no ephemeral storage?
问题
我在Kubernetes集群上运行Spark。在对具有许多分区的数据进行重新分区时,强制每个分区只有1个文件,我的Pod被驱逐。
错误如下:
节点资源不足:ephemeral-storage。容器sosreport-spark-cluster-opendatahub-w使用56291400Ki,超过了其请求0的限制。
我的Spark配置如下:
def create_spark_config(spark_cluster, executor_memory='16g', executor_cores='4', max_cores='16'):
print('Spark集群是:{}'.format(spark_cluster))
sc_conf = (
pyspark.SparkConf().setMaster(spark_cluster) \
.set('spark.driver.host', HOSTNAME) \
.set('spark.driver.port', 42000) \
.set('spark.driver.bindAddress', '0.0.0.0') \
.set('spark.driver.blockManager.port', 42100) \
.set('spark.executor.memory', '1536M') \
.set('spark.executor.cores', '2') \
.set('spark.sql.parquet.enableVectorizedReader', True) \
.set('spark.kubernetes.memoryOverheadFactor', '0.20')
)
return sc_conf
这是我如何重新分区数据的方式:
def save_repartitioned_dataframe(bucket_name, df):
dest_path = form_path_string(bucket_name, repartitioned_data=True)
print('尝试保存重新分区的数据到:{}'.format(dest_path))
df.repartition(1, "created_year", "created_month", "created_day").write.partitionBy(
"created_year", "created_month", "created_day").mode("overwrite").parquet(dest_path)
print('数据重新分区完成,保存在以下位置:')
print(dest_path)
_, count, distinct_count, num_partitions = read_dataframe_from_bucket(bucket_name, repartitioned_data=True)
return count, distinct_count, num_partitions
希望这些翻译对你有所帮助。
英文:
I am running Spark on Kubernetes cluster. While re-partitioning data with many partitions, forcing it to have only 1 file per partition, I am getting my pods evicted.
The error is as following:
The node was low on resource: ephemeral-storage. Container sosreport-spark-cluster-opendatahub-w was using 56291400Ki, which exceeds its request of 0.
My Spark configs are:
def create_spark_config(spark_cluster, executor_memory='16g', executor_cores='4', max_cores='16'):
print('Spark cluster is: {}'.format(spark_cluster))
sc_conf = (
pyspark.SparkConf().setMaster(spark_cluster) \
.set('spark.driver.host', HOSTNAME) \
.set('spark.driver.port', 42000) \
.set('spark.driver.bindAddress', '0.0.0.0') \
.set('spark.driver.blockManager.port', 42100) \
.set('spark.executor.memory', '1536M') \
.set('spark.executor.cores', '2') \
.set('spark.sql.parquet.enableVectorizedReader', True) \
.set('spark.kubernetes.memoryOverheadFactor', '0.20')
)
return sc_conf
This is how I am re-partitioning the data:
def save_repartitioned_dataframe(bucket_name, df):
dest_path = form_path_string(bucket_name, repartitioned_data=True)
print('Trying to save repartitioned data at: {}'.format(dest_path))
df.repartition(1, "created_year", "created_month", "created_day").write.partitionBy(
"created_year", "created_month", "created_day").mode("overwrite").parquet(dest_path)
print('Data repartitioning complete with at the following location: ')
print(dest_path)
_, count, distinct_count, num_partitions = read_dataframe_from_bucket(bucket_name, repartitioned_data=True)
return count, distinct_count, num_partitions
答案1
得分: 2
你的问题可能不是使用临时存储的问题,而是将整个数据发送给单个工作器的问题。
".repartition(1, "created_year", "created_month", "created_day")"
你将所有数据合并到一个 Spark 分区中,然后写入所有分区。
你可能想要做的是按分区键进行全局排序,然后使用 write.partitionBy。这样做会将大多数数据发送到单个表分区,限制了 Spark 分区的数量(如果你的分区很小,通常是一个 Spark 分区)。
通常的写法如下:
df.orderBy("partitionCol")
.write
.partitionBy("partitionCol")
.insertInto("my_table")
英文:
Your problem likely isn't the fact that you're using ephemeral storage but the fact that you're send your entire data to a single worker.
".repartition(1, "created_year", "created_month", "created_day")"
You're combining all of your data into a single spark partition which then writes to all partitions.
What you probably want todo is do a global sort by the partition key and then do a write.partionBy. What this does is send most of the data to single table partition to a limited number of spark partitions. (typically a single spark partition if your partitions are small)
this typically looks like...
df.orderBy("partitionCol")
.write
.partitionBy("partitionCol")
.insertInto("my_table")
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论