英文:
How to send Avro schema to GCP BigQuery using Java?
问题
我正在尝试使用Java 11和Spring 2将Avro模式发送到GCP BigQuery。我已经研究了很多信息,但是我没有找到一个示例,说明如何以以下格式发送Avro文件模式:
{
"namespace": "example.gcp",
"type": "record",
"name": "Client",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "phone", "type": ["string", "null"]},
{"name": "address", "type": ["string", "null"]}
]
}
我可以使用以下代码片段将其作为二进制文件发送,文件格式为.avro
:
@PostMapping("/uploadFileAvro")
public ModelAndView handleFileUpload(
@RequestParam("file") MultipartFile file, @RequestParam("tableName") String tableName)
throws IOException {
ListenableFuture<Job> loadJob = this.bigQueryTemplate.writeDataToTable(
tableName, file.getInputStream(), FormatOptions.avro());
return getResponse(loadJob, tableName);
}
并且可以使用以下代码将其作为.csv
文件格式发送:
@PostMapping("/uploadFileCSV")
public ModelAndView handleFileUploadCSV(
@RequestParam("file") MultipartFile file, @RequestParam("tableName") String tableName)
throws IOException {
ListenableFuture<Job> loadJob = this.bigQueryTemplate.writeDataToTable(
tableName, file.getInputStream(), FormatOptions.csv());
return getResponse(loadJob, tableName);
}
但是,当我尝试发送模式文件而不是二进制文件,并且文件格式为.avro
时,我收到以下错误:
java.util.concurrent.ExecutionException: org.springframework.cloud.gcp.bigquery.core.BigQueryException: Error while reading data, error message: The Apache Avro library failed to parse the header with the following error: Invalid data file. Magic does not match: gs://bigquery-prod-upload-us/prod-scotty-fa6aadb4-b3d6-40db-a39f-2025f1a99019
...
Caused by: org.springframework.cloud.gcp.bigquery.core.BigQueryException: Error while reading data, error message: The Apache Avro library failed to parse the header with the following error: Invalid data file. Magic does not match: gs://bigquery-prod-upload-us/prod-scotty-fa6aadb4-b3d6-40db-a39f-2025f1a99019
...
请问有人可以告诉我如何将Avro模式文件发送到GCP BigQuery吗?根据视频的理解,我可以以某种方式以编程方式实现这一点。
英文:
I'm trying to send avro schema to GCP BigQuery using Java 11 and Spring 2. I've researched a lot of information, but I didn't find an example how to send avro file schema in format like:
{"namespace": "example.gcp",
"type": "record",
"name": "Client",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "phone", "type": ["string", "null"]},
{"name": "address", "type": ["string", "null"]}
]
}
I can send as binary file in format .avro
using this snippet of code:
@PostMapping("/uploadFileAvro")
public ModelAndView handleFileUpload(
@RequestParam("file") MultipartFile file, @RequestParam("tableName") String tableName)
throws IOException {
ListenableFuture<Job> loadJob = this.bigQueryTemplate.writeDataToTable(
tableName, file.getInputStream(), FormatOptions.avro());
return getResponse(loadJob, tableName);
}
and in format .csv
file using code like this:
@PostMapping("/uploadFileCSV")
public ModelAndView handleFileUploadCSV(
@RequestParam("file") MultipartFile file, @RequestParam("tableName") String tableName)
throws IOException {
ListenableFuture<Job> loadJob = this.bigQueryTemplate.writeDataToTable(
tableName, file.getInputStream(), FormatOptions.csv());
return getResponse(loadJob, tableName);
}
but when I'm trying to send schema file not binary file in format .avro
, I'm getting the error:
java.util.concurrent.ExecutionException: org.springframework.cloud.gcp.bigquery.core.BigQueryException: Error while reading data, error message: The Apache Avro library failed to parse the header with the following error: Invalid data file. Magic does not match: gs://bigquery-prod-upload-us/prod-scotty-fa6aadb4-b3d6-40db-a39f-2025f1a99019
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:119)
at org.springframework.cloud.springbootbigqueryapp.controller.WebController.getResponse(WebController.java:107)
at org.springframework.cloud.springbootbigqueryapp.controller.WebController.handleFileUpload(WebController.java:63)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:652)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.cloud.gcp.bigquery.core.BigQueryException: Error while reading data, error message: The Apache Avro library failed to parse the header with the following error: Invalid data file. Magic does not match: gs://bigquery-prod-upload-us/prod-scotty-fa6aadb4-b3d6-40db-a39f-2025f1a99019
at org.springframework.cloud.gcp.bigquery.core.BigQueryTemplate.lambda$createJobFuture$0(BigQueryTemplate.java:170)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Please, can someone enlighten me how to send avro schema file to GCP BigQuery?
P.S. As I understand correctly from video, I can do that programmatically somehow.
答案1
得分: 1
BigQuery本身没有模式注册表的概念,因此您可能希望使用模式来创建表格或将数据加载到BigQuery中。
在创建表格的情况下,您需要将Avro模式转换为BigQuery模式。BigQuery不直接使用Avro模式表示。以下是使用Java创建具有显式模式的表格的示例:https://cloud.google.com/bigquery/docs/samples/bigquery-create-table#bigquery_create_table-java
在加载数据的情况下,无需单独传输模式数据。Avro OCF结构通常将模式作为标头的一部分,以及一组数据块。只需指定Avro文件的URI:https://cloud.google.com/bigquery/docs/samples/bigquery-load-table-gcs-avro#bigquery_load_table_gcs_avro-java
英文:
BigQuery doesn't have the concept of a schema registry itself, so you're likely going to want to use schema to either create a table, or load data into bigquery.
In the table creation case, you'd want to convert the avro schema to a bigquery schema. BigQuery doesn't use avro schema representations directly. Here's an example in java of creating tables with explicit schema: https://cloud.google.com/bigquery/docs/samples/bigquery-create-table#bigquery_create_table-java
In the load case, there's no need to transmit the schema independently of the data. The Avro OCF structure typically includes the schema as part of the header, plus a set of data blocks. Just specify the URI for the avro files: https://cloud.google.com/bigquery/docs/samples/bigquery-load-table-gcs-avro#bigquery_load_table_gcs_avro-java
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论