将CSV文件上传并根据两列数据分隔成Parquet文件。

huangapple go评论78阅读模式
英文:

Uploading a csv file to separate parquet files based on 2 column data

问题

我有一个类似下面显示的 CSV 文件。

我想要筛选出 ID 和组合的不同值,并将每个组合保存为一个 Parquet 文件。例如:12888_1.parquet,13368_1.parquet 等等。我有多个不同的 ID 和 6 个组合 [1,2,3,4,5,6]。

我知道复制活动可以将任何文件保存为 .parquet 文件并自定义名称,但我不确定如何对列进行分组并保存 Parquet 文件。任何帮助将不胜感激。

英文:

I have a csv file like shown below.

将CSV文件上传并根据两列数据分隔成Parquet文件。

I want to filter the distinct values of ID and Combinations and save each combo as a parquet file. example : 12888_1.parquet, 13368_1.parquet etc.... I have several different IDs and 6 combinations [1,2,3,4,5,6].

I am aware that copy activity can save any file as .parquet file with a custom name, But I am not sure how to grouping the column and saving the parquet files. Any help would be appreciated.

答案1

得分: 1

  • 你可以通过数据流和管道活动的组合来实现你的需求。使用数据流活动来使用聚合转换获取唯一的 id+combination 值,使用 sink 缓存,遍历这个结果,并将这些值传递到另一个数据流中,根据这些值进行过滤和写入。

  • 以下是我获取的示例文件数据:

id,combo,character
123,1,A
123,1,B
123,1,C
234,1,D
234,1,E
234,2,F
234,2,G
234,2,H
234,2,I
234,3,J
345,3,K
345,3,L
345,3,M
456,4,N
456,4,O
567,5,P
567,5,Q
567,6,R
678,6,S
  • 现在,我已将这个作为我的数据流源。我添加了一个聚合转换,并根据 id 和 combo 对数据进行分组。使用任何聚合转换(无论如何我们都会在映射中排除它)。

将CSV文件上传并根据两列数据分隔成Parquet文件。

  • 现在,只选择 id 和 combo 并使用 sink 缓存。以下是数据流活动的调试输出:

将CSV文件上传并根据两列数据分隔成Parquet文件。

  • 现在使用 for each 活动遍历这个数据,使用以下动态内容:
@activity('get unique id and combo').output.runStatus.output.sink1.value
  • 在每次迭代中将 id 和 combo 的值传递给新的数据流。以下是相同的图像:

将CSV文件上传并根据两列数据分隔成Parquet文件。

  • 在新的数据流中,以所需文件作为源。使用以下条件的筛选活动:
id==$id && combo==$combo

将CSV文件上传并根据两列数据分隔成Parquet文件。

  • 使用以下动态内容和配置为每次迭代命名文件:
concat($id,'_',$combo,'.parquet')

将CSV文件上传并根据两列数据分隔成Parquet文件。

  • 文件将以 parquet 文件的形式写入到您的存储账户中,如下所示:

将CSV文件上传并根据两列数据分隔成Parquet文件。

英文:
  • You can achieve your requirement using the combination of dataflows and pipeline activities. Use a dataflow activity to get the unique id+combination values using aggregate transformation, use sink cache, iterate through this result and pass the values to another dataflow to filter and write based on these values.

  • The following is a sample file data that I have taken:

id,combo,character
123,1,A
123,1,B
123,1,C
234,1,D
234,1,E
234,2,F
234,2,G
234,2,H
234,2,I
234,3,J
345,3,K
345,3,L
345,3,M
456,4,N
456,4,O
567,5,P
567,5,Q
567,6,R
678,6,S
  • Now, I have taken this as my dataflow source. I have added an aggregate transformation and grouped the data by id and combo. Use any aggregate transformation (we will exclude this in mapping anyway).

将CSV文件上传并根据两列数据分隔成Parquet文件。

  • Now, select only the id and combo and use sink cache. The following is how the debug output of the dataflow activity looks like:

将CSV文件上传并根据两列数据分隔成Parquet文件。

  • Now iterate through this using for each activity with the following dynamic content:
@activity('get unique id and combo').output.runStatus.output.sink1.value
  • Pass the values of id and combo in each iteration to the new dataflow. The following is an image of the same:

将CSV文件上传并根据两列数据分隔成Parquet文件。

  • In the new dataflow, take the required file as source. Use filter activity with the following condition:
id==$id && combo==$combo

将CSV文件上传并根据两列数据分隔成Parquet文件。

  • Use the following dynamic content and configuration to name the file for each iteration:
concat($id,'_',$combo,'.parquet')

将CSV文件上传并根据两列数据分隔成Parquet文件。

  • The following is how the files will be written to your storage account as parquet files:

将CSV文件上传并根据两列数据分隔成Parquet文件。

huangapple
  • 本文由 发表于 2023年6月16日 04:12:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/76485212.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定