英文:
Writing Parquet/Avro GenericRecord to JSON while maintaining LogicalTypes
问题
我正在尝试将包含逻辑类型的Parquet记录写入JSON。我通过AvroParquetReader
执行此操作,它给我一个Avro GenericRecord
:
GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
try (ParquetReader<GenericRecord> parquetReader =
AvroParquetReader.<GenericRecord>builder(new LocalInputFile(this.path))
.withDataModel(GenericData.get())
.build()) {
GenericRecord record = parquetReader.read();
record.toString();
}
record.toString()
生成:
{"universe_member_id": 94639, "member_from_dt": 2001-08-31T00:00:00Z, "member_to_dt": 2200-01-01T00:00:00Z}
请注意,这是无效的JSON - 日期已根据其LogicalType
正确转换,但未用引号括起来。
所以我尝试了 JsonEncoder
:
GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); //等等
OutputStream stringOutputStream = new StringOutputStream();
try (ParquetReader<GenericRecord> parquetReader =
AvroParquetReader.<GenericRecord>builder(new LocalInputFile(this.path))
.withDataModel(GenericData.get())
.build()) {
GenericRecord record = parquetReader.read();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), stringOutputStream);
writer.write(record, encoder);
encoder.flush();
}
但是这并不会转换日期字段,并将数据类型嵌入到每条记录中:
{"universe_member_id":{"long":94639},"member_from_dt":{"long":999216000000000},"member_to_dt":{"long":7258118400000000}}
我期望的输出是:
{"universe_member_id": 94639, "member_from_dt": "2001-08-31T00:00:00Z", "member_to_dt": "2200-01-01T00:00:00Z"}
如何正确将GenericRecord
写入JSON?
英文:
I am trying to write some Parquet records that contain LogicalTypes to JSON. I do this via AvroParquetReader
, which gives me an Avro GenericRecord
:
GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
try (ParquetReader<GenericRecord> parquetReader =
AvroParquetReader.<GenericRecord>builder(new LocalInputFile(this.path))
.withDataModel(GenericData.get())
.build()) {
GenericRecord record = parquetReader.read();
record.toString();
}
record.toString()
produces:
{"universe_member_id": 94639, "member_from_dt": 2001-08-31T00:00:00Z, "member_to_dt": 2200-01-01T00:00:00Z}
Notice that this is invalid JSON - the dates are correctly converted as per their LogicalType
, but are not surrounded by quotes.
So instead I tried the JsonEncoder
:
GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); //etc
OutputStream stringOutputStream = new StringOutputStream();
try (ParquetReader<GenericRecord> parquetReader =
AvroParquetReader.<GenericRecord>builder(new LocalInputFile(this.path))
.withDataModel(GenericData.get())
.build()) {
GenericRecord record = parquetReader.read();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), stringOutputStream);
writer.write(record, encoder);
encoder.flush();
}
but this doesn't convert the date fields at all and bakes the datatype into every record:
{"universe_member_id":{"long":94639},"member_from_dt":{"long":999216000000000},"member_to_dt":{"long":7258118400000000}}
The output I'm looking for is:
{"universe_member_id": 94639, "member_from_dt": "2001-08-31T00:00:00Z", "member_to_dt": "2200-01-01T00:00:00Z"}
How can I correctly write a GenericRecord
to JSON?
答案1
得分: 6
你所示,GenericRecord
类中的方法 toString()
将为您提供几乎有效的 JSON 表示。
如您所见,在 GenericData
类的源代码中,GenericData.Record
的 toString
方法只是在其实现中调用了 GenericData
的 toString(Object)
方法。
如果您想要记录的有效 JSON 表示,您可以采用该代码,并进行最小的修改,以获取所需的信息。
例如,我们可以定义一个如下所示的实用程序类:
package stackoverflow.parquetavro;
// 省略 import 部分...
public class GenericRecordJsonEncoder {
// 省略 registerLogicalTypeConverter 和其他方法...
public String serialize(GenericRecord value) {
StringBuilder buffer = new StringBuilder();
serialize(value, buffer, new IdentityHashMap<>(128));
String result = buffer.toString();
return result;
}
// 省略 serialize 和其他方法...
}
在这个类中,您可以为需要的逻辑类型注册转换器。考虑以下示例:
GenericRecordJsonEncoder encoder = new GenericRecordJsonEncoder();
// 注册您所需的逻辑类型转换器
encoder.registerLogicalTypeConverter(LogicalTypes.timestampMillis(), o -> {
final Instant instant = (Instant)o;
final String result = DateTimeFormatter.ISO_INSTANT.format(instant);
return result;
});
String json = encoder.serialize(genericRecord);
System.out.println(json);
这将为您提供所需的结果。
英文:
As you have indicated, the method toString()
in class GenericRecord
will give you a nearly valid JSON representation.
As you can see in the source code of the GenericData
class, the GenericData.Record
toString
method just invoke the GenericData
toString(Object)
method in its implementation.
If you want a valid JSON representation of the record, you can take that code and, with minimal modifications, obtain the information that you need.
For instance, we can define an utility class like the following:
package stackoverflow.parquetavro;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.function.Function;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
public class GenericRecordJsonEncoder {
Map<LogicalType, Function<Object, Object>> logicalTypesConverters = new HashMap<>();
public void registerLogicalTypeConverter(LogicalType logicalType, Function<Object, Object> converter) {
this.logicalTypesConverters.put(logicalType, converter);
}
public Function<Object, Object> getLogicalTypeConverter(Schema.Field field) {
Schema fieldSchema = field.schema();
LogicalType logicalType = fieldSchema.getLogicalType();
return getLogicalTypeConverter(logicalType);
}
public Function<Object, Object> getLogicalTypeConverter(LogicalType logicalType) {
if (logicalType == null) {
return Function.identity();
}
return logicalTypesConverters.getOrDefault(logicalType, Function.identity());
}
public String serialize(GenericRecord value) {
StringBuilder buffer = new StringBuilder();
serialize(value, buffer, new IdentityHashMap<>(128) );
String result = buffer.toString();
return result;
}
private static final String TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT =
" \">>> CIRCULAR REFERENCE CANNOT BE PUT IN JSON STRING, ABORTING RECURSION <<<\" ";
/** Renders a Java datum as <a href="http://www.json.org/">JSON</a>. */
private void serialize(final Object datum, final StringBuilder buffer, final IdentityHashMap<Object, Object> seenObjects) {
if (isRecord(datum)) {
if (seenObjects.containsKey(datum)) {
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
return;
}
seenObjects.put(datum, datum);
buffer.append("{");
int count = 0;
Schema schema = getRecordSchema(datum);
for (Schema.Field f : schema.getFields()) {
serialize(f.name(), buffer, seenObjects);
buffer.append(": ");
Function<Object, Object> logicalTypeConverter = getLogicalTypeConverter(f);
serialize(logicalTypeConverter.apply(getField(datum, f.name(), f.pos())), buffer, seenObjects);
if (++count < schema.getFields().size())
buffer.append(", ");
}
buffer.append("}");
seenObjects.remove(datum);
} else if (isArray(datum)) {
if (seenObjects.containsKey(datum)) {
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
return;
}
seenObjects.put(datum, datum);
Collection<?> array = getArrayAsCollection(datum);
buffer.append("[");
long last = array.size()-1;
int i = 0;
for (Object element : array) {
serialize(element, buffer, seenObjects);
if (i++ < last)
buffer.append(", ");
}
buffer.append("]");
seenObjects.remove(datum);
} else if (isMap(datum)) {
if (seenObjects.containsKey(datum)) {
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
return;
}
seenObjects.put(datum, datum);
buffer.append("{");
int count = 0;
@SuppressWarnings(value="unchecked")
Map<Object,Object> map = (Map<Object,Object>)datum;
for (Map.Entry<Object,Object> entry : map.entrySet()) {
serialize(entry.getKey(), buffer, seenObjects);
buffer.append(": ");
serialize(entry.getValue(), buffer, seenObjects);
if (++count < map.size())
buffer.append(", ");
}
buffer.append("}");
seenObjects.remove(datum);
} else if (isString(datum)|| isEnum(datum)) {
buffer.append("\"");
writeEscapedString(datum.toString(), buffer);
buffer.append("\"");
} else if (isBytes(datum)) {
buffer.append("{\"bytes\": \"");
ByteBuffer bytes = ((ByteBuffer) datum).duplicate();
writeEscapedString(StandardCharsets.ISO_8859_1.decode(bytes), buffer);
buffer.append("\"}");
} else if (((datum instanceof Float) && // quote Nan & Infinity
(((Float)datum).isInfinite() || ((Float)datum).isNaN()))
|| ((datum instanceof Double) &&
(((Double)datum).isInfinite() || ((Double)datum).isNaN()))) {
buffer.append("\"");
buffer.append(datum);
buffer.append("\"");
} else if (datum instanceof GenericData) {
if (seenObjects.containsKey(datum)) {
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
return;
}
seenObjects.put(datum, datum);
serialize(datum, buffer, seenObjects);
seenObjects.remove(datum);
} else {
// This fallback is the reason why GenericRecord toString does not
// generate a valid JSON representation
buffer.append(datum);
}
}
// All these methods are also copied from the GenericData class source
private boolean isRecord(Object datum) {
return datum instanceof IndexedRecord;
}
private Schema getRecordSchema(Object record) {
return ((GenericContainer)record).getSchema();
}
private Object getField(Object record, String name, int position) {
return ((IndexedRecord)record).get(position);
}
private boolean isArray(Object datum) {
return datum instanceof Collection;
}
private Collection getArrayAsCollection(Object datum) {
return (Collection)datum;
}
private boolean isEnum(Object datum) {
return datum instanceof GenericEnumSymbol;
}
private boolean isMap(Object datum) {
return datum instanceof Map;
}
private boolean isString(Object datum) {
return datum instanceof CharSequence;
}
private boolean isBytes(Object datum) {
return datum instanceof ByteBuffer;
}
private void writeEscapedString(CharSequence string, StringBuilder builder) {
for(int i = 0; i < string.length(); i++){
char ch = string.charAt(i);
switch(ch){
case '"':
builder.append("\\\"");
break;
case '\\':
builder.append("\\\\");
break;
case '\b':
builder.append("\\b");
break;
case '\f':
builder.append("\\f");
break;
case '\n':
builder.append("\\n");
break;
case '\r':
builder.append("\\r");
break;
case '\t':
builder.append("\\t");
break;
default:
// Reference: http://www.unicode.org/versions/Unicode5.1.0/
if((ch>='\u0000' && ch<='\u001F') || (ch>='\u007F' && ch<='\u009F') || (ch>='\u2000' && ch<='\u20FF')){
String hex = Integer.toHexString(ch);
builder.append("\\u");
for(int j = 0; j < 4 - hex.length(); j++)
builder.append('0');
builder.append(hex.toUpperCase());
} else {
builder.append(ch);
}
}
}
}
}
In this class you can register converters for the logical types that you need. Consider the following example:
GenericRecordJsonEncoder encoder = new GenericRecordJsonEncoder();
// Register as many logical types converters as you need
encoder.registerLogicalTypeConverter(LogicalTypes.timestampMillis(), o -> {
final Instant instant = (Instant)o;
final String result = DateTimeFormatter.ISO_INSTANT.format(instant);
return result;
});
String json = encoder.serialize(genericRecord);
System.out.println(json);
This will provide you the desired result.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论