英文:
How to transform in DataFrame in PySpark?
问题
以下是翻译好的部分:
我在 Py Spark 中有一个数据框,其中包含列:id、name、value。
列名应为每个id取值`A、B、C`。value列包含数值。
样本数据框:
data = [
(1, "A", 0),
(1, "B", 2),
(1, "C", 0),
(2, "A", 5),
(2, "B", 0),
(2, "C", 2),
(3, "A", 7),
(3, "B", 8),
(3, "C", 9),
]
columns = ["id", "event_name", "event_value"]
df = spark.createDataFrame(data, columns)
df.show()
作为输出,需要一个具有以下列的数据框:id、event_name、event_value,根据以下条件:
- 每个id在某一行中有值A,如果对于name = A的行中value为0,则删除该id的所有value为0的行,仅保留name为A或value > 0的行。
- 每个id在某一行中都有值A,如果在name = A的行中value > 0,则不删除该id的任何行,无论该id的value列中有什么值。
因此,我需要的结果如下:
id | name | value
----|------|------
1 | A | 0
1 | B | 2
2 | A | 5
2 | B | 0
2 | C | 2
3 | A | 7
3 | B | 8
3 | C | 9
如何在 Py Spark 中实现这个目标?
英文:
I have a data frame in Py Spark with columns: id, name, value.
The column name should take the values A,B,C
for each id. The value column has numeric values.
Sample data frame:`
data = [
(1, "A", 0),
(1, "B", 2),
(1, "C", 0),
(2, "A", 5),
(2, "B", 0),
(2, "C", 2),
(3, "A", 7),
(3, "B", 8),
(3, "C", 9),
]
columns = ["id", "event_name", "event_value"]
df = spark.createDataFrame(data, columns)
df.show()
`id | name | value
----|------|------
1 | A | 0
1 | B | 2
1 | C | 0
2 | A | 5
2 | B | 0
2 | C | 2
3 | A | 7
3 | B | 8
3 | C | 9`
`As output it needs a data frame with columns: id, event_name, event_value according to the following conditions:
- each id has value A in one of the rows, if for the row with name = A in the row value is 0 then delete for this id all rows where it has value 0 in value, leave only the row where in name is value A or value > 0
- each id has the value A in one of the rows, if for a row with name = A there is a value > 0 in the value column, then do not delete any row for that id, no matter what values are in the value for that id
So, as a result I need something like below:
`
`id | name | value
----|------|------
1 | A | 0
1 | B | 2
2 | A | 5
2 | B | 0
2 | C | 2
3 | A | 7
3 | B | 8
3 | C | 9`
How can I do that in Py Spark ?
答案1
得分: 1
按照id
对数据框进行分区,创建一个布尔标志以检查在任何行中event_name
和event_value
是否为('A', 0),然后将此标志与其他提供的条件一起用于对数据框进行filter
。
exp = F.expr("event_name = 'A' AND event_value = 0").cast('int')
counts = F.sum(exp).over(Window.partitionBy('id'))
mask = ~((F.col('event_value') == 0) & (counts > 0)) | (F.col('event_name') == 'A')
df1 = df.withColumn('mask', mask).filter('mask')
结果如下:
+---+----------+-----------+----+
| id|event_name|event_value|mask|
+---+----------+-----------+----+
| 1| A| 0|true|
| 1| B| 2|true|
| 2| A| 5|true|
| 2| B| 0|true|
| 2| C| 2|true|
| 3| A| 7|true|
| 3| B| 8|true|
| 3| C| 9|true|
+---+----------+-----------+----+
英文:
Partition the dataframe by id
to create a boolean flag to check the condition when event_name
and event_value
are ('A', 0) in any of the rows then use this flag along with the other provided conditions to filter
the dataframe.
exp = F.expr("event_name = 'A' AND event_value = 0").cast('int')
counts = F.sum(exp).over(Window.partitionBy('id'))
mask = ~((F.col('event_value') == 0) & (counts > 0)) | (F.col('event_name') == 'A')
df1 = df.withColumn('mask', mask).filter('mask')
+---+----------+-----------+----+
| id|event_name|event_value|mask|
+---+----------+-----------+----+
| 1| A| 0|true|
| 1| B| 2|true|
| 2| A| 5|true|
| 2| B| 0|true|
| 2| C| 2|true|
| 3| A| 7|true|
| 3| B| 8|true|
| 3| C| 9|true|
+---+----------+-----------+----+
答案2
得分: 0
使用window
函数,然后根据要求进行filter
。请查看下面的代码:
scala> spark.table("input").show(false)
+---+----------+-----------+
|id |event_name|event_value|
+---+----------+-----------+
|1 |A |0 |
|1 |B |2 |
|1 |C |0 |
|2 |A |5 |
|2 |B |0 |
|2 |C |2 |
|3 |A |7 |
|3 |B |8 |
|3 |C |9 |
+---+----------+-----------+
scala> :paste
// 进入粘贴模式(ctrl-D完成)
spark.sql("""
WITH wn_input AS (
SELECT
*,
(RANK() OVER(PARTITION BY ID ORDER BY event_name, event_value)) AS rid
FROM input
)
SELECT
id,
event_name AS name,
event_value AS value
FROM wn_input WHERE not ( rid = 1 AND event_name = 'A' AND event_value = 0)
""").show(false)
// 退出粘贴模式,现在进行解释。
+---+----+-----+
|id |name|value|
+---+----+-----+
|1 |B |2 |
|1 |C |0 |
|2 |A |5 |
|2 |B |0 |
|2 |C |2 |
|3 |A |7 |
|3 |B |8 |
|3 |C |9 |
+---+----+-----+
英文:
Use window
function & then filter
based on requirement. Please check below code
scala> spark.table("input").show(false)
+---+----------+-----------+
|id |event_name|event_value|
+---+----------+-----------+
|1 |A |0 |
|1 |B |2 |
|1 |C |0 |
|2 |A |5 |
|2 |B |0 |
|2 |C |2 |
|3 |A |7 |
|3 |B |8 |
|3 |C |9 |
+---+----------+-----------+
scala> :paste
// Entering paste mode (ctrl-D to finish)
spark.sql("""
WITH wn_input AS (
SELECT
*,
(RANK() OVER(PARTITION BY ID ORDER BY event_name, event_value)) AS rid
FROM input
)
SELECT
id,
event_name AS name,
event_value AS value
FROM wn_input WHERE not ( rid = 1 AND event_name = 'A' AND event_value = 0)
""").show(false)
// Exiting paste mode, now interpreting.
+---+----+-----+
|id |name|value|
+---+----+-----+
|1 |B |2 |
|1 |C |0 |
|2 |A |5 |
|2 |B |0 |
|2 |C |2 |
|3 |A |7 |
|3 |B |8 |
|3 |C |9 |
+---+----+-----+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论