加载数据到数据框 – PySpark

huangapple go评论59阅读模式
英文:

Loading data in a dataframe - pyspark

问题

我尝试从分区结构加载数据到一个数据框中。
在我的示例中,我选择了一个特定的时间段,然后循环以获取我的数据范围。
但这不是非常高效的!

start_date = inputdate

end_date = inputend

df_union = None

for single_date in pd.date_range(start_date, end_date, freq='D'):
    yy = single_date.strftime("%y")
    mm = single_date.strftime("%m")
    dd = single_date.strftime("%d")

    df = spark.read.parquet(f"abfs://XXXXX.dfs.core.windows.net/coredb/user_action/{yy}/{mm}/{dd}/user_action.parquet") \
                    .select("user_action_id", "account_id", "inserted" , "partner_id", "status", "service_id") 


    if df_union is None:
        df_union = df
    else:
        df_union = df_union.unionAll(df)

有什么帮助吗? 加载数据到数据框 – PySpark

英文:

I'm trying to load data into a dataframe from a partitionned structure.
I have a specific period selected in my example and then loop to have my data range.
But This is not very performant!

start_date = inputdate

end_date = inputend

df_union = None

for single_date in pd.date_range(start_date, end_date, freq='D'):
    yy = single_date.strftime("%y")
    mm = single_date.strftime("%m")
    dd = single_date.strftime("%d")

    df = spark.read.parquet(f"abfs://XXXXX.dfs.core.windows.net/coredb/user_action/{yy}/{mm}/{dd}/user_action.parquet") \
                    .select("user_action_id", "account_id", "inserted" , "partner_id", "status", "service_id") 
            
            
    if df_union is None:
        df_union = df
    else:
        df_union = df_union.unionAll(df)

Any help ? 加载数据到数据框 – PySpark

答案1

得分: 1

您可以直接使用通配符路径来读取所有具有相同模式的文件。

  • 我有一个名为sample1.csv(3行)的文件,它位于结构化的源目录中,结构如下:yyyy/MM/dd。我有3个日期文件夹结构,即2023/02/252023/02/262023/02/27。使用以下代码,我成功读取了数据:
df = spark.read.option("header", True).csv('abfss://data@datalk2702.dfs.core.windows.net/2023/*/*/sample1.csv')

display(df.limit(10))
  • 但这会从2023文件夹中读取所有名称为sample1.csv的文件。如果您想在startdateenddate之间读取文件,请使用以下代码:
import pandas as pd
start_date = '2023-02-25'
end_date = '2023-02-27'

base_path = 'abfss://data@datalk2702.dfs.core.windows.net/'
paths = []  

for single_date in pd.date_range(start_date, end_date, freq='D'):
    paths.append(base_path + f'{single_date.strftime("%Y")}/{single_date.strftime("%m")}/{single_date.strftime("%d")}/sample1.csv')

#print(paths)

df = spark.read.option("header", True).csv(paths)
display(df.limit(10))

加载数据到数据框 – PySpark

英文:

You can directly use wildcard paths to read all files (with same schema) directly.

  • I have a file called sample1.csv (3 rows) in my source directories structured as yyyy/MM/dd. I have 3 date folder structures i.e., 2023/02/25, 2023/02/26 and 2023/02/27. Using the following code I am able to read the data successfully:
df = spark.read.option("header",True).csv('abfss://data@datalk2702.dfs.core.windows.net/2023/*/*/sample1.csv')

display(df.limit(10))

加载数据到数据框 – PySpark

  • But this reads all files with name sample1.csv from everything inside 2023 folder. If you want to read only in between startdate and enddate, use the following code:
import pandas as pd
start_date = '2023-02-25'
end_date = '2023-02-27'

base_path = 'abfss://data@datalk2702.dfs.core.windows.net/'
paths = []  

for single_date in pd.date_range(start_date, end_date, freq='D'):

    paths.append(base_path + f'{single_date.strftime("%Y")}/{single_date.strftime("%m")}/{single_date.strftime("%d")}/sample1.csv')

#print(paths)

df = spark.read.option("header",True).csv(paths)
display(df.limit(10))

加载数据到数据框 – PySpark

huangapple
  • 本文由 发表于 2023年2月26日 22:19:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/75572604.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定