在AWS Glue中写入BigQuery时出现空指针异常。

huangapple go评论61阅读模式
英文:

NullPointerException when writing to BigQuery in AWS Glue

问题

我正在从AWS Aurora设置ETL管道到BigQuery,并使用Glue来完成。目标是能够使用任意SQL作为输入,因此我将其导出到S3作为Parquet格式,然后使用Glue Crawler进行爬取,使用Glue Catalog作为ETL作业的数据源。这些部分都正常工作,我可以在ETL作业内部打印出源表的数据和模式。

然而,在写入BigQuery时,我遇到了一个NullPointerException:

2023-06-15 16:28:59,300 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Error from Python:Traceback (most recent call last):
...
java.lang.NullPointerException
...

这个异常在Google上很少出现,似乎不太具体。我尝试追踪问题,但我不懂Java,所以无济于事。该连接器仅在版本0.24.2中可用,因此我无法尝试更新版本,并且我找不到如何激活/使用0.22.0-2连接器的方法。

源表已经非常简单 - 为了测试,我创建了一个只包含单个列(id = bigint,不可为空)和1000条记录的转储,并将其爬取为数据源。表中没有分区键或其他特殊内容。因此,源数据问题几乎可以排除。我还尝试过不使用表,使用空表以及在BigQuery中将表定义为id = integer - 结果都一样。

奇怪的是,我之前在credentials.json中有一个错误,仍然得到了完全相同的错误。 (现在它是正确的,经过gcloud service-account activate验证。)因此,它与目标和实际写入无关。

credentials.json存在于容器中,内容正确,S3中的日志和临时路径被写入(根据我目前的了解,内容不是很有用),没有审计错误,因此IAM问题不太可能。

我找到了这篇指南:https://www.m3tech.blog/entry/load-to-bq-by-aws-glue 并按照其中的步骤进行操作,这对我非常有帮助,因为这是唯一一个我找到的详细流程说明。(尽管其中存在一些错误,例如在不包括项目ID的情况下指定表规范不起作用。)其他一切都非常零散,不完整,并且信息相互矛盾。官方“文档”在这里:https://aws.amazon.com/marketplace/pp/prodview-sqnd4gn5fykx6#pdp-usage

我尝试将credentials.json添加到Python代码中的Spark上下文和--conf参数中(通过将值设置为key1=val --conf key2=val...,如SO上其他地方建议的那样),但两者都产生了相同的结果。

我也对PySpark、Spark和BigQuery一无所知 - 我只是一个被交给了这个令人讨厌的任务的AWS管理员。是否有人可以建议尝试调试此问题的方法,或者也许有关原因的想法?

英文:

I'm setting up an ETL pipeline from AWS Aurora to BigQuery and am using Glue to do so. The goal is to be able to use arbitrary SQL as input, so I'm dumping to S3 as Parquet, crawling that with a Glue Crawler and using the Glue Catalog as a source for the ETL job. These parts work, and I can print out the data and schema of the source table inside the ETL job.

However, when writing to BigQuery, I get a NullPointerException:

2023-06-15 16:28:59,300 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Error from Python:Traceback (most recent call last):
  File "/tmp/simple.py", line 25, in <module>
    target = glueContext.write_dynamic_frame.from_options(frame = source, connection_type = "marketplace.spark", connection_options = 
{
    "connectionName": "bigquery-bigquery-main",
    "dataset": "datawarehouse",
    "parentProject": "datawarehouse-123123",
    "table": "simple",
    "temporaryGcsBucket": "bucket-name"
}
, transformation_ctx = "target")
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 802, in from_options
    format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 331, in write_dynamic_frame_from_options
    format, format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 354, in write_from_options
    return sink.write(frame_or_dfc)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 39, in write
    return self.writeFrame(dynamic_frame_or_dfc, info)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 32, in writeFrame
    return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o102.pyWriteDynamicFrame.
: java.lang.RuntimeException: Failed to write to BigQuery
	at com.google.cloud.spark.bigquery.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.scala:69)
	at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.insert(BigQueryInsertableRelation.scala:43)
	at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:111)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at com.amazonaws.services.glue.marketplace.connector.SparkCustomDataSink.writeDynamicFrame(CustomDataSink.scala:45)
	at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:71)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NullPointerException
	at com.google.cloud.bigquery.connector.common.BigQueryClient.loadDataIntoTable(BigQueryClient.java:532)
	at com.google.cloud.spark.bigquery.BigQueryWriteHelper.loadDataToBigQuery(BigQueryWriteHelper.scala:87)
	at com.google.cloud.spark.bigquery.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.scala:66)
	... 42 more

This exception pops up very rarely on Google and seems unspecific. I tried tracking it down but I don't know Java so this is a futile endeavour. The connector is only available as 0.24.2 so I can't try newer versions, and I couldn't find out how to activate / use the 0.22.0-2 connector.

The source table is already as simple as can be - for testing, I created a dump with a single column (id = bigint, not nullable) and 1000 records and crawled it as a data source. There are no partition keys or anything else special about the table. So any source data issues are pretty much out. I also tried without a table, with an empty table and with a table defined as id = integer in BigQuery - same results.

What's odd is that I had an error in my credentials.json earlier and still got the exact same error. (Now it's correct, verified with gcloud service-account activate.) So it can't have something to do with the target and the actual write, either.

The credentials.json is present in the container with the correct content, the log and temp paths in S3 get written to (content not useful as far as I can tell) and there are no audit errors, so an IAM issue is unlikely.

I found this guide: https://www.m3tech.blog/entry/load-to-bq-by-aws-glue and followed it along, which was incredibly helpful because it's the only thing I found that lays out the complete process. (Although there are some errors in it, e. g. the table specification while leaving out the project ID doesn't work.) Everything else is extremely spread out, incomplete and contradictory information. Official "documentation" is here: https://aws.amazon.com/marketplace/pp/prodview-sqnd4gn5fykx6#pdp-usage

I tried adding the credentials.json to the Spark context both in the Python code and in the --conf parameter (by setting the value to key1=val --conf key2=val... as suggested elsewhere on SO), same results for either.

I also don't know anything about PySpark, Spark or BigQuery - I'm just an AWS admin who was given this godforsaken task. Can anyone suggest methods of trying to debug this, or maybe ideas what the reason could be?

答案1

得分: 1

已解决。最终从Google Cloud团队获得了反馈 - 问题是GCS存储桶和BigQuery数据集位于不同的地区。这表明异常确实可能意味着“任何事情都有问题”,因为相同的错误发生在许多不同的上下文中,其中大多数可能实际上并未工作。

作为上面链接的附注:无需对源代码进行模板化,使用--conf = key=val --conf key2=val...方法确实有效。这意味着您可以在简单的设置中使用glue_script数据源。

英文:

Solved. Finally got feedback from the Google Cloud team - the issue was that the GCS bucket and the BigQuery dataset were in different regions. Goes to show that the exception can really mean "anything is wrong", given that the same error occurred with a variety of different contexts, most of which probably didn't actually work.

And as an addendum to the guide linked above: There is no need for templating the source code, using the --conf = key=val --conf key2=val... approach does work. This means you can use the glue_script datasource for simple setups like this.

huangapple
  • 本文由 发表于 2023年6月16日 01:53:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/76484318.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定