如何在PySpark中将DataFrame进行转换?

huangapple go评论130阅读模式
英文:

How to transform in DataFrame in PySpark?

问题

以下是翻译好的部分:

  1. 我在 Py Spark 中有一个数据框其中包含列idnamevalue
  2. 列名应为每个id取值`ABC`value列包含数值
  3. 样本数据框:
  4. data = [
  5. (1, "A", 0),
  6. (1, "B", 2),
  7. (1, "C", 0),
  8. (2, "A", 5),
  9. (2, "B", 0),
  10. (2, "C", 2),
  11. (3, "A", 7),
  12. (3, "B", 8),
  13. (3, "C", 9),
  14. ]
  15. columns = ["id", "event_name", "event_value"]
  16. df = spark.createDataFrame(data, columns)
  17. df.show()

作为输出,需要一个具有以下列的数据框:id、event_name、event_value,根据以下条件:

  1. 每个id在某一行中有值A,如果对于name = A的行中value为0,则删除该id的所有value为0的行,仅保留name为A或value > 0的行。
  2. 每个id在某一行中都有值A,如果在name = A的行中value > 0,则不删除该id的任何行,无论该id的value列中有什么值。

因此,我需要的结果如下:

  1. id | name | value
  2. ----|------|------
  3. 1 | A | 0
  4. 1 | B | 2
  5. 2 | A | 5
  6. 2 | B | 0
  7. 2 | C | 2
  8. 3 | A | 7
  9. 3 | B | 8
  10. 3 | C | 9

如何在 Py Spark 中实现这个目标?

英文:

I have a data frame in Py Spark with columns: id, name, value.
The column name should take the values A,B,C for each id. The value column has numeric values.

Sample data frame:`

  1. data = [
  2. (1, "A", 0),
  3. (1, "B", 2),
  4. (1, "C", 0),
  5. (2, "A", 5),
  6. (2, "B", 0),
  7. (2, "C", 2),
  8. (3, "A", 7),
  9. (3, "B", 8),
  10. (3, "C", 9),
  11. ]
  12. columns = ["id", "event_name", "event_value"]
  13. df = spark.createDataFrame(data, columns)
  14. df.show()
  1. `id | name | value
  2. ----|------|------
  3. 1 | A | 0
  4. 1 | B | 2
  5. 1 | C | 0
  6. 2 | A | 5
  7. 2 | B | 0
  8. 2 | C | 2
  9. 3 | A | 7
  10. 3 | B | 8
  11. 3 | C | 9`

`As output it needs a data frame with columns: id, event_name, event_value according to the following conditions:

  1. each id has value A in one of the rows, if for the row with name = A in the row value is 0 then delete for this id all rows where it has value 0 in value, leave only the row where in name is value A or value > 0
  2. each id has the value A in one of the rows, if for a row with name = A there is a value > 0 in the value column, then do not delete any row for that id, no matter what values are in the value for that id

So, as a result I need something like below:
`

  1. `id | name | value
  2. ----|------|------
  3. 1 | A | 0
  4. 1 | B | 2
  5. 2 | A | 5
  6. 2 | B | 0
  7. 2 | C | 2
  8. 3 | A | 7
  9. 3 | B | 8
  10. 3 | C | 9`

How can I do that in Py Spark ?

答案1

得分: 1

按照id对数据框进行分区,创建一个布尔标志以检查在任何行中event_nameevent_value是否为('A', 0),然后将此标志与其他提供的条件一起用于对数据框进行filter

  1. exp = F.expr("event_name = 'A' AND event_value = 0").cast('int')
  2. counts = F.sum(exp).over(Window.partitionBy('id'))
  3. mask = ~((F.col('event_value') == 0) & (counts > 0)) | (F.col('event_name') == 'A')
  4. df1 = df.withColumn('mask', mask).filter('mask')

结果如下:

  1. +---+----------+-----------+----+
  2. | id|event_name|event_value|mask|
  3. +---+----------+-----------+----+
  4. | 1| A| 0|true|
  5. | 1| B| 2|true|
  6. | 2| A| 5|true|
  7. | 2| B| 0|true|
  8. | 2| C| 2|true|
  9. | 3| A| 7|true|
  10. | 3| B| 8|true|
  11. | 3| C| 9|true|
  12. +---+----------+-----------+----+
英文:

Partition the dataframe by id to create a boolean flag to check the condition when event_name and event_value are ('A', 0) in any of the rows then use this flag along with the other provided conditions to filter the dataframe.

  1. exp = F.expr("event_name = 'A' AND event_value = 0").cast('int')
  2. counts = F.sum(exp).over(Window.partitionBy('id'))
  3. mask = ~((F.col('event_value') == 0) & (counts > 0)) | (F.col('event_name') == 'A')
  4. df1 = df.withColumn('mask', mask).filter('mask')

  1. +---+----------+-----------+----+
  2. | id|event_name|event_value|mask|
  3. +---+----------+-----------+----+
  4. | 1| A| 0|true|
  5. | 1| B| 2|true|
  6. | 2| A| 5|true|
  7. | 2| B| 0|true|
  8. | 2| C| 2|true|
  9. | 3| A| 7|true|
  10. | 3| B| 8|true|
  11. | 3| C| 9|true|
  12. +---+----------+-----------+----+

答案2

得分: 0

使用window函数,然后根据要求进行filter。请查看下面的代码:

  1. scala> spark.table("input").show(false)
  2. +---+----------+-----------+
  3. |id |event_name|event_value|
  4. +---+----------+-----------+
  5. |1 |A |0 |
  6. |1 |B |2 |
  7. |1 |C |0 |
  8. |2 |A |5 |
  9. |2 |B |0 |
  10. |2 |C |2 |
  11. |3 |A |7 |
  12. |3 |B |8 |
  13. |3 |C |9 |
  14. +---+----------+-----------+
  1. scala> :paste
  2. // 进入粘贴模式(ctrl-D完成)
  3. spark.sql("""
  4. WITH wn_input AS (
  5. SELECT
  6. *,
  7. (RANK() OVER(PARTITION BY ID ORDER BY event_name, event_value)) AS rid
  8. FROM input
  9. )
  10. SELECT
  11. id,
  12. event_name AS name,
  13. event_value AS value
  14. FROM wn_input WHERE not ( rid = 1 AND event_name = 'A' AND event_value = 0)
  15. """).show(false)
  16. // 退出粘贴模式,现在进行解释。
  17. +---+----+-----+
  18. |id |name|value|
  19. +---+----+-----+
  20. |1 |B |2 |
  21. |1 |C |0 |
  22. |2 |A |5 |
  23. |2 |B |0 |
  24. |2 |C |2 |
  25. |3 |A |7 |
  26. |3 |B |8 |
  27. |3 |C |9 |
  28. +---+----+-----+
英文:

Use window function & then filter based on requirement. Please check below code

  1. scala> spark.table("input").show(false)
  2. +---+----------+-----------+
  3. |id |event_name|event_value|
  4. +---+----------+-----------+
  5. |1 |A |0 |
  6. |1 |B |2 |
  7. |1 |C |0 |
  8. |2 |A |5 |
  9. |2 |B |0 |
  10. |2 |C |2 |
  11. |3 |A |7 |
  12. |3 |B |8 |
  13. |3 |C |9 |
  14. +---+----------+-----------+
  1. scala> :paste
  2. // Entering paste mode (ctrl-D to finish)
  3. spark.sql("""
  4. WITH wn_input AS (
  5. SELECT
  6. *,
  7. (RANK() OVER(PARTITION BY ID ORDER BY event_name, event_value)) AS rid
  8. FROM input
  9. )
  10. SELECT
  11. id,
  12. event_name AS name,
  13. event_value AS value
  14. FROM wn_input WHERE not ( rid = 1 AND event_name = 'A' AND event_value = 0)
  15. """).show(false)
  16. // Exiting paste mode, now interpreting.
  17. +---+----+-----+
  18. |id |name|value|
  19. +---+----+-----+
  20. |1 |B |2 |
  21. |1 |C |0 |
  22. |2 |A |5 |
  23. |2 |B |0 |
  24. |2 |C |2 |
  25. |3 |A |7 |
  26. |3 |B |8 |
  27. |3 |C |9 |
  28. +---+----+-----+

huangapple
  • 本文由 发表于 2023年8月11日 03:20:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/76878759.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定