为什么toPandas不好?

huangapple go评论54阅读模式
英文:

Why toPandas is bad?

问题

根据文档,PySpark的toPandas方法并不适用于大型数据集,即使启用了Arrow。

只有在预期生成的 Pandas pandas.DataFrame 很小的情况下,才应使用此方法,因为所有数据都加载到驱动程序的内存中。

尽管如此,许多人仍然使用它来传输相对较大的数据框(我个人曾经传输过5GB的数据)。

  1. 为什么toPandas对于大型数据实际上是不好的?

  2. 超过什么阈值可以认为数据框足够小,可以通过toPandas进行传输?

  3. 将数据框传输到本地 Python 的最合适方法是什么?df.repartition(1).write.csv,然后跟随 hdfs dfs -get

英文:

According to the documentation, PySpark's toPandas method is not designed to be used with large datasets. Even with Arrow enabled.

> This method should only be used if the resulting Pandas pandas.DataFrame is expected to be small, as all the data is loaded into the driver’s memory.

Nevertheless, many of us use it to transfer relatively large dataframes (I personally transferred 5Gb once).

  1. Why toPandas is actually bad for large data?

  2. After what threshold can it be considered that a dataframe is small enough to be transferred via toPandas

  3. What is the most suitable approach to transfer a dataframe to the local python? df.repartition(1).write.csv followed by hdfs dfs -get?

Upd. I consider datasets that can comfortly fit to driver's node memory

答案1

得分: 2

  1. 因为它将占用你的RAM中的所有数据。如果RAM接近满了,数据将被转移到硬盘上的交换内存,这将显著减慢操作速度。特别是如果CPU需要访问先前移至交换的数据,因为它需要将其放回RAM,同时将其他数据移到交换内存中。如果RAM和交换限制都超过了,你将面临内存不足的错误,操作将取消。

  2. 阈值实际上取决于主机机器上可用的RAM,但通常建议远低于可能的限制,以确保在服务器负载高时操作速度快。

  3. 通常最好对大数据进行流式处理以避免此问题。通常,你无法知道主机机器上将有多少RAM和交换可用。对于大数据,流式处理更安全、更快速。这篇文章提供了四种优秀的策略,我想引用其中两种:

    抽样:最简单的选项是对数据集进行抽样。这种方法在探索阶段尤其强大:数据是什么样子?我可以创建哪些特征?换句话说,什么有效,什么无效。通常,对这样一个大数据集进行的10%的随机抽样就已经包含了大量信息。这引出了第一个问题,你实际上是否需要处理整个数据集来训练一个足够好的模型?

    import pandas
    import random
    
    filename = "data.csv" 
    n = sum(1 for line in open(filename))-1  # 计算文件中的行数
    s = n//10  # 10%的样本大小
    skip = sorted(random.sample(range(1, n+1), n-s))  # n+1以补偿标题 
    df = pandas.read_csv(filename, skiprows=skip)
    

    分块处理:如果确实需要处理所有数据,可以选择将数据分割为若干块(每块都适合内存),并在每个单独的块上执行数据清理和特征工程。此外,根据要使用的模型类型,有两种选择:

    • 如果所选模型允许部分拟合,可以在每个块的数据上逐步训练模型;
    • 在每个单独的块上训练一个模型。然后,对新的未见数据进行评分,使用每个模型进行预测,并将平均值或多数投票作为最终预测。
    import pandas
    from sklearn.linear_model import LogisticRegression
    datafile = "data.csv"
    chunksize = 100000
    models = []
    for chunk in pd.read_csv(datafile, chunksize=chunksize):
        chunk = pre_process_and_feature_engineer(chunk) 
        # 用于清理数据并创建特征的函数
        model = LogisticRegression()
        model.fit(chunk[features], chunk['label'])
        models.append(model)
    df = pd.read_csv("data_to_score.csv")
    df = pre_process_and_feature_engineer(df)
    predictions = mean([model.predict(df[features]) for model in models], axis=0)
    
英文:
  1. Because it will occupy the entire data in your RAM. If RAM gets nearly full, the data will be off-loaded to the swap memory on the hard drive, which will slow down the operation significantly. This is especially the case, if the CPU need to access data that was moved to the swap previously, because it needs to put it back in the RAM moving something else into the swap memory. If RAM and swap limit will both be exceeded, you will face an out of memory error and the operation will cancel.

  2. The Threshold really depends on the available RAM on the host machine, but it is generally recommend to stay way below possible limits to ensure a fast operation when the server is in high load.

  3. It's generally better to stream large data to avoid this issue. Often, you can't know how much RAM and swap will be available on the host machine. Streaming is safer and faster for large data. This article has four excellent strategies and I'd like to quote two of them here:

> Sampling: The most simple option is sampling your dataset. This approach can be especially powerful during the exploration phase: how does the data look like? What features can I create? In other words, what works and what does not. Often a random sample of 10% of such a large dataset will already contain a lot of information. That raises the first question, do you actually need to process your entire dataset to train an adequate model?
>
> py
> import pandas
> import random
>
> filename = "data.csv"
> n = sum(1 for line in open(filename))-1 # Calculate number of rows > in file
> s = n//10 # sample size of 10%
> skip = sorted(random.sample(range(1, n+1), n-s)) # n+1 to compensate for header
> df = pandas.read_csv(filename, skiprows=skip)
>

>
> Chunking: If you do need to process all data, you can choose to split the data into a number of chunks (which in itself do fit in memory) and perform your data cleaning and feature engineering on each individual chunk. Moreover, depending on the type of model you want to use, you have two options:
>
> - If the model of your choosing allows for partial fitting, you can incrementally train a model on the data of each chunk;
> - Train a model on each individual chunk. Subsequently, to score new unseen data, make a prediction with each model and take the average or majority vote as the final prediction.
>
> py
> import pandas
> from sklearn.linear_model import LogisticRegression
> datafile = "data.csv"
> chunksize = 100000
> models = []
> for chunk in pd.read_csv(datafile, chunksize=chunksize):
> chunk = pre_process_and_feature_engineer(chunk)
> # A function to clean my data and create my features
> model = LogisticRegression()
> model.fit(chunk[features], chunk['label'])
> models.append(model)
> df = pd.read_csv("data_to_score.csv")
> df = pre_process_and_feature_engineer(df)
> predictions = mean([model.predict(df[features]) for model in models], axis=0)
>

huangapple
  • 本文由 发表于 2023年2月27日 07:06:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/75575544.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定