如何高效地读取pq文件 – Python

huangapple go评论72阅读模式
英文:

how to efficiently read pq files - Python

问题

我有一个文件列表,扩展名为.pq,它们的名称存储在一个列表中。我的意图是读取这些文件,基于pandas 进行筛选,然后合并它们成为一个单一的pandas数据框架。

由于有成千上万个文件,当前的代码运行效率非常低。最大的瓶颈在于读取pq文件的部分。在实验中,我注释掉了筛选部分。我尝试了下面展示的三种不同的方法,然而,每次到达一个文件都需要1.5秒,速度相当慢。是否有其他方法可以执行这些操作?

from tqdm import tqdm
from fastparquet import ParquetFile
import pandas as pd
import pyarrow.parquet as pq

files = [.....]

#第一种方式
for file in tqdm(files):
    temp = pd.read_parquet(file)
    #筛选temp并追加

#第二种方式
for file in tqdm(files):
    temp = ParquetFile(file).to_pandas()
    #筛选temp并追加

#第三种方式
for file in tqdm(files):
    temp = pq.read_table(source=file).to_pandas()
    #筛选temp并追加

每次在循环中读取file时,都需要相当长的时间。对于24个文件,我花了28秒。

24/24 [00:28<00:00,  1.19s/it]

24/24 [00:25<00:00,  1.08s/it]

平均一个样本文件大小为90MB,对应667858行和48列。数据类型都是数值型(即float64)。行数可能有所不同,但列数保持不变。

英文:

I have a list of files with .pq extension, whose names are stored in a list. My intention is to read these files, filter them based on pandas, and then merge them into a single pandas data frame.

Since there are thousands of files, the code currently runs super inefficiently. The biggest bottleneck is where I read the pq file. During the experiments, I commented out the filtering part. I've tried three different ways as shown below, however, it takes 1.5 seconds to reach each file which is quite slow. Are there alternative ways that I can perform these operations?

from tqdm import tqdm
from fastparquet import ParquetFile
import pandas as pd 
import pyarrow.parquet as pq

files = [.....]

#First way
for file in tqdm(files ):
    temp = pd.read_parquet(file)
    #filter temp and append 

#Second way
for file in tqdm(files):
    temp = ParquetFile(file).to_pandas()
    # filter temp and append

#Third way

for file in tqdm(files):
    temp = pq.read_table(source=file).to_pandas()
    # filter temp and append

Each line when I read file inside the for loop, it takes quite bit of a long time. For 24 files, I spend 28 seconds.

 24/24 [00:28<00:00,  1.19s/it]

 24/24 [00:25<00:00,  1.08s/it]

One sample file is on average 90MB that corresponds to 667858 rows and 48 columns. Data type is all numerical (i.e. float64). The number of rows may vary, but the number of columns remains the same.

答案1

得分: 1

以下是翻译好的部分:

import pyarrow.parquet as pq

dataset = pq.ParquetDataset(your_files,
                            use_legacy_dataset=False,
                            filters=['columnName', 'in', filterList])
df = dataset.read(use_threads=True).to_pandas()
英文:

Read multiple parquet files(partitions) at once into pyarrow.parquet.ParquetDataset which accepts a directory name, single file name, or list of file names and conveniently allows filtering of scanned data:

import pyarrow.parquet as pq

dataset = pq.ParquetDataset(your_files,
                            use_legacy_dataset=False,
                            filters=[('columnName', 'in', filterList)])
df = dataset.read(use_threads=True).to_pandas()

答案2

得分: 1

原始帖子没有提供reprex。我尝试使用附带的代码重现报告的症状,但未能成功。测试系统是一台搭载apfs SSD存储的MacBook Air。英特尔Core i7的时钟速度为2.2 GHz。

#! /usr/bin/env python

# $ python -m cProfile -s tottime geo/ski/bench_parquet.py
from pathlib import Path
from time import time

from tqdm import tqdm
import numpy as np
import pandas as pd
import pyarrow.parquet as pq

K = 24
PQ_DIR = Path("/tmp/parquet.d")

def gen_dfs(k=K, dst_dir=PQ_DIR, shape=(667_858, 48)):
    dst_dir.mkdir(exist_ok=True)
    rng = np.random.default_rng()
    for i in range(k):
        df = pd.DataFrame(
            rng.integers(66_000, size=shape) / 1_000,
            columns=[f"col_{j}" for j in range(shape[1]),
        )
        print(i)
        df.to_parquet(dst_dir / f"{i}.parquet")

def read_dfs(src_dir=PQ_DIR):
    for file in src_dir.glob("*.parquet"):
        yield pd.read_parquet(file)

def main():
    gen_dfs()
    t0 = time()

    for df in tqdm(read_dfs()):
        assert len(df) > 0

    files = list(PQ_DIR.glob("*.parquet"))
    dataset = pq.ParquetDataset(files)
    assert dataset
    # df = dataset.read(use_threads=True).to_pandas()

    elapsed = time() - t0
    print(f"{elapsed:.3f} seconds elapsed, {elapsed / K:.3f} per file")

if __name__ == "__main__":
    main()

在读取两打FP数据文件时,我观察到如下的时间:

7.879秒已过,每个文件0.328秒

考虑到这是每秒2百万行,每行有100多字节需要解压缩,对我来说似乎是合理的吞吐量。

启用ParquetDataset的.read()调用显示了大致相同的吞吐量,但有一个重要的警告。读取一些文件效果很好。如果要读取全部2 GiB的压缩数据,最好有足够的可用内存来存储它,否则吞吐量将下降10倍。通常,在操作方便大小的块上操作是有利的,根据需要分配和释放内存。

指定...,compression=None在写入时不会改善读取的时间。

从积极的一面来看,二进制Parquet格式明显是一个巨大的优势。以208 MiB纯文本.CSV格式读取相同的数据需要约4秒,慢了一个数量级。使用Gzip进行压缩后,它的大小与Parquet相同,但读取需要大约5秒。

cProfile显示我们花费约20%的时间在磁盘I/O上,约80%的时间在解压缩和编组位进入数组格式,这听起来是正确的。我没有注意到任何严重的低效率。

tl;dr: 你已经做得很正确。

英文:

The original post omits a
reprex.
I attempted to reproduce the reported symptoms with the enclosed code,
but was unable to.
The test system is a MacBook Air that mounts
apfs SSD storage.
The Intel Core i7 is clocked at 2.2 GHz.


#! /usr/bin/env python

# $ python -m cProfile -s tottime geo/ski/bench_parquet.py
from pathlib import Path
from time import time

from tqdm import tqdm
import numpy as np
import pandas as pd
import pyarrow.parquet as pq

K = 24
PQ_DIR = Path("/tmp/parquet.d")


def gen_dfs(k=K, dst_dir=PQ_DIR, shape=(667_858, 48)):
    dst_dir.mkdir(exist_ok=True)
    rng = np.random.default_rng()
    for i in range(k):
        df = pd.DataFrame(
            rng.integers(66_000, size=shape) / 1_000,
            columns=[f"col_{j}" for j in range(shape[1])],
        )
        print(i)
        df.to_parquet(dst_dir / f"{i}.parquet")


def read_dfs(src_dir=PQ_DIR):
    for file in src_dir.glob("*.parquet"):
        yield pd.read_parquet(file)


def main():
    gen_dfs()
    t0 = time()

    for df in tqdm(read_dfs()):
        assert len(df) > 0

    files = list(PQ_DIR.glob("*.parquet"))
    dataset = pq.ParquetDataset(files)
    assert dataset
    # df = dataset.read(use_threads=True).to_pandas()

    elapsed = time() - t0
    print(f"{elapsed:.3f} seconds elapsed, {elapsed / K:.3f} per file")


if __name__ == "__main__":
    main()

When reading two dozen files of FP data I observe timings like this:

7.879 seconds elapsed, 0.328 per file

Given that this is 2 M row / second, each row having
more than a hundred bytes of data that needs to be decompressed,
it seems like reasonable throughput to me.

Enabling the ParquetDataset .read() call shows about the same throughput,
with one big caveat. Reading a handful of files works fine. If you want to
read the full 2 GiB of compressed data, you'd better have plenty
of free RAM to store it, or throughput will plummet by a factor of 10x.
Often it's a win to operate on conveniently sized chunks,
allocating and freeing as you go.


Specifying ... , compression=None on writes does not improve the read timings.

On the positive side, binary parquet format is demonstrably a big win.
Reading the same data formatted as a 208 MiB plain text .CSV takes ~ 4 seconds -- an order of magnitude slower.
Gzip'ing it yields the same size as parquet, at a cost of ~ 5 seconds
to read it.


cProfile
reveals that we spend ~ 20% of the time on disk I/O and ~ 80%
of the time decompressing and marshaling the bits into array format,
which sounds about right. I don't notice any terrible inefficiencies in this.

tl;dr: You're doing it correctly already.

huangapple
  • 本文由 发表于 2023年2月24日 03:42:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/75549599.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定