英文:
Transformation of csv to json in Dataflow
问题
以下是从CSV到JSON的转换部分的翻译,以及有关修复输出格式和提高速度的问题:
# 从CSV到JSON的转换部分
class ConvertCsvToJson(beam.DoFn):
def process(self, element):
# 假设CSV有三列:col1、col2、col3
col1, col2, col3 = element.split(',')
# 创建一个JSON对象
json_obj = {
'col1': col1.strip(),
'col2': col2.strip(),
'col3': col3.strip()
}
yield json_obj
# Pipeline代码
(pipeline
| 'Read CSV' >> ReadFromText(input_file, skip_header_lines=1) # 跳过第一行
| 'Convert to JSON' >> beam.ParDo(ConvertCsvToJson())
| 'Create JSON Array' >> beam.combiners.ToList() # 将JSON对象列表转换为JSON数组
)
# 生成列名(col1、col2)基于CSV文件的标题行
def extract_headers(element):
return element.split(',')
header = (pipeline
| 'Read Header' >> ReadFromText(input_file, skip_header_lines=0, num_parallel_reads=1)
| 'Extract Headers' >> beam.Map(extract_headers)
| 'Select First Row' >> beam.Map(lambda x: x[0])
| 'Create JSON Schema' >> beam.Map(lambda x: {f'col{i + 1}': x[i].strip() for i in range(len(x))})
)
# 提高Pipeline的运行速度
# 1. 优化代码以提高性能,例如使用更有效的操作和数据结构。
# 2. 增加机器资源,可以通过在运行Pipeline时为其分配更多的CPU和内存来实现。
# 3. 并行处理,使用Beam的并行处理功能,将数据流分成更小的块并并行处理它们,以加快处理速度。
# 4. 在数据存储和读取方面进行优化,例如使用更快的存储系统或缓存。
希望这些翻译和建议能够帮助您解决问题并提高Pipeline的运行速度。
英文:
The below is my transformation part from csv to json. The csv has three columns.
class ConvertCsvToJson(beam.DoFn):
def process(self, element):
# Assuming the CSV has three columns: col1, col2, col3
col1, col2, col3 = element.split(',')
# Create a JSON object
json_obj = {
'col1': col1.strip(),
'col2': col2.strip(),
'col3': col3.strip()
}
yield json_obj
and the pipeline code as
(pipeline
| 'Read CSV' >> ReadFromText(input_file, skip_header_lines=1) # Skip the first line
| 'Convert to JSON' >> beam.ParDo(ConvertCsvToJson())
the problem is the json code is coming as below because I dont have a loop. I cant seem to figure out on the fix
{'col1': '100001', 'col2': 'Claude Potier (Earth-616)', 'col3': 'Secret'}
{'col1': '100002', 'col2': 'Elektra Natchios (Earth-616)', 'col3': 'Secret'}
it should be coming as
[
{
"col1": "100001",
"col2": "Claude Potier (Earth-616)",
"col3": "Secret"
},
{
"col1": "100002",
"col2": "Elektra Natchios (Earth-616)",
"col3": "Secret"
}
]
also is there a way I can generate the col1,clo2 based on the headers from the csv itself?
Also the pipeline almost takes 5 minutes to run, is there a way to speed up the process?
答案1
得分: 2
-
Beam用于分布式计算。因此,它不知道整个数据以将所有内容转储为一个大的JSON对象。
-
您可以通过仅读取第一行来获取标头。
-
处理JSON格式的更好方法可能是使用beam.dataframe:
import apache_beam as beam
pipeline = beam.Pipeline()
beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('solar_events.csv')
beam.dataframe.io.to_json(beam_df, 'solar_events.jsonl')
pipeline.run()
solar_events.csv
是
timestamp,event
2021-03-20 09:37:00,March Equinox
2021-06-21 03:32:00,June Solstice
2021-09-22 19:21:00,September Equinox
2021-12-21 15:59:00,December Solstice
但输出的JSON是
{"timestamp":{"0":"2021-03-20 09:37:00","1":"2021-06-21 03:32:00","2":"2021-09-22 19:21:00","3":"2021-12-21 15:59:00"},"event":{"0":"March Equinox","1":"June Solstice","2":"September Equinox","3":"December Solstice"}}
这允许数据可以分布式存储。
- Dataflow设置计算环境的开销较大。如果您的Python代码具有许多依赖项,可能需要5分钟甚至更长时间。
英文:
- Beam is for the distributed computing. So it does not know the entire data to dump everything as a big json object.
- You can grab the headers by just reading the first line.
- Maybe a better way to handle the json format is to use beam.dataframe:
import apache_beam as beam
pipeline = beam.Pipeline()
beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('solar_events.csv')
beam.dataframe.io.to_json(beam_df, 'solar_events.jsonl')
pipeline.run()
solar_events.csv
is
timestamp,event
2021-03-20 09:37:00,March Equinox
2021-06-21 03:32:00,June Solstice
2021-09-22 19:21:00,September Equinox
2021-12-21 15:59:00,December Solstice
but the output json is
{"timestamp":{"0":"2021-03-20 09:37:00","1":"2021-06-21 03:32:00","2":"2021-09-22 19:21:00","3":"2021-12-21 15:59:00"},"event":{"0":"March Equinox","1":"June Solstice","2":"September Equinox","3":"December Solstice"}}
This allows the data can be stored distributively.
4. Dataflow has the overhead to setup the computing environment. It could take 5 minutes or even more if your python code has many dependencies.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论