英文:
How to cast nested struct in Java Spark (Unsupported NullType)
问题
以下是翻译好的内容:
我正在将 MongoDB 数据加载到 Hive 表中,并尝试在使用 saveAsTable 时解决不支持的 NullType 问题。
样本数据架构
root
|-- level1: struct (nullable = true)
| |-- level2: struct (nullable = true)
| | |-- level3_1: null (nullable = true)
| | |-- level3_2: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- level4: null (nullable = true)
我尝试了使用 functions.lit 函数,例如
df = df.withColumn("level1.level2.level3_1", functions.lit("null").cast("string"))
.withColumn("level1.level2.level3_2.level4", functions.lit("null").cast("string"))
但结果如下所示
root
|-- level1: struct (nullable = true)
| |-- level2: struct (nullable = true)
| | |-- level3_1: null (nullable = true)
| | |-- level3_2: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- level4: null (nullable = true)
|-- level1.level2.level3_1: string (nullable = false)
|-- level1.level2.level3_2.level4: string (nullable = false)
我还检查了 df.na().fill(),但似乎没有改变架构。
期望的结果是
root
|-- level1: struct (nullable = true)
| |-- level2: struct (nullable = true)
| | |-- level3_1: string (nullable = true)
| | |-- level3_2: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- level4: string (nullable = true)
这样我就可以将加载的 MongoDB 数据保存为 Hive 表了。
有人在处理类似问题时有经验吗?是否可以给予一些建议,如何转换嵌套的 NullType,或者如何在 Java 中处理 NullType。请考虑一个可扩展到更复杂数据的系统/通用解决方案。非常感谢!
英文:
I am loading mongodb to hive table and trying to solve Unsupported NullType when saveAsTable.
Sample data schema
root
|-- level1: struct (nullable = true)
| |-- level2: struct (nullable = true)
| | |-- level3_1: null (nullable = true)
| | |-- level3_2: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- level4: null (nullable = true)
I tried functions.lit like
df = df.withColumn("level1.level2.level3_1", functions.lit("null").cast("string"));
.withColumn("level1.level2.level3_2.level4", functions.lit("null").cast("string"));
but the result is like
root
|-- level1: struct (nullable = true)
| |-- level2: struct (nullable = true)
| | |-- level3_1: null (nullable = true)
| | |-- level3_2: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- level4: null (nullable = true)
|-- level1.level2.level3_1: string (nullable = false)
|-- level1.level2.level3_2.level4: string (nullable = false)
I also checked df.na().fill() but this seems not changing the schema.
The desired result is
root
|-- level1: struct (nullable = true)
| |-- level2: struct (nullable = true)
| | |-- level3_1: string (nullable = true)
| | |-- level3_2: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- level4: string (nullable = true)
and that I can use loaded mongodb data save as table to hive
Does anyone have worked on this and could give me some advise that how to cast nested nulltype or how to deal with nulltype in java. Think of a systematic/general solution that can scale for more complex data.
Many thanks
答案1
得分: 1
StructType schema = createStructType(Arrays.asList(
createStructField("level1", createStructType(Arrays.asList(
createStructField("level2", createStructType(Arrays.asList(
createStructField("level3_1", StringType, true),
createStructField("level3_2", createArrayType(createStructType(Arrays.asList(
createStructField("level4", StringType, true)))), true)
)), true))), true)));
// Replace new ArrayList<>() to your dataset.
Dataset<Row> df = ss.createDataFrame(new ArrayList<>(), schema);
df.printSchema();
root
|-- level1: struct (nullable = true)
| |-- level2: struct (nullable = true)
| | |-- level3_1: string (nullable = true)
| | |-- level3_2: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- level4: string (nullable = true)
EDIT:
I added a more intuitive example here to deliver my thought. I hope it helps you.
@Test
public void test() {
SparkSession ss = SparkSession.builder().master("local").appName("test").getOrCreate();
// Step1) read your MongoDB data. (I added NullType field 'level' manually for explanation.)
// https://docs.mongodb.com/spark-connector/master/python/read-from-mongodb/
Dataset<Row> data = ss.read().json("test.json").withColumn("level", lit(null));
data.printSchema();
StructType schema = createStructType(Arrays.asList(
createStructField("_id", LongType, true),
createStructField("level", StringType, true)));
// Step2) create newData using the schema you defined.
Dataset<Row> newData = ss.createDataFrame(data.collectAsList(), schema);
newData.printSchema();
// Step3) load newData to Hive
}
英文:
One idea is a creating schema with StringType and read data with the schema.
StructType schema = createStructType(Arrays.asList(
createStructField("level1", createStructType(Arrays.asList(
createStructField("level2", createStructType(Arrays.asList(
createStructField("level3_1", StringType, true),
createStructField("level3_2", createArrayType(createStructType(Arrays.asList(
createStructField("level4", StringType, true)))), true)
)), true))), true)));
// Replace new ArrayList<>() to your dataset.
Dataset<Row> df = ss.createDataFrame(new ArrayList<>(), schema);
df.printSchema();
root
|-- level1: struct (nullable = true)
| |-- level2: struct (nullable = true)
| | |-- level3_1: string (nullable = true)
| | |-- level3_2: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- level4: string (nullable = true)
<br>
EDIT:
I added more intuitive example here to deliver my thought. I hope it helps you.
@Test
public void test() {
SparkSession ss = SparkSession.builder().master("local").appName("test").getOrCreate();
// Step1) read your mongoDB data. (I added NullType field 'level' manually for explaination.
// https://docs.mongodb.com/spark-connector/master/python/read-from-mongodb/
Dataset<Row> data = ss.read().json("test.json").withColumn("level", lit(null));
data.printSchema();
StructType schema = createStructType(Arrays.asList(
createStructField("_id", LongType, true),
createStructField("level", StringType, true)));
// Step2) create newData using schema you defined.
Dataset<Row> newData = ss.createDataFrame(data.collectAsList(), schema);
newData.printSchema();
// Step3) load newData to Hive
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论