将CSV转换为JSON在Dataflow中

huangapple go评论108阅读模式
英文:

Transformation of csv to json in Dataflow

问题

以下是从CSV到JSON的转换部分的翻译,以及有关修复输出格式和提高速度的问题:

  1. # 从CSV到JSON的转换部分
  2. class ConvertCsvToJson(beam.DoFn):
  3. def process(self, element):
  4. # 假设CSV有三列:col1、col2、col3
  5. col1, col2, col3 = element.split(',')
  6. # 创建一个JSON对象
  7. json_obj = {
  8. 'col1': col1.strip(),
  9. 'col2': col2.strip(),
  10. 'col3': col3.strip()
  11. }
  12. yield json_obj
  13. # Pipeline代码
  14. (pipeline
  15. | 'Read CSV' >> ReadFromText(input_file, skip_header_lines=1) # 跳过第一行
  16. | 'Convert to JSON' >> beam.ParDo(ConvertCsvToJson())
  17. | 'Create JSON Array' >> beam.combiners.ToList() # 将JSON对象列表转换为JSON数组
  18. )
  19. # 生成列名(col1、col2)基于CSV文件的标题行
  20. def extract_headers(element):
  21. return element.split(',')
  22. header = (pipeline
  23. | 'Read Header' >> ReadFromText(input_file, skip_header_lines=0, num_parallel_reads=1)
  24. | 'Extract Headers' >> beam.Map(extract_headers)
  25. | 'Select First Row' >> beam.Map(lambda x: x[0])
  26. | 'Create JSON Schema' >> beam.Map(lambda x: {f'col{i + 1}': x[i].strip() for i in range(len(x))})
  27. )
  28. # 提高Pipeline的运行速度
  29. # 1. 优化代码以提高性能,例如使用更有效的操作和数据结构。
  30. # 2. 增加机器资源,可以通过在运行Pipeline时为其分配更多的CPU和内存来实现。
  31. # 3. 并行处理,使用Beam的并行处理功能,将数据流分成更小的块并并行处理它们,以加快处理速度。
  32. # 4. 在数据存储和读取方面进行优化,例如使用更快的存储系统或缓存。

希望这些翻译和建议能够帮助您解决问题并提高Pipeline的运行速度。

英文:

The below is my transformation part from csv to json. The csv has three columns.

  1. class ConvertCsvToJson(beam.DoFn):
  2. def process(self, element):
  3. # Assuming the CSV has three columns: col1, col2, col3
  4. col1, col2, col3 = element.split(',')
  5. # Create a JSON object
  6. json_obj = {
  7. 'col1': col1.strip(),
  8. 'col2': col2.strip(),
  9. 'col3': col3.strip()
  10. }
  11. yield json_obj

and the pipeline code as

  1. (pipeline
  2. | 'Read CSV' >> ReadFromText(input_file, skip_header_lines=1) # Skip the first line
  3. | 'Convert to JSON' >> beam.ParDo(ConvertCsvToJson())

the problem is the json code is coming as below because I dont have a loop. I cant seem to figure out on the fix

  1. {'col1': '100001', 'col2': 'Claude Potier (Earth-616)', 'col3': 'Secret'}
  2. {'col1': '100002', 'col2': 'Elektra Natchios (Earth-616)', 'col3': 'Secret'}

it should be coming as

  1. [
  2. {
  3. "col1": "100001",
  4. "col2": "Claude Potier (Earth-616)",
  5. "col3": "Secret"
  6. },
  7. {
  8. "col1": "100002",
  9. "col2": "Elektra Natchios (Earth-616)",
  10. "col3": "Secret"
  11. }
  12. ]

also is there a way I can generate the col1,clo2 based on the headers from the csv itself?
Also the pipeline almost takes 5 minutes to run, is there a way to speed up the process?

答案1

得分: 2

  1. Beam用于分布式计算。因此,它不知道整个数据以将所有内容转储为一个大的JSON对象。

  2. 您可以通过仅读取第一行来获取标头。

  3. 处理JSON格式的更好方法可能是使用beam.dataframe:

  1. import apache_beam as beam
  2. pipeline = beam.Pipeline()
  3. beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('solar_events.csv')
  4. beam.dataframe.io.to_json(beam_df, 'solar_events.jsonl')
  5. pipeline.run()

solar_events.csv

  1. timestamp,event
  2. 2021-03-20 09:37:00,March Equinox
  3. 2021-06-21 03:32:00,June Solstice
  4. 2021-09-22 19:21:00,September Equinox
  5. 2021-12-21 15:59:00,December Solstice

但输出的JSON是

  1. {"timestamp":{"0":"2021-03-20 09:37:00","1":"2021-06-21 03:32:00","2":"2021-09-22 19:21:00","3":"2021-12-21 15:59:00"},"event":{"0":"March Equinox","1":"June Solstice","2":"September Equinox","3":"December Solstice"}}

这允许数据可以分布式存储。

  1. Dataflow设置计算环境的开销较大。如果您的Python代码具有许多依赖项,可能需要5分钟甚至更长时间。
英文:
  1. Beam is for the distributed computing. So it does not know the entire data to dump everything as a big json object.
  2. You can grab the headers by just reading the first line.
  3. Maybe a better way to handle the json format is to use beam.dataframe:
  1. import apache_beam as beam
  2. pipeline = beam.Pipeline()
  3. beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('solar_events.csv')
  4. beam.dataframe.io.to_json(beam_df, 'solar_events.jsonl')
  5. pipeline.run()

solar_events.csv is

  1. timestamp,event
  2. 2021-03-20 09:37:00,March Equinox
  3. 2021-06-21 03:32:00,June Solstice
  4. 2021-09-22 19:21:00,September Equinox
  5. 2021-12-21 15:59:00,December Solstice

but the output json is

  1. {"timestamp":{"0":"2021-03-20 09:37:00","1":"2021-06-21 03:32:00","2":"2021-09-22 19:21:00","3":"2021-12-21 15:59:00"},"event":{"0":"March Equinox","1":"June Solstice","2":"September Equinox","3":"December Solstice"}}

This allows the data can be stored distributively.
4. Dataflow has the overhead to setup the computing environment. It could take 5 minutes or even more if your python code has many dependencies.

huangapple
  • 本文由 发表于 2023年6月16日 04:15:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/76485223.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定