如何将 Spark Java 中 Row 中的结构字段转换为 Avro 记录。

huangapple go评论70阅读模式
英文:

How to convert a struct field in a Row to an avro record in Spark Java

问题

MyStruct convertToAvro(Object avroField) {
    GenericRecord genericRecord = (GenericRecord) avroField; // Cast the Avro field to GenericRecord
    MyStruct myStruct = new MyStruct();

    // Assuming MyStruct fields are named "field1", "field2", etc.
    myStruct.setField1((String) genericRecord.get("field1"));
    myStruct.setField2((int) genericRecord.get("field2"));
    // Repeat for other fields

    return myStruct;
}

请注意,上述代码假定你的 Avro 记录是一个 GenericRecord,并且 MyStruct 中的字段和 Avro 记录中的字段名称是一致的。你需要根据你实际的数据结构进行调整。

英文:

I have a use case where I want to convert a struct field to an Avro record. The struct field originally maps to an Avro type. The input data is avro files and the struct field corresponds to a field in the input avro records.

Below is what I want to achieve in pseudocode.

DataSet<Row> data = loadInput(); // data is of form (foo, bar, myStruct) from avro data. 

// do some joins to add more data
data = doJoins(data); // now data is of form (a, b, myStruct)

// transform DataSet<Row> to DataSet<MyType> 
DataSet<MyType> myData = data.map(row -> myUDF(row), encoderOfMyType);

// method `myUDF` definition
MyType myUDF(Row row) {
  String a = row.getAs("a");
  String b = row.getAs("b");

  // MyStruct is the generated avro class that corresponds to field myStruct 
  MyStruct myStruct = convertToAvro(row.getAs("myStruct"));

  return generateMyType(a, b, myStruct);
}

My question is: how can I implement the convertToAvro method in above pseudocode?

答案1

得分: 3

文档中:

> Avro包提供了函数to_avro,将列编码为Avro格式的二进制,并提供from_avro()函数,将Avro二进制数据解码为列。这两个函数都将一列转换为另一列,输入/输出SQL数据类型可以是复杂类型或原始类型。

函数to_avro可替代convertToAvro方法:

import static org.apache.spark.sql.avro.functions.*;

//将结构列的Avro模式放入字符串中
//在我的示例中,我假设结构由两个字段组成:
//一个长字段(s1)和一个字符串字段(s2)
String schema = "...";

data = ...

//添加一个包含结构的二进制列
Dataset<Row> data2 = df.withColumn("to_avro", to_avro(data.col("myStruct"), schema));
df2.printSchema();
df2.show(false);

输出:

root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- mystruct: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: string (nullable = true)
 |-- to_avro: binary (nullable = true)

+----+----+----------+----------------------------+
|a   |b   |mystruct  |to_avro                     |
+----+----+----------+----------------------------+
|foo1|bar1|[1, one]  |[00 02 00 06 6F 6E 65]      |
|foo2|bar2|[3, three]|[00 06 00 0A 74 68 72 65 65]|
+----+----+----------+----------------------------+

要将Avro列转换回来,可以使用函数from_avro

Dataset<Row> data3 = data2.withColumn("from_avro", from_avro(data2.col("to_avro"), schema));
df3.printSchema();
df3.show();

输出:

root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- mystruct: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: string (nullable = true)
 |-- to_avro: binary (nullable = true)
 |-- from_avro: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: string (nullable = true)

+----+----+----------+--------------------+----------+
|   a|   b|  mystruct|             to_avro| from_avro|
+----+----+----------+--------------------+----------+
|foo1|bar1|  [1, one]|[00 02 00 06 6F 6...|  [1, one]|
|foo2|bar2|[3, three]|[00 06 00 0A 74 6...|[3, three]|
+----+----+----------+--------------------+----------+

关于UDF的一点说明:在问题中,您在UDF内执行了到Avro格式的转换。我更喜欢在UDF中仅包含实际的业务逻辑,并将格式转换放在外部。这将逻辑和格式转换分开。如果需要,您可以在创建Avro列后删除原始列mystruct

英文:

From the documentation:
> The Avro package provides function to_avro to encode a column as binary in Avro format, and from_avro() to decode Avro binary data into a column. Both functions transform one column to another column, and the input/output SQL data type can be a complex type or a primitive type.

The function to_avro acts as replacement for the convertToAvro method:

import static org.apache.spark.sql.avro.functions.*;

//put the avro schema of the struct column into a string
//in my example I assume that the struct consists of a two fields:
//a long field (s1) and a string field (s2)
String schema = &quot;{\&quot;type\&quot;:\&quot;record\&quot;,\&quot;name\&quot;:\&quot;mystruct\&quot;,&quot; +
        &quot;\&quot;namespace\&quot;:\&quot;topLevelRecord\&quot;,\&quot;fields\&quot;:[{\&quot;name\&quot;:\&quot;s1\&quot;,&quot; +
        &quot;\&quot;type\&quot;:[\&quot;long\&quot;,\&quot;null\&quot;]},{\&quot;name\&quot;:\&quot;s2\&quot;,\&quot;type\&quot;:&quot; +
        &quot;[\&quot;string\&quot;,\&quot;null\&quot;]}]},\&quot;null\&quot;]}&quot;;

data = ...

//add an additional column containing the struct as binary column
Dataset&lt;Row&gt; data2 = df.withColumn(&quot;to_avro&quot;, to_avro(data.col(&quot;myStruct&quot;), schema));
df2.printSchema();
df2.show(false);

prints

root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- mystruct: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: string (nullable = true)
 |-- to_avro: binary (nullable = true)

+----+----+----------+----------------------------+
|a   |b   |mystruct  |to_avro                     |
+----+----+----------+----------------------------+
|foo1|bar1|[1, one]  |[00 02 00 06 6F 6E 65]      |
|foo2|bar2|[3, three]|[00 06 00 0A 74 68 72 65 65]|
+----+----+----------+----------------------------+

To convert the avro column back, the function from_avro can be used:

Dataset&lt;Row&gt; data3 = data2.withColumn(&quot;from_avro&quot;, from_avro(data2.col(&quot;to_avro&quot;), schema));
df3.printSchema();
df3.show();

Output:

root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- mystruct: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: string (nullable = true)
 |-- to_avro: binary (nullable = true)
 |-- from_avro: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: string (nullable = true)

+----+----+----------+--------------------+----------+
|   a|   b|  mystruct|             to_avro| from_avro|
+----+----+----------+--------------------+----------+
|foo1|bar1|  [1, one]|[00 02 00 06 6F 6...|  [1, one]|
|foo2|bar2|[3, three]|[00 06 00 0A 74 6...|[3, three]|
+----+----+----------+--------------------+----------+

A word about the udf: in the question you performed the transformation to the avro format within the udf. I would prefer to include only the actual business logic in the udf and keep the format transformation outside. This separates the logic and the format transformation. If necessary, you can drop the original column mystruct after creating the avro column.

huangapple
  • 本文由 发表于 2020年9月17日 01:30:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/63925187.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定