读取 Apache Beam 上的 GPKG 格式文件

huangapple go评论115阅读模式
英文:

Reading GPKG format file over Apache Beam

问题

Q1: 哪个Beam库可以帮助我加载geopackage文件?
Q2: 作为备选方案,我正在尝试将geopackage文件读取为二进制文件,然后通过ParDo进行转换并加载它。我们如何在Apache Beam中读取二进制文件?

Update: 备选方案
我有一个需求,要通过Python Apache Beam(Dataflow作为运行程序)来读取二进制编码文件。

我正试图在我的代码中复制以下示例“从新源读取”的示例,以读取二进制文件。

我的代码如下,你能帮助我找出问题在哪里吗:

#------------导入库-----------------------#
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import apache_beam as beam, os, sys, argparse
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io import iobase

#------------设置BQ参数-----------------------#
project = 'proj-dev'
dataset_id = 'sandbox'
table_schema_Audit = ('id:STRING,name:STRING')
input = 'gs://bucket/beers.csv'
BUCKET = 'bucket'

#-------------拆分记录----------------------#
class Transaction(iobase.BoundedSource):
    def process(self):
        # 打开Shapefile
        import fiona
        with fiona.open('gs://bucket/2022_data.gpkg', 'r') as input_file:
            parsed_data = [{"id": json.loads(json.dumps(feature['properties']))['Id'],
                            "name": json.loads(json.dumps(feature['properties']))['Name']} for feature in input_file]
        return parsed_data

def run(argv=None, save_main_session=True):
    pipeline_args = [
      '--project={0}'.format(project),
      '--job_name=loadstructu',
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--region=us-yyyy1',
      '--runner=DataflowRunner',
      '--subnetwork=https://www.googleapis.com/compute/v1/projects/proj-dev/regions/us-yyyy1/subnetworks/xxxxxx-dev-subnet'
    ]

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    p1 = beam.Pipeline(options=pipeline_options)

    data_loading = (
        p1
        | 'ReadData' >> beam.io.ReadFromText(Transaction())   
        )

#---------------------类型=加载----------------------------------------------------------------------------------------------------------------------
    result = (
    data_loading
        | 'Write-Audit' >> beam.io.WriteToBigQuery(
                                                    table='structdata',
                                                    dataset=dataset_id,
                                                    project=project,
                                                    schema=table_schema_Audit,
                                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
                                                    ))

    result = p1.run()
    result.wait_until_finish()

if __name__ == '__main__':
  run()

它弹出以下错误:

~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, validate, skip_header_lines, delimiter, escapechar, **kwargs)
772         skip_header_lines=skip_header_lines,
773         delimiter=delimiter,
---> 774         escapechar=escapechar)
775 
776   def expand(self, pvalue):
~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, buffer_size, validate, skip_header_lines, header_processor_fns, delimiter, escapechar)
133         min_bundle_size,
134         compression_type=compression_type,
---> 135         validate=validate)
136 
137     self._strip_trailing_newlines = strip_trailing_newlines
~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/filebasedsource.py in __init__(self, file_pattern, min_bundle_size, compression_type, splittable, validate)
110           '%s: file_pattern must be of type string'
111           ' or ValueProvider; got %r instead' %
--> 112           (self.__class__.__name__, file_pattern))
113 
114     if isinstance(file_pattern, str):
TypeError: _TextSource: file_pattern must be of type string or ValueProvider; got <__main__.Transaction object at 0x7fcc79ffc250> instead
英文:

I have a requirement to parse and load gpgk extension file to Bigquery table through apache beam (Dataflow runner). I could see that beam has feature called Geobeam, but i couldn't see reference for loading of gpgk files.

Q1: Which Beam library can help me to load geopakage file?
Q2: As an alternate solution i am trying to read geopakage file as Binary file and over ParDo can transform it and get it loaded. How we can read Binary file over Apache beam?

Does any one has experience over the same and share experience.

Update: Alternate solution
I have a requirement to read Binary Coded file through Python Apache beam (Dataflow as a runner).

I am trying to replicate following example Reading from a new Source over my code to read Binary files.

My code looks is given below, can you help me where its going wrong:-

#------------Import Lib-----------------------#
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import apache_beam as beam, os, sys, argparse
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io import iobase
#------------Set up BQ parameters-----------------------#
project = &#39;proj-dev&#39;
dataset_id = &#39;sandbox&#39;
table_schema_Audit = (&#39;id:STRING,name:STRING&#39;)
input = &#39;gs://bucket/beers.csv&#39;
BUCKET = &#39;bucket&#39;
#-------------Splitting Of Records----------------------#
class Transaction(iobase.BoundedSource):
def process(self):
# Open the Shapefile
import fiona
with fiona.open(&#39;gs://bucket/2022_data.gpkg&#39;, &#39;r&#39;) as input_file:
parsed_data = [[{&quot;id&quot;: json.loads(json.dumps(feature[&#39;properties&#39;]))[&#39;Id&#39;],
&quot;name&quot;: json.loads(json.dumps(feature[&#39;properties&#39;]))[&#39;Name&#39;]}] for feature in input_file]
return parsed_data
def run(argv=None, save_main_session=True):
pipeline_args = [
&#39;--project={0}&#39;.format(project),
&#39;--job_name=loadstructu&#39;,
&#39;--staging_location=gs://{0}/staging/&#39;.format(BUCKET),
&#39;--temp_location=gs://{0}/staging/&#39;.format(BUCKET),
&#39;--region=us-yyyy1&#39;,
&#39;--runner=DataflowRunner&#39;,
&#39;--subnetwork=https://www.googleapis.com/compute/v1/projects/proj-dev/regions/us-yyyy1/subnetworks/xxxxxx-dev-subnet&#39;
]
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
p1 = beam.Pipeline(options=pipeline_options)
data_loading = (
p1
| &#39;ReadData&#39; &gt;&gt; beam.io.ReadFromText(Transaction())   
)
#---------------------Type = load----------------------------------------------------------------------------------------------------------------------
result = (
data_loading
| &#39;Write-Audit&#39; &gt;&gt; beam.io.WriteToBigQuery(
table=&#39;structdata&#39;,
dataset=dataset_id,
project=project,
schema=table_schema_Audit,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
))
result = p1.run()
result.wait_until_finish()
if __name__ == &#39;__main__&#39;:
run()

It popping error as given below:-

~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, validate, skip_header_lines, delimiter, escapechar, **kwargs)
772         skip_header_lines=skip_header_lines,
773         delimiter=delimiter,
--&gt; 774         escapechar=escapechar)
775 
776   def expand(self, pvalue):
~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, buffer_size, validate, skip_header_lines, header_processor_fns, delimiter, escapechar)
133         min_bundle_size,
134         compression_type=compression_type,
--&gt; 135         validate=validate)
136 
137     self._strip_trailing_newlines = strip_trailing_newlines
~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/filebasedsource.py in __init__(self, file_pattern, min_bundle_size, compression_type, splittable, validate)
110           &#39;%s: file_pattern must be of type string&#39;
111           &#39; or ValueProvider; got %r instead&#39; %
--&gt; 112           (self.__class__.__name__, file_pattern))
113 
114     if isinstance(file_pattern, str):
TypeError: _TextSource: file_pattern must be of type string or ValueProvider; got &lt;__main__.Transaction object at 0x7fcc79ffc250&gt; instead

答案1

得分: 1

#-------------将二进制文件转换为文本文件----------------------#
def file_create():
        # 打开Shapefile
        import fiona, json
        f = open("data.txt", "a")
        with fiona.open('gs://bucket/2022_data.gpkg', 'r') as input_file:
            for feature in input_file:
                row = json.loads(json.dumps(feature['properties']))['Id'] + ';' + json.loads(json.dumps(feature['properties']))['Name'] + ';' + str(shape(json.loads(json.dumps(feature['geometry'])))) +'\n'
                f.write(row)
        f.close()

#---------------将新创建的文件上传到GCS-------------------------
def upload():
    client = storage.Client()
    bucket = client.bucket('bucket')
    blob = bucket.blob('data.txt')
    blob.upload_from_filename('data.txt')
英文:

solution which worked for me is by adding following 2 additional functions before running beam pipelines :-

#-------------convert binary file to a txt file----------------------#
def file_create():
# Open the Shapefile
import fiona, json
f = open(&quot;data.txt&quot;, &quot;a&quot;)
with fiona.open(&#39;gs://bucket/2022_data.gpkg&#39;, &#39;r&#39;) as input_file:
for feature in input_file:
row = json.loads(json.dumps(feature[&#39;properties&#39;]))[&#39;Id&#39;] + &#39;;&#39; + json.loads(json.dumps(feature[&#39;properties&#39;]))[&#39;Name&#39;] + &#39;;&#39; + str(shape(json.loads(json.dumps(feature[&#39;geometry&#39;])))) +&#39;\n&#39;
f.write(row)
f.close()
#---------------Upload newly created file to GCS-------------------------
def upload():
client = storage.Client()
bucket = client.bucket(&#39;bucket&#39;)
blob = bucket.blob(&#39;data.txt&#39;)
blob.upload_from_filename(&#39;data.txt&#39;)

huangapple
  • 本文由 发表于 2023年1月9日 16:45:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/75054857.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定