如何在Java Spark中转换嵌套的结构体(不支持的NullType)

huangapple go评论95阅读模式
英文:

How to cast nested struct in Java Spark (Unsupported NullType)

问题

以下是翻译好的内容:

我正在将 MongoDB 数据加载到 Hive 表中,并尝试在使用 saveAsTable 时解决不支持的 NullType 问题。
样本数据架构

root
 |-- level1: struct (nullable = true)
 |    |-- level2: struct (nullable = true)
 |    |    |-- level3_1: null (nullable = true)
 |    |    |-- level3_2: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- level4: null (nullable = true)

我尝试了使用 functions.lit 函数,例如

df = df.withColumn("level1.level2.level3_1", functions.lit("null").cast("string"))
       .withColumn("level1.level2.level3_2.level4", functions.lit("null").cast("string"))

但结果如下所示

root
 |-- level1: struct (nullable = true)
 |    |-- level2: struct (nullable = true)
 |    |    |-- level3_1: null (nullable = true)
 |    |    |-- level3_2: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- level4: null (nullable = true)
 |-- level1.level2.level3_1: string (nullable = false)
 |-- level1.level2.level3_2.level4: string (nullable = false)

我还检查了 df.na().fill(),但似乎没有改变架构。

期望的结果是

root
 |-- level1: struct (nullable = true)
 |    |-- level2: struct (nullable = true)
 |    |    |-- level3_1: string (nullable = true)
 |    |    |-- level3_2: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- level4: string (nullable = true)

这样我就可以将加载的 MongoDB 数据保存为 Hive 表了。

有人在处理类似问题时有经验吗?是否可以给予一些建议,如何转换嵌套的 NullType,或者如何在 Java 中处理 NullType。请考虑一个可扩展到更复杂数据的系统/通用解决方案。非常感谢!

英文:

I am loading mongodb to hive table and trying to solve Unsupported NullType when saveAsTable.
Sample data schema

root
 |-- level1: struct (nullable = true)
 |    |-- level2: struct (nullable = true)
 |    |    |-- level3_1: null (nullable = true)
 |    |    |-- level3_2: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- level4: null (nullable = true)

I tried functions.lit like

df = df.withColumn("level1.level2.level3_1", functions.lit("null").cast("string"));
       .withColumn("level1.level2.level3_2.level4", functions.lit("null").cast("string"));

but the result is like

root
 |-- level1: struct (nullable = true)
 |    |-- level2: struct (nullable = true)
 |    |    |-- level3_1: null (nullable = true)
 |    |    |-- level3_2: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- level4: null (nullable = true)
 |-- level1.level2.level3_1: string (nullable = false)
 |-- level1.level2.level3_2.level4: string (nullable = false)

I also checked df.na().fill() but this seems not changing the schema.

The desired result is

root
 |-- level1: struct (nullable = true)
 |    |-- level2: struct (nullable = true)
 |    |    |-- level3_1: string (nullable = true)
 |    |    |-- level3_2: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- level4: string (nullable = true)

and that I can use loaded mongodb data save as table to hive

Does anyone have worked on this and could give me some advise that how to cast nested nulltype or how to deal with nulltype in java. Think of a systematic/general solution that can scale for more complex data.
Many thanks

答案1

得分: 1

StructType schema = createStructType(Arrays.asList(
    createStructField("level1", createStructType(Arrays.asList(
        createStructField("level2", createStructType(Arrays.asList(
            createStructField("level3_1", StringType, true),
            createStructField("level3_2", createArrayType(createStructType(Arrays.asList(
                createStructField("level4", StringType, true)))), true)
            )), true))), true)));

// Replace new ArrayList<>() to your dataset.
Dataset<Row> df = ss.createDataFrame(new ArrayList<>(), schema);
df.printSchema();
root
 |-- level1: struct (nullable = true)
 |    |-- level2: struct (nullable = true)
 |    |    |-- level3_1: string (nullable = true)
 |    |    |-- level3_2: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- level4: string (nullable = true)

EDIT:

I added a more intuitive example here to deliver my thought. I hope it helps you.

@Test
public void test() {
    SparkSession ss = SparkSession.builder().master("local").appName("test").getOrCreate();

    // Step1) read your MongoDB data. (I added NullType field 'level' manually for explanation.)
    // https://docs.mongodb.com/spark-connector/master/python/read-from-mongodb/
    Dataset<Row> data = ss.read().json("test.json").withColumn("level", lit(null));
    data.printSchema();

    StructType schema = createStructType(Arrays.asList(
        createStructField("_id", LongType, true),
        createStructField("level", StringType, true)));

    // Step2) create newData using the schema you defined.
    Dataset<Row> newData = ss.createDataFrame(data.collectAsList(), schema);
    newData.printSchema();

    // Step3) load newData to Hive
}
英文:

One idea is a creating schema with StringType and read data with the schema.

StructType schema = createStructType(Arrays.asList(
    createStructField(&quot;level1&quot;, createStructType(Arrays.asList(
        createStructField(&quot;level2&quot;, createStructType(Arrays.asList(
            createStructField(&quot;level3_1&quot;, StringType, true),
            createStructField(&quot;level3_2&quot;, createArrayType(createStructType(Arrays.asList(
                createStructField(&quot;level4&quot;, StringType, true)))), true)
            )), true))), true)));

// Replace new ArrayList&lt;&gt;() to your dataset.
Dataset&lt;Row&gt; df = ss.createDataFrame(new ArrayList&lt;&gt;(), schema);
df.printSchema();
root
 |-- level1: struct (nullable = true)
 |    |-- level2: struct (nullable = true)
 |    |    |-- level3_1: string (nullable = true)
 |    |    |-- level3_2: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- level4: string (nullable = true)

<br>

EDIT:

I added more intuitive example here to deliver my thought. I hope it helps you.

@Test
public void test() {
    SparkSession ss = SparkSession.builder().master(&quot;local&quot;).appName(&quot;test&quot;).getOrCreate();

    // Step1) read your mongoDB data. (I added NullType field &#39;level&#39; manually for explaination.
    // https://docs.mongodb.com/spark-connector/master/python/read-from-mongodb/
    Dataset&lt;Row&gt; data = ss.read().json(&quot;test.json&quot;).withColumn(&quot;level&quot;, lit(null));
    data.printSchema();

    StructType schema = createStructType(Arrays.asList(
        createStructField(&quot;_id&quot;, LongType, true),
        createStructField(&quot;level&quot;, StringType, true)));

    // Step2) create newData using schema you defined. 
    Dataset&lt;Row&gt; newData = ss.createDataFrame(data.collectAsList(), schema);
    newData.printSchema();

    // Step3) load newData to Hive
}

huangapple
  • 本文由 发表于 2020年4月6日 05:18:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/61049642.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定