在Spark DataFrame中展开具有不同模式的嵌套结构

huangapple go评论115阅读模式
英文:

Exploding Nested Struct In Spark Dataframe having Different Schema

问题

我有一个 JSON,其具有以下模式:

 |-- Pool: struct (nullable = true)
 |    |-- 1: struct (nullable = true)
 |    |    |-- Client: struct (nullable = true)
 |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)
 |    |    |    |-- 2: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)
 |    |-- 2: struct (nullable = true)
 |    |    |-- Alias: string (nullable = true)
 |    |    |-- Chaddr: string (nullable = true)
 |    |    |-- ChaddrMask: string (nullable = true)
 |    |    |-- Client: struct (nullable = true)
 |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)

而我试图实现的输出是:

PoolId ClientID Client_Active
1      1        true
1      2        false
2      1        true

此模式随着 JSON 的变化而变化。例如,现在有 2 个 Pool ID,可能会有另一个 JSON 包含 5 个 Pool ID,客户端 ID 也一样。

问题是:

  1. 我们不能在结构上使用 Explode。
  2. Pool 无法转换为 Map,因为每次客户端都有不同的客户端 ID,导致每行的模式都不同。

有没有想法如何实现这个?

我尝试了这个链接,将结构转换为 Map 然后进行拆分,但当不同 Pool 中的客户端 ID 不同时它不起作用。

英文:

I have a json which has below schema:

 |-- Pool: struct (nullable = true)
 |    |-- 1: struct (nullable = true)
 |    |    |-- Client: struct (nullable = true)
 |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)
 |    |    |    |-- 2: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)
 |    |-- 2: struct (nullable = true)
 |    |    |-- Alias: string (nullable = true)
 |    |    |-- Chaddr: string (nullable = true)
 |    |    |-- ChaddrMask: string (nullable = true)
 |    |    |-- Client: struct (nullable = true)
 |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)

And the output that i am trying to achieve is:

 PoolId ClientID Client_Active
 1      1        true
 1      2        false
 2      1        true

This schema keeps on changing with json.Eg for now there are 2 Pool id, there may be another json which will have 5 Pool Id and same is with CLient Id.

The problem with is :

  1. We cant use Explode on struct.
  2. Pool cant be converted to Map as each time client has different client ID that leads to different schema for each row.

Any thought how to achieve this?

I have tried this link for converting to Struct to Map and then exploding but it doesn't work when there are different numbers of Client IDs in different Pool.

答案1

得分: 4

以下是翻译好的部分:

从我的角度来看,你只需要定义一个UDF。

这是一个示例:

  1. 定义一个投影案例类(你希望作为结果结构的内容):
case class Projection(PoolId: String, ClientID: String, Client_Active: Boolean)
  1. 定义一个UDF,就像下面这样,允许你同时处理你的结构(字段)和数据:
val myUdf = udf{r: Row =>
  r.schema.fields.flatMap{rf =>
    val poolId = rf.name
    val pool = r.getAs[Row](poolId)
    val clientRow = pool.getAs[Row]("Client")
    clientRow.schema.fields.map{cr =>
      val clientId = cr.name
      val isActive = clientRow.getAs[Row](clientId).getAs[Boolean]("Active")
      Projection(poolId, clientId, isActive)
    }
  }
}
  1. 使用你的UDF:
val newDF = df.select(explode(myUdf($"Pool")).as("projection"))
    .select("projection.*")
    .cache

newDF.show(false)

输出是预期的:

+------+--------+-------------+
|PoolId|ClientID|Client_Active|
+------+--------+-------------+
|1     |1       |true         |
|1     |2       |false        |
|2     |1       |true         |
+------+--------+-------------+
英文:

From my perspective you only need to define an UDF.

Here's an example :

  1. Define a projection case class (what you want as a resulting structure)
case class Projection(PoolId: String, ClientID: String, Client_Active: Boolean)
  1. Define an UDF like the one below, allowing you to work both with your structure (fields) and data:
val myUdf = udf{r: Row =>
  r.schema.fields.flatMap{rf =>
    val poolId = rf.name
    val pool = r.getAs[Row](poolId)
    val clientRow = pool.getAs[Row]("Client")
    clientRow.schema.fields.map{cr =>
      val clientId = cr.name
      val isActive = clientRow.getAs[Row](clientId).getAs[Boolean]("Active")
      Projection(poolId, clientId, isActive)
    }
  }
}
  1. Use your UDF :
val newDF = df.select(explode(myUdf($"Pool")).as("projection"))
    .select("projection.*")
    .cache

newDF.show(false)

The output is the expected one :

+------+--------+-------------+
|PoolId|ClientID|Client_Active|
+------+--------+-------------+
|1     |1       |true         |
|1     |2       |false        |
|2     |1       |true         |
+------+--------+-------------+

huangapple
  • 本文由 发表于 2020年1月6日 14:46:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/59607791.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定