将CSV转换为JSON在Dataflow中

huangapple go评论74阅读模式
英文:

Transformation of csv to json in Dataflow

问题

以下是从CSV到JSON的转换部分的翻译,以及有关修复输出格式和提高速度的问题:

# 从CSV到JSON的转换部分
class ConvertCsvToJson(beam.DoFn):
    def process(self, element):
        # 假设CSV有三列:col1、col2、col3
        col1, col2, col3 = element.split(',')
        
        # 创建一个JSON对象
        json_obj = {
            'col1': col1.strip(),
            'col2': col2.strip(),
            'col3': col3.strip()
        }
        
        yield json_obj

# Pipeline代码
(pipeline
        | 'Read CSV' >> ReadFromText(input_file, skip_header_lines=1)  # 跳过第一行
        | 'Convert to JSON' >> beam.ParDo(ConvertCsvToJson())
        | 'Create JSON Array' >> beam.combiners.ToList()  # 将JSON对象列表转换为JSON数组
)

# 生成列名(col1、col2)基于CSV文件的标题行
def extract_headers(element):
    return element.split(',')

header = (pipeline
          | 'Read Header' >> ReadFromText(input_file, skip_header_lines=0, num_parallel_reads=1)
          | 'Extract Headers' >> beam.Map(extract_headers)
          | 'Select First Row' >> beam.Map(lambda x: x[0])
          | 'Create JSON Schema' >> beam.Map(lambda x: {f'col{i + 1}': x[i].strip() for i in range(len(x))})
          )

# 提高Pipeline的运行速度
# 1. 优化代码以提高性能,例如使用更有效的操作和数据结构。
# 2. 增加机器资源,可以通过在运行Pipeline时为其分配更多的CPU和内存来实现。
# 3. 并行处理,使用Beam的并行处理功能,将数据流分成更小的块并并行处理它们,以加快处理速度。
# 4. 在数据存储和读取方面进行优化,例如使用更快的存储系统或缓存。

希望这些翻译和建议能够帮助您解决问题并提高Pipeline的运行速度。

英文:

The below is my transformation part from csv to json. The csv has three columns.

class ConvertCsvToJson(beam.DoFn):
    def process(self, element):
        # Assuming the CSV has three columns: col1, col2, col3
        col1, col2, col3 = element.split(',')
        
        # Create a JSON object
        json_obj = {
            'col1': col1.strip(),
            'col2': col2.strip(),
            'col3': col3.strip()
        }
        
        yield json_obj

and the pipeline code as

(pipeline
        | 'Read CSV' >> ReadFromText(input_file, skip_header_lines=1)  # Skip the first line
        | 'Convert to JSON' >> beam.ParDo(ConvertCsvToJson())

the problem is the json code is coming as below because I dont have a loop. I cant seem to figure out on the fix

{'col1': '100001', 'col2': 'Claude Potier (Earth-616)', 'col3': 'Secret'}
{'col1': '100002', 'col2': 'Elektra Natchios (Earth-616)', 'col3': 'Secret'}

it should be coming as

[
  {
    "col1": "100001",
    "col2": "Claude Potier (Earth-616)",
    "col3": "Secret"
  },
  {
    "col1": "100002",
    "col2": "Elektra Natchios (Earth-616)",
    "col3": "Secret"
  }
]

also is there a way I can generate the col1,clo2 based on the headers from the csv itself?
Also the pipeline almost takes 5 minutes to run, is there a way to speed up the process?

答案1

得分: 2

  1. Beam用于分布式计算。因此,它不知道整个数据以将所有内容转储为一个大的JSON对象。

  2. 您可以通过仅读取第一行来获取标头。

  3. 处理JSON格式的更好方法可能是使用beam.dataframe:

import apache_beam as beam
pipeline = beam.Pipeline()

beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('solar_events.csv')
beam.dataframe.io.to_json(beam_df, 'solar_events.jsonl') 

pipeline.run()

solar_events.csv

timestamp,event
2021-03-20 09:37:00,March Equinox
2021-06-21 03:32:00,June Solstice
2021-09-22 19:21:00,September Equinox
2021-12-21 15:59:00,December Solstice

但输出的JSON是

{"timestamp":{"0":"2021-03-20 09:37:00","1":"2021-06-21 03:32:00","2":"2021-09-22 19:21:00","3":"2021-12-21 15:59:00"},"event":{"0":"March Equinox","1":"June Solstice","2":"September Equinox","3":"December Solstice"}}

这允许数据可以分布式存储。

  1. Dataflow设置计算环境的开销较大。如果您的Python代码具有许多依赖项,可能需要5分钟甚至更长时间。
英文:
  1. Beam is for the distributed computing. So it does not know the entire data to dump everything as a big json object.
  2. You can grab the headers by just reading the first line.
  3. Maybe a better way to handle the json format is to use beam.dataframe:
import apache_beam as beam
pipeline = beam.Pipeline()

beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('solar_events.csv')
beam.dataframe.io.to_json(beam_df, 'solar_events.jsonl') 

pipeline.run()

solar_events.csv is

timestamp,event
2021-03-20 09:37:00,March Equinox
2021-06-21 03:32:00,June Solstice
2021-09-22 19:21:00,September Equinox
2021-12-21 15:59:00,December Solstice

but the output json is

{"timestamp":{"0":"2021-03-20 09:37:00","1":"2021-06-21 03:32:00","2":"2021-09-22 19:21:00","3":"2021-12-21 15:59:00"},"event":{"0":"March Equinox","1":"June Solstice","2":"September Equinox","3":"December Solstice"}}

This allows the data can be stored distributively.
4. Dataflow has the overhead to setup the computing environment. It could take 5 minutes or even more if your python code has many dependencies.

huangapple
  • 本文由 发表于 2023年6月16日 04:15:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/76485223.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定