英文:
Trying to Load kafka byte data into Bigquery
问题
以下是代码部分的翻译:
public class AvroSerializer {
  public static final byte MAGIC_BYTE = 0x0;
  public void serialize() throws Exception {
    Schema schema;
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    try {
      schema =
          new Schema.Parser()
              .parse(
                  new File(
                      "${path to schema.avcs}"));
      byte[] kafkaTopicData =
          FileUtils.readFileToByteArray(
              new File(
                  "${path to kafka topic dump using kafka console consumer}"));
      // MAGIC_BYTE | schemaId-bytes | avro_payload
      out.write(MAGIC_BYTE);
      out.write(schema.toString().getBytes());
      out.write("${output file}");
      FileUtils.writeByteArrayToFile(
          new File(
              ""),
          out.toByteArray());
    } catch (Exception ex) {
      throw new Exception(ex);
    }
  }
}
尝试读取数据的部分:
public void decryptAvro() {
    Schema schema = null;
    try {
      schema =
          new Schema.Parser()
              .parse(
                  new File(
                      "${path to schema.avsc}"));
      DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
      DataFileReader<GenericRecord> dataFileReader =
          new DataFileReader<GenericRecord>(
              new File(
                  "${path to output file created in earlier step}"),
              datumReader);
      GenericRecord hcpClaims = null;
      while (dataFileReader.hasNext()) {
        hcpClaims = dataFileReader.next(hcpClaims);
        System.out.println(hcpClaims);
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
错误信息如下:
org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
	at org.apache.avro.file.DataFileStream.validateMagic(DataFileStream.java:115)
	at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:123)
	at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:143)
	at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:113)
	at com.optum.clm.avroutils.AvroReader.decryptAvro(AvroReader.java:22)
希望这有助于您理解代码和错误信息。
英文:
I am trying to load some few records (100 at max) from a kafka topic into bigquery .
The data stored in kafka topic is in bytes , and I have the schema avaialble in avsc file
steps I have taken are
1 consume the kafka topic for 100 messages using kafka console consumer and stored in a file
2 created a code to create a avro file , avro file consists of magic marker | schema | records
3 create a test utility to read this avro data
I am seeing invalid avro exception in my last step i have the code pasted below for creating the avro file and reading the avro file
public class AvroSerializer {
  public static final byte MAGIC_BYTE = 0x0;
  public void serialize() throws Exception {
    Schema schema;
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    try {
      schema =
          new Schema.Parser()
              .parse(
                  new File(
                      "${path to schema.avcs}"));
      byte[] kafakTopicData =
          FileUtils.readFileToByteArray(
              new File(
                  "${path to kafka topic dump using kafka console consumer}"));
      // MAGIC_BYTE | schemaId-bytes | avro_payload
      out.write(MAGIC_BYTE);
      out.write(schema.toString().getBytes());
      out.write("${output file}");
      FileUtils.writeByteArrayToFile(
          new File(
              ""),
          out.toByteArray());
    } catch (Exception ex) {
      throw new Exception(ex);
    }
  }
}
Trying to read the data
 public void decryptAvro() {
    Schema schema = null;
    try {
      schema =
          new Schema.Parser()
              .parse(
                  new File(
                      "${path to schema.avsc}"));
      DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
      DataFileReader<GenericRecord> dataFileReader =
          new DataFileReader<GenericRecord>(
              new File(
                  "${path to output file created in earlier step}"),
              datumReader);
      GenericRecord hcpClaims = null;
      while (dataFileReader.hasNext()) {
        hcpClaims = dataFileReader.next(hcpClaims);
        System.out.println(hcpClaims);
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
error pasted below
org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
	at org.apache.avro.file.DataFileStream.validateMagic(DataFileStream.java:115)
	at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:123)
	at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:143)
	at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:113)
	at com.optum.clm.avroutils.AvroReader.decryptAvro(AvroReader.java:22)
答案1
得分: 1
Avro文件不需要"魔法字节"或事件的模式标识符。Avro文件在标头中有一个模式,然后有许多匹配该模式的记录。这是无法从kafka-console-consumer生成的...要从Java实现这一点,您需要从普通的KafkaConsumer中缓冲Avro对象实例,然后创建一个DataFileWriter。
此外,已经存在一个BigQuery Kafka Sink连接器,因此您不需要将Avro文件存储在磁盘上。
英文:
Avro files don't need a "magic byte", or a schema id per event. Avro files have one schema in a header, then many records within matching that schema. This is not possible to generate from kafka-console-consumer... To do this from Java, you would need to buffer Avro object instances from a plain KafkaConsumer, then create a DataFileWriter.
Besides, there already exists a BigQuery Kafka Sink connector, so you don't need Avro files on disk.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论