英文:
Avro logicalType String Date conversion to EPOCH timestamp-millis
问题
我有以下模式
{"name": "timestampstring", "type": [{"type":"string","logicalType":"timestamp-millis"}, "null"]},
我打算向它提供日期,并进行转换将日期转换为毫秒时间戳。
GenericRecord user2 = new GenericData.Record(schema1);
user2.put("timestampstring", "2019-01-26T12:00:40.931");
final GenericData genericData = new GenericData();
genericData.addLogicalTypeConversion(new MyTimestampConversion());
datumReader = new GenericDatumReader<GenericRecord>(schema2, schema2, genericData);
GenericRecord user = null;
try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
System.out.println(user);
}
}
//Conversion code
public static class MyTimestampConversion extends Conversion<Long> {
public MyTimestampConversion() {
}
public Class<Long> getConvertedType() {
return Long.class;
}
public String getLogicalTypeName() {
return "timestamp-millis";
}
public Long fromCharSequence(CharSequence value, Schema schema, LogicalType type) {
return 123L;
}
}
但是这段代码不起作用... 我原以为它会转换为毫秒级时间戳(在上面的示例中我硬编码了123L)。
有什么帮助吗?
英文:
I have below schema
{"name": "timestampstring", "type": [{"type":"string","logicalType":"timestamp-millis"}, "null"]},
I intend to supply date to it, and have conversion convert the date to epoch mili.
GenericRecord user2 = new GenericData.Record(schema1);
user2.put("timestampstring", "2019-01-26T12:00:40.931");
final GenericData genericData = new GenericData();
genericData.addLogicalTypeConversion(new MyTimestampConversion());
datumReader = new GenericDatumReader<GenericRecord>(schema2, schema2, genericData);
GenericRecord user = null;
try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
System.out.println(user);
}
}
//Conversion code
public static class MyTimestampConversion extends Conversion<Long> {
public MyTimestampConversion() {
}
public Class<Long> getConvertedType() {
return Long.class;
}
public String getLogicalTypeName() {
return "timestamp-millis";
}
public Long fromCharSequence(CharSequence value, Schema schema, LogicalType type) {
return 123L;
}
}
But this code doesnt work... I was expecting it to convert to timestamp milis (i hardcoded 123L in above sample).
Any help?
答案1
得分: 0
Referring back to https://stackoverflow.com/questions/49034266/how-to-define-a-logicaltype-in-avro-java, I managed to solve this by creating my own logical type. It seems like doing this with "timestamp-millis" logicalType won't work. So I created my own logicalType...
package example;
import org.apache.avro.Conversion;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.*;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.ResolvingDecoder;
import org.joda.time.DateTime;
import java.io.File;
import java.io.IOException;
public class AvroWriteDateUtcToEpochMili {
// ... (rest of the code)
}
LogicalType
public class UtcDateTimeToTimestampMilisLogicalType extends LogicalType {
public static final String CONVERT_LONG_TYPE_NAME = "utc-to-epoch-millis";
public UtcDateTimeToTimestampMilisLogicalType() {
super(CONVERT_LONG_TYPE_NAME);
}
@Override
public void validate(Schema schema) {
super.validate(schema);
if (schema.getType() != Schema.Type.LONG) {
throw new IllegalArgumentException("Logical type 'utc-to-epoch-millis' must be backed by bytes");
}
}
}
Schema
{
"namespace": "example.avro.modified.string",
"type": "record",
"name": "UserDate",
"fields": [
{
"name": "timestamplong",
"type": {
"type": "long",
"logicalType": "utc-to-epoch-millis"
}
},
{
"name": "timestampstring",
"type": "string"
}
]
}
Result
{ "type": "record", "name": "UserDate", "namespace": "example.avro.modified.string", "fields": [ { "name": "timestamplong", "type": { "type": "long", "logicalType": "utc-to-epoch-millis" } }, { "name": "timestampstring", "type": "string" } ] }
{ "timestamplong": 1562646705281, "timestampstring": "2019-07-09T04:31:45.281Z" }
{ "timestamplong": 2, "timestampstring": "1970-01-01T07:30:00.002+07:30" }
{ "timestamplong": 1601616694713, "timestampstring": "2020-10-02T13:31:34.713+08:00" }
//AFTER
{ "timestamplong": "2019-07-09T12:31:45.281+08:00", "timestampstring": "2019-07-09T04:31:45.281Z" }
{ "timestamplong": "1970-01-01T07:30:00.002+07:30", "timestampstring": "1970-01-01T07:30:00.002+07:30" }
{ "timestamplong": "2020-10-02T13:31:34.713+08:00", "timestampstring": "2020-10-02T13:31:34.713+08:00" }
Process finished with exit code 0
英文:
Referring back to https://stackoverflow.com/questions/49034266/how-to-define-a-logicaltype-in-avro-java, I managed to solve this by creating my own logical type. It seems like doing this with "timestamp-millis" logicalType wont work. So I created my own logicalType...
package example;
import org.apache.avro.Conversion;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.*;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.ResolvingDecoder;
import org.joda.time.DateTime;
import java.io.File;
import java.io.IOException;
public class AvroWriteDateUtcToEpochMili {
public static void main(String[] args) throws IOException {
Boolean isRegisterNewLogicalType = true;
Boolean isWrite = true;
if(isRegisterNewLogicalType) {
LogicalTypes.register(UtcDateTimeToTimestampMilisLogicalType.CONVERT_LONG_TYPE_NAME, new LogicalTypes.LogicalTypeFactory() {
private final LogicalType convertLongLogicalType = new UtcDateTimeToTimestampMilisLogicalType();
@Override
public LogicalType fromSchema(Schema schema) {
return convertLongLogicalType;
}
});
}
Schema schema1 = new Parser().parse(new File("./userdate_modified_string.avsc"));
// Serialize user1 and user2 to disk
File file1 = new File("users.avro");
if(isWrite) {
GenericRecord user1 = new GenericData.Record(schema1);
user1.put("timestamplong", "2019-07-09T04:31:45.281Z");
//user1.put("timestamplong", 1L);
user1.put("timestampstring", "2019-07-09T04:31:45.281Z");
GenericRecord user2 = new GenericData.Record(schema1);
//user2.put("timestamplong", "2018-07-09T04:30:45.781Z");
user2.put("timestamplong", 2L);
user2.put("timestampstring", (new DateTime(2L)).toString());
//user2.put("timestampstring", new Timestamp(new Date("2018-01-26").getTime()));
var currentDateTime = DateTime.now();
GenericRecord user3 = new GenericData.Record(schema1);
user3.put("timestamplong", currentDateTime.toString());
//user3.put("timestamplong", 3L);
user3.put("timestampstring", currentDateTime.toString());
final GenericData genericData2 = new GenericData();
genericData2.addLogicalTypeConversion(new MyStringTimestampConversion());
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema1, genericData2);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema1, file1);
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();
}
// Deserialize users from disk
Boolean once = true;
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema1);
GenericRecord user = null;
try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
if(once) {
System.out.println(user.getSchema());
once = false;
}
//System.out.println(LogicalTypes.fromSchema(user.getSchema()));
System.out.println(user);
}
}
// Deserialize users from disk
System.out.println("//AFTER");
Schema schema2 = new Parser().parse(new File("./userdate_modified_string.avsc"));
final GenericData genericData = new GenericData();
genericData.addLogicalTypeConversion(new MyStringTimestampConversion());
datumReader = new MyReader<GenericRecord>(schema2, schema2, genericData);
user = null;
try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
System.out.println(user);
}
}
}
public static class MyReader<G extends IndexedRecord> extends GenericDatumReader {
public MyReader() {
super();
}
public MyReader(Schema writer, Schema reader, GenericData data) {
super(writer, reader, data);
}
@Override
protected Object read(Object old, Schema expected, ResolvingDecoder in) throws IOException {
Object datum = this.readWithoutConversion(old, expected, in);
LogicalType logicalType = expected.getLogicalType();
if (logicalType != null) {
Conversion<?> conversion = this.getData().getConversionFor(logicalType);
if (conversion != null) {
return this.convert(datum, expected, logicalType, conversion);
}
}
return datum;
}
}
public static class MyStringTimestampConversion extends Conversion<String> {
public MyStringTimestampConversion() {
super();
}
@Override
public Class<String> getConvertedType() {
return String.class;
}
@Override
public String getLogicalTypeName() {
// "timestamp-millis";
return UtcDateTimeToTimestampMilisLogicalType.CONVERT_LONG_TYPE_NAME;
}
@Override
public String fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
return (new DateTime(millisFromEpoch)).toString();
//return "123456L";
}
@Override
public Long toLong(String value, Schema schema, LogicalType type) { //https://stackoverflow.com/questions/22681348/joda-datetime-to-unix-datetime
//DateTimeFormatter dtf = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSSSS'Z'");//https://stackoverflow.com/questions/8405087/what-is-this-date-format-2011-08-12t201746-384z
DateTime dateTime = DateTime.parse(value);
long epochMilli = dateTime.toDate().toInstant().toEpochMilli();
return epochMilli;
}
}
}
LogicalType
public class UtcDateTimeToTimestampMilisLogicalType extends LogicalType {
//The key to use as a reference to the type
public static final String CONVERT_LONG_TYPE_NAME = "utc-to-epoch-millis";
public UtcDateTimeToTimestampMilisLogicalType() {
super(CONVERT_LONG_TYPE_NAME);
}
@Override
public void validate(Schema schema) {
super.validate(schema);
if (schema.getType() != Schema.Type.LONG) {
throw new IllegalArgumentException(
"Logical type 'utc-to-epoch-millis' must be backed by bytes");
}
}
}
Schema
{
"namespace": "example.avro.modified.string",
"type": "record",
"name": "UserDate",
"fields": [
{
"name": "timestamplong",
"type":
{
"type": "long",
"logicalType": "utc-to-epoch-millis"
}
},
{
"name": "timestampstring",
"type": "string"
}
]
}
Result
{"type":"record","name":"UserDate","namespace":"example.avro.modified.string","fields":[{"name":"timestamplong","type":{"type":"long","logicalType":"utc-to-epoch-millis"}},{"name":"timestampstring","type":"string"}]}
{"timestamplong": 1562646705281, "timestampstring": "2019-07-09T04:31:45.281Z"}
{"timestamplong": 2, "timestampstring": "1970-01-01T07:30:00.002+07:30"}
{"timestamplong": 1601616694713, "timestampstring": "2020-10-02T13:31:34.713+08:00"}
//AFTER
{"timestamplong": "2019-07-09T12:31:45.281+08:00", "timestampstring": "2019-07-09T04:31:45.281Z"}
{"timestamplong": "1970-01-01T07:30:00.002+07:30", "timestampstring": "1970-01-01T07:30:00.002+07:30"}
{"timestamplong": "2020-10-02T13:31:34.713+08:00", "timestampstring": "2020-10-02T13:31:34.713+08:00"}
Process finished with exit code 0
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论