将Parquet/Avro的GenericRecord写入JSON,同时保留逻辑类型。

huangapple go评论73阅读模式
英文:

Writing Parquet/Avro GenericRecord to JSON while maintaining LogicalTypes

问题

我正在尝试将包含逻辑类型的Parquet记录写入JSON。我通过AvroParquetReader执行此操作,它给我一个Avro GenericRecord

GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());

try (ParquetReader<GenericRecord> parquetReader =
    AvroParquetReader.<GenericRecord>builder(new LocalInputFile(this.path))
        .withDataModel(GenericData.get())
        .build()) {
    GenericRecord record = parquetReader.read();
    record.toString();
}

record.toString() 生成:

{"universe_member_id": 94639, "member_from_dt": 2001-08-31T00:00:00Z, "member_to_dt": 2200-01-01T00:00:00Z}

请注意,这是无效的JSON - 日期已根据其LogicalType正确转换,但未用引号括起来。

所以我尝试了 JsonEncoder

GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); //等等
OutputStream stringOutputStream = new StringOutputStream();

try (ParquetReader<GenericRecord> parquetReader =
    AvroParquetReader.<GenericRecord>builder(new LocalInputFile(this.path))
        .withDataModel(GenericData.get())
        .build()) {
    GenericRecord record = parquetReader.read();
    DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
    JsonEncoder encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), stringOutputStream);
    writer.write(record, encoder);
    encoder.flush();
}

但是这并不会转换日期字段,并将数据类型嵌入到每条记录中:

{"universe_member_id":{"long":94639},"member_from_dt":{"long":999216000000000},"member_to_dt":{"long":7258118400000000}}

我期望的输出是:

{"universe_member_id": 94639, "member_from_dt": "2001-08-31T00:00:00Z", "member_to_dt": "2200-01-01T00:00:00Z"}

如何正确将GenericRecord写入JSON?

英文:

I am trying to write some Parquet records that contain LogicalTypes to JSON. I do this via AvroParquetReader, which gives me an Avro GenericRecord:

GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());

try (ParquetReader&lt;GenericRecord&gt; parquetReader =
    AvroParquetReader.&lt;GenericRecord&gt;builder(new LocalInputFile(this.path))
        .withDataModel(GenericData.get())
        .build()) {
    GenericRecord record = parquetReader.read();
    record.toString();
}

record.toString() produces:

{&quot;universe_member_id&quot;: 94639, &quot;member_from_dt&quot;: 2001-08-31T00:00:00Z, &quot;member_to_dt&quot;: 2200-01-01T00:00:00Z}

Notice that this is invalid JSON - the dates are correctly converted as per their LogicalType, but are not surrounded by quotes.

So instead I tried the JsonEncoder:

GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); //etc
OutputStream stringOutputStream = new StringOutputStream();

try (ParquetReader&lt;GenericRecord&gt; parquetReader =
    AvroParquetReader.&lt;GenericRecord&gt;builder(new LocalInputFile(this.path))
        .withDataModel(GenericData.get())
        .build()) {
    GenericRecord record = parquetReader.read();
    DatumWriter&lt;GenericRecord&gt; writer = new GenericDatumWriter&lt;&gt;(record.getSchema());
    JsonEncoder encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), stringOutputStream);
    writer.write(record, encoder);
    encoder.flush();
}

but this doesn't convert the date fields at all and bakes the datatype into every record:

{&quot;universe_member_id&quot;:{&quot;long&quot;:94639},&quot;member_from_dt&quot;:{&quot;long&quot;:999216000000000},&quot;member_to_dt&quot;:{&quot;long&quot;:7258118400000000}}

The output I'm looking for is:

{&quot;universe_member_id&quot;: 94639, &quot;member_from_dt&quot;: &quot;2001-08-31T00:00:00Z&quot;, &quot;member_to_dt&quot;: &quot;2200-01-01T00:00:00Z&quot;}

How can I correctly write a GenericRecord to JSON?

答案1

得分: 6

你所示,GenericRecord 类中的方法 toString() 将为您提供几乎有效的 JSON 表示。

如您所见,在 GenericData 类的源代码中,GenericData.RecordtoString 方法只是在其实现中调用了 GenericDatatoString(Object) 方法。

如果您想要记录的有效 JSON 表示,您可以采用该代码,并进行最小的修改,以获取所需的信息。

例如,我们可以定义一个如下所示的实用程序类:

package stackoverflow.parquetavro;

// 省略 import 部分...

public class GenericRecordJsonEncoder {

  // 省略 registerLogicalTypeConverter 和其他方法...

  public String serialize(GenericRecord value) {
    StringBuilder buffer = new StringBuilder();
    serialize(value, buffer, new IdentityHashMap<>(128));
    String result = buffer.toString();
    return result;
  }

  // 省略 serialize 和其他方法...
}

在这个类中,您可以为需要的逻辑类型注册转换器。考虑以下示例:

GenericRecordJsonEncoder encoder = new GenericRecordJsonEncoder();
// 注册您所需的逻辑类型转换器
encoder.registerLogicalTypeConverter(LogicalTypes.timestampMillis(), o -> {
  final Instant instant = (Instant)o;
  final String result = DateTimeFormatter.ISO_INSTANT.format(instant);
  return result;
});

String json = encoder.serialize(genericRecord);
System.out.println(json);

这将为您提供所需的结果。

英文:

As you have indicated, the method toString() in class GenericRecord will give you a nearly valid JSON representation.

As you can see in the source code of the GenericData class, the GenericData.Record toString method just invoke the GenericData toString(Object) method in its implementation.

If you want a valid JSON representation of the record, you can take that code and, with minimal modifications, obtain the information that you need.

For instance, we can define an utility class like the following:

package stackoverflow.parquetavro;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.function.Function;

import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

public class GenericRecordJsonEncoder {

  Map&lt;LogicalType, Function&lt;Object, Object&gt;&gt; logicalTypesConverters = new HashMap&lt;&gt;();

  public void registerLogicalTypeConverter(LogicalType logicalType, Function&lt;Object, Object&gt; converter) {
    this.logicalTypesConverters.put(logicalType, converter);
  }

  public Function&lt;Object, Object&gt; getLogicalTypeConverter(Schema.Field field) {
    Schema fieldSchema = field.schema();
    LogicalType logicalType = fieldSchema.getLogicalType();
    return getLogicalTypeConverter(logicalType);
  }

  public Function&lt;Object, Object&gt; getLogicalTypeConverter(LogicalType logicalType) {
    if (logicalType == null) {
      return Function.identity();
    }

    return logicalTypesConverters.getOrDefault(logicalType, Function.identity());
  }

  public String serialize(GenericRecord value) {
    StringBuilder buffer = new StringBuilder();
    serialize(value, buffer, new IdentityHashMap&lt;&gt;(128) );
    String result = buffer.toString();
    return result;
  }

  private static final String TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT =
      &quot; \&quot;&gt;&gt;&gt; CIRCULAR REFERENCE CANNOT BE PUT IN JSON STRING, ABORTING RECURSION &lt;&lt;&lt;\&quot; &quot;;

  /** Renders a Java datum as &lt;a href=&quot;http://www.json.org/&quot;&gt;JSON&lt;/a&gt;. */
  private void serialize(final Object datum, final StringBuilder buffer, final IdentityHashMap&lt;Object, Object&gt; seenObjects) {
    if (isRecord(datum)) {
      if (seenObjects.containsKey(datum)) {
        buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
        return;
      }
      seenObjects.put(datum, datum);
      buffer.append(&quot;{&quot;);
      int count = 0;
      Schema schema = getRecordSchema(datum);
      for (Schema.Field f : schema.getFields()) {
        serialize(f.name(), buffer, seenObjects);
        buffer.append(&quot;: &quot;);
        Function&lt;Object, Object&gt; logicalTypeConverter = getLogicalTypeConverter(f);
        serialize(logicalTypeConverter.apply(getField(datum, f.name(), f.pos())), buffer, seenObjects);
        if (++count &lt; schema.getFields().size())
          buffer.append(&quot;, &quot;);
      }
      buffer.append(&quot;}&quot;);
      seenObjects.remove(datum);
    } else if (isArray(datum)) {
      if (seenObjects.containsKey(datum)) {
        buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
        return;
      }
      seenObjects.put(datum, datum);
      Collection&lt;?&gt; array = getArrayAsCollection(datum);
      buffer.append(&quot;[&quot;);
      long last = array.size()-1;
      int i = 0;
      for (Object element : array) {
        serialize(element, buffer, seenObjects);
        if (i++ &lt; last)
          buffer.append(&quot;, &quot;);
      }
      buffer.append(&quot;]&quot;);
      seenObjects.remove(datum);
    } else if (isMap(datum)) {
      if (seenObjects.containsKey(datum)) {
        buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
        return;
      }
      seenObjects.put(datum, datum);
      buffer.append(&quot;{&quot;);
      int count = 0;
      @SuppressWarnings(value=&quot;unchecked&quot;)
      Map&lt;Object,Object&gt; map = (Map&lt;Object,Object&gt;)datum;
      for (Map.Entry&lt;Object,Object&gt; entry : map.entrySet()) {
        serialize(entry.getKey(), buffer, seenObjects);
        buffer.append(&quot;: &quot;);
        serialize(entry.getValue(), buffer, seenObjects);
        if (++count &lt; map.size())
          buffer.append(&quot;, &quot;);
      }
      buffer.append(&quot;}&quot;);
      seenObjects.remove(datum);
    } else if (isString(datum)|| isEnum(datum)) {
      buffer.append(&quot;\&quot;&quot;);
      writeEscapedString(datum.toString(), buffer);
      buffer.append(&quot;\&quot;&quot;);
    } else if (isBytes(datum)) {
      buffer.append(&quot;{\&quot;bytes\&quot;: \&quot;&quot;);
      ByteBuffer bytes = ((ByteBuffer) datum).duplicate();
      writeEscapedString(StandardCharsets.ISO_8859_1.decode(bytes), buffer);
      buffer.append(&quot;\&quot;}&quot;);
    } else if (((datum instanceof Float) &amp;&amp;       // quote Nan &amp; Infinity
        (((Float)datum).isInfinite() || ((Float)datum).isNaN()))
        || ((datum instanceof Double) &amp;&amp;
        (((Double)datum).isInfinite() || ((Double)datum).isNaN()))) {
      buffer.append(&quot;\&quot;&quot;);
      buffer.append(datum);
      buffer.append(&quot;\&quot;&quot;);
    } else if (datum instanceof GenericData) {
      if (seenObjects.containsKey(datum)) {
        buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
        return;
      }
      seenObjects.put(datum, datum);
      serialize(datum, buffer, seenObjects);
      seenObjects.remove(datum);
    } else {
      // This fallback is the reason why GenericRecord toString does not
      // generate a valid JSON representation
      buffer.append(datum);
    }
  }

  // All these methods are also copied from the GenericData class source

  private boolean isRecord(Object datum) {
    return datum instanceof IndexedRecord;
  }

  private Schema getRecordSchema(Object record) {
    return ((GenericContainer)record).getSchema();
  }

  private Object getField(Object record, String name, int position) {
    return ((IndexedRecord)record).get(position);
  }

  private boolean isArray(Object datum) {
    return datum instanceof Collection;
  }

  private Collection getArrayAsCollection(Object datum) {
    return (Collection)datum;
  }

  private boolean isEnum(Object datum) {
    return datum instanceof GenericEnumSymbol;
  }

  private boolean isMap(Object datum) {
    return datum instanceof Map;
  }

  private boolean isString(Object datum) {
    return datum instanceof CharSequence;
  }

  private boolean isBytes(Object datum) {
    return datum instanceof ByteBuffer;
  }

  private void writeEscapedString(CharSequence string, StringBuilder builder) {
    for(int i = 0; i &lt; string.length(); i++){
      char ch = string.charAt(i);
      switch(ch){
        case &#39;&quot;&#39;:
          builder.append(&quot;\\\&quot;&quot;);
          break;
        case &#39;\\&#39;:
          builder.append(&quot;\\\\&quot;);
          break;
        case &#39;\b&#39;:
          builder.append(&quot;\\b&quot;);
          break;
        case &#39;\f&#39;:
          builder.append(&quot;\\f&quot;);
          break;
        case &#39;\n&#39;:
          builder.append(&quot;\\n&quot;);
          break;
        case &#39;\r&#39;:
          builder.append(&quot;\\r&quot;);
          break;
        case &#39;\t&#39;:
          builder.append(&quot;\\t&quot;);
          break;
        default:
          // Reference: http://www.unicode.org/versions/Unicode5.1.0/
          if((ch&gt;=&#39;\u0000&#39; &amp;&amp; ch&lt;=&#39;\u001F&#39;) || (ch&gt;=&#39;\u007F&#39; &amp;&amp; ch&lt;=&#39;\u009F&#39;) || (ch&gt;=&#39;\u2000&#39; &amp;&amp; ch&lt;=&#39;\u20FF&#39;)){
            String hex = Integer.toHexString(ch);
            builder.append(&quot;\\u&quot;);
            for(int j = 0; j &lt; 4 - hex.length(); j++)
              builder.append(&#39;0&#39;);
            builder.append(hex.toUpperCase());
          } else {
            builder.append(ch);
          }
      }
    }
  }
}

In this class you can register converters for the logical types that you need. Consider the following example:

GenericRecordJsonEncoder encoder = new GenericRecordJsonEncoder();
// Register as many logical types converters as you need
encoder.registerLogicalTypeConverter(LogicalTypes.timestampMillis(), o -&gt; {
  final Instant instant = (Instant)o;
  final String result = DateTimeFormatter.ISO_INSTANT.format(instant);
  return result;
});

String json = encoder.serialize(genericRecord);
System.out.println(json);

This will provide you the desired result.

huangapple
  • 本文由 发表于 2020年8月30日 16:20:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/63655421.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定