英文:
Exploding Nested Struct In Spark Dataframe having Different Schema
问题
我有一个 JSON,其具有以下模式:
 |-- Pool: struct (nullable = true)
 |    |-- 1: struct (nullable = true)
 |    |    |-- Client: struct (nullable = true)
 |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)
 |    |    |    |-- 2: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)
 |    |-- 2: struct (nullable = true)
 |    |    |-- Alias: string (nullable = true)
 |    |    |-- Chaddr: string (nullable = true)
 |    |    |-- ChaddrMask: string (nullable = true)
 |    |    |-- Client: struct (nullable = true)
 |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)
而我试图实现的输出是:
PoolId ClientID Client_Active
1      1        true
1      2        false
2      1        true
此模式随着 JSON 的变化而变化。例如,现在有 2 个 Pool ID,可能会有另一个 JSON 包含 5 个 Pool ID,客户端 ID 也一样。
问题是:
- 我们不能在结构上使用 Explode。
 - Pool 无法转换为 Map,因为每次客户端都有不同的客户端 ID,导致每行的模式都不同。
 
有没有想法如何实现这个?
我尝试了这个链接,将结构转换为 Map 然后进行拆分,但当不同 Pool 中的客户端 ID 不同时它不起作用。
英文:
I have a json which has below schema:
 |-- Pool: struct (nullable = true)
 |    |-- 1: struct (nullable = true)
 |    |    |-- Client: struct (nullable = true)
 |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)
 |    |    |    |-- 2: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)
 |    |-- 2: struct (nullable = true)
 |    |    |-- Alias: string (nullable = true)
 |    |    |-- Chaddr: string (nullable = true)
 |    |    |-- ChaddrMask: string (nullable = true)
 |    |    |-- Client: struct (nullable = true)
 |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)
And the output that i am trying to achieve is:
 PoolId ClientID Client_Active
 1      1        true
 1      2        false
 2      1        true
This schema keeps on changing with json.Eg for now there are 2 Pool id, there may be another json which will have 5 Pool Id and same is with CLient Id.
The problem with is :
- We cant use Explode on struct.
 - Pool cant be converted to Map as each time client has different client ID that leads to different schema for each row.
 
Any thought how to achieve this?
I have tried this link for converting to Struct to Map and then exploding but it doesn't work when there are different numbers of Client IDs in different Pool.
答案1
得分: 4
以下是翻译好的部分:
从我的角度来看,你只需要定义一个UDF。
这是一个示例:
- 定义一个投影案例类(你希望作为结果结构的内容):
 
case class Projection(PoolId: String, ClientID: String, Client_Active: Boolean)
- 定义一个UDF,就像下面这样,允许你同时处理你的结构(字段)和数据:
 
val myUdf = udf{r: Row =>
  r.schema.fields.flatMap{rf =>
    val poolId = rf.name
    val pool = r.getAs[Row](poolId)
    val clientRow = pool.getAs[Row]("Client")
    clientRow.schema.fields.map{cr =>
      val clientId = cr.name
      val isActive = clientRow.getAs[Row](clientId).getAs[Boolean]("Active")
      Projection(poolId, clientId, isActive)
    }
  }
}
- 使用你的UDF:
 
val newDF = df.select(explode(myUdf($"Pool")).as("projection"))
    .select("projection.*")
    .cache
newDF.show(false)
输出是预期的:
+------+--------+-------------+
|PoolId|ClientID|Client_Active|
+------+--------+-------------+
|1     |1       |true         |
|1     |2       |false        |
|2     |1       |true         |
+------+--------+-------------+
英文:
From my perspective you only need to define an UDF.
Here's an example :
- Define a projection case class (what you want as a resulting structure)
 
case class Projection(PoolId: String, ClientID: String, Client_Active: Boolean)
- Define an UDF like the one below, allowing you to work both with your structure (fields) and data:
 
val myUdf = udf{r: Row =>
  r.schema.fields.flatMap{rf =>
    val poolId = rf.name
    val pool = r.getAs[Row](poolId)
    val clientRow = pool.getAs[Row]("Client")
    clientRow.schema.fields.map{cr =>
      val clientId = cr.name
      val isActive = clientRow.getAs[Row](clientId).getAs[Boolean]("Active")
      Projection(poolId, clientId, isActive)
    }
  }
}
- Use your 
UDF: 
val newDF = df.select(explode(myUdf($"Pool")).as("projection"))
    .select("projection.*")
    .cache
newDF.show(false)
The output is the expected one :
+------+--------+-------------+
|PoolId|ClientID|Client_Active|
+------+--------+-------------+
|1     |1       |true         |
|1     |2       |false        |
|2     |1       |true         |
+------+--------+-------------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论