Flink Collector在收集带有Object类的Map的Collection对象时出现问题。

huangapple go评论66阅读模式
英文:

Flink Collector issue when Collection Object with Map of Object class

问题

我面临一个问题当我从 Flink 的 flatMap 收集器中收集对象时我没有正确地获取到收集的值我得到的是对象引用而不是实际的值

dataStream.filter(new FilterFunction<GenericRecord>() {
      @Override
      public boolean filter(GenericRecord record) throws Exception {
        if (record.get("user_id") != null) {
          return true;
        }
        return false;
      }
    }).flatMap(new ProfileEventAggregateFlatMapFunction(aggConfig))
        .map(new MapFunction<ProfileEventAggregateEmittedTuple, String>() {
          @Override
          public String map(
              ProfileEventAggregateEmittedTuple profileEventAggregateEmittedTupleNew)
              throws Exception {
            String res=null;
            try {
              ObjectMapper mapper = new ObjectMapper();
              mapper.setVisibility(PropertyAccessor.FIELD, Visibility.ANY);
              res= mapper.writeValueAsString(profileEventAggregateEmittedTupleNew);
            } catch (Exception e) {
              e.printStackTrace();
            }
            return res;
          }
        }).print();

public class ProfileEventAggregateFlatMapFunction extends
    RichFlatMapFunction<GenericRecord, ProfileEventAggregateEmittedTuple> {

  private final ProfileEventAggregateTupleEmitter aggregator;
  ObjectMapper mapper = ObjectMapperPool.getInstance().get();

  public ProfileEventAggregateFlatMapFunction(String config) throws IOException {
    this.aggregator = new ProfileEventAggregateTupleEmitter(config);
  }

  @Override
  public void flatMap(GenericRecord event,
      Collector<ProfileEventAggregateEmittedTuple> collector) throws Exception {
    try {

      List<ProfileEventAggregateEmittedTuple> aggregateTuples = aggregator.runAggregates(event);

      for (ProfileEventAggregateEmittedTuple tuple : aggregateTuples) {

        collector.collect(tuple);
      }
}}

调试结果:
在收集器中收集的 tuple

tuple = {ProfileEventAggregateEmittedTuple@7880} 
 profileType = "userprofile";
 key = "1152473";
 businessType = "keyless";
 name = "consumer";
 aggregates = {ArrayList@7886}  size = 1
  0 = {ProfileEventAggregate@7888} "geo_id {geo_id=1} {keyless_select_destination_cnt=1, total_estimated_distance=12.5}"
   entityType = "geo_id";
   dimension = {LinkedHashMap@7891}  size = 1
    "geo_id" -> {Integer@7897} 1
     key = "geo_id";
     value = {Integer@7897} 1
   metrics = {LinkedHashMap@7892}  size = 2
    "keyless_select_destination_cnt" -> {Long@7773} 1
     key = "keyless_select_destination_cnt";
     value = {Long@7773} 1
    "total_estimated_distance" -> {Double@7904} 12.5
     key = "total_estimated_distance";
     value = {Double@7904} 12.5

这是在我的 map 函数 .map(new MapFunction<ProfileEventAggregateEmittedTuple, String>()

profileEventAggregateEmittedTuple = {ProfileEventAggregateEmittedTuple@7935} 
 profileType = "userprofile";
 key = "1152473";
 businessType = "keyless";
 name = "consumer";
 aggregates = {GenericData$Array@7948}  size = 1
  0 = {ProfileEventAggregate@7950} "geo_id {geo_id=java.lang.Object@863dce2} {keyless_select_destination_cnt=java.lang.Object@7cdb4bfc, total_estimated_distance=java.lang.Object@52e81f57}"
   entityType = "geo_id";
   dimension = {HashMap@7952}  size = 1
    "geo_id" -> {Object@7957} 
     key = "geo_id";
     value = {Object@7957} 
      Class has no fields
   metrics = {HashMap@7953}  size = 2
    "keyless_select_destination_cnt" -> {Object@7962} 
     key = "keyless_select_destination_cnt";
     value = {Object@7962} 
      Class has no fields
    "total_estimated_distance" -> {Object@7963} 

请帮我理解发生了什么,为什么我没有得到正确的数据。

public class ProfileEventAggregateEmittedTuple implements Cloneable, Serializable {
  private String profileType;
  private String key;
  private String businessType;
  private String name;
  private List<ProfileEventAggregate> aggregates = new ArrayList<ProfileEventAggregate>();
  private long startTime;
  private long endTime;

  // 省略 getter 和 setter

  @Override
  public ProfileEventAggregateEmittedTuple clone() {
    ProfileEventAggregateEmittedTuple clone = new ProfileEventAggregateEmittedTuple();

    clone.setProfileType(this.profileType);
    clone.setKey(this.key);
    clone.setBusinessType(this.businessType);
    clone.setName(this.name);

    for (ProfileEventAggregate aggregate : this.aggregates) {
      clone.addAggregate(aggregate.clone());
    }
    return clone;
  }
}

public class ProfileEventAggregate  implements Cloneable, Serializable {

  private String entityType;
  private Map<String, Object> dimension = new LinkedHashMap<String, Object>();
  private Map<String, Object> metrics = new LinkedHashMap<String, Object>();

  // 省略 getter 和 setter

  @Override
  public ProfileEventAggregate clone()  {
    ProfileEventAggregate clone = new ProfileEventAggregate();

    clone.setEntityType(this.entityType);
    clone.getDimension().putAll(this.getDimension());
    clone.getMetrics().putAll(this.metrics);
    return clone;
  }
}

<details>
<summary>英文:</summary>
I am facing a issue where when i collecting object from flink flatmap collector than i am not getting value collected correctly. I am getting object reference and its not giving me actual value. 

dataStream.filter(new FilterFunction<GenericRecord>() {
@Override
public boolean filter(GenericRecord record) throws Exception {
if (record.get("user_id") != null) {
return true;
}
return false;
}
}).flatMap(new ProfileEventAggregateFlatMapFunction(aggConfig))
.map(new MapFunction<ProfileEventAggregateEmittedTuple, String>() {
@Override
public String map(
ProfileEventAggregateEmittedTuple profileEventAggregateEmittedTupleNew)
throws Exception {
String res=null;
try {
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.FIELD, Visibility.ANY);
res= mapper.writeValueAsString(profileEventAggregateEmittedTupleNew);
} catch (Exception e) {
e.printStackTrace();
}
return res;
}
}).print();

public class ProfileEventAggregateFlatMapFunction extends
RichFlatMapFunction<GenericRecord, ProfileEventAggregateEmittedTuple> {

private final ProfileEventAggregateTupleEmitter aggregator;
ObjectMapper mapper = ObjectMapperPool.getInstance().get();

public ProfileEventAggregateFlatMapFunction(String config) throws IOException {
this.aggregator = new ProfileEventAggregateTupleEmitter(config);
}

@Override
public void flatMap(GenericRecord event,
Collector<ProfileEventAggregateEmittedTuple> collector) throws Exception {
try {

  List&lt;ProfileEventAggregateEmittedTuple&gt; aggregateTuples = aggregator.runAggregates(event);
for (ProfileEventAggregateEmittedTuple tuple : aggregateTuples) {
collector.collect(tuple);
}

}}


Debug Results:
tuple that i am collecting in collector

tuple = {ProfileEventAggregateEmittedTuple@7880}
profileType = "userprofile"
key = "1152473"
businessType = "keyless"
name = "consumer"
aggregates = {ArrayList@7886} size = 1
0 = {ProfileEventAggregate@7888} "geo_id {geo_id=1} {keyless_select_destination_cnt=1, total_estimated_distance=12.5}"
entityType = "geo_id"
dimension = {LinkedHashMap@7891} size = 1
"geo_id" -> {Integer@7897} 1
key = "geo_id"
value = {Integer@7897} 1
metrics = {LinkedHashMap@7892} size = 2
"keyless_select_destination_cnt" -> {Long@7773} 1
key = "keyless_select_destination_cnt"
value = {Long@7773} 1
"total_estimated_distance" -> {Double@7904} 12.5
key = "total_estimated_distance"
value = {Double@7904} 12.5

 
This i get in my map function .map(new MapFunction&lt;ProfileEventAggregateEmittedTuple, String&gt;()

profileEventAggregateEmittedTuple = {ProfileEventAggregateEmittedTuple@7935}
profileType = "userprofile"
key = "1152473"
businessType = "keyless"
name = "consumer"
aggregates = {GenericData$Array@7948} size = 1
0 = {ProfileEventAggregate@7950} "geo_id {geo_id=java.lang.Object@863dce2} {keyless_select_destination_cnt=java.lang.Object@7cdb4bfc, total_estimated_distance=java.lang.Object@52e81f57}"
entityType = "geo_id"
dimension = {HashMap@7952} size = 1
"geo_id" -> {Object@7957}
key = "geo_id"
value = {Object@7957}
Class has no fields
metrics = {HashMap@7953} size = 2
"keyless_select_destination_cnt" -> {Object@7962}
key = "keyless_select_destination_cnt"
value = {Object@7962}
Class has no fields
"total_estimated_distance" -> {Object@7963}

Please help me to understand what is happening why i am not getting correct data.
public class ProfileEventAggregateEmittedTuple implements Cloneable, Serializable {
private String profileType;
private String key;
private String businessType;
private String name;
private List&lt;ProfileEventAggregate&gt; aggregates = new ArrayList&lt;ProfileEventAggregate&gt;();
private long startTime;
private long endTime;
public String getProfileType() {
return profileType;
}
public void setProfileType(String profileType) {
this.profileType = profileType;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getBusinessType() {
return businessType;
}
public void setBusinessType(String businessType) {
this.businessType = businessType;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public List&lt;ProfileEventAggregate&gt; getAggregates() {
return aggregates;
}
public void addAggregate(ProfileEventAggregate aggregate) {
this.aggregates.add(aggregate);
}
public void setAggregates(List&lt;ProfileEventAggregate&gt; aggregates) {
this.aggregates = aggregates;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public long getEndTime() {
return endTime;
}
public void setEndTime(long endTime) {
this.endTime = endTime;
}
@Override
public ProfileEventAggregateEmittedTuple clone() {
ProfileEventAggregateEmittedTuple clone = new ProfileEventAggregateEmittedTuple();
clone.setProfileType(this.profileType);
clone.setKey(this.key);
clone.setBusinessType(this.businessType);
clone.setName(this.name);
for (ProfileEventAggregate aggregate : this.aggregates) {
clone.addAggregate(aggregate.clone());
}
return clone;
}
public class ProfileEventAggregate  implements Cloneable, Serializable {
private String entityType;
private Map&lt;String, Object&gt; dimension =new LinkedHashMap&lt;String, Object&gt;();
private Map&lt;String, Object&gt; metrics = new LinkedHashMap&lt;String, Object&gt;();
public Map&lt;String, Object&gt; getDimension() {
return dimension;
}
public void setDimension(Map&lt;String, Object&gt; dimension) {
this.dimension.putAll(dimension);
}
public void addDimension(String dimensionKey, Object dimensionValue) {
this.dimension.put(dimensionKey, dimensionValue);
}
public Map&lt;String, Object&gt; getMetrics() {
return metrics;
}
public void addMetric(String metricKey, Object metricValue) {
this.metrics.put(metricKey, metricValue);
}
public void setMetrics(Map&lt;String, Object&gt; metrics) {
this.metrics.putAll(metrics);
}
public String getEntityType() {
return entityType;
}
public void setEntityType(String entityType) {
this.entityType = entityType;
}
@Override
public ProfileEventAggregate clone()  {
ProfileEventAggregate clone = new ProfileEventAggregate();
clone.setEntityType(this.entityType);
clone.getDimension().putAll(this.getDimension());
clone.getMetrics().putAll(this.metrics);
return clone;
}
</details>
# 答案1
**得分**: 1
当您不启用`enableObjectReuse`时,对象将使用您配置的序列化器进行复制(似乎是Avro?)。
在您的情况下,您使用了无法推断出合理模式的`Map<String, Object>`。
最简单的解决方法是启用`enableObjectReuse`。否则,请确保您的序列化器与您的数据匹配。因此,您可以添加一个单元测试,在测试中使用`AvroSerializer#copy`,并确保您的POJO在使用Avro reflect时已经得到了[适当的注释][2],或者更好的是采用先定义Avro模式,然后生成您的Java POJO,使用具体的Avro。
让我们讨论一些替代方案:
* 使用`GenericRecord`。不要将其转换为Java类型,直接访问`GenericRecord`。当完整记录是灵活的(例如,您的作业接受任何输入并将其写入S3)时,通常只有这种方法可行。
*  去规范化模式。不要使用`class Event { int id; Map<String, Object> data; }`这样的结构,而是使用`class EventInformation { int id; String predicate; Object value; }`。您需要将所有信息进行分组以进行处理。但是,您将在Avro中遇到相同的类型问题。
* 使用宽模式。根据前面的方法,如果不同的谓词事先是已知的,那么您可以使用这些来构建宽模式的`class Event { int id; Long predicate1; Integer predicate2; ... String predicateN; }`,其中所有条目都是可为空的,其中大多数的确是`null`。编码`null`的成本非常低。
* 放弃Avro。Avro是完全类型化的。您可能希望使用更具动态性的东西。Protobuf具有[Any][4]来支持任意子消息。
* 使用Kryo。Kryo可以序列化任意对象树,但代价是速度较慢且开销较大。
如果要编写数据,您还需要考虑一种解决方案,其中添加了类型信息以进行适当的反序列化。有关示例,请查看此[JSON问题][5]。但是还有其他实现方式。
[1]: https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html
[2]: https://avro.apache.org/docs/current/api/java/org/apache/avro/reflect/package-summary.html
[3]: https://avro.apache.org/docs/1.10.0/gettingstartedjava.html
[4]: https://developers.google.com/protocol-buffers/docs/proto3#any
[5]: https://stackoverflow.com/a/30386694/10299342
<details>
<summary>英文:</summary>
When you don&#39;t [`enableObjectReuse`][1], objects are copied with your configured serializer (seems to be Avro?).
In your case, you use Map&lt;String, Object&gt; where you cannot infer a plausible schema. 
The easiest fix would be to `enableObjectReuse`. Else make sure your serializer matches your data. So you could add a unit test where you use `AvroSerializer#copy` and make sure your POJO is [properly annotated][2] if you want to stick with Avro reflect or even better go with a schema first approach, where you [generate your Java POJO with a Avro schema][3] and use specific Avro.
Let&#39;s discuss some alternatives:
* Use `GenericRecord`. Instead of converting it to a Java type, directly access `GenericRecord`. This is usually the only way when the full record is flexible (e.g. your job takes any input and writes it out to S3).
* Denormalize schema. Instead of having some `class Event { int id; Map&lt;String, Object&gt; data; }` you would use `class EventInformation { int id; String predicate; Object value; }`. You would need to group all information for processing. However, you will run into the same type issues with Avro.
* Use wide-schema. Looking at the previous approach, if the different predicates are known beforehand, then you can use that to craft a wide-schema `class Event { int id; Long predicate1; Integer predicate2; ... String predicateN; }` where all oft he entries are nullable and most of them are indeed `null`. To encode `null` is very cheap.
* Ditch Avro. Avro is fully typed. You may want to use something more dynamic. Protobuf has [Any][4] to support arbitrary submessages.
* Use Kryo. Kryo can serialize arbitrary object trees at the cost of being slower and having more overhead.
If you want to write the data, you also need to think about a solution where the type information is added for proper deserialization. For an example, check out this [JSON question][5]. But there are more ways to implement it.
[1]: https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html
[2]: https://avro.apache.org/docs/current/api/java/org/apache/avro/reflect/package-summary.html
[3]: https://avro.apache.org/docs/1.10.0/gettingstartedjava.html
[4]: https://developers.google.com/protocol-buffers/docs/proto3#any
[5]: https://stackoverflow.com/a/30386694/10299342
</details>

huangapple
  • 本文由 发表于 2020年9月25日 20:06:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/64063833.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定