在内存中重新分配 vs 文件中重新分配

huangapple go评论63阅读模式
英文:

repartition in memory vs file

问题

repartition() 在内存中创建分区,并用作读取操作。partitionBy() 在磁盘上创建分区,并用作写入操作。

  1. 在使用 repartition() 时,我们如何确认内存中存在多个文件?
  2. 如果 repartition 只在内存中创建分区 articles.repartition(1).write.saveAsTable("articles_table", format = 'orc', mode = 'overwrite'),为什么这个操作只创建一个文件?这与 partitionBy() 有什么不同?
英文:

repartition() creates partition in memory and is used as a read() operation. partitionBy() creates partition in disk and is used as a write operation.

  1. How can we confirm there is multiple files in memory while using repartition()
  2. If repartition only creates partition in memory articles.repartition(1).write.saveAsTable("articles_table", format = 'orc', mode = 'overwrite') , why does this operation only creates one file? And how is this different from partitionBy()?

答案1

得分: 1

partitionBy 确实会影响文件在磁盘上的存储方式,而且在写文件时确实会被使用(它是 DataFrameWriter 类的一个方法)。

然而,这并不意味着 repartition 对于写入磁盘没有任何影响。

让我们看看以下示例:

df = spark.createDataFrame([
  (1,2,3),
  (2,2,3),
  (3,20,300),
  (1,24,299),
  (5,26,312),
  (5,28,322),
  (5,9,2)
], ["colA", "colB", "colC"])

df.write.partitionBy("colA").parquet("using_partitionBy.parquet")
df.repartition(4).write.parquet("using_repartition.parquet")

在这里,我们创建了一个简单的数据帧,并使用两种方法将其写入:

1) 通过使用 partitionBy

磁盘上的输出文件结构如下:

tree using_partitionBy.parquet/
using_partitionBy.parquet/
├── colA=1
│   ├── part-00000-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
│   └── part-00002-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
├── colA=2
│   └── part-00001-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
├── colA=3
│   └── part-00001-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
├── colA=5
│   ├── part-00002-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
│   └── part-00003-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
└── _SUCCESS

我们可以看到这创建了 6 个“子文件”在 4 个“子目录”中。有关数据值的信息(如 colA=1)实际上存储在磁盘上。这使您能够在需要读取此文件的后续查询中进行大幅度的改进。想象一下,如果您需要读取所有 colA=1 的值,那将是一个简单的任务(忽略其他子目录)。

2) 通过使用 repartition(4)

磁盘上的输出文件结构如下:

tree using_repartition.parquet/
using_repartition.parquet/
├── part-00000-6dafa06d-cea6-4af0-ba0d-9c761de0fd50-c000.snappy.parquet
├── part-00001-6dafa06d-cea6-4af0-ba0d-9c761de0fd50-c000.snappy.parquet
├── part-00002-6dafa06d-cea6-4af0-ba0d-9c761de0fd50-c000.snappy.parquet
├── part-00003-6dafa06d-cea6-4af0-ba0d-9c761de0fd50-c000.snappy.parquet
└── _SUCCESS

我们可以看到创建了 4 个“子文件”,并且没有创建任何“子目录”。实际上,这些“子文件”代表了 Spark 内的分区。由于您通常在 Spark 中处理非常大的数据,所有数据都必须以某种方式进行分区。

每个分区将由一个任务处理,该任务可以由集群的一个核心占用。一旦核心占用了此任务并完成了所有必要的处理,核心将此输出写入磁盘中的一个“子文件”。完成写入此“子文件”后,核心准备好读取另一个分区。

何时使用 partitionByrepartition

这有点主观,肯定不是详尽无遗的,但可能会为您提供一些关于何时使用它们的见解。

partitionByrepartition 可以用于不同的目标:

  • 使用 partitionBy 如果:
    • 您要将数据写入磁盘,并希望在读取时获得大幅性能提升。这在您有一个需要进行大量过滤的列,且基数不太高时通常会有用。
  • 使用 repartition 如果:
    • 您想要调整分区的大小以适应集群的大小,以提高作业的性能。
    • 您想要写入一个具有合理分区大小的文件,但使用 partitionBy 在任何列上都会有非常高的基数(想象一下传感器上的时间序列数据)。
英文:

partitionBy indeed has an effect on how your files will look on disk, and indeed is used when writing a file (it is a method of the DataFrameWriter class).

That, however, does not mean that the repartition has no effect at all on what will be written to disk.

Let's take the following example:

df = spark.createDataFrame([
  (1,2,3),
  (2,2,3),
  (3,20,300),
  (1,24,299),
  (5,26,312),
  (5,28,322),
  (5,9,2)
], ["colA", "colB", "colC"])

df.write.partitionBy("colA").parquet("using_partitionBy.parquet")
df.repartition(4).write.parquet("using_repartition.parquet")

In here, we create a simple dataframe and write it away using 2 methods:

1) By using partitionBy

The output file structure on disk looks like this:

tree using_partitionBy.parquet/
using_partitionBy.parquet/
├── colA=1
│   ├── part-00000-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
│   └── part-00002-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
├── colA=2
│   └── part-00001-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
├── colA=3
│   └── part-00001-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
├── colA=5
│   ├── part-00002-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
│   └── part-00003-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
└── _SUCCESS

We see that this created 6 "subfiles", in 4 "subdirectories". Information about the data values (like colA=1) is actually stored on disk. This enables you to do big improvements in subsequent queries that would need to read this file. Imagine that you would need to read all the values where colA=1, that would be a trivial task (ignore the other subdirectories).

2) By using repartition(4)

The output file structure on disk looks like this:

tree using_repartition.parquet/
using_repartition.parquet/
├── part-00000-6dafa06d-cea6-4af0-ba0d-9c761de0fd50-c000.snappy.parquet
├── part-00001-6dafa06d-cea6-4af0-ba0d-9c761de0fd50-c000.snappy.parquet
├── part-00002-6dafa06d-cea6-4af0-ba0d-9c761de0fd50-c000.snappy.parquet
├── part-00003-6dafa06d-cea6-4af0-ba0d-9c761de0fd50-c000.snappy.parquet
└── _SUCCESS

We see that 4 "subfiles" were created and NO "subdirectories" were made. Actually these "subfiles" represent your partitions inside of Spark. Since you're typically working with very big data in Spark, all your data has to be partitioned some way.

Each partition will be processed by 1 task, which can be taken up by 1 core of your cluster. Once this task is taken up by a core and after doing all the necessary processing, your core will write away this output on disk in one of these "subfiles". When it has finished writing away this "subfile", your core is ready to read another partition.

When to use partitionBy and repartition

This is a bit opinionated and surely not exhaustive, but might give you some insight into what to use.

partitionBy and repartition can be used for different goals:

  • Use partitionBy if:
    • You want to write data on disk on which you want to have big performance benefits to read. This will mostly be useful when you have a column you will do lots of filtering on whose cardinality is not too high
  • Use repartition if:
    • You want to tune the size of your partitions to your cluster size, to improve performance on your jobs
    • You want to write away a file with a partition size that makes sense, but using partitionBy on any column would have a way too high cardinality (imagine time series data on sensors)

答案2

得分: 0

  1. 当您执行例如.repartition(100)时,您希望如何确认输出将得到100个文件?我在SparkUI中进行了检查,任务数量=分区数量=写入文件数量。

  2. 使用.repartition(1)会将整个数据集移动到一个分区中,将作为一个任务由一个核心处理,并写入一个文件。没有办法并行处理单个任务,所以Spark别无选择,只能将所有内容存储在一个文件中。

英文:
  1. You mean how to confirm that when you do for example .repartition(100) you will get 100 files on output? I was checking it in SparkUI, number of tasks = number of partitions = number of written files

  2. With .repartition(1) you are moving whole dataset to one partition, which will be processed as 1 task by one core and written to one file. There is no way to process single task in paralell so Spark has no choice but to store everything in one file

huangapple
  • 本文由 发表于 2023年7月13日 14:24:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/76676475.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定