关于Parquet行组大小的实际含义是什么?

huangapple go评论53阅读模式
英文:

What is actually meant when referring to parquet row-group size?

问题

我正在开始使用Parquet文件格式。
Apache官方网站建议使用512MB到1GB的大行组(参见这里)。
一些在线资源(例如这个)建议默认的行组大小为128MB。

我有大量的Parquet文件,稍后将在AWS Glue上使用PySpark进行下游处理。这些文件具有非常小的行组。我无法控制我要开始处理的文件,但希望合并行组以在下游处理之前拥有“更高效”的文件(为什么?这些文件将上传到S3并使用Spark进行处理;我的理解是Spark会一次读取一个行组,因此更多的较小行组会导致增加的IO操作,这是低效的;如果这个假设是无效的,请教育我)。

让我们只考虑这个问题中的一个文件。它经过压缩(使用snappy压缩)后,在磁盘上占用85MB。当我使用pqrs工具检查其模式时,它报告文件中有55,733条记录,分为1,115个行组,每个行组似乎约为500KB,具体如下:

第7行组:
--------------------------------------------------------------------------------
总字节大小:424752
行数:50

如果我简单地取(1115个行组 * 500KB/行组),那大约是500MB;而磁盘上的文件大小是85MB。当然,一些行组比500KB小,但我大致估计了其中大约有100个(一半在顶部,一半在底部),它们的大小都在这个范围内。

子问题1: 500MB计算出的差异与实际的85MB之间的差异是因为pqrs报告的行组大小实际上代表了未压缩大小,也许是行组的内存中大小(假设这个大小会比磁盘上的压缩序列化大小大)?换句话说,我不能简单地进行1115 * 500的计算,而必须应用某种压缩因子吗?

子问题2: 当我看到推荐的批处理大小是128MB时,这究竟指的是什么?未压缩的内存中大小?磁盘上的序列化紧凑大小?还是其他什么?它与pqrs报告的内容有什么关系?

我的(简化的)用于压缩这些行组的代码如下:

import pyarrow.dataset as ds
import pyarrow.parquet as pq

def compact_parquet_in_batches(infile, outfile, batchsize):
    parquet_file = pq.ParquetFile(infile)
    ds.write_dataset(
        parquet_file.iter_batches(batch_size=batchsize), 
        outfile,
        schema=RSCHEMA,
        format='parquet'
    ) 

主问题:batchsize应该是多少?

iter_batches接受batch_size作为记录数,而不是字节大小。我可以根据总记录数和所需批次数计算它,但我不清楚这里应该计算什么。

我尝试过以下计算:

  • 所需的批次数 = 磁盘上的文件大小(MB)/ 128 = 85/128 = 1(四舍五入)
  • 批次大小 = 记录数 / 所需的批次数 = 55,733 / 1 = 60000(四舍五入到最接近的10,000)

当我使用批次大小为60,000运行我的代码时:

  • 我得到两个记录组(很好,1,115减少到2;但为什么不是1?)
  • 第一个记录组的报告字节大小约为250MB。因此,尽管它最终创建了我预期的两倍的行组数量,但它们的大小实际上是我预期的两倍。
第0行组:
--------------------------------------------------------------------------------
总字节大小:262,055,359
行数:32,768

我认为我的一些假设或关于Parquet文件格式、pqrs工具或pyarrow库的理解可能是错误的。有人能帮我解开迷雾吗?

英文:

I am starting to work with the parquet file format.
The official Apache site recommends large row groups of 512MB to 1GB (here).
Several online source (e.g. this one) suggest that the default row group size is 128MB.

I have a large number of parquet files which I will later process downstream with PySpark on AWS Glue. These files have very small row-groups. I cannot control the files I'm starting with, but want to combine row-groups so as to have "more efficient" files prior to downstream processing (why? these files will be uploaded to S3 and processed with Spark; my understanding is that Spark will read one row-group at a time, so many more smaller row-groups results in increased IO operations which is inefficient; if this assumption is invalid please educate me).

Let's consider just one of these files for this question. It's compressed (with snappy compression) and 85MB on disk. When I inspect its schema using the pqrs tool it reports that the file has 55,733 records in 1,115 row groups, and each row group seems to be around 500 kB - specifically, something like this:

row group 7:
--------------------------------------------------------------------------------
total byte size: 424752
num of rows: 50

If I simply take (1115 row-groups * 500 kB/row-group) that's around 500MB; whereas the file on disk is 85MB. Granted, some of the row-groups are smaller than 500kB but I eyeballed around 100 of them (half at top, half at bottom) and they're in that general ballpark.

Sub-question 1: is the difference (500MB calculated vs 85MB actual) because the row-group size reported by pqrs actually represents the uncompressed size, maybe what would be the in-memory size of the row-group (which presumably would be larger than the compressed serialized size on disk)? In other words I can't do a simplistic 1115 * 500 but have to apply some sort of compression factor?

Sub-question 2: when I see that the recommended batch size is 128MB, what exactly does that refer to? The uncompressed in-memory size? The serialized, compacted size on disk? Something else? How does it relate to what's reported by pqrs?

My (simplified) code to compact these row-groups is:

import pyarrow.dataset as ds
import pyarrow.parquet as pq

def compact_parquet_in_batches(infile, outfile, batchsize):
    parquet_file = pq.ParquetFile(infile)
    ds.write_dataset(
        parquet_file.iter_batches(batch_size=batchsize), 
        outfile,
        schema=RSCHEMA,
        format='parquet'
    ) 

Main question: What should batchsize be?

iter_batches takes batch_size as a number of records rather than a byte size. I could calculate it from total records and desired # of batches, but I'm unclear what I should be calculating for here.

I tried this:

  • required # batches = file size on disk in MB / 128 = 85/128 = 1 (rounded up)
  • batch size = # records / required # batches = 55,733 / 1 = 60000 (rounded up to next 10k)

When I run my code with batch size of 60k:

  • I get two record groups (great, 1,115 is down to 2; but why not to 1?)
  • the reported byte size of the first record group is around 250MB. So even though it ended up creating twice the number of row-groups I expected, instead of each being half the size I expected they are actually double the size I expected.
row group 0:
--------------------------------------------------------------------------------
total byte size: 262055359
num of rows: 32768

I figure some of my assumptions - or understanding about the parquet file format, the pqrs tool or the pyarrow library - are off. Can someone please demystify me?

答案1

得分: 3

以下是翻译好的部分:

  1. 差异(500MB计算与85MB实际)是否是因为pqrs报告的行组大小实际上代表了未压缩大小?

    • 是的,工具通常在这方面不太清楚。
  2. 当我看到推荐的批处理大小为128MB时,确切指的是什么?内存中未压缩的大小?磁盘上序列化的压缩大小?还是其他什么?

    • 主要问题:批处理大小应该是多少?
    • 答案通常取决于确保你的I/O请求对存储系统是理想的。然而,如果你的行组非常小(例如100、1k、10k行),那么可能无论你的存储是什么都不重要。这些非常小的大小几乎总是性能普遍较差的。
    • 如果你使用HDFS,规则可能稍有不同。我对HDFS的经验不多。在所有其他情况下,通常希望行组足够大,以便你的I/O请求足够大,以满足存储系统的需求。
    • 例如,当从HDD读取时,如果你执行大量随机的64字节读取,性能将比执行大量顺序的64字节读取差。然而,如果你执行大量随机的4MiB读取,那么你应该会得到与大量顺序的4MiB读取大致相同的性能。当然,这将取决于硬盘,但我发现4MiB对于HDD来说是一个不错的数字。另一方面,如果你从S3读取,他们的指南建议8-16MiB。
    • 将这个概念转化为行组大小有点棘手,将取决于你的查询习惯。如果你通常从文件中读取所有列,那么你希望你的行组是8-16MiB。另一方面,如果你通常只从文件中读取“一些”列,那么你希望每列都是8-16MiB。
    • 现在事情变得棘手,因为我们必须考虑压缩和编码。例如,布尔列几乎永远不是8MiB。由于压缩,你可能需要至少64Mi行,甚至更多。float32列稍微容易理解一些。你应该能够在2Mi行中获得8MiB的读取,并且在许多情况下,你不会从中获得太多压缩。
    • 以上都是理论。实际上,我已经进行了相当多的基准测试,既在本地磁盘上,又在S3上,我发现1Mi行通常是一个不错的行组大小。可能有情况下更大的行组大小是个不错的选择,而且你仍然可以在较小的行组中获得相当不错的性能。你最终需要根据自己的个人用例进行基准测试。不过,1Mi是一个容易记住的整数。如果你需要以未压缩字节数的方式表示行组大小,那么这将取决于你有多少列。再次作为一个经验法则,我们可以假设列占用4字节,因此可以使用以下计算:
    字节数 = 1Mi * 列数 * 4
    

    换句话说,如果你有10列,那么就朝着至少40MiB的行组大小努力。

  3. 如果我使行组太大会怎么样?

    • 鉴于上述情况,似乎简单地使行组非常大就可以了。这将确保你有理想的I/O请求。在一个完美的世界中,如果所有的parquet读取器都是相等的,那么我会说这是正确的(每个文件一个行组是理想的)。
    • 然而,许多parquet读取器将使用行组作为以下之一:
      • 并行性单位 - 在这种情况下,如果你的文件中只有一个行组,你将不会获得足够的并行性。
      • 读取的最小大小 - 在这种情况下,即使你正在进行流处理,你的读取器的RAM消耗也会很高。
    • 因此,出于这些原因,通常希望避免过大的行组大小。
  4. 关于推送过滤/统计/等等。

    • 通常这是保持行组小的另一个原因。行组统计是最容易使用的推送过滤工具,有些读取器完全依赖于它。这意味着推送只能根据过滤器完全排除整个行组。因此,较小的行组意味着你更有机会完全消除I/O。
    • 幸运的是,parquet读取工具已经逐渐开始使用页面级别的统计信息(或页面级别的布隆过滤器)来执行此过滤。页面非常小(~1MiB)并且在过滤方面提供非常好的分辨率(尽管在一些罕见的情况下,这种分辨率太高,因为它需要更多的元数据处理)。如果你的parquet读取器能够利用页面级别的统计信息进行推送,那么行组大小对推送没有影响。
    • 任何形式的跳过或加载单行与行组大小无关。par
英文:

TL;DR - 1 Mi rows

Your understanding is roughly correct. Different tools have different recommendations and some tools (e.g. pyarrow) will use # of rows to determine row group size and other tools (e.g. parquet-mr, the java parquet implementation used by spark) will use # of bytes.

> is the difference (500MB calculated vs 85MB actual) because the row-group size reported by pqrs actually represents the uncompressed size

Yes. Tools are often not very clear on this. I find the parquet thrift definition to be a good source of ground truth when dealing with parquet metadata fields.

struct RowGroup {
  /** Metadata for each column chunk in this row group.
   * This list must have the same order as the SchemaElement list in FileMetaData.
   **/
  1: required list<ColumnChunk> columns

  /** Total byte size of all the uncompressed column data in this row group **/
  2: required i64 total_byte_size

> when I see that the recommended batch size is 128MB, what exactly does that refer to? The uncompressed in-memory size? The serialized, compacted size on disk? Something else? How does it relate to what's reported by pqrs?

> Main question: What should batchsize be?

The answer usually comes down to ensuring that you are making I/O requests that are ideal for your storage system. However, if your row groups are very small (e.g. 100, 1k, 10k rows) then it probably doesn't matter what your storage is (both because row groups introduce extra compute and because row groups affect the metadata / data ratio). These very small sizes are almost always universally bad for performance.

If you are using HDFS I believe the rules may be slightly different. I don't have much experience with HDFS. In all other cases you generally want row groups to be large enough that your I/O requests are big enough to satisfy your storage system.

For example, when reading from a HDD, if you do a bunch of random 64 byte reads you will get worse performance than a bunch of sequential 64 byte reads. However, if you do a bunch of random 4MiB reads then you should get roughly the same performance as a bunch of sequential 4MiB reads. This will depend on the hard drive of course but I've found 4MiB to be a good number for HDD. On the other hand, if you are reading from S3, then their guidelines recommend 8-16MiB.

Translating this to row group size is a bit tricky and will depend on your query habits. If you normally read all columns from a file then you will want your row group to be 8-16MiB. On the other hand, if you normally only read "some" columns from a file then you want each column to be 8-16MiB.

Now things get tricky because we have to think about compression and encoding. For example, a boolean column is almost never 8MiB. You would need at least 64Mi rows and possibly quite a bit more due to compression. A float32 column is a little easier to reason with. You should get an 8MiB read with 2Mi rows and, in many cases, you don't get much compression from this.

All of the above is the theory. In practice I have done a fair amount of benchmarking, both on local disks, and on S3, and I have found that 1Mi rows is generally a good size for a row group. There are probably cases where larger row groups are a good idea and you can still get pretty good performance with smaller row groups. You'll want to eventually benchmark for your own personal use case. However, 1Mi is a nice round number that is easy to remember. If you need to express your row group size in # of uncompressed bytes then it depends on how many columns you have. Again, as a rule of thumb, we can assume columns are 4 bytes, and so you can use the calculation:

# of bytes = 1Mi * # of columns * 4

In other words, if you have 10 columns, then aim for row groups with at least 40MiB.

What if I make row groups too large?

Given the above, it may seem simple to just make the row groups massive. This will ensure that you have ideal I/O requests. In a perfect world, were all parquet readers created equal, then I would say this is correct (1 row group per file is ideal).

However, many parquet readers will use row groups either as:

  • The unit of parallelism - In this case you won't get enough parallelism if you only have one row group in your file.
  • The minimum size of a read - In this case the RAM consumption of your reader will be very high, even if you are doing streaming processing.

For these reasons you will typically want to avoid overly large row group sizes.

What about pushdown filtering / statistics / etc.

This is typically another reason to keep row groups small. Row group statistics are the easiest pushdown filtering tool to use and some readers rely exclusively on this. This means that the pushdown is only capable of filtering out entire row groups based on the filter. So smaller row groups means you have a better chance of eliminating I/O entirely.

Fortunately, parquet reading tools have been slowly moving towards using page-level statistics (or page level bloom filters) to do this filtering. Pages are quite small (~1MiB) and can offer a very good resolution for filtering (though in some rare cases it is too fine of a resolution as it require more metadata processing). If your parquet reader is able to utilize page level statistics for pushdown then the row group size should have no affect on pushdown.

Any kind of skipping or loading of single rows will be unrelated to row groups sizes. Parquet readers should be capable of applying skips at page-level resolution.

Disclaimer: I work on arrow-c++/pyarrow. The pyarrow datasets reader's performance is very dependent on row group size (I'm slowly trying to fix this) for some of the reasons I describe above.

huangapple
  • 本文由 发表于 2023年7月28日 01:06:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/76782018.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定