Avro的logicalType String Date转换为EPOCH时间戳(毫秒)

huangapple go评论91阅读模式
英文:

Avro logicalType String Date conversion to EPOCH timestamp-millis

问题

  1. 我有以下模式
  2. {"name": "timestampstring", "type": [{"type":"string","logicalType":"timestamp-millis"}, "null"]},
  3. 我打算向它提供日期,并进行转换将日期转换为毫秒时间戳。
  4. GenericRecord user2 = new GenericData.Record(schema1);
  5. user2.put("timestampstring", "2019-01-26T12:00:40.931");
  6. final GenericData genericData = new GenericData();
  7. genericData.addLogicalTypeConversion(new MyTimestampConversion());
  8. datumReader = new GenericDatumReader<GenericRecord>(schema2, schema2, genericData);
  9. GenericRecord user = null;
  10. try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
  11. while (dataFileReader.hasNext()) {
  12. user = dataFileReader.next(user);
  13. System.out.println(user);
  14. }
  15. }
  16. //Conversion code
  17. public static class MyTimestampConversion extends Conversion<Long> {
  18. public MyTimestampConversion() {
  19. }
  20. public Class<Long> getConvertedType() {
  21. return Long.class;
  22. }
  23. public String getLogicalTypeName() {
  24. return "timestamp-millis";
  25. }
  26. public Long fromCharSequence(CharSequence value, Schema schema, LogicalType type) {
  27. return 123L;
  28. }
  29. }
  30. 但是这段代码不起作用... 我原以为它会转换为毫秒级时间戳(在上面的示例中我硬编码了123L)。
  31. 有什么帮助吗?
英文:

I have below schema

  1. {&quot;name&quot;: &quot;timestampstring&quot;, &quot;type&quot;: [{&quot;type&quot;:&quot;string&quot;,&quot;logicalType&quot;:&quot;timestamp-millis&quot;}, &quot;null&quot;]},

I intend to supply date to it, and have conversion convert the date to epoch mili.

  1. GenericRecord user2 = new GenericData.Record(schema1);
  2. user2.put(&quot;timestampstring&quot;, &quot;2019-01-26T12:00:40.931&quot;);
  3. final GenericData genericData = new GenericData();
  4. genericData.addLogicalTypeConversion(new MyTimestampConversion());
  5. datumReader = new GenericDatumReader&lt;GenericRecord&gt;(schema2, schema2, genericData);
  6. GenericRecord user = null;
  7. try (DataFileReader&lt;GenericRecord&gt; dataFileReader = new DataFileReader&lt;GenericRecord&gt;(file1, datumReader)) {
  8. while (dataFileReader.hasNext()) {
  9. user = dataFileReader.next(user);
  10. System.out.println(user);
  11. }
  12. }

//Conversion code

  1. public static class MyTimestampConversion extends Conversion&lt;Long&gt; {
  2. public MyTimestampConversion() {
  3. }
  4. public Class&lt;Long&gt; getConvertedType() {
  5. return Long.class;
  6. }
  7. public String getLogicalTypeName() {
  8. return &quot;timestamp-millis&quot;;
  9. }
  10. public Long fromCharSequence(CharSequence value, Schema schema, LogicalType type) {
  11. return 123L;
  12. }
  13. }

But this code doesnt work... I was expecting it to convert to timestamp milis (i hardcoded 123L in above sample).

Any help?

答案1

得分: 0

  1. Referring back to https://stackoverflow.com/questions/49034266/how-to-define-a-logicaltype-in-avro-java, I managed to solve this by creating my own logical type. It seems like doing this with "timestamp-millis" logicalType won't work. So I created my own logicalType...
  2. package example;
  3. import org.apache.avro.Conversion;
  4. import org.apache.avro.LogicalType;
  5. import org.apache.avro.LogicalTypes;
  6. import org.apache.avro.Schema;
  7. import org.apache.avro.Schema.Parser;
  8. import org.apache.avro.file.DataFileReader;
  9. import org.apache.avro.file.DataFileWriter;
  10. import org.apache.avro.generic.*;
  11. import org.apache.avro.io.DatumReader;
  12. import org.apache.avro.io.DatumWriter;
  13. import org.apache.avro.io.ResolvingDecoder;
  14. import org.joda.time.DateTime;
  15. import java.io.File;
  16. import java.io.IOException;
  17. public class AvroWriteDateUtcToEpochMili {
  18. // ... (rest of the code)
  19. }
  20. LogicalType
  21. public class UtcDateTimeToTimestampMilisLogicalType extends LogicalType {
  22. public static final String CONVERT_LONG_TYPE_NAME = "utc-to-epoch-millis";
  23. public UtcDateTimeToTimestampMilisLogicalType() {
  24. super(CONVERT_LONG_TYPE_NAME);
  25. }
  26. @Override
  27. public void validate(Schema schema) {
  28. super.validate(schema);
  29. if (schema.getType() != Schema.Type.LONG) {
  30. throw new IllegalArgumentException("Logical type 'utc-to-epoch-millis' must be backed by bytes");
  31. }
  32. }
  33. }
  34. Schema
  35. {
  36. "namespace": "example.avro.modified.string",
  37. "type": "record",
  38. "name": "UserDate",
  39. "fields": [
  40. {
  41. "name": "timestamplong",
  42. "type": {
  43. "type": "long",
  44. "logicalType": "utc-to-epoch-millis"
  45. }
  46. },
  47. {
  48. "name": "timestampstring",
  49. "type": "string"
  50. }
  51. ]
  52. }
  53. Result
  54. { "type": "record", "name": "UserDate", "namespace": "example.avro.modified.string", "fields": [ { "name": "timestamplong", "type": { "type": "long", "logicalType": "utc-to-epoch-millis" } }, { "name": "timestampstring", "type": "string" } ] }
  55. { "timestamplong": 1562646705281, "timestampstring": "2019-07-09T04:31:45.281Z" }
  56. { "timestamplong": 2, "timestampstring": "1970-01-01T07:30:00.002+07:30" }
  57. { "timestamplong": 1601616694713, "timestampstring": "2020-10-02T13:31:34.713+08:00" }
  58. //AFTER
  59. { "timestamplong": "2019-07-09T12:31:45.281+08:00", "timestampstring": "2019-07-09T04:31:45.281Z" }
  60. { "timestamplong": "1970-01-01T07:30:00.002+07:30", "timestampstring": "1970-01-01T07:30:00.002+07:30" }
  61. { "timestamplong": "2020-10-02T13:31:34.713+08:00", "timestampstring": "2020-10-02T13:31:34.713+08:00" }
  62. Process finished with exit code 0
英文:

Referring back to https://stackoverflow.com/questions/49034266/how-to-define-a-logicaltype-in-avro-java, I managed to solve this by creating my own logical type. It seems like doing this with "timestamp-millis" logicalType wont work. So I created my own logicalType...

  1. package example;
  2. import org.apache.avro.Conversion;
  3. import org.apache.avro.LogicalType;
  4. import org.apache.avro.LogicalTypes;
  5. import org.apache.avro.Schema;
  6. import org.apache.avro.Schema.Parser;
  7. import org.apache.avro.file.DataFileReader;
  8. import org.apache.avro.file.DataFileWriter;
  9. import org.apache.avro.generic.*;
  10. import org.apache.avro.io.DatumReader;
  11. import org.apache.avro.io.DatumWriter;
  12. import org.apache.avro.io.ResolvingDecoder;
  13. import org.joda.time.DateTime;
  14. import java.io.File;
  15. import java.io.IOException;
  16. public class AvroWriteDateUtcToEpochMili {
  17. public static void main(String[] args) throws IOException {
  18. Boolean isRegisterNewLogicalType = true;
  19. Boolean isWrite = true;
  20. if(isRegisterNewLogicalType) {
  21. LogicalTypes.register(UtcDateTimeToTimestampMilisLogicalType.CONVERT_LONG_TYPE_NAME, new LogicalTypes.LogicalTypeFactory() {
  22. private final LogicalType convertLongLogicalType = new UtcDateTimeToTimestampMilisLogicalType();
  23. @Override
  24. public LogicalType fromSchema(Schema schema) {
  25. return convertLongLogicalType;
  26. }
  27. });
  28. }
  29. Schema schema1 = new Parser().parse(new File(&quot;./userdate_modified_string.avsc&quot;));
  30. // Serialize user1 and user2 to disk
  31. File file1 = new File(&quot;users.avro&quot;);
  32. if(isWrite) {
  33. GenericRecord user1 = new GenericData.Record(schema1);
  34. user1.put(&quot;timestamplong&quot;, &quot;2019-07-09T04:31:45.281Z&quot;);
  35. //user1.put(&quot;timestamplong&quot;, 1L);
  36. user1.put(&quot;timestampstring&quot;, &quot;2019-07-09T04:31:45.281Z&quot;);
  37. GenericRecord user2 = new GenericData.Record(schema1);
  38. //user2.put(&quot;timestamplong&quot;, &quot;2018-07-09T04:30:45.781Z&quot;);
  39. user2.put(&quot;timestamplong&quot;, 2L);
  40. user2.put(&quot;timestampstring&quot;, (new DateTime(2L)).toString());
  41. //user2.put(&quot;timestampstring&quot;, new Timestamp(new Date(&quot;2018-01-26&quot;).getTime()));
  42. var currentDateTime = DateTime.now();
  43. GenericRecord user3 = new GenericData.Record(schema1);
  44. user3.put(&quot;timestamplong&quot;, currentDateTime.toString());
  45. //user3.put(&quot;timestamplong&quot;, 3L);
  46. user3.put(&quot;timestampstring&quot;, currentDateTime.toString());
  47. final GenericData genericData2 = new GenericData();
  48. genericData2.addLogicalTypeConversion(new MyStringTimestampConversion());
  49. DatumWriter&lt;GenericRecord&gt; datumWriter = new GenericDatumWriter&lt;GenericRecord&gt;(schema1, genericData2);
  50. DataFileWriter&lt;GenericRecord&gt; dataFileWriter = new DataFileWriter&lt;GenericRecord&gt;(datumWriter);
  51. dataFileWriter.create(schema1, file1);
  52. dataFileWriter.append(user1);
  53. dataFileWriter.append(user2);
  54. dataFileWriter.append(user3);
  55. dataFileWriter.close();
  56. }
  57. // Deserialize users from disk
  58. Boolean once = true;
  59. DatumReader&lt;GenericRecord&gt; datumReader = new GenericDatumReader&lt;GenericRecord&gt;(schema1);
  60. GenericRecord user = null;
  61. try (DataFileReader&lt;GenericRecord&gt; dataFileReader = new DataFileReader&lt;GenericRecord&gt;(file1, datumReader)) {
  62. while (dataFileReader.hasNext()) {
  63. user = dataFileReader.next(user);
  64. if(once) {
  65. System.out.println(user.getSchema());
  66. once = false;
  67. }
  68. //System.out.println(LogicalTypes.fromSchema(user.getSchema()));
  69. System.out.println(user);
  70. }
  71. }
  72. // Deserialize users from disk
  73. System.out.println(&quot;//AFTER&quot;);
  74. Schema schema2 = new Parser().parse(new File(&quot;./userdate_modified_string.avsc&quot;));
  75. final GenericData genericData = new GenericData();
  76. genericData.addLogicalTypeConversion(new MyStringTimestampConversion());
  77. datumReader = new MyReader&lt;GenericRecord&gt;(schema2, schema2, genericData);
  78. user = null;
  79. try (DataFileReader&lt;GenericRecord&gt; dataFileReader = new DataFileReader&lt;GenericRecord&gt;(file1, datumReader)) {
  80. while (dataFileReader.hasNext()) {
  81. user = dataFileReader.next(user);
  82. System.out.println(user);
  83. }
  84. }
  85. }
  86. public static class MyReader&lt;G extends IndexedRecord&gt; extends GenericDatumReader {
  87. public MyReader() {
  88. super();
  89. }
  90. public MyReader(Schema writer, Schema reader, GenericData data) {
  91. super(writer, reader, data);
  92. }
  93. @Override
  94. protected Object read(Object old, Schema expected, ResolvingDecoder in) throws IOException {
  95. Object datum = this.readWithoutConversion(old, expected, in);
  96. LogicalType logicalType = expected.getLogicalType();
  97. if (logicalType != null) {
  98. Conversion&lt;?&gt; conversion = this.getData().getConversionFor(logicalType);
  99. if (conversion != null) {
  100. return this.convert(datum, expected, logicalType, conversion);
  101. }
  102. }
  103. return datum;
  104. }
  105. }
  106. public static class MyStringTimestampConversion extends Conversion&lt;String&gt; {
  107. public MyStringTimestampConversion() {
  108. super();
  109. }
  110. @Override
  111. public Class&lt;String&gt; getConvertedType() {
  112. return String.class;
  113. }
  114. @Override
  115. public String getLogicalTypeName() {
  116. // &quot;timestamp-millis&quot;;
  117. return UtcDateTimeToTimestampMilisLogicalType.CONVERT_LONG_TYPE_NAME;
  118. }
  119. @Override
  120. public String fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
  121. return (new DateTime(millisFromEpoch)).toString();
  122. //return &quot;123456L&quot;;
  123. }
  124. @Override
  125. public Long toLong(String value, Schema schema, LogicalType type) { //https://stackoverflow.com/questions/22681348/joda-datetime-to-unix-datetime
  126. //DateTimeFormatter dtf = DateTimeFormatter.ofPattern(&quot;uuuu-MM-dd&#39;T&#39;HH:mm:ss.SSSSSS&#39;Z&#39;&quot;);//https://stackoverflow.com/questions/8405087/what-is-this-date-format-2011-08-12t201746-384z
  127. DateTime dateTime = DateTime.parse(value);
  128. long epochMilli = dateTime.toDate().toInstant().toEpochMilli();
  129. return epochMilli;
  130. }
  131. }
  132. }

LogicalType

  1. public class UtcDateTimeToTimestampMilisLogicalType extends LogicalType {
  2. //The key to use as a reference to the type
  3. public static final String CONVERT_LONG_TYPE_NAME = &quot;utc-to-epoch-millis&quot;;
  4. public UtcDateTimeToTimestampMilisLogicalType() {
  5. super(CONVERT_LONG_TYPE_NAME);
  6. }
  7. @Override
  8. public void validate(Schema schema) {
  9. super.validate(schema);
  10. if (schema.getType() != Schema.Type.LONG) {
  11. throw new IllegalArgumentException(
  12. &quot;Logical type &#39;utc-to-epoch-millis&#39; must be backed by bytes&quot;);
  13. }
  14. }
  15. }

Schema

  1. {
  2. &quot;namespace&quot;: &quot;example.avro.modified.string&quot;,
  3. &quot;type&quot;: &quot;record&quot;,
  4. &quot;name&quot;: &quot;UserDate&quot;,
  5. &quot;fields&quot;: [
  6. {
  7. &quot;name&quot;: &quot;timestamplong&quot;,
  8. &quot;type&quot;:
  9. {
  10. &quot;type&quot;: &quot;long&quot;,
  11. &quot;logicalType&quot;: &quot;utc-to-epoch-millis&quot;
  12. }
  13. },
  14. {
  15. &quot;name&quot;: &quot;timestampstring&quot;,
  16. &quot;type&quot;: &quot;string&quot;
  17. }
  18. ]
  19. }

Result

  1. {&quot;type&quot;:&quot;record&quot;,&quot;name&quot;:&quot;UserDate&quot;,&quot;namespace&quot;:&quot;example.avro.modified.string&quot;,&quot;fields&quot;:[{&quot;name&quot;:&quot;timestamplong&quot;,&quot;type&quot;:{&quot;type&quot;:&quot;long&quot;,&quot;logicalType&quot;:&quot;utc-to-epoch-millis&quot;}},{&quot;name&quot;:&quot;timestampstring&quot;,&quot;type&quot;:&quot;string&quot;}]}
  2. {&quot;timestamplong&quot;: 1562646705281, &quot;timestampstring&quot;: &quot;2019-07-09T04:31:45.281Z&quot;}
  3. {&quot;timestamplong&quot;: 2, &quot;timestampstring&quot;: &quot;1970-01-01T07:30:00.002+07:30&quot;}
  4. {&quot;timestamplong&quot;: 1601616694713, &quot;timestampstring&quot;: &quot;2020-10-02T13:31:34.713+08:00&quot;}
  5. //AFTER
  6. {&quot;timestamplong&quot;: &quot;2019-07-09T12:31:45.281+08:00&quot;, &quot;timestampstring&quot;: &quot;2019-07-09T04:31:45.281Z&quot;}
  7. {&quot;timestamplong&quot;: &quot;1970-01-01T07:30:00.002+07:30&quot;, &quot;timestampstring&quot;: &quot;1970-01-01T07:30:00.002+07:30&quot;}
  8. {&quot;timestamplong&quot;: &quot;2020-10-02T13:31:34.713+08:00&quot;, &quot;timestampstring&quot;: &quot;2020-10-02T13:31:34.713+08:00&quot;}
  9. Process finished with exit code 0

huangapple
  • 本文由 发表于 2020年10月1日 16:04:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/64151310.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定