英文:
Exploding Nested Struct In Spark Dataframe having Different Schema
问题
我有一个 JSON,其具有以下模式:
|-- Pool: struct (nullable = true)
| |-- 1: struct (nullable = true)
| | |-- Client: struct (nullable = true)
| | | |-- 1: struct (nullable = true)
| | | | |-- Active: boolean (nullable = true)
| | | | |-- Alias: string (nullable = true)
| | | | |-- Chaddr: string (nullable = true)
| | | |-- 2: struct (nullable = true)
| | | | |-- Active: boolean (nullable = true)
| | | | |-- Alias: string (nullable = true)
| | | | |-- Chaddr: string (nullable = true)
| |-- 2: struct (nullable = true)
| | |-- Alias: string (nullable = true)
| | |-- Chaddr: string (nullable = true)
| | |-- ChaddrMask: string (nullable = true)
| | |-- Client: struct (nullable = true)
| | | |-- 1: struct (nullable = true)
| | | | |-- Active: boolean (nullable = true)
| | | | |-- Alias: string (nullable = true)
| | | | |-- Chaddr: string (nullable = true)
而我试图实现的输出是:
PoolId ClientID Client_Active
1 1 true
1 2 false
2 1 true
此模式随着 JSON 的变化而变化。例如,现在有 2 个 Pool ID,可能会有另一个 JSON 包含 5 个 Pool ID,客户端 ID 也一样。
问题是:
- 我们不能在结构上使用 Explode。
- Pool 无法转换为 Map,因为每次客户端都有不同的客户端 ID,导致每行的模式都不同。
有没有想法如何实现这个?
我尝试了这个链接,将结构转换为 Map 然后进行拆分,但当不同 Pool 中的客户端 ID 不同时它不起作用。
英文:
I have a json which has below schema:
|-- Pool: struct (nullable = true)
| |-- 1: struct (nullable = true)
| | |-- Client: struct (nullable = true)
| | | |-- 1: struct (nullable = true)
| | | | |-- Active: boolean (nullable = true)
| | | | |-- Alias: string (nullable = true)
| | | | |-- Chaddr: string (nullable = true)
| | | |-- 2: struct (nullable = true)
| | | | |-- Active: boolean (nullable = true)
| | | | |-- Alias: string (nullable = true)
| | | | |-- Chaddr: string (nullable = true)
| |-- 2: struct (nullable = true)
| | |-- Alias: string (nullable = true)
| | |-- Chaddr: string (nullable = true)
| | |-- ChaddrMask: string (nullable = true)
| | |-- Client: struct (nullable = true)
| | | |-- 1: struct (nullable = true)
| | | | |-- Active: boolean (nullable = true)
| | | | |-- Alias: string (nullable = true)
| | | | |-- Chaddr: string (nullable = true)
And the output that i am trying to achieve is:
PoolId ClientID Client_Active
1 1 true
1 2 false
2 1 true
This schema keeps on changing with json.Eg for now there are 2 Pool id, there may be another json which will have 5 Pool Id and same is with CLient Id.
The problem with is :
- We cant use Explode on struct.
- Pool cant be converted to Map as each time client has different client ID that leads to different schema for each row.
Any thought how to achieve this?
I have tried this link for converting to Struct to Map and then exploding but it doesn't work when there are different numbers of Client IDs in different Pool.
答案1
得分: 4
以下是翻译好的部分:
从我的角度来看,你只需要定义一个UDF。
这是一个示例:
- 定义一个投影案例类(你希望作为结果结构的内容):
case class Projection(PoolId: String, ClientID: String, Client_Active: Boolean)
- 定义一个UDF,就像下面这样,允许你同时处理你的结构(字段)和数据:
val myUdf = udf{r: Row =>
r.schema.fields.flatMap{rf =>
val poolId = rf.name
val pool = r.getAs[Row](poolId)
val clientRow = pool.getAs[Row]("Client")
clientRow.schema.fields.map{cr =>
val clientId = cr.name
val isActive = clientRow.getAs[Row](clientId).getAs[Boolean]("Active")
Projection(poolId, clientId, isActive)
}
}
}
- 使用你的UDF:
val newDF = df.select(explode(myUdf($"Pool")).as("projection"))
.select("projection.*")
.cache
newDF.show(false)
输出是预期的:
+------+--------+-------------+
|PoolId|ClientID|Client_Active|
+------+--------+-------------+
|1 |1 |true |
|1 |2 |false |
|2 |1 |true |
+------+--------+-------------+
英文:
From my perspective you only need to define an UDF.
Here's an example :
- Define a projection case class (what you want as a resulting structure)
case class Projection(PoolId: String, ClientID: String, Client_Active: Boolean)
- Define an UDF like the one below, allowing you to work both with your structure (fields) and data:
val myUdf = udf{r: Row =>
r.schema.fields.flatMap{rf =>
val poolId = rf.name
val pool = r.getAs[Row](poolId)
val clientRow = pool.getAs[Row]("Client")
clientRow.schema.fields.map{cr =>
val clientId = cr.name
val isActive = clientRow.getAs[Row](clientId).getAs[Boolean]("Active")
Projection(poolId, clientId, isActive)
}
}
}
- Use your
UDF
:
val newDF = df.select(explode(myUdf($"Pool")).as("projection"))
.select("projection.*")
.cache
newDF.show(false)
The output is the expected one :
+------+--------+-------------+
|PoolId|ClientID|Client_Active|
+------+--------+-------------+
|1 |1 |true |
|1 |2 |false |
|2 |1 |true |
+------+--------+-------------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论