尝试将 Kafka 字节数据加载到 Bigquery 中

huangapple go评论70阅读模式
英文:

Trying to Load kafka byte data into Bigquery

问题

以下是代码部分的翻译:

public class AvroSerializer {
  public static final byte MAGIC_BYTE = 0x0;

  public void serialize() throws Exception {
    Schema schema;
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    try {
      schema =
          new Schema.Parser()
              .parse(
                  new File(
                      "${path to schema.avcs}"));
      byte[] kafkaTopicData =
          FileUtils.readFileToByteArray(
              new File(
                  "${path to kafka topic dump using kafka console consumer}"));
      // MAGIC_BYTE | schemaId-bytes | avro_payload
      out.write(MAGIC_BYTE);
      out.write(schema.toString().getBytes());
      out.write("${output file}");
      FileUtils.writeByteArrayToFile(
          new File(
              ""),
          out.toByteArray());
    } catch (Exception ex) {
      throw new Exception(ex);
    }
  }
}

尝试读取数据的部分:

public void decryptAvro() {
    Schema schema = null;
    try {
      schema =
          new Schema.Parser()
              .parse(
                  new File(
                      "${path to schema.avsc}"));
      DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
      DataFileReader<GenericRecord> dataFileReader =
          new DataFileReader<GenericRecord>(
              new File(
                  "${path to output file created in earlier step}"),
              datumReader);
      GenericRecord hcpClaims = null;

      while (dataFileReader.hasNext()) {
        hcpClaims = dataFileReader.next(hcpClaims);
        System.out.println(hcpClaims);
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

错误信息如下:

org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
	at org.apache.avro.file.DataFileStream.validateMagic(DataFileStream.java:115)
	at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:123)
	at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:143)
	at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:113)
	at com.optum.clm.avroutils.AvroReader.decryptAvro(AvroReader.java:22)

希望这有助于您理解代码和错误信息。

英文:

I am trying to load some few records (100 at max) from a kafka topic into bigquery .

The data stored in kafka topic is in bytes , and I have the schema avaialble in avsc file

steps I have taken are
1 consume the kafka topic for 100 messages using kafka console consumer and stored in a file
2 created a code to create a avro file , avro file consists of magic marker | schema | records
3 create a test utility to read this avro data

I am seeing invalid avro exception in my last step i have the code pasted below for creating the avro file and reading the avro file

public class AvroSerializer {
  public static final byte MAGIC_BYTE = 0x0;

  public void serialize() throws Exception {
    Schema schema;
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    try {
      schema =
          new Schema.Parser()
              .parse(
                  new File(
                      &quot;${path to schema.avcs}&quot;));
      byte[] kafakTopicData =
          FileUtils.readFileToByteArray(
              new File(
                  &quot;${path to kafka topic dump using kafka console consumer}&quot;));
      // MAGIC_BYTE | schemaId-bytes | avro_payload
      out.write(MAGIC_BYTE);
      out.write(schema.toString().getBytes());
      out.write(&quot;${output file}&quot;);
      FileUtils.writeByteArrayToFile(
          new File(
              &quot;&quot;),
          out.toByteArray());
    } catch (Exception ex) {
      throw new Exception(ex);
    }
  }
}

Trying to read the data

 public void decryptAvro() {
    Schema schema = null;
    try {
      schema =
          new Schema.Parser()
              .parse(
                  new File(
                      &quot;${path to schema.avsc}&quot;));
      DatumReader&lt;GenericRecord&gt; datumReader = new GenericDatumReader&lt;&gt;(schema);
      DataFileReader&lt;GenericRecord&gt; dataFileReader =
          new DataFileReader&lt;GenericRecord&gt;(
              new File(
                  &quot;${path to output file created in earlier step}&quot;),
              datumReader);
      GenericRecord hcpClaims = null;

      while (dataFileReader.hasNext()) {
        hcpClaims = dataFileReader.next(hcpClaims);
        System.out.println(hcpClaims);
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

error pasted below

org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
	at org.apache.avro.file.DataFileStream.validateMagic(DataFileStream.java:115)
	at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:123)
	at org.apache.avro.file.DataFileReader.&lt;init&gt;(DataFileReader.java:143)
	at org.apache.avro.file.DataFileReader.&lt;init&gt;(DataFileReader.java:113)
	at com.optum.clm.avroutils.AvroReader.decryptAvro(AvroReader.java:22)

答案1

得分: 1

Avro文件不需要"魔法字节"或事件的模式标识符。Avro文件在标头中有一个模式,然后有许多匹配该模式的记录。这是无法从kafka-console-consumer生成的...要从Java实现这一点,您需要从普通的KafkaConsumer中缓冲Avro对象实例,然后创建一个DataFileWriter

此外,已经存在一个BigQuery Kafka Sink连接器,因此您不需要将Avro文件存储在磁盘上。

英文:

Avro files don't need a "magic byte", or a schema id per event. Avro files have one schema in a header, then many records within matching that schema. This is not possible to generate from kafka-console-consumer... To do this from Java, you would need to buffer Avro object instances from a plain KafkaConsumer, then create a DataFileWriter.

Besides, there already exists a BigQuery Kafka Sink connector, so you don't need Avro files on disk.

huangapple
  • 本文由 发表于 2023年3月7日 09:54:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/75657391.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定