英文:
Apache Flink - How to load a csv DataSet with a date data in it?
问题
// 我的代码如下,返回一个错误:"不支持将类型 'java.util.Date' 用于CSV输入格式"。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple3<Date, Double, Integer>> csvInput = env.readCsvFile("/home/work/Desktop/Test2.csv")
.ignoreFirstLine()
.types(Date.class, Double.class, Integer.class);
英文:
//My code below returns an error "The type 'java.util.Date' is not supported for the CSV input format".
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<org.apache.flink.api.java.tuple.Tuple3<Date ,Double,Integer>> csvInput = env.readCsvFile("/home/work/Desktop/Test2.csv")
.ignoreFirstLine()
.types(Date.class, Double.class, Integer.class);
答案1
得分: 0
public class TaxiRide implements Comparable<TaxiRide>, Serializable {
private static final transient DateTimeFormatter timeFormatter =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC();
public long rideId;
public boolean isStart;
public DateTime startTime;
public DateTime endTime;
public float startLon;
public float startLat;
public float endLon;
public float endLat;
public short passengerCnt;
public long taxiId;
public long driverId;
public TaxiRide() {
this.startTime = new DateTime();
this.endTime = new DateTime();
}
public TaxiRide(long rideId, boolean isStart, DateTime startTime, DateTime endTime,
float startLon, float startLat, float endLon, float endLat,
short passengerCnt, long taxiId, long driverId) {
this.rideId = rideId;
this.isStart = isStart;
this.startTime = startTime;
this.endTime = endTime;
this.startLon = startLon;
this.startLat = startLat;
this.endLon = endLon;
this.endLat = endLat;
this.passengerCnt = passengerCnt;
this.taxiId = taxiId;
this.driverId = driverId;
}
public static TaxiRide fromString(String line) {
String[] tokens = line.split(",");
if (tokens.length != 11) {
throw new RuntimeException("Invalid record: " + line);
}
TaxiRide ride = new TaxiRide();
try {
ride.rideId = Long.parseLong(tokens[0]);
switch (tokens[1]) {
case "START":
ride.isStart = true;
ride.startTime = DateTime.parse(tokens[2], timeFormatter);
ride.endTime = DateTime.parse(tokens[3], timeFormatter);
break;
case "END":
ride.isStart = false;
ride.endTime = DateTime.parse(tokens[2], timeFormatter);
ride.startTime = DateTime.parse(tokens[3], timeFormatter);
break;
default:
throw new RuntimeException("Invalid record: " + line);
}
ride.startLon = tokens[4].length() > 0 ? Float.parseFloat(tokens[4]) : 0.0f;
ride.startLat = tokens[5].length() > 0 ? Float.parseFloat(tokens[5]) : 0.0f;
ride.endLon = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) : 0.0f;
ride.endLat = tokens[7].length() > 0 ? Float.parseFloat(tokens[7]) : 0.0f;
ride.passengerCnt = Short.parseShort(tokens[8]);
ride.taxiId = Long.parseLong(tokens[9]);
ride.driverId = Long.parseLong(tokens[10]);
} catch (NumberFormatException nfe) {
throw new RuntimeException("Invalid record: " + line, nfe);
}
return ride;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(rideId).append(",");
sb.append(isStart ? "START" : "END").append(",");
sb.append(startTime.toString(timeFormatter)).append(",");
sb.append(endTime.toString(timeFormatter)).append(",");
sb.append(startLon).append(",");
sb.append(startLat).append(",");
sb.append(endLon).append(",");
sb.append(endLat).append(",");
sb.append(passengerCnt).append(",");
sb.append(taxiId).append(",");
sb.append(driverId);
return sb.toString();
}
public int compareTo(TaxiRide other) {
if (other == null) {
return 1;
}
int compareTimes = Long.compare(this.getEventTime(), other.getEventTime());
if (compareTimes == 0) {
if (this.isStart == other.isStart) {
return 0;
} else {
if (this.isStart) {
return -1;
} else {
return 1;
}
}
} else {
return compareTimes;
}
}
public boolean equals(Object other) {
return other instanceof TaxiRide &&
this.rideId == ((TaxiRide) other).rideId;
}
public int hashCode() {
return (int) this.rideId;
}
public long getEventTime() {
if (isStart) {
return startTime.getMillis();
} else {
return endTime.getMillis();
}
}
public double getEuclideanDistance(double longitude, double latitude) {
if (this.isStart) {
return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.startLon, this.startLat);
} else {
return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.endLon, this.endLat);
}
}
}
英文:
create a source function that transform the string to date. Here is one example from the FLink training TaxiRideSource. and here is my example which is very similar:
public class TaxiRide implements Comparable<TaxiRide>, Serializable {
private static final transient DateTimeFormatter timeFormatter =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC();
public long rideId;
public boolean isStart;
public DateTime startTime;
public DateTime endTime;
public float startLon;
public float startLat;
public float endLon;
public float endLat;
public short passengerCnt;
public long taxiId;
public long driverId;
public TaxiRide() {
this.startTime = new DateTime();
this.endTime = new DateTime();
}
public TaxiRide(long rideId, boolean isStart, DateTime startTime, DateTime endTime,
float startLon, float startLat, float endLon, float endLat,
short passengerCnt, long taxiId, long driverId) {
this.rideId = rideId;
this.isStart = isStart;
this.startTime = startTime;
this.endTime = endTime;
this.startLon = startLon;
this.startLat = startLat;
this.endLon = endLon;
this.endLat = endLat;
this.passengerCnt = passengerCnt;
this.taxiId = taxiId;
this.driverId = driverId;
}
public static TaxiRide fromString(String line) {
String[] tokens = line.split(",");
if (tokens.length != 11) {
throw new RuntimeException("Invalid record: " + line);
}
TaxiRide ride = new TaxiRide();
try {
ride.rideId = Long.parseLong(tokens[0]);
switch (tokens[1]) {
case "START":
ride.isStart = true;
ride.startTime = DateTime.parse(tokens[2], timeFormatter);
ride.endTime = DateTime.parse(tokens[3], timeFormatter);
break;
case "END":
ride.isStart = false;
ride.endTime = DateTime.parse(tokens[2], timeFormatter);
ride.startTime = DateTime.parse(tokens[3], timeFormatter);
break;
default:
throw new RuntimeException("Invalid record: " + line);
}
ride.startLon = tokens[4].length() > 0 ? Float.parseFloat(tokens[4]) : 0.0f;
ride.startLat = tokens[5].length() > 0 ? Float.parseFloat(tokens[5]) : 0.0f;
ride.endLon = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) : 0.0f;
ride.endLat = tokens[7].length() > 0 ? Float.parseFloat(tokens[7]) : 0.0f;
ride.passengerCnt = Short.parseShort(tokens[8]);
ride.taxiId = Long.parseLong(tokens[9]);
ride.driverId = Long.parseLong(tokens[10]);
} catch (NumberFormatException nfe) {
throw new RuntimeException("Invalid record: " + line, nfe);
}
return ride;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(rideId).append(",");
sb.append(isStart ? "START" : "END").append(",");
sb.append(startTime.toString(timeFormatter)).append(",");
sb.append(endTime.toString(timeFormatter)).append(",");
sb.append(startLon).append(",");
sb.append(startLat).append(",");
sb.append(endLon).append(",");
sb.append(endLat).append(",");
sb.append(passengerCnt).append(",");
sb.append(taxiId).append(",");
sb.append(driverId);
return sb.toString();
}
// sort by timestamp,
// putting START events before END events if they have the same timestamp
public int compareTo(TaxiRide other) {
if (other == null) {
return 1;
}
int compareTimes = Long.compare(this.getEventTime(), other.getEventTime());
if (compareTimes == 0) {
if (this.isStart == other.isStart) {
return 0;
} else {
if (this.isStart) {
return -1;
} else {
return 1;
}
}
} else {
return compareTimes;
}
}
@Override
public boolean equals(Object other) {
return other instanceof TaxiRide &&
this.rideId == ((TaxiRide) other).rideId;
}
@Override
public int hashCode() {
return (int) this.rideId;
}
public long getEventTime() {
if (isStart) {
return startTime.getMillis();
} else {
return endTime.getMillis();
}
}
public double getEuclideanDistance(double longitude, double latitude) {
if (this.isStart) {
return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.startLon, this.startLat);
} else {
return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.endLon, this.endLat);
}
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论