在两列中对连续数值进行分组的PySpark代码:

huangapple go评论49阅读模式
英文:

grouping values that are sequence in two columns pyspark

问题

我有以下数据框(df):

index      initial_range     final_range
1            1000000              5999999
2            6000000              6299999
3            6300000              6399999
4            6400000              6499999
5            6600000              6699999
6            6700000              6749999
7            6750000              6799999
8            7000000              7399999
9            7600000              7699999
10           7700000              7749999
11           7750000              7799999
12           6500000              6549999

可以看到,'initial_range'字段和'final_range'字段是不连续的区间。当我们比较索引1和索引2时,我们可以观察到'final_range'字段的值结束后的下一个索引2的'initial_range'值是序列+1。因此,在这个示例中,索引1的值结束于5999999,而索引2的值开始于6000000。我需要将这些情况分组并返回以下数据框(df):

index      initial_range     final_range       grouping
1            1000000              5999999      1000000-6549999
2            6000000              6299999      1000000-6549999
3            6300000              6399999      1000000-6549999
4            6400000              6499999      1000000-6549999
5            6600000              6699999      6600000-6799999
6            6700000              6749999      6600000-6799999
7            6750000              6799999      6600000-6799999
8            7000000              7399999      7000000-7399999
9            7600000              7699999      7600000-7799999
10           7700000              7749999      7600000-7799999
11           7750000              7799999      7600000-7799999
12           6500000              6549999      1000000-6549999

请注意,在'grouping'字段中有一些新的区间,这些区间是最小(initial)值和最大(final)值,直到序列被中断。

一些细节:

  • 在索引4和5之间,序列+1被中断,所以新的'grouping'发生了变化。换句话说,每当序列被中断,就需要写入一个新的序列。
  • 在索引12中,出现了1000000-6549999的分组,因为6500000是索引4中6499999的下一个数字。

我尝试了这段代码:

comparison = df == df.shift() + 1
df['grouping'] = comparison['initial_range'] & comparison['final_range']

但是,这个逻辑序列并没有生效。

有谁可以帮助我?

英文:

I have the follow df:

index      initial_range	  final_range
1            1000000	          5999999
2            6000000	          6299999
3            6300000	          6399999
4            6400000	          6499999
5            6600000	          6699999
6            6700000	          6749999
7            6750000	          6799999
8            7000000	          7399999
9            7600000	          7699999
10           7700000	          7749999
11           7750000	          7799999
12           6500000              6549999

See that the 'initial_range' field and 'final_range' field are intervals of abrangency.
When we compare row index 1 and index 2, we observe that the end of the value of the 'final_range' field starts in the next one as sequence+1 in the 'initial_range' index 2. So, in the example ended in 5999999 and started in 6000000 in index 2. I need grouping this cases and return the follow df:

index      initial_range	  final_range       grouping
1            1000000	          5999999    1000000-6549999
2            6000000	          6299999    1000000-6549999
3            6300000	          6399999    1000000-6549999
4            6400000	          6499999    1000000-6549999
5            6600000	          6699999    6600000-6799999
6            6700000	          6749999    6600000-6799999
7            6750000	          6799999    6600000-6799999
8            7000000	          7399999    7000000-7399999
9            7600000	          7699999    7600000-7799999
10           7700000	          7749999    7600000-7799999
11           7750000	          7799999    7600000-7799999
12           6500000              6549999    1000000-6549999

See, that the grouping field there are a news abrangencies, that are the values min(initial) and max(final), until the sequence is broken.

Some details:

  • The index 4 for 5 the sequence+1 is broken, so the new 'grouping' change. In other words, every time the sequence is broken a new sequence needs to be written.
  • In index 12 the grouping 1000000-6549999 appear again, because the 6500000 is the next number of 6499999 in index 4.

I tried this code:

comparison = df == df.shift()+1
df['grouping'] = comparison['initial_range'] & comparison['final_range']

But, the logic sequence, don't worked.

Can anyone help me?

答案1

得分: 1

以下是代码的中文翻译:

这个有点复杂下面是我的答案

首先我在使用UDF所以性能可能会稍差

```python
import copy
import pyspark.sql.functions as F
from pyspark.sql.types import *

rn = 0

def check_vals(x, y):
    global rn
    
    if (y != None) and (int(x)+1) == int(y):
        return rn + 1
    else:
        # 使用copy进行深拷贝,而不是浅拷贝。
        res = copy.copy(rn)
        # 增加以便下一个值从+1开始
        rn += 1
        # 返回相同的值,因为我们要使用它进行分组
        return res + 1
    
    return 0

rn_udf = F.udf(lambda x, y: check_vals(x, y), IntegerType())

接下来,

from pyspark.sql.window import Window

# 我们想根据initial_value检查final_range的最终范围值
w = Window().orderBy(F.col('initial_range'))

# 首先将initial_range的下一行值放在名为nextRange的列中,以便我们可以进行比较
# 检查final_range+1 == nextRange,如果是,使用rn值,如果不是,然后使用rn并递增它以供下一次迭代使用。
# 现在在check_1列创建的分区中找到最大和最小值。
# 连接最小值和最大值
# 按ID排序以获取初始排序,我必须将其转换为整数,但您可能不需要它
# 删除所有计算的值
df.withColumn('nextRange', F.lead('initial_range').over(w)) \
    .withColumn('check_1', rn_udf("final_range", "nextRange")) \
    .withColumn('min_val', F.min("initial_range").over(Window.partitionBy("check_1"))) \
    .withColumn('max_val', F.max("final_range").over(Window.partitionBy("check_1"))) \
    .withColumn('range', F.concat("min_val", F.lit("-"), "max_val")) \
    .orderBy(F.col("ID").cast(IntegerType())) \
    .drop("nextRange", "check_1", "min_val", "max_val") \
    .show(truncate=False)

输出:

+---+-------------+-----------+---------------+
|ID |initial_range|final_range|range          |
+---+-------------+-----------+---------------+
|1  |1000000      |5999999    |1000000-6549999|
|2  |6000000      |6299999    |1000000-6549999|
|3  |6300000      |6399999    |1000000-6549999|
|4  |6400000      |6499999    |1000000-6549999|
|5  |6600000      |6699999    |6600000-6799999|
|6  |6700000      |6749999    |6600000-6799999|
|7  |6750000      |6799999    |6600000-6799999|
|8  |7000000      |7399999    |7000000-7399999|
|9  |7600000      |7699999    |7600000-7799999|
|10 |7700000      |7749999    |7600000-7799999|
|11 |7750000      |7799999    |7600000-7799999|
|12 |6500000      |6549999    |1000000-6549999|
+---+-------------+-----------+---------------+

<details>
<summary>英文:</summary>

Well this was a tough one, here is my answer,

First of all, I am using UDF so expect the performance to be a little bad,

import copy
import pyspark.sql.functions as F
from pyspark.sql.types import *

rn = 0

def check_vals(x, y):
global rn

if (y != None) and (int(x)+1) == int(y):
    return rn + 1
else:
    # Using copy to deepcopy and not forming a shallow one.
    res = copy.copy(rn)
    # Increment so that the next value with start form +1
    rn += 1
    # Return the same value as we want to group using this
    return res + 1

return 0

rn_udf = F.udf(lambda x, y: check_vals(x, y), IntegerType())


Next,

from pyspark.sql.window import Window

We want to check the final_range values according to the initial_value

w = Window().orderBy(F.col('initial_range'))

First of all take the next row values of initial range in a column called nextRange so that we can compare

Check if the final_range+1 == nextRange, if yes use rn value, if not then use rn and increment it for the next iteration.

Now find the max and min values in the partition created by the check_1 column.

Concat min and max values

order it by ID to get the initial ordering, I have to cast it to integer but you might not need it

drop all calculated values

df.withColumn('nextRange', F.lead('initial_range').over(w))
.withColumn('check_1', rn_udf("final_range", "nextRange"))
.withColumn('min_val', F.min("initial_range").over(Window.partitionBy("check_1")))
.withColumn('max_val', F.max("final_range").over(Window.partitionBy("check_1")))
.withColumn('range', F.concat("min_val", F.lit("-"), "max_val"))
.orderBy(F.col("ID").cast(IntegerType()))
.drop("nextRange", "check_1", "min_val", "max_val")
.show(truncate=False)


Output:

+---+-------------+-----------+---------------+
|ID |initial_range|final_range|range |
+---+-------------+-----------+---------------+
|1 |1000000 |5999999 |1000000-6549999|
|2 |6000000 |6299999 |1000000-6549999|
|3 |6300000 |6399999 |1000000-6549999|
|4 |6400000 |6499999 |1000000-6549999|
|5 |6600000 |6699999 |6600000-6799999|
|6 |6700000 |6749999 |6600000-6799999|
|7 |6750000 |6799999 |6600000-6799999|
|8 |7000000 |7399999 |7000000-7399999|
|9 |7600000 |7699999 |7600000-7799999|
|10 |7700000 |7749999 |7600000-7799999|
|11 |7750000 |7799999 |7600000-7799999|
|12 |6500000 |6549999 |1000000-6549999|
+---+-------------+-----------+---------------+


</details>



huangapple
  • 本文由 发表于 2023年2月9日 01:07:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/75389284.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定