PySpark 创建DataFrame列之间的关系

huangapple go评论46阅读模式
英文:

PySpark Create Relationship between DataFrame Columns

问题

我正在尝试实现一些逻辑,以根据以下逻辑获取ID和链接之间的关系。

逻辑 -

  • 如果ID 1与2有链接,2与3有链接,那么关系是 1 -> 2, 1 -> 3, 2 -> 1, 2 -> 3, 3 -> 1, 3 -> 2
  • 类似地,如果1与4,4与7,7与5,那么关系是 1 -> 4, 1 -> 5, 1 -> 7, 4 -> 1, 4 -> 5, 4 -> 7, 5 -> 1, 5 -> 4, 5 -> 7

输入数据框 -

+---+----+
| id|link|
+---+----+
|  1|   2|
|  3|   1|
|  4|   2|
|  6|   5|
|  9|   7|
|  9|  10|
+---+----+

我正在尝试实现以下输出-

+---+----+
| Id|Link|
+---+----+
|  1|   2|
|  1|   3|
|  1|   4|
|  2|   1|
|  2|   3|
|  2|   4|
|  3|   1|
|  3|   2|
|  3|   4|
|  4|   1|
|  4|   2|
|  4|   3|
|  5|   6|
|  6|   5|
|  7|   9|
|  7|  10|
|  9|   7|
|  9|  10|
| 10|   7|
| 10|   9|
+---+----+

我已经尝试了许多方法,但都没有起作用。我尝试了以下代码:

# 你的第一段代码

# 你的第二段代码

希望对你有所帮助。

英文:

I am trying to implement some logic to get a relationship between ID and Link based on the logic below.

Logic -

  • if id 1 has link with 2 and 2 has link with 3, then relation is 1 -> 2, 1 -> 3, 2 -> 1, 2 -> 3, 3 -> 1, 3 -> 2
  • Similarly if 1 with 4, 4 with 7 and 7 with 5 then relation is 1 -> 4, 1 -> 5, 1 -> 7, 4 -> 1, 4 -> 5, 4 -> 7, 5 -> 1, 5 -> 4, 5 -> 7

Input DataFrame -

+---+----+
| id|link|
+---+----+
|  1|   2|
|  3|   1|
|  4|   2|
|  6|   5|
|  9|   7|
|  9|  10|
+---+----+

I am trying to achieve below output-

+---+----+
| Id|Link|
+---+----+
|  1|   2|
|  1|   3|
|  1|   4|
|  2|   1|
|  2|   3|
|  2|   4|
|  3|   1|
|  3|   2|
|  3|   4|
|  4|   1|
|  4|   2|
|  4|   3|
|  5|   6|
|  6|   5|
|  7|   9|
|  7|  10|
|  9|   7|
|  9|  10|
| 10|   7|
| 10|   9|
+---+----+

I have tried many way, but it's not at all working. I have tried following codes as well

df = spark.createDataFrame([(1, 2), (3, 1), (4, 2), (6, 5), (9, 7), (9, 10)], ["id", "link"])
ids = df.select("Id").distinct().rdd.flatMap(lambda x: x).collect()
links = df.select("Link").distinct().rdd.flatMap(lambda x: x).collect()
combinations = [(id, link) for id in ids for link in links]
df_combinations = spark.createDataFrame(combinations, ["Id", "Link"])
result = df_combinations.join(df, ["Id", "Link"], "left_anti").union(df).dropDuplicates()
result = result.sort(asc("Id"), asc("Link"))

and

df = spark.createDataFrame([(1, 2), (3, 1), (4, 2), (6, 5), (9, 7), (9, 10)], ["id", "link"])

combinations = df.alias("a").crossJoin(df.alias("b")) \
    .filter(F.col("a.id") != F.col("b.id"))\
    .select(col("a.id").alias("a_id"), col("b.id").alias("b_id"), col("a.link").alias("a_link"), col("b.link").alias("b_link"))

window = Window.partitionBy("a_id").orderBy("a_id", "b_link")
paths = combinations.groupBy("a_id", "b_link") \
    .agg(F.first("b_id").over(window).alias("id")) \
    .groupBy("id").agg(F.collect_list("b_link").alias("links"))

result = paths.select("id", F.explode("links").alias("link"))
result = result.union(df.selectExpr("id as id_", "link as link_"))

Any help would be much appreciated.

答案1

得分: 2

这不是一种通用方法,但你可以使用graphframes包。你可能会在设置它时遇到一些困难,但可以使用它,结果很简单。

import os
sc.addPyFile(os.path.expanduser('graphframes-0.8.1-spark3.0-s_2.12.jar'))

from graphframes import *

e = df.select('id', 'link').toDF('src', 'dst')
v = e.select('src').toDF('id') \
  .union(e.select('dst')) \
  .distinct()

g = GraphFrame(v, e)

sc.setCheckpointDir("/tmp/graphframes")
df = g.connectedComponents()

df.join(df.withColumnRenamed('id', 'link'), ['component'], 'inner') \
  .drop('component') \
  .filter('id != link') \
  .show()

connectedComponents方法返回每个顶点的组件ID,对于每个顶点组(通过边连接,并且如果没有边连接到其他组件,则分隔开)。因此,您可以对每个组件进行笛卡尔积,但不包括顶点本身。

附加答案

受到上述方法的启发,我查找并找到了networkx包。

import networkx as nx

df = df.toPandas()
G = nx.from_pandas_edgelist(df, 'id', 'link')
components = [[list(c)] for c in nx.connected_components(G)]

df2 = spark.createDataFrame(components, ['array']) \
  .withColumn('component', f.monotonically_increasing_id()) \
  .select('component', f.explode('array').alias('id'))

df2.join(df2.withColumnRenamed('id', 'link'), ['component'], 'inner') \
  .drop('component') \
  .filter('id != link') \
  .show()

希望这有所帮助。

英文:

This is not a general approach but you can use the graphframes package. You might struggle to set it up but one can use it, the result is simple.

import os
sc.addPyFile(os.path.expanduser('graphframes-0.8.1-spark3.0-s_2.12.jar'))

from graphframes import *

e = df.select('id', 'link').toDF('src', 'dst')
v = e.select('src').toDF('id') \
  .union(e.select('dst')) \
  .distinct()

g = GraphFrame(v, e)

sc.setCheckpointDir("/tmp/graphframes")
df = g.connectedComponents()

df.join(df.withColumnRenamed('id', 'link'), ['component'], 'inner') \
  .drop('component') \
  .filter('id != link') \
  .show()

+---+----+
| id|link|
+---+----+
|  7|  10|
|  7|   9|
|  3|   2|
|  3|   4|
|  3|   1|
|  5|   6|
|  6|   5|
|  9|  10|
|  9|   7|
|  1|   2|
|  1|   4|
|  1|   3|
| 10|   9|
| 10|   7|
|  4|   2|
|  4|   1|
|  4|   3|
|  2|   4|
|  2|   1|
|  2|   3|
+---+----+

connectedComponents method returns the component id for each vertex, that is unique for each vertex group (that is connected by edge and seperated if there is no edge to the other component). So you can do the cartesian product for each component without the vertex itself.

Added answer

Inspired from the above approach, I looked up and found the networkx package.

import networkx as nx

df = df.toPandas()
G = nx.from_pandas_edgelist(df, 'id', 'link')
components = [[list(c)] for c in nx.connected_components(G)]

df2 = spark.createDataFrame(components, ['array']) \
  .withColumn('component', f.monotonically_increasing_id()) \
  .select('component', f.explode('array').alias('id'))

df2.join(df2.withColumnRenamed('id', 'link'), ['component'], 'inner') \
  .drop('component') \
  .filter('id != link') \
  .show()

+---+----+
| id|link|
+---+----+
|  1|   2|
|  1|   3|
|  1|   4|
|  2|   1|
|  2|   3|
|  2|   4|
|  3|   1|
|  3|   2|
|  3|   4|
|  4|   1|
|  4|   2|
|  4|   3|
|  5|   6|
|  6|   5|
|  9|  10|
|  9|   7|
| 10|   9|
| 10|   7|
|  7|   9|
|  7|  10|
+---+----+

huangapple
  • 本文由 发表于 2023年2月14日 04:10:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/75440729.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定