Apache Flink – 如何加载包含日期数据的 CSV DataSet?

huangapple go评论73阅读模式
英文:

Apache Flink - How to load a csv DataSet with a date data in it?

问题

// 我的代码如下,返回一个错误:"不支持将类型 'java.util.Date' 用于CSV输入格式"。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSource<Tuple3<Date, Double, Integer>> csvInput = env.readCsvFile("/home/work/Desktop/Test2.csv")
    .ignoreFirstLine()
    .types(Date.class, Double.class, Integer.class);
英文:

//My code below returns an error "The type 'java.util.Date' is not supported for the CSV input format".

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	DataSource&lt;org.apache.flink.api.java.tuple.Tuple3&lt;Date ,Double,Integer&gt;&gt; csvInput = env.readCsvFile(&quot;/home/work/Desktop/Test2.csv&quot;)
			.ignoreFirstLine()
			.types(Date.class, Double.class, Integer.class);

Apache Flink – 如何加载包含日期数据的 CSV DataSet?

答案1

得分: 0

public class TaxiRide implements Comparable<TaxiRide>, Serializable {

    private static final transient DateTimeFormatter timeFormatter =
        DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC();
    public long rideId;
    public boolean isStart;
    public DateTime startTime;
    public DateTime endTime;
    public float startLon;
    public float startLat;
    public float endLon;
    public float endLat;
    public short passengerCnt;
    public long taxiId;
    public long driverId;

    public TaxiRide() {
        this.startTime = new DateTime();
        this.endTime = new DateTime();
    }

    public TaxiRide(long rideId, boolean isStart, DateTime startTime, DateTime endTime,
                    float startLon, float startLat, float endLon, float endLat,
                    short passengerCnt, long taxiId, long driverId) {

        this.rideId = rideId;
        this.isStart = isStart;
        this.startTime = startTime;
        this.endTime = endTime;
        this.startLon = startLon;
        this.startLat = startLat;
        this.endLon = endLon;
        this.endLat = endLat;
        this.passengerCnt = passengerCnt;
        this.taxiId = taxiId;
        this.driverId = driverId;
    }

    public static TaxiRide fromString(String line) {

        String[] tokens = line.split(",");
        if (tokens.length != 11) {
            throw new RuntimeException("Invalid record: " + line);
        }

        TaxiRide ride = new TaxiRide();

        try {
            ride.rideId = Long.parseLong(tokens[0]);

            switch (tokens[1]) {
                case "START":
                    ride.isStart = true;
                    ride.startTime = DateTime.parse(tokens[2], timeFormatter);
                    ride.endTime = DateTime.parse(tokens[3], timeFormatter);
                    break;
                case "END":
                    ride.isStart = false;
                    ride.endTime = DateTime.parse(tokens[2], timeFormatter);
                    ride.startTime = DateTime.parse(tokens[3], timeFormatter);
                    break;
                default:
                    throw new RuntimeException("Invalid record: " + line);
            }

            ride.startLon = tokens[4].length() > 0 ? Float.parseFloat(tokens[4]) : 0.0f;
            ride.startLat = tokens[5].length() > 0 ? Float.parseFloat(tokens[5]) : 0.0f;
            ride.endLon = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) : 0.0f;
            ride.endLat = tokens[7].length() > 0 ? Float.parseFloat(tokens[7]) : 0.0f;
            ride.passengerCnt = Short.parseShort(tokens[8]);
            ride.taxiId = Long.parseLong(tokens[9]);
            ride.driverId = Long.parseLong(tokens[10]);

        } catch (NumberFormatException nfe) {
            throw new RuntimeException("Invalid record: " + line, nfe);
        }

        return ride;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append(rideId).append(",");
        sb.append(isStart ? "START" : "END").append(",");
        sb.append(startTime.toString(timeFormatter)).append(",");
        sb.append(endTime.toString(timeFormatter)).append(",");
        sb.append(startLon).append(",");
        sb.append(startLat).append(",");
        sb.append(endLon).append(",");
        sb.append(endLat).append(",");
        sb.append(passengerCnt).append(",");
        sb.append(taxiId).append(",");
        sb.append(driverId);

        return sb.toString();
    }

    public int compareTo(TaxiRide other) {
        if (other == null) {
            return 1;
        }
        int compareTimes = Long.compare(this.getEventTime(), other.getEventTime());
        if (compareTimes == 0) {
            if (this.isStart == other.isStart) {
                return 0;
            } else {
                if (this.isStart) {
                    return -1;
                } else {
                    return 1;
                }
            }
        } else {
            return compareTimes;
        }
    }

    public boolean equals(Object other) {
        return other instanceof TaxiRide &&
            this.rideId == ((TaxiRide) other).rideId;
    }

    public int hashCode() {
        return (int) this.rideId;
    }

    public long getEventTime() {
        if (isStart) {
            return startTime.getMillis();
        } else {
            return endTime.getMillis();
        }
    }

    public double getEuclideanDistance(double longitude, double latitude) {
        if (this.isStart) {
            return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.startLon, this.startLat);
        } else {
            return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.endLon, this.endLat);
        }
    }
}
英文:

create a source function that transform the string to date. Here is one example from the FLink training TaxiRideSource. and here is my example which is very similar:

public class TaxiRide implements Comparable&lt;TaxiRide&gt;, Serializable {
private static final transient DateTimeFormatter timeFormatter =
DateTimeFormat.forPattern(&quot;yyyy-MM-dd HH:mm:ss&quot;).withLocale(Locale.US).withZoneUTC();
public long rideId;
public boolean isStart;
public DateTime startTime;
public DateTime endTime;
public float startLon;
public float startLat;
public float endLon;
public float endLat;
public short passengerCnt;
public long taxiId;
public long driverId;
public TaxiRide() {
this.startTime = new DateTime();
this.endTime = new DateTime();
}
public TaxiRide(long rideId, boolean isStart, DateTime startTime, DateTime endTime,
float startLon, float startLat, float endLon, float endLat,
short passengerCnt, long taxiId, long driverId) {
this.rideId = rideId;
this.isStart = isStart;
this.startTime = startTime;
this.endTime = endTime;
this.startLon = startLon;
this.startLat = startLat;
this.endLon = endLon;
this.endLat = endLat;
this.passengerCnt = passengerCnt;
this.taxiId = taxiId;
this.driverId = driverId;
}
public static TaxiRide fromString(String line) {
String[] tokens = line.split(&quot;,&quot;);
if (tokens.length != 11) {
throw new RuntimeException(&quot;Invalid record: &quot; + line);
}
TaxiRide ride = new TaxiRide();
try {
ride.rideId = Long.parseLong(tokens[0]);
switch (tokens[1]) {
case &quot;START&quot;:
ride.isStart = true;
ride.startTime = DateTime.parse(tokens[2], timeFormatter);
ride.endTime = DateTime.parse(tokens[3], timeFormatter);
break;
case &quot;END&quot;:
ride.isStart = false;
ride.endTime = DateTime.parse(tokens[2], timeFormatter);
ride.startTime = DateTime.parse(tokens[3], timeFormatter);
break;
default:
throw new RuntimeException(&quot;Invalid record: &quot; + line);
}
ride.startLon = tokens[4].length() &gt; 0 ? Float.parseFloat(tokens[4]) : 0.0f;
ride.startLat = tokens[5].length() &gt; 0 ? Float.parseFloat(tokens[5]) : 0.0f;
ride.endLon = tokens[6].length() &gt; 0 ? Float.parseFloat(tokens[6]) : 0.0f;
ride.endLat = tokens[7].length() &gt; 0 ? Float.parseFloat(tokens[7]) : 0.0f;
ride.passengerCnt = Short.parseShort(tokens[8]);
ride.taxiId = Long.parseLong(tokens[9]);
ride.driverId = Long.parseLong(tokens[10]);
} catch (NumberFormatException nfe) {
throw new RuntimeException(&quot;Invalid record: &quot; + line, nfe);
}
return ride;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(rideId).append(&quot;,&quot;);
sb.append(isStart ? &quot;START&quot; : &quot;END&quot;).append(&quot;,&quot;);
sb.append(startTime.toString(timeFormatter)).append(&quot;,&quot;);
sb.append(endTime.toString(timeFormatter)).append(&quot;,&quot;);
sb.append(startLon).append(&quot;,&quot;);
sb.append(startLat).append(&quot;,&quot;);
sb.append(endLon).append(&quot;,&quot;);
sb.append(endLat).append(&quot;,&quot;);
sb.append(passengerCnt).append(&quot;,&quot;);
sb.append(taxiId).append(&quot;,&quot;);
sb.append(driverId);
return sb.toString();
}
// sort by timestamp,
// putting START events before END events if they have the same timestamp
public int compareTo(TaxiRide other) {
if (other == null) {
return 1;
}
int compareTimes = Long.compare(this.getEventTime(), other.getEventTime());
if (compareTimes == 0) {
if (this.isStart == other.isStart) {
return 0;
} else {
if (this.isStart) {
return -1;
} else {
return 1;
}
}
} else {
return compareTimes;
}
}
@Override
public boolean equals(Object other) {
return other instanceof TaxiRide &amp;&amp;
this.rideId == ((TaxiRide) other).rideId;
}
@Override
public int hashCode() {
return (int) this.rideId;
}
public long getEventTime() {
if (isStart) {
return startTime.getMillis();
} else {
return endTime.getMillis();
}
}
public double getEuclideanDistance(double longitude, double latitude) {
if (this.isStart) {
return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.startLon, this.startLat);
} else {
return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.endLon, this.endLat);
}
}
}

huangapple
  • 本文由 发表于 2020年6月29日 11:00:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/62630569.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定