读取 Apache Beam 上的 GPKG 格式文件

huangapple go评论149阅读模式
英文:

Reading GPKG format file over Apache Beam

问题

Q1: 哪个Beam库可以帮助我加载geopackage文件?
Q2: 作为备选方案,我正在尝试将geopackage文件读取为二进制文件,然后通过ParDo进行转换并加载它。我们如何在Apache Beam中读取二进制文件?

Update: 备选方案
我有一个需求,要通过Python Apache Beam(Dataflow作为运行程序)来读取二进制编码文件。

我正试图在我的代码中复制以下示例“从新源读取”的示例,以读取二进制文件。

我的代码如下,你能帮助我找出问题在哪里吗:

  1. #------------导入库-----------------------#
  2. from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
  3. import apache_beam as beam, os, sys, argparse
  4. from apache_beam.options.pipeline_options import SetupOptions
  5. from apache_beam.io import iobase
  6. #------------设置BQ参数-----------------------#
  7. project = 'proj-dev'
  8. dataset_id = 'sandbox'
  9. table_schema_Audit = ('id:STRING,name:STRING')
  10. input = 'gs://bucket/beers.csv'
  11. BUCKET = 'bucket'
  12. #-------------拆分记录----------------------#
  13. class Transaction(iobase.BoundedSource):
  14. def process(self):
  15. # 打开Shapefile
  16. import fiona
  17. with fiona.open('gs://bucket/2022_data.gpkg', 'r') as input_file:
  18. parsed_data = [{"id": json.loads(json.dumps(feature['properties']))['Id'],
  19. "name": json.loads(json.dumps(feature['properties']))['Name']} for feature in input_file]
  20. return parsed_data
  21. def run(argv=None, save_main_session=True):
  22. pipeline_args = [
  23. '--project={0}'.format(project),
  24. '--job_name=loadstructu',
  25. '--staging_location=gs://{0}/staging/'.format(BUCKET),
  26. '--temp_location=gs://{0}/staging/'.format(BUCKET),
  27. '--region=us-yyyy1',
  28. '--runner=DataflowRunner',
  29. '--subnetwork=https://www.googleapis.com/compute/v1/projects/proj-dev/regions/us-yyyy1/subnetworks/xxxxxx-dev-subnet'
  30. ]
  31. pipeline_options = PipelineOptions(pipeline_args)
  32. pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
  33. p1 = beam.Pipeline(options=pipeline_options)
  34. data_loading = (
  35. p1
  36. | 'ReadData' >> beam.io.ReadFromText(Transaction())
  37. )
  38. #---------------------类型=加载----------------------------------------------------------------------------------------------------------------------
  39. result = (
  40. data_loading
  41. | 'Write-Audit' >> beam.io.WriteToBigQuery(
  42. table='structdata',
  43. dataset=dataset_id,
  44. project=project,
  45. schema=table_schema_Audit,
  46. create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
  47. write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
  48. ))
  49. result = p1.run()
  50. result.wait_until_finish()
  51. if __name__ == '__main__':
  52. run()

它弹出以下错误:

  1. ~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, validate, skip_header_lines, delimiter, escapechar, **kwargs)
  2. 772 skip_header_lines=skip_header_lines,
  3. 773 delimiter=delimiter,
  4. ---> 774 escapechar=escapechar)
  5. 775
  6. 776 def expand(self, pvalue):
  7. ~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, buffer_size, validate, skip_header_lines, header_processor_fns, delimiter, escapechar)
  8. 133 min_bundle_size,
  9. 134 compression_type=compression_type,
  10. ---> 135 validate=validate)
  11. 136
  12. 137 self._strip_trailing_newlines = strip_trailing_newlines
  13. ~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/filebasedsource.py in __init__(self, file_pattern, min_bundle_size, compression_type, splittable, validate)
  14. 110 '%s: file_pattern must be of type string'
  15. 111 ' or ValueProvider; got %r instead' %
  16. --> 112 (self.__class__.__name__, file_pattern))
  17. 113
  18. 114 if isinstance(file_pattern, str):
  19. TypeError: _TextSource: file_pattern must be of type string or ValueProvider; got <__main__.Transaction object at 0x7fcc79ffc250> instead
英文:

I have a requirement to parse and load gpgk extension file to Bigquery table through apache beam (Dataflow runner). I could see that beam has feature called Geobeam, but i couldn't see reference for loading of gpgk files.

Q1: Which Beam library can help me to load geopakage file?
Q2: As an alternate solution i am trying to read geopakage file as Binary file and over ParDo can transform it and get it loaded. How we can read Binary file over Apache beam?

Does any one has experience over the same and share experience.

Update: Alternate solution
I have a requirement to read Binary Coded file through Python Apache beam (Dataflow as a runner).

I am trying to replicate following example Reading from a new Source over my code to read Binary files.

My code looks is given below, can you help me where its going wrong:-

  1. #------------Import Lib-----------------------#
  2. from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
  3. import apache_beam as beam, os, sys, argparse
  4. from apache_beam.options.pipeline_options import SetupOptions
  5. from apache_beam.io import iobase
  6. #------------Set up BQ parameters-----------------------#
  7. project = &#39;proj-dev&#39;
  8. dataset_id = &#39;sandbox&#39;
  9. table_schema_Audit = (&#39;id:STRING,name:STRING&#39;)
  10. input = &#39;gs://bucket/beers.csv&#39;
  11. BUCKET = &#39;bucket&#39;
  12. #-------------Splitting Of Records----------------------#
  13. class Transaction(iobase.BoundedSource):
  14. def process(self):
  15. # Open the Shapefile
  16. import fiona
  17. with fiona.open(&#39;gs://bucket/2022_data.gpkg&#39;, &#39;r&#39;) as input_file:
  18. parsed_data = [[{&quot;id&quot;: json.loads(json.dumps(feature[&#39;properties&#39;]))[&#39;Id&#39;],
  19. &quot;name&quot;: json.loads(json.dumps(feature[&#39;properties&#39;]))[&#39;Name&#39;]}] for feature in input_file]
  20. return parsed_data
  21. def run(argv=None, save_main_session=True):
  22. pipeline_args = [
  23. &#39;--project={0}&#39;.format(project),
  24. &#39;--job_name=loadstructu&#39;,
  25. &#39;--staging_location=gs://{0}/staging/&#39;.format(BUCKET),
  26. &#39;--temp_location=gs://{0}/staging/&#39;.format(BUCKET),
  27. &#39;--region=us-yyyy1&#39;,
  28. &#39;--runner=DataflowRunner&#39;,
  29. &#39;--subnetwork=https://www.googleapis.com/compute/v1/projects/proj-dev/regions/us-yyyy1/subnetworks/xxxxxx-dev-subnet&#39;
  30. ]
  31. pipeline_options = PipelineOptions(pipeline_args)
  32. pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
  33. p1 = beam.Pipeline(options=pipeline_options)
  34. data_loading = (
  35. p1
  36. | &#39;ReadData&#39; &gt;&gt; beam.io.ReadFromText(Transaction())
  37. )
  38. #---------------------Type = load----------------------------------------------------------------------------------------------------------------------
  39. result = (
  40. data_loading
  41. | &#39;Write-Audit&#39; &gt;&gt; beam.io.WriteToBigQuery(
  42. table=&#39;structdata&#39;,
  43. dataset=dataset_id,
  44. project=project,
  45. schema=table_schema_Audit,
  46. create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
  47. write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
  48. ))
  49. result = p1.run()
  50. result.wait_until_finish()
  51. if __name__ == &#39;__main__&#39;:
  52. run()

It popping error as given below:-

  1. ~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, validate, skip_header_lines, delimiter, escapechar, **kwargs)
  2. 772 skip_header_lines=skip_header_lines,
  3. 773 delimiter=delimiter,
  4. --&gt; 774 escapechar=escapechar)
  5. 775
  6. 776 def expand(self, pvalue):
  7. ~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, buffer_size, validate, skip_header_lines, header_processor_fns, delimiter, escapechar)
  8. 133 min_bundle_size,
  9. 134 compression_type=compression_type,
  10. --&gt; 135 validate=validate)
  11. 136
  12. 137 self._strip_trailing_newlines = strip_trailing_newlines
  13. ~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/filebasedsource.py in __init__(self, file_pattern, min_bundle_size, compression_type, splittable, validate)
  14. 110 &#39;%s: file_pattern must be of type string&#39;
  15. 111 &#39; or ValueProvider; got %r instead&#39; %
  16. --&gt; 112 (self.__class__.__name__, file_pattern))
  17. 113
  18. 114 if isinstance(file_pattern, str):
  19. TypeError: _TextSource: file_pattern must be of type string or ValueProvider; got &lt;__main__.Transaction object at 0x7fcc79ffc250&gt; instead

答案1

得分: 1

  1. #-------------将二进制文件转换为文本文件----------------------#
  2. def file_create():
  3. # 打开Shapefile
  4. import fiona, json
  5. f = open("data.txt", "a")
  6. with fiona.open('gs://bucket/2022_data.gpkg', 'r') as input_file:
  7. for feature in input_file:
  8. row = json.loads(json.dumps(feature['properties']))['Id'] + ';' + json.loads(json.dumps(feature['properties']))['Name'] + ';' + str(shape(json.loads(json.dumps(feature['geometry'])))) +'\n'
  9. f.write(row)
  10. f.close()
  11. #---------------将新创建的文件上传到GCS-------------------------
  12. def upload():
  13. client = storage.Client()
  14. bucket = client.bucket('bucket')
  15. blob = bucket.blob('data.txt')
  16. blob.upload_from_filename('data.txt')
英文:

solution which worked for me is by adding following 2 additional functions before running beam pipelines :-

  1. #-------------convert binary file to a txt file----------------------#
  2. def file_create():
  3. # Open the Shapefile
  4. import fiona, json
  5. f = open(&quot;data.txt&quot;, &quot;a&quot;)
  6. with fiona.open(&#39;gs://bucket/2022_data.gpkg&#39;, &#39;r&#39;) as input_file:
  7. for feature in input_file:
  8. row = json.loads(json.dumps(feature[&#39;properties&#39;]))[&#39;Id&#39;] + &#39;;&#39; + json.loads(json.dumps(feature[&#39;properties&#39;]))[&#39;Name&#39;] + &#39;;&#39; + str(shape(json.loads(json.dumps(feature[&#39;geometry&#39;])))) +&#39;\n&#39;
  9. f.write(row)
  10. f.close()
  11. #---------------Upload newly created file to GCS-------------------------
  12. def upload():
  13. client = storage.Client()
  14. bucket = client.bucket(&#39;bucket&#39;)
  15. blob = bucket.blob(&#39;data.txt&#39;)
  16. blob.upload_from_filename(&#39;data.txt&#39;)

huangapple
  • 本文由 发表于 2023年1月9日 16:45:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/75054857.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定