英文:
Union 60 Dataframes in palantir foundry directory using pyspark
问题
我有一个包含60个铸造数据集的目录。我只需读取所有数据集并将它们合并成一个数据框。
路径 = 输入("MySpend new files P2P/2020/")
数据集1
数据集2
数据集3
...
数据集60
输出("MySpend new files P2P/2020/UnionAll")
英文:
I have a directory with 60+ foundry dataset in it. I just to read all the datasets and union it into a single dataframe
Path = input("MySpend new files P2P/2020/")
Dataset1
Dataset2
Dataset3
...
Dataset60
Output("MySpend new files P2P/2020/UnionAll")
答案1
得分: 1
更加技术性的方法:
- 你可以在你的转换中利用
union_many
(更详细的示例在此),并手动列出其输入。请注意,你可以使用数据谱系快速复制粘贴所有数据集的路径(选择数据集 > 右上角 > "查看直方图"图标 > "复制路径")。基本用法:
from transforms.api import transform_df, Input, Output
from transforms.verbs import dataframes as D
@transform_df(
Output("/path/to/dataset/unioned"),
source_df_1=Input("/path/to/dataset/one"),
source_df_2=Input("/path/to/dataset/two"),
source_df_3=Input("/path/to/dataset/three"),
)
def compute(source_df_1, source_df_2, source_df_3):
return D.union_many(
source_df_1,
source_df_2,
source_df_3,
)
- 同样的方式,更容易复制粘贴,你可以将你的转换参数化,以使用路径数组作为输入:
from transforms.verbs import dataframes as D
from transforms.api import transform_df, Input, Output
# 配置要生成的数据集数量
list_datasets_paths = [
"/path/to/dataset/one",
"/path/to/dataset/two",
"/path/to/dataset/three"]
# 将路径列表转换成Input()字典
input_dict = {}
for dataset_path in list_datasets_paths:
input_dict[dataset_path.split("/")[-1]] = Input(dataset_path)
# 将Input()字典提供给转换
@transform_df(
Output("/path/to/dataset/unioned"),
**input_dict
)
def compute_2(**inputs_dataframes):
# 从输入字典创建数据帧列表
dataframes_list = inputs_dataframes.values()
# 合并数据帧列表
return D.union_many(*dataframes_list)
- 如果数据集的集合随时间演变,你可以使用逻辑流程。它基本上会列出给定输入文件夹中资源的资源标识符(rids),并为包含这些rids/路径的文件向存储库提交一个新的拉取请求。请注意,这是一个测试产品。
注意:你还有其他工具可以用来构建管道,从而合并数据集,比如管道构建器/文档。
英文:
More technically :
- You can leverage
union_many
in your transform (more verbose example here) and list its inputs manually. Note that you can use Data Lineage to quickly copy paste the paths of all the datasets (select datasets > top right > "view histogram" icon > "copy paths"
). Basic usage :
from transforms.api import transform_df, Input, Output
from transforms.verbs import dataframes as D
@transform_df(
Output("/path/to/dataset/unioned"),
source_df_1=Input("/path/to/dataset/one"),
source_df_2=Input("/path/to/dataset/two"),
source_df_3=Input("/path/to/dataset/three"),
)
def compute(source_df_1, source_df_2, source_df_3):
return D.union_many(
source_df_1,
source_df_2,
source_df_3,
)
- Same way but easier to copy paste, you can parameter your transform to use an array of paths as an input
from transforms.verbs import dataframes as D
from transforms.api import transform_df, Input, Output
# Configure the number of datasets to generate
list_datasets_paths = [
"/path/to/dataset/one",
"/path/to/dataset/two",
"/path/to/dataset/three"]
# Convert the list of paths in a dict of Input()
input_dict = {}
for dataset_path in list_datasets_paths:
input_dict[dataset_path.split("/")[-1]] = Input(dataset_path)
# Provide the dict of Input() to the transform
@transform_df(
Output("/path/to/dataset/unioned"),
**input_dict
)
def compute_2(**inputs_dataframes):
# Create a list of dataframes from the input dict
dataframes_list = inputs_dataframes.values()
# Union the list of dataframes
return D.union_many(*dataframes_list)
- If the set of dataset were to evolve over time, you can use Logic Flows. It will essentially list rids (resource identifiers) of resources in a given input folder and open a new pull request the repository with a file containing those rids/paths. Note this is a beta product.
Note: You have as well other tools to build pipeline, and hence union datasets, like Pipeline Builder/docs.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论