英文:
Reading GPKG format file over Apache Beam
问题
Q1: 哪个Beam库可以帮助我加载geopackage文件?
Q2: 作为备选方案,我正在尝试将geopackage文件读取为二进制文件,然后通过ParDo进行转换并加载它。我们如何在Apache Beam中读取二进制文件?
Update: 备选方案
我有一个需求,要通过Python Apache Beam(Dataflow作为运行程序)来读取二进制编码文件。
我正试图在我的代码中复制以下示例“从新源读取”的示例,以读取二进制文件。
我的代码如下,你能帮助我找出问题在哪里吗:
#------------导入库-----------------------#
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import apache_beam as beam, os, sys, argparse
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io import iobase
#------------设置BQ参数-----------------------#
project = 'proj-dev'
dataset_id = 'sandbox'
table_schema_Audit = ('id:STRING,name:STRING')
input = 'gs://bucket/beers.csv'
BUCKET = 'bucket'
#-------------拆分记录----------------------#
class Transaction(iobase.BoundedSource):
def process(self):
# 打开Shapefile
import fiona
with fiona.open('gs://bucket/2022_data.gpkg', 'r') as input_file:
parsed_data = [{"id": json.loads(json.dumps(feature['properties']))['Id'],
"name": json.loads(json.dumps(feature['properties']))['Name']} for feature in input_file]
return parsed_data
def run(argv=None, save_main_session=True):
pipeline_args = [
'--project={0}'.format(project),
'--job_name=loadstructu',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--region=us-yyyy1',
'--runner=DataflowRunner',
'--subnetwork=https://www.googleapis.com/compute/v1/projects/proj-dev/regions/us-yyyy1/subnetworks/xxxxxx-dev-subnet'
]
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
p1 = beam.Pipeline(options=pipeline_options)
data_loading = (
p1
| 'ReadData' >> beam.io.ReadFromText(Transaction())
)
#---------------------类型=加载----------------------------------------------------------------------------------------------------------------------
result = (
data_loading
| 'Write-Audit' >> beam.io.WriteToBigQuery(
table='structdata',
dataset=dataset_id,
project=project,
schema=table_schema_Audit,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
))
result = p1.run()
result.wait_until_finish()
if __name__ == '__main__':
run()
它弹出以下错误:
~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, validate, skip_header_lines, delimiter, escapechar, **kwargs)
772 skip_header_lines=skip_header_lines,
773 delimiter=delimiter,
---> 774 escapechar=escapechar)
775
776 def expand(self, pvalue):
~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, buffer_size, validate, skip_header_lines, header_processor_fns, delimiter, escapechar)
133 min_bundle_size,
134 compression_type=compression_type,
---> 135 validate=validate)
136
137 self._strip_trailing_newlines = strip_trailing_newlines
~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/filebasedsource.py in __init__(self, file_pattern, min_bundle_size, compression_type, splittable, validate)
110 '%s: file_pattern must be of type string'
111 ' or ValueProvider; got %r instead' %
--> 112 (self.__class__.__name__, file_pattern))
113
114 if isinstance(file_pattern, str):
TypeError: _TextSource: file_pattern must be of type string or ValueProvider; got <__main__.Transaction object at 0x7fcc79ffc250> instead
英文:
I have a requirement to parse and load gpgk
extension file to Bigquery table through apache beam (Dataflow runner). I could see that beam has feature called Geobeam, but i couldn't see reference for loading of gpgk
files.
Q1: Which Beam library can help me to load geopakage
file?
Q2: As an alternate solution i am trying to read geopakage
file as Binary file and over ParDo
can transform it and get it loaded. How we can read Binary
file over Apache beam?
Does any one has experience over the same and share experience.
Update: Alternate solution
I have a requirement to read Binary Coded file through Python Apache beam (Dataflow as a runner).
I am trying to replicate following example Reading from a new Source
over my code to read Binary files.
My code looks is given below, can you help me where its going wrong:-
#------------Import Lib-----------------------#
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import apache_beam as beam, os, sys, argparse
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io import iobase
#------------Set up BQ parameters-----------------------#
project = 'proj-dev'
dataset_id = 'sandbox'
table_schema_Audit = ('id:STRING,name:STRING')
input = 'gs://bucket/beers.csv'
BUCKET = 'bucket'
#-------------Splitting Of Records----------------------#
class Transaction(iobase.BoundedSource):
def process(self):
# Open the Shapefile
import fiona
with fiona.open('gs://bucket/2022_data.gpkg', 'r') as input_file:
parsed_data = [[{"id": json.loads(json.dumps(feature['properties']))['Id'],
"name": json.loads(json.dumps(feature['properties']))['Name']}] for feature in input_file]
return parsed_data
def run(argv=None, save_main_session=True):
pipeline_args = [
'--project={0}'.format(project),
'--job_name=loadstructu',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--region=us-yyyy1',
'--runner=DataflowRunner',
'--subnetwork=https://www.googleapis.com/compute/v1/projects/proj-dev/regions/us-yyyy1/subnetworks/xxxxxx-dev-subnet'
]
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
p1 = beam.Pipeline(options=pipeline_options)
data_loading = (
p1
| 'ReadData' >> beam.io.ReadFromText(Transaction())
)
#---------------------Type = load----------------------------------------------------------------------------------------------------------------------
result = (
data_loading
| 'Write-Audit' >> beam.io.WriteToBigQuery(
table='structdata',
dataset=dataset_id,
project=project,
schema=table_schema_Audit,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
))
result = p1.run()
result.wait_until_finish()
if __name__ == '__main__':
run()
It popping error as given below:-
~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, validate, skip_header_lines, delimiter, escapechar, **kwargs)
772 skip_header_lines=skip_header_lines,
773 delimiter=delimiter,
--> 774 escapechar=escapechar)
775
776 def expand(self, pvalue):
~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, buffer_size, validate, skip_header_lines, header_processor_fns, delimiter, escapechar)
133 min_bundle_size,
134 compression_type=compression_type,
--> 135 validate=validate)
136
137 self._strip_trailing_newlines = strip_trailing_newlines
~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/filebasedsource.py in __init__(self, file_pattern, min_bundle_size, compression_type, splittable, validate)
110 '%s: file_pattern must be of type string'
111 ' or ValueProvider; got %r instead' %
--> 112 (self.__class__.__name__, file_pattern))
113
114 if isinstance(file_pattern, str):
TypeError: _TextSource: file_pattern must be of type string or ValueProvider; got <__main__.Transaction object at 0x7fcc79ffc250> instead
答案1
得分: 1
#-------------将二进制文件转换为文本文件----------------------#
def file_create():
# 打开Shapefile
import fiona, json
f = open("data.txt", "a")
with fiona.open('gs://bucket/2022_data.gpkg', 'r') as input_file:
for feature in input_file:
row = json.loads(json.dumps(feature['properties']))['Id'] + ';' + json.loads(json.dumps(feature['properties']))['Name'] + ';' + str(shape(json.loads(json.dumps(feature['geometry'])))) +'\n'
f.write(row)
f.close()
#---------------将新创建的文件上传到GCS-------------------------
def upload():
client = storage.Client()
bucket = client.bucket('bucket')
blob = bucket.blob('data.txt')
blob.upload_from_filename('data.txt')
英文:
solution which worked for me is by adding following 2 additional functions before running beam pipelines :-
#-------------convert binary file to a txt file----------------------#
def file_create():
# Open the Shapefile
import fiona, json
f = open("data.txt", "a")
with fiona.open('gs://bucket/2022_data.gpkg', 'r') as input_file:
for feature in input_file:
row = json.loads(json.dumps(feature['properties']))['Id'] + ';' + json.loads(json.dumps(feature['properties']))['Name'] + ';' + str(shape(json.loads(json.dumps(feature['geometry'])))) +'\n'
f.write(row)
f.close()
#---------------Upload newly created file to GCS-------------------------
def upload():
client = storage.Client()
bucket = client.bucket('bucket')
blob = bucket.blob('data.txt')
blob.upload_from_filename('data.txt')
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论