Avro的logicalType String Date转换为EPOCH时间戳(毫秒)

huangapple go评论69阅读模式
英文:

Avro logicalType String Date conversion to EPOCH timestamp-millis

问题

我有以下模式

    {"name": "timestampstring", "type": [{"type":"string","logicalType":"timestamp-millis"}, "null"]},

我打算向它提供日期,并进行转换将日期转换为毫秒时间戳。

GenericRecord user2 = new GenericData.Record(schema1);
user2.put("timestampstring", "2019-01-26T12:00:40.931");

final GenericData genericData = new GenericData();
genericData.addLogicalTypeConversion(new MyTimestampConversion());
datumReader = new GenericDatumReader<GenericRecord>(schema2, schema2, genericData);

GenericRecord user = null;
try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
    while (dataFileReader.hasNext()) {
        user = dataFileReader.next(user);

        System.out.println(user);
    }
}

//Conversion code

public static class MyTimestampConversion extends Conversion<Long> {
    public MyTimestampConversion() {
    }

    public Class<Long> getConvertedType() {
        return Long.class;
    }

    public String getLogicalTypeName() {
        return "timestamp-millis";
    }

    public Long fromCharSequence(CharSequence value, Schema schema, LogicalType type) {
        return 123L;
    }
}

但是这段代码不起作用... 我原以为它会转换为毫秒级时间戳(在上面的示例中我硬编码了123L)。

有什么帮助吗?
英文:

I have below schema

	 {&quot;name&quot;: &quot;timestampstring&quot;, &quot;type&quot;: [{&quot;type&quot;:&quot;string&quot;,&quot;logicalType&quot;:&quot;timestamp-millis&quot;}, &quot;null&quot;]},

I intend to supply date to it, and have conversion convert the date to epoch mili.

    GenericRecord user2 = new GenericData.Record(schema1);
user2.put(&quot;timestampstring&quot;, &quot;2019-01-26T12:00:40.931&quot;);
final GenericData genericData = new GenericData();
genericData.addLogicalTypeConversion(new MyTimestampConversion());
datumReader = new GenericDatumReader&lt;GenericRecord&gt;(schema2, schema2, genericData);
GenericRecord user = null;
try (DataFileReader&lt;GenericRecord&gt; dataFileReader = new DataFileReader&lt;GenericRecord&gt;(file1, datumReader)) {
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
System.out.println(user);
}
}

//Conversion code

public static class MyTimestampConversion extends Conversion&lt;Long&gt; {
public MyTimestampConversion() {
}
public Class&lt;Long&gt; getConvertedType() {
return Long.class;
}
public String getLogicalTypeName() {
return &quot;timestamp-millis&quot;;
}
public Long fromCharSequence(CharSequence value, Schema schema, LogicalType type) {
return 123L;
}
}

But this code doesnt work... I was expecting it to convert to timestamp milis (i hardcoded 123L in above sample).

Any help?

答案1

得分: 0

Referring back to https://stackoverflow.com/questions/49034266/how-to-define-a-logicaltype-in-avro-java, I managed to solve this by creating my own logical type. It seems like doing this with "timestamp-millis" logicalType won't work. So I created my own logicalType...

package example;

import org.apache.avro.Conversion;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.*;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.ResolvingDecoder;
import org.joda.time.DateTime;

import java.io.File;
import java.io.IOException;

public class AvroWriteDateUtcToEpochMili {
    // ... (rest of the code)
}

LogicalType

public class UtcDateTimeToTimestampMilisLogicalType extends LogicalType {
    public static final String CONVERT_LONG_TYPE_NAME = "utc-to-epoch-millis";

    public UtcDateTimeToTimestampMilisLogicalType() {
        super(CONVERT_LONG_TYPE_NAME);
    }

    @Override
    public void validate(Schema schema) {
        super.validate(schema);
        if (schema.getType() != Schema.Type.LONG) {
            throw new IllegalArgumentException("Logical type 'utc-to-epoch-millis' must be backed by bytes");
        }
    }
}

Schema

{
    "namespace": "example.avro.modified.string",
    "type": "record",
    "name": "UserDate",
    "fields": [
        {
            "name": "timestamplong",
            "type": {
                "type": "long",
                "logicalType": "utc-to-epoch-millis"
            }
        },
        {
            "name": "timestampstring",
            "type": "string"
        }
    ]
}

Result

{ "type": "record", "name": "UserDate", "namespace": "example.avro.modified.string", "fields": [ { "name": "timestamplong", "type": { "type": "long", "logicalType": "utc-to-epoch-millis" } }, { "name": "timestampstring", "type": "string" } ] }
{ "timestamplong": 1562646705281, "timestampstring": "2019-07-09T04:31:45.281Z" }
{ "timestamplong": 2, "timestampstring": "1970-01-01T07:30:00.002+07:30" }
{ "timestamplong": 1601616694713, "timestampstring": "2020-10-02T13:31:34.713+08:00" }
//AFTER
{ "timestamplong": "2019-07-09T12:31:45.281+08:00", "timestampstring": "2019-07-09T04:31:45.281Z" }
{ "timestamplong": "1970-01-01T07:30:00.002+07:30", "timestampstring": "1970-01-01T07:30:00.002+07:30" }
{ "timestamplong": "2020-10-02T13:31:34.713+08:00", "timestampstring": "2020-10-02T13:31:34.713+08:00" }

Process finished with exit code 0
英文:

Referring back to https://stackoverflow.com/questions/49034266/how-to-define-a-logicaltype-in-avro-java, I managed to solve this by creating my own logical type. It seems like doing this with "timestamp-millis" logicalType wont work. So I created my own logicalType...

package example;
import org.apache.avro.Conversion;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.*;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.ResolvingDecoder;
import org.joda.time.DateTime;
import java.io.File;
import java.io.IOException;
public class AvroWriteDateUtcToEpochMili {
public static void main(String[] args) throws IOException {
Boolean isRegisterNewLogicalType = true;
Boolean isWrite = true;
if(isRegisterNewLogicalType) {
LogicalTypes.register(UtcDateTimeToTimestampMilisLogicalType.CONVERT_LONG_TYPE_NAME, new LogicalTypes.LogicalTypeFactory() {
private final LogicalType convertLongLogicalType = new UtcDateTimeToTimestampMilisLogicalType();
@Override
public LogicalType fromSchema(Schema schema) {
return convertLongLogicalType;
}
});
}
Schema schema1 = new Parser().parse(new File(&quot;./userdate_modified_string.avsc&quot;));
// Serialize user1 and user2 to disk
File file1 = new File(&quot;users.avro&quot;);
if(isWrite) {
GenericRecord user1 = new GenericData.Record(schema1);
user1.put(&quot;timestamplong&quot;, &quot;2019-07-09T04:31:45.281Z&quot;);
//user1.put(&quot;timestamplong&quot;, 1L);
user1.put(&quot;timestampstring&quot;, &quot;2019-07-09T04:31:45.281Z&quot;);
GenericRecord user2 = new GenericData.Record(schema1);
//user2.put(&quot;timestamplong&quot;, &quot;2018-07-09T04:30:45.781Z&quot;);
user2.put(&quot;timestamplong&quot;, 2L);
user2.put(&quot;timestampstring&quot;, (new DateTime(2L)).toString());
//user2.put(&quot;timestampstring&quot;, new Timestamp(new Date(&quot;2018-01-26&quot;).getTime()));
var currentDateTime = DateTime.now();
GenericRecord user3 = new GenericData.Record(schema1);
user3.put(&quot;timestamplong&quot;, currentDateTime.toString());
//user3.put(&quot;timestamplong&quot;, 3L);
user3.put(&quot;timestampstring&quot;, currentDateTime.toString());
final GenericData genericData2 = new GenericData();
genericData2.addLogicalTypeConversion(new MyStringTimestampConversion());
DatumWriter&lt;GenericRecord&gt; datumWriter = new GenericDatumWriter&lt;GenericRecord&gt;(schema1, genericData2);
DataFileWriter&lt;GenericRecord&gt; dataFileWriter = new DataFileWriter&lt;GenericRecord&gt;(datumWriter);
dataFileWriter.create(schema1, file1);
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();
}
// Deserialize users from disk
Boolean once = true;
DatumReader&lt;GenericRecord&gt; datumReader = new GenericDatumReader&lt;GenericRecord&gt;(schema1);
GenericRecord user = null;
try (DataFileReader&lt;GenericRecord&gt; dataFileReader = new DataFileReader&lt;GenericRecord&gt;(file1, datumReader)) {
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
if(once) {
System.out.println(user.getSchema());
once = false;
}
//System.out.println(LogicalTypes.fromSchema(user.getSchema()));
System.out.println(user);
}
}
// Deserialize users from disk
System.out.println(&quot;//AFTER&quot;);
Schema schema2 = new Parser().parse(new File(&quot;./userdate_modified_string.avsc&quot;));
final GenericData genericData = new GenericData();
genericData.addLogicalTypeConversion(new MyStringTimestampConversion());
datumReader = new MyReader&lt;GenericRecord&gt;(schema2, schema2, genericData);
user = null;
try (DataFileReader&lt;GenericRecord&gt; dataFileReader = new DataFileReader&lt;GenericRecord&gt;(file1, datumReader)) {
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
System.out.println(user);
}
}
}
public static class MyReader&lt;G extends IndexedRecord&gt; extends GenericDatumReader {
public MyReader() {
super();
}
public MyReader(Schema writer, Schema reader, GenericData data) {
super(writer, reader, data);
}
@Override
protected Object read(Object old, Schema expected, ResolvingDecoder in) throws IOException {
Object datum = this.readWithoutConversion(old, expected, in);
LogicalType logicalType = expected.getLogicalType();
if (logicalType != null) {
Conversion&lt;?&gt; conversion = this.getData().getConversionFor(logicalType);
if (conversion != null) {
return this.convert(datum, expected, logicalType, conversion);
}
}
return datum;
}
}
public static class MyStringTimestampConversion extends Conversion&lt;String&gt; {
public MyStringTimestampConversion() {
super();
}
@Override
public Class&lt;String&gt; getConvertedType() {
return String.class;
}
@Override
public String getLogicalTypeName() {
// &quot;timestamp-millis&quot;;
return UtcDateTimeToTimestampMilisLogicalType.CONVERT_LONG_TYPE_NAME;
}
@Override
public String fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
return (new DateTime(millisFromEpoch)).toString();
//return &quot;123456L&quot;;
}
@Override
public Long toLong(String value, Schema schema, LogicalType type) { //https://stackoverflow.com/questions/22681348/joda-datetime-to-unix-datetime
//DateTimeFormatter dtf = DateTimeFormatter.ofPattern(&quot;uuuu-MM-dd&#39;T&#39;HH:mm:ss.SSSSSS&#39;Z&#39;&quot;);//https://stackoverflow.com/questions/8405087/what-is-this-date-format-2011-08-12t201746-384z
DateTime dateTime = DateTime.parse(value);
long epochMilli = dateTime.toDate().toInstant().toEpochMilli();
return epochMilli;
}
}
}

LogicalType

public class UtcDateTimeToTimestampMilisLogicalType extends LogicalType {
//The key to use as a reference to the type
public static final String CONVERT_LONG_TYPE_NAME = &quot;utc-to-epoch-millis&quot;;
public UtcDateTimeToTimestampMilisLogicalType() {
super(CONVERT_LONG_TYPE_NAME);
}
@Override
public void validate(Schema schema) {
super.validate(schema);
if (schema.getType() != Schema.Type.LONG) {
throw new IllegalArgumentException(
&quot;Logical type &#39;utc-to-epoch-millis&#39; must be backed by bytes&quot;);
}
}
}

Schema

{
&quot;namespace&quot;: &quot;example.avro.modified.string&quot;,
&quot;type&quot;: &quot;record&quot;,
&quot;name&quot;: &quot;UserDate&quot;,
&quot;fields&quot;: [
{
&quot;name&quot;: &quot;timestamplong&quot;,
&quot;type&quot;: 
{
&quot;type&quot;: &quot;long&quot;,
&quot;logicalType&quot;: &quot;utc-to-epoch-millis&quot;
}
},
{
&quot;name&quot;: &quot;timestampstring&quot;,
&quot;type&quot;: &quot;string&quot;
}
]
}

Result

{&quot;type&quot;:&quot;record&quot;,&quot;name&quot;:&quot;UserDate&quot;,&quot;namespace&quot;:&quot;example.avro.modified.string&quot;,&quot;fields&quot;:[{&quot;name&quot;:&quot;timestamplong&quot;,&quot;type&quot;:{&quot;type&quot;:&quot;long&quot;,&quot;logicalType&quot;:&quot;utc-to-epoch-millis&quot;}},{&quot;name&quot;:&quot;timestampstring&quot;,&quot;type&quot;:&quot;string&quot;}]}
{&quot;timestamplong&quot;: 1562646705281, &quot;timestampstring&quot;: &quot;2019-07-09T04:31:45.281Z&quot;}
{&quot;timestamplong&quot;: 2, &quot;timestampstring&quot;: &quot;1970-01-01T07:30:00.002+07:30&quot;}
{&quot;timestamplong&quot;: 1601616694713, &quot;timestampstring&quot;: &quot;2020-10-02T13:31:34.713+08:00&quot;}
//AFTER
{&quot;timestamplong&quot;: &quot;2019-07-09T12:31:45.281+08:00&quot;, &quot;timestampstring&quot;: &quot;2019-07-09T04:31:45.281Z&quot;}
{&quot;timestamplong&quot;: &quot;1970-01-01T07:30:00.002+07:30&quot;, &quot;timestampstring&quot;: &quot;1970-01-01T07:30:00.002+07:30&quot;}
{&quot;timestamplong&quot;: &quot;2020-10-02T13:31:34.713+08:00&quot;, &quot;timestampstring&quot;: &quot;2020-10-02T13:31:34.713+08:00&quot;}
Process finished with exit code 0

huangapple
  • 本文由 发表于 2020年10月1日 16:04:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/64151310.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定