英文:
Trying to Load kafka byte data into Bigquery
问题
以下是代码部分的翻译:
public class AvroSerializer {
public static final byte MAGIC_BYTE = 0x0;
public void serialize() throws Exception {
Schema schema;
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
schema =
new Schema.Parser()
.parse(
new File(
"${path to schema.avcs}"));
byte[] kafkaTopicData =
FileUtils.readFileToByteArray(
new File(
"${path to kafka topic dump using kafka console consumer}"));
// MAGIC_BYTE | schemaId-bytes | avro_payload
out.write(MAGIC_BYTE);
out.write(schema.toString().getBytes());
out.write("${output file}");
FileUtils.writeByteArrayToFile(
new File(
""),
out.toByteArray());
} catch (Exception ex) {
throw new Exception(ex);
}
}
}
尝试读取数据的部分:
public void decryptAvro() {
Schema schema = null;
try {
schema =
new Schema.Parser()
.parse(
new File(
"${path to schema.avsc}"));
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
DataFileReader<GenericRecord> dataFileReader =
new DataFileReader<GenericRecord>(
new File(
"${path to output file created in earlier step}"),
datumReader);
GenericRecord hcpClaims = null;
while (dataFileReader.hasNext()) {
hcpClaims = dataFileReader.next(hcpClaims);
System.out.println(hcpClaims);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
错误信息如下:
org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
at org.apache.avro.file.DataFileStream.validateMagic(DataFileStream.java:115)
at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:123)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:143)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:113)
at com.optum.clm.avroutils.AvroReader.decryptAvro(AvroReader.java:22)
希望这有助于您理解代码和错误信息。
英文:
I am trying to load some few records (100 at max) from a kafka topic into bigquery .
The data stored in kafka topic is in bytes , and I have the schema avaialble in avsc file
steps I have taken are
1 consume the kafka topic for 100 messages using kafka console consumer and stored in a file
2 created a code to create a avro file , avro file consists of magic marker | schema | records
3 create a test utility to read this avro data
I am seeing invalid avro exception in my last step i have the code pasted below for creating the avro file and reading the avro file
public class AvroSerializer {
public static final byte MAGIC_BYTE = 0x0;
public void serialize() throws Exception {
Schema schema;
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
schema =
new Schema.Parser()
.parse(
new File(
"${path to schema.avcs}"));
byte[] kafakTopicData =
FileUtils.readFileToByteArray(
new File(
"${path to kafka topic dump using kafka console consumer}"));
// MAGIC_BYTE | schemaId-bytes | avro_payload
out.write(MAGIC_BYTE);
out.write(schema.toString().getBytes());
out.write("${output file}");
FileUtils.writeByteArrayToFile(
new File(
""),
out.toByteArray());
} catch (Exception ex) {
throw new Exception(ex);
}
}
}
Trying to read the data
public void decryptAvro() {
Schema schema = null;
try {
schema =
new Schema.Parser()
.parse(
new File(
"${path to schema.avsc}"));
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
DataFileReader<GenericRecord> dataFileReader =
new DataFileReader<GenericRecord>(
new File(
"${path to output file created in earlier step}"),
datumReader);
GenericRecord hcpClaims = null;
while (dataFileReader.hasNext()) {
hcpClaims = dataFileReader.next(hcpClaims);
System.out.println(hcpClaims);
}
} catch (Exception e) {
e.printStackTrace();
}
}
error pasted below
org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
at org.apache.avro.file.DataFileStream.validateMagic(DataFileStream.java:115)
at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:123)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:143)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:113)
at com.optum.clm.avroutils.AvroReader.decryptAvro(AvroReader.java:22)
答案1
得分: 1
Avro文件不需要"魔法字节"或事件的模式标识符。Avro文件在标头中有一个模式,然后有许多匹配该模式的记录。这是无法从kafka-console-consumer
生成的...要从Java实现这一点,您需要从普通的KafkaConsumer
中缓冲Avro对象实例,然后创建一个DataFileWriter
。
此外,已经存在一个BigQuery Kafka Sink连接器,因此您不需要将Avro文件存储在磁盘上。
英文:
Avro files don't need a "magic byte", or a schema id per event. Avro files have one schema in a header, then many records within matching that schema. This is not possible to generate from kafka-console-consumer
... To do this from Java, you would need to buffer Avro object instances from a plain KafkaConsumer
, then create a DataFileWriter
.
Besides, there already exists a BigQuery Kafka Sink connector, so you don't need Avro files on disk.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论