在Java Spark中更新结构类型列中的值。

huangapple go评论81阅读模式
英文:

Update value in struct type column in java spark

问题

我希望能够在嵌套数据集中更新值。为此,我在Spark中创建了一个嵌套数据集,其架构结构如下:

root

 |-- field_a: 字符串(可为空 = false)

 |-- field_b: 结构体(可为空 = true)

 |    |-- field_d: 结构体(可为空 = false)
          |-- field_not_to_update: 字符串(可为空 = true)

 |        |-- field_to_update: 字符串(可为空 = false)
 |   field_c: 字符串(可为空 = false)

现在,我想要在数据集中更新 field_to_update 的值。我尝试过:

aFooData.withColumn("field_b.field_d.field_to_update", lit("updated_val"))

也尝试过:

aFooData.foreach(new ClassWithForEachFunction());

其中 ClassWithForEachFunction 实现了 ForEachFunction<Row>,并具有方法 public void call(Row aRow) 来更新 field_to_update 属性。我还尝试过使用 lambda,但是它抛出了“Task not serializable”异常,因此不得不选择较长的过程。

迄今为止,这些方法都没有取得成果,使用 foreach 得到的数据集和第二种情况中带有名称 field_b.field_d.field_to_update 的新列是相同的。有关此事的其他建议吗?

英文:

I want capability to update value in nested dataset. For this I have a created as nested Dataset in Spark. It has below schema structure:-

root

 |-- field_a: string (nullable = false)

 |-- field_b: struct (nullable = true)

 |    |-- field_d: struct(nullable = false)
          |-- field_not_to_update: string(nullable = true)

 |        |-- field_to_update: string(nullable = false)
 |   field_c: string (nullable = false)

Now I wanted to update value in field_to_update in the dataset. I have tried

aFooData.withColumn(&quot;field_b.field_d.field_to_update&quot;, lit(&quot;updated_val&quot;)

Also tried,

aFooData.foreach(new ClassWithForEachFunction());

where ClassWithForEachFunction implements ForEachFunction&lt;Row&gt; and has method public void call(Row aRow) to update field_to_update attribute. Tried same with lamda as well but it was throwing Task not serializable exception so has to go for long process.

None of them are fruitful so far and I am getting same Dataset with foreach and new column with name field_b.field_d.field_to_update in second case. Any other suggestions for same?

答案1

得分: 1

请检查以下代码。

  • 从结构中提取字段
  • 更新所需字段。
  • 重新构建结构。
df.show(false)
+-------+--------------+
|field_a|field_b       |
+-------+--------------+
|parentA|[srinivas, 20]|
|parentB|[ravi, 30]    |
+-------+--------------+

df.printSchema
root
 |-- field_a: string (nullable = true)
 |-- field_b: struct (nullable = true)
 |    |-- field_to_update: string (nullable = true)
 |    |-- field_not_to_update: integer (nullable = true)

df.select("field_a","field_b.field_to_update","field_b.field_not_to_update")
  .withColumn("field_to_update", lit("updated_val"))
  .select(col("field_a"), struct(col("field_to_update"), col("field_not_to_update")).as("field_b"))
  .show(false)
+-------+-----------------+
|field_a|field_b          |
+-------+-----------------+
|parentA|[updated_val, 20]|
|parentB|[updated_val, 30]|
+-------+-----------------+
英文:

Please check below code.

  • Extract the fields from struct
  • Update the required filed.
  • Reconstruct the struct back.
scala&gt; df.show(false)
+-------+--------------+
|field_a|field_b       |
+-------+--------------+
|parentA|[srinivas, 20]|
|parentB|[ravi, 30]    |
+-------+--------------+


scala&gt; df.printSchema
root
 |-- field_a: string (nullable = true)
 |-- field_b: struct (nullable = true)
 |    |-- field_to_update: string (nullable = true)
 |    |-- field_not_to_update: integer (nullable = true)


scala&gt; df.select(&quot;field_a&quot;,&quot;field_b.field_to_update&quot;,&quot;field_b.field_not_to_update&quot;).withColumn(&quot;field_to_update&quot;,lit(&quot;updated_val&quot;)).select(col(&quot;field_a&quot;),struct(col(&quot;field_to_update&quot;),col(&quot;field_not_to_update&quot;)).as(&quot;field_b&quot;)).show(false)
+-------+-----------------+
|field_a|field_b          |
+-------+-----------------+
|parentA|[updated_val, 20]|
|parentB|[updated_val, 30]|
+-------+-----------------+

答案2

得分: 0

你需要重新构建整个模式,你可以使用以下语句在单个实例中完成。

import org.apache.spark.sql.functions.{lit, struct};

df.select(
  df.col("field_a"), // 保留不变的字段
  struct( // 第一级字段必须重新构建
     lit("updated_value").alias("field_to_update"), // 转换或设置新元素
     df.col("fb.field_not_to_update").alias("field_not_to_update") // 保留不变的子元素并保留姓氏
  ).alias("field_b"), // 我们还必须保留姓名
  df.col("field_c")
);

在 Java 中的语法与此相同。

英文:

You will have to reconstruct the whole schema, you can do it in a single instance with the following sentence.

import org.apache.spark.sql.functions.{lit, struct}

df.select(
  df(&quot;field_a&quot;), // keep the fields that don&#39;t change
  struct( // the field at first level must be reconstructed
     lit(&quot;updated_value&quot;) as &quot;field_to_update&quot;, // transform or set the new elements
     df(&quot;fb.field_not_to_update&quot;) as &quot;field_not_to_update&quot; // keep the unchanged sub elements and keep the last name
  ) as &quot;field_b&quot;, // and we have to keep the name
  df(&quot;field_c&quot;)
)

The syntax will be the same in java

答案3

得分: 0

更加“类似于Java”的方法是将数据帧转换为(有类型的)数据集,然后使用 [map](https://spark.apache.org/docs/2.4.4/api/java/org/apache/spark/sql/Dataset.html#map-org.apache.spark.api.java.function.MapFunction-org.apache.spark.sql.Encoder-) 调用来更改数据。从Java的角度来看,代码很容易处理。但缺点是你需要为给定的模式编写三个 [Java bean 类](https://www.oracle.com/technetwork/java/javase/documentation/spec-136004.html)。
```lang-java
Dataset<Bean1> ds = df.as(Encoders.bean(Bean1.class));

Dataset<Bean1> updatedDs = ds.map((MapFunction<Bean1, Bean1>) row -> {
    row.getField_b().getField_d().setField_to_update("updated");
    return row;
}, Encoders.bean(Bean1.class));

使用这三个 Bean 类

public static class Bean1 implements Serializable {
    private String field_a;
    private Bean2 field_b;
    private String field_c;

    //getters and setters
}

public static class Bean2 implements Serializable {
    private Bean3 field_d;

    //getter and setter
}

public static class Bean3 implements Serializable {
    private String field_not_to_update;
    private String field_to_update;

    //getters and setters
}

<details>
<summary>英文:</summary>

A more &quot;Java-like&quot; approach would be to convert the dataframe into a (typed) dataset and then use a [map](https://spark.apache.org/docs/2.4.4/api/java/org/apache/spark/sql/Dataset.html#map-org.apache.spark.api.java.function.MapFunction-org.apache.spark.sql.Encoder-) call to change the data. From a Java perspective the code is easy to handle. But the downside is that you would need three [Java bean classes](https://www.oracle.com/technetwork/java/javase/documentation/spec-136004.html) for the given schema.
```lang-java
Dataset&lt;Bean1&gt; ds = df.as(Encoders.bean(Bean1.class));

Dataset&lt;Bean1&gt; updatedDs = ds.map((MapFunction&lt;Bean1, Bean1&gt;) row -&gt; {
    row.getField_b().getField_d().setField_to_update(&quot;updated&quot;);
    return row;
}, Encoders.bean(Bean1.class));

with the three Bean classes

public static class Bean1 implements Serializable {
    private String field_a;
    private Bean2 field_b;
    private String field_c;

    //getters and setters
}

public static class Bean2 implements Serializable {
    private Bean3 field_d;

    //getter and setter
}

public static class Bean3 implements Serializable {
    private String field_not_to_update;
    private String field_to_update;

    //getters and setters
}

答案4

得分: 0

方法withField是在Spark 3.1.1中引入的。根据文档:

> 通过名称在StructType中添加/替换字段的表达式。

以下Scala代码片段显示了这一点:

import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.functions

val structSchema = new StructType()
.add(new StructField("field_a", StringType, true))
.add(new StructField("field_b", 
new StructType().add(new StructField("field_not_to_update", StringType, true))
.add(new StructField("field_to_update", StringType, true)), 
true))
.add(new StructField("field_c", StringType, true))

val df = Seq("""{"field_a": "some-value-a", "field_b": {"field_not_to_update": "field_not_to_update_b", "field_to_update": "field_to_update_b"}, "field_c": "some-value-c"}""").toDF
val processedDf = df.withColumn(
          "processed_payload",
          functions.from_json(functions.col("value"), structSchema)
        )
val finalDf = processedDf.select("processed_payload.*")
println("输入数据帧:")
finalDf.show(truncate=false)
val updatedDf = finalDf.withColumn("field_b", functions.col("field_b").withField("field_to_update", functions.lit("UPDATED_VALUE")))
println("更新后的数据帧:")
updatedDf.show(truncate=false)

以上代码片段的输出如下:

输入数据帧:
+------------+------------------------------------------+------------+
|field_a     |field_b                                   |field_c     |
+------------+------------------------------------------+------------+
|some-value-a|{field_not_to_update_b, field_to_update_b}|some-value-c|
+------------+------------------------------------------+------------+

更新后的数据帧:
+------------+--------------------------------------+------------+
|field_a     |field_b                               |field_c     |
+------------+--------------------------------------+------------+
|some-value-a|{field_not_to_update_b, UPDATED_VALUE}|some-value-c|
+------------+--------------------------------------+------------+
英文:

The method withField was introduced in Spark 3.1.1. As per the documentation: -

> An expression that adds/replaces field in StructType by name.

The following code snippet (in scala) shows this: -

import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.functions

val structSchema = new StructType()
.add(new StructField(&quot;field_a&quot;, StringType, true))
.add(new StructField(&quot;field_b&quot;, 
new StructType().add(new StructField(&quot;field_not_to_update&quot;, StringType, true))
.add(new StructField(&quot;field_to_update&quot;, StringType, true)), 
true))
.add(new StructField(&quot;field_c&quot;, StringType, true))

val df = Seq(&quot;&quot;&quot;{&quot;field_a&quot;: &quot;some-value-a&quot;, &quot;field_b&quot;: {&quot;field_not_to_update&quot;: &quot;field_not_to_update_b&quot;, &quot;field_to_update&quot;: &quot;field_to_update_b&quot;}, &quot;field_c&quot;: &quot;some-value-c&quot;}&quot;&quot;&quot;).toDF
val processedDf = df.withColumn(
          &quot;processed_payload&quot;,
          functions.from_json(functions.col(&quot;value&quot;), structSchema)
        )
val finalDf = processedDf.select(&quot;processed_payload.*&quot;)
println(&quot;Input dataframe: &quot;)
finalDf.show(truncate=false)
val updatedDf = finalDf.withColumn(&quot;field_b&quot;, functions.col(&quot;field_b&quot;).withField(&quot;field_to_update&quot;, functions.lit(&quot;UPDATED_VALUE&quot;)))
println(&quot;Updated dataframe: &quot;)
updatedDf.show(truncate=false)

The output of the above code snippet is as follows: -

Input dataframe: 
+------------+------------------------------------------+------------+
|field_a     |field_b                                   |field_c     |
+------------+------------------------------------------+------------+
|some-value-a|{field_not_to_update_b, field_to_update_b}|some-value-c|
+------------+------------------------------------------+------------+
Updated dataframe: 
+------------+--------------------------------------+------------+
|field_a     |field_b                               |field_c     |
+------------+--------------------------------------+------------+
|some-value-a|{field_not_to_update_b, UPDATED_VALUE}|some-value-c|
+------------+--------------------------------------+------------+

huangapple
  • 本文由 发表于 2020年5月5日 18:48:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/61611328.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定