尝试将 Kafka 字节数据加载到 Bigquery 中

huangapple go评论110阅读模式
英文:

Trying to Load kafka byte data into Bigquery

问题

以下是代码部分的翻译:

  1. public class AvroSerializer {
  2. public static final byte MAGIC_BYTE = 0x0;
  3. public void serialize() throws Exception {
  4. Schema schema;
  5. ByteArrayOutputStream out = new ByteArrayOutputStream();
  6. try {
  7. schema =
  8. new Schema.Parser()
  9. .parse(
  10. new File(
  11. "${path to schema.avcs}"));
  12. byte[] kafkaTopicData =
  13. FileUtils.readFileToByteArray(
  14. new File(
  15. "${path to kafka topic dump using kafka console consumer}"));
  16. // MAGIC_BYTE | schemaId-bytes | avro_payload
  17. out.write(MAGIC_BYTE);
  18. out.write(schema.toString().getBytes());
  19. out.write("${output file}");
  20. FileUtils.writeByteArrayToFile(
  21. new File(
  22. ""),
  23. out.toByteArray());
  24. } catch (Exception ex) {
  25. throw new Exception(ex);
  26. }
  27. }
  28. }

尝试读取数据的部分:

  1. public void decryptAvro() {
  2. Schema schema = null;
  3. try {
  4. schema =
  5. new Schema.Parser()
  6. .parse(
  7. new File(
  8. "${path to schema.avsc}"));
  9. DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
  10. DataFileReader<GenericRecord> dataFileReader =
  11. new DataFileReader<GenericRecord>(
  12. new File(
  13. "${path to output file created in earlier step}"),
  14. datumReader);
  15. GenericRecord hcpClaims = null;
  16. while (dataFileReader.hasNext()) {
  17. hcpClaims = dataFileReader.next(hcpClaims);
  18. System.out.println(hcpClaims);
  19. }
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. }

错误信息如下:

  1. org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
  2. at org.apache.avro.file.DataFileStream.validateMagic(DataFileStream.java:115)
  3. at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:123)
  4. at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:143)
  5. at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:113)
  6. at com.optum.clm.avroutils.AvroReader.decryptAvro(AvroReader.java:22)

希望这有助于您理解代码和错误信息。

英文:

I am trying to load some few records (100 at max) from a kafka topic into bigquery .

The data stored in kafka topic is in bytes , and I have the schema avaialble in avsc file

steps I have taken are
1 consume the kafka topic for 100 messages using kafka console consumer and stored in a file
2 created a code to create a avro file , avro file consists of magic marker | schema | records
3 create a test utility to read this avro data

I am seeing invalid avro exception in my last step i have the code pasted below for creating the avro file and reading the avro file

  1. public class AvroSerializer {
  2. public static final byte MAGIC_BYTE = 0x0;
  3. public void serialize() throws Exception {
  4. Schema schema;
  5. ByteArrayOutputStream out = new ByteArrayOutputStream();
  6. try {
  7. schema =
  8. new Schema.Parser()
  9. .parse(
  10. new File(
  11. &quot;${path to schema.avcs}&quot;));
  12. byte[] kafakTopicData =
  13. FileUtils.readFileToByteArray(
  14. new File(
  15. &quot;${path to kafka topic dump using kafka console consumer}&quot;));
  16. // MAGIC_BYTE | schemaId-bytes | avro_payload
  17. out.write(MAGIC_BYTE);
  18. out.write(schema.toString().getBytes());
  19. out.write(&quot;${output file}&quot;);
  20. FileUtils.writeByteArrayToFile(
  21. new File(
  22. &quot;&quot;),
  23. out.toByteArray());
  24. } catch (Exception ex) {
  25. throw new Exception(ex);
  26. }
  27. }
  28. }

Trying to read the data

  1. public void decryptAvro() {
  2. Schema schema = null;
  3. try {
  4. schema =
  5. new Schema.Parser()
  6. .parse(
  7. new File(
  8. &quot;${path to schema.avsc}&quot;));
  9. DatumReader&lt;GenericRecord&gt; datumReader = new GenericDatumReader&lt;&gt;(schema);
  10. DataFileReader&lt;GenericRecord&gt; dataFileReader =
  11. new DataFileReader&lt;GenericRecord&gt;(
  12. new File(
  13. &quot;${path to output file created in earlier step}&quot;),
  14. datumReader);
  15. GenericRecord hcpClaims = null;
  16. while (dataFileReader.hasNext()) {
  17. hcpClaims = dataFileReader.next(hcpClaims);
  18. System.out.println(hcpClaims);
  19. }
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. }
  23. }

error pasted below

  1. org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
  2. at org.apache.avro.file.DataFileStream.validateMagic(DataFileStream.java:115)
  3. at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:123)
  4. at org.apache.avro.file.DataFileReader.&lt;init&gt;(DataFileReader.java:143)
  5. at org.apache.avro.file.DataFileReader.&lt;init&gt;(DataFileReader.java:113)
  6. at com.optum.clm.avroutils.AvroReader.decryptAvro(AvroReader.java:22)

答案1

得分: 1

Avro文件不需要"魔法字节"或事件的模式标识符。Avro文件在标头中有一个模式,然后有许多匹配该模式的记录。这是无法从kafka-console-consumer生成的...要从Java实现这一点,您需要从普通的KafkaConsumer中缓冲Avro对象实例,然后创建一个DataFileWriter

此外,已经存在一个BigQuery Kafka Sink连接器,因此您不需要将Avro文件存储在磁盘上。

英文:

Avro files don't need a "magic byte", or a schema id per event. Avro files have one schema in a header, then many records within matching that schema. This is not possible to generate from kafka-console-consumer... To do this from Java, you would need to buffer Avro object instances from a plain KafkaConsumer, then create a DataFileWriter.

Besides, there already exists a BigQuery Kafka Sink connector, so you don't need Avro files on disk.

huangapple
  • 本文由 发表于 2023年3月7日 09:54:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/75657391.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定