Flink Java API – 将 Pojo 类型转换为 Tuple 数据类型

huangapple go评论107阅读模式
英文:

Flink Java API - Pojo Type to Tuple Datatype

问题

以下是您提供的内容的翻译部分:

我正在使用JAVA Flink API创建一个小型实用程序,以学习其功能。我试图读取CSV文件并将其打印出来,为数据的结构我已经开发了一个POJO类。当我执行代码时,我看到的不是正确的值。(整数值被替换为零,字符串的空值也被替换为零)。我该如何映射属性的数据类型?

我的主要类:

  1. package org.karthick.flinkLab;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import javax.xml.crypto.Data;
  5. public class CSVFileRead {
  6. public static void main(String[] args) throws Exception {
  7. System.out.println("--使用Flink的数据集API读取CSV文件--");
  8. ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
  9. DataSet<DataModel> csvInput = execEnv.readCsvFile("C:\\Flink\\Data\\IndividualDetails.csv")
  10. .pojoType(DataModel.class);
  11. csvInput.print();
  12. }
  13. }

我的POJO类(DataModel.class)

  1. package org.karthick.flinkLab;
  2. import org.apache.flink.api.java.tuple.Tuple;
  3. import org.apache.flink.api.java.tuple.Tuple12;
  4. import java.io.Serializable;
  5. import java.util.Date;
  6. public class DataModel<T extends Tuple>
  7. extends Tuple12<Integer,String,Date,Integer,String,String,String,String,String,String,Date,String>
  8. implements Serializable
  9. {
  10. public Integer id;
  11. public String government_id;
  12. public Date diagnosed_date;
  13. public Integer age;
  14. public String detected_city;
  15. public String detected_district;
  16. public String detected_state;
  17. public String nationality;
  18. public String current_status;
  19. public Date status_change_date;
  20. public String notes;
  21. public DataModel() {};
  22. public String getNotes() {
  23. return notes;
  24. }
  25. public Date getStatus_change_date() {
  26. return status_change_date;
  27. }
  28. public String getCurrent_status() {
  29. return current_status;
  30. }
  31. public String getNationality() {
  32. return nationality;
  33. }
  34. public String getDetected_state() {
  35. return detected_state;
  36. }
  37. public String getDetected_district() {
  38. return detected_district;
  39. }
  40. public String getDetected_city() {
  41. return detected_city;
  42. }
  43. public String gender ;
  44. public Date getDiagnosed_date() {
  45. return diagnosed_date;
  46. }
  47. public String getGender() {
  48. return gender;
  49. }
  50. public Integer getAge() {
  51. return age;
  52. }
  53. public Integer getId() {
  54. return id;
  55. }
  56. public void setId(Integer id) {
  57. this.id = id;
  58. }
  59. public String getGovernment_id() {
  60. return government_id;
  61. }
  62. public void setGovernment_id(String government_id) {
  63. this.government_id = government_id;
  64. }
  65. }

当我执行主方法时,我看不到正确的值。示例结果:

  1. (0,,Tue May 19 16:50:38 IST 2020,0,,,,,,,Tue May 19 16:50:38 IST 2020,)

而我期望看到类似于:

  1. (2777,AP,Tue May 19 16:50:38 IST 2020,0,A,B,C,D,E,F,Tue May 19 16:50:38 IST 2020,G)

这里可能缺少什么?

英文:

I am creating a small utility on JAVA flink API to learn the functionalities. I am trying to read csv file and just print it and I have developed a POJO class for the structure of the data. When I executed the code, I dont see the right values.(Integers values are replaced with zeros and null values for String. How do I map the datatype for the attributes

My Main Class:

  1. package org.karthick.flinkLab;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import javax.xml.crypto.Data;
  5. public class CSVFileRead {
  6. public static void main(String[] args) throws Exception {
  7. System.out.println(&quot;--CSV File Reader using Flink&#39;s Data Set API--&quot;);
  8. ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
  9. DataSet&lt;DataModel&gt; csvInput = execEnv.readCsvFile(&quot;C:\\Flink\\Data\\IndividualDetails.csv&quot;)
  10. .pojoType(DataModel.class);
  11. csvInput.print();
  12. }
  13. }

My Pojo class (DataModel.class)

  1. package org.karthick.flinkLab;
  2. import org.apache.flink.api.java.tuple.Tuple;
  3. import org.apache.flink.api.java.tuple.Tuple12;
  4. import java.io.Serializable;
  5. import java.util.Date;
  6. public class DataModel&lt;T extends Tuple&gt;
  7. extends Tuple12&lt;Integer,String,Date,Integer,String,String,String,String,String,String,Date,String&gt;
  8. implements Serializable
  9. {
  10. public Integer id;
  11. public String government_id;
  12. public Date diagnosed_date;
  13. public Integer age;
  14. public String detected_city;
  15. public String detected_district;
  16. public String detected_state;
  17. public String nationality;
  18. public String current_status;
  19. public Date status_change_date;
  20. public String notes;
  21. public DataModel() {};
  22. public String getNotes() {
  23. return notes;
  24. }
  25. public Date getStatus_change_date() {
  26. return status_change_date;
  27. }
  28. public String getCurrent_status() {
  29. return current_status;
  30. }
  31. public String getNationality() {
  32. return nationality;
  33. }
  34. public String getDetected_state() {
  35. return detected_state;
  36. }
  37. public String getDetected_district() {
  38. return detected_district;
  39. }
  40. public String getDetected_city() {
  41. return detected_city;
  42. }
  43. public String gender ;
  44. public Date getDiagnosed_date() {
  45. return diagnosed_date;
  46. }
  47. public String getGender() {
  48. return gender;
  49. }
  50. public Integer getAge() {
  51. return age;
  52. }
  53. public Integer getId() {
  54. return id;
  55. }
  56. public void setId(Integer id) {
  57. this.id = id;
  58. }
  59. public String getGovernment_id() {
  60. return government_id;
  61. }
  62. public void setGovernment_id(String government_id) {
  63. this.government_id = government_id;
  64. }
  65. }

When I executed the main method, I dont see the proper values. Sample result

  1. (0,,Tue May 19 16:50:38 IST 2020,0,,,,,,,Tue May 19 16:50:38 IST 2020,)

where as I expect something like

  1. (2777,AP,Tue May 19 16:50:38 IST 2020,0,A,B,C,D,E,F,Tue May 19 16:50:38 IST 2020,G)

What could be missing here?

答案1

得分: 1

你缺少从CSV到POJO的列映射。添加映射即可生效。列名称的映射必须遵循以下两个规则:

  • 列名称应与POJO中的名称完全相同。
  • 映射中列的顺序应与CSV文件中的顺序完全相同。

你可以按以下方式定义映射:

  1. DataSet<DataModel> csvInput = execEnv.readCsvFile("C:\\Flink\\Data\\IndividualDetails.csv")
  2. .pojoType(DataModel.class, "id", "age",......);

虽然本应该抛出错误,但实际并未抛出。这可能是一个bug

英文:

You are missing the column mapping from CSV to POJO. Adding the mapping will work. The mapping of the column names must follow the following two rules:

  • The column names should be exactly the same names as in POJO.
  • The order of the columns in the mapping should be exactly the same as in the CSV file.

You can define the mapping as follows:

  1. DataSet&lt;DataModel&gt; csvInput = execEnv.readCsvFile(&quot;C:\\Flink\\Data\\IndividualDetails.csv&quot;)
  2. .pojoType(DataModel.class, &quot;id&quot;, &quot;age&quot;,.........);

It should have thrown error but it hasn't. It could be a bug

huangapple
  • 本文由 发表于 2020年5月19日 19:30:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/61889970.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定