英文:
PySpark Create Relationship between DataFrame Columns
问题
我正在尝试实现一些逻辑,以根据以下逻辑获取ID和链接之间的关系。
逻辑 -
- 如果ID 1与2有链接,2与3有链接,那么关系是 1 -> 2, 1 -> 3, 2 -> 1, 2 -> 3, 3 -> 1, 3 -> 2
- 类似地,如果1与4,4与7,7与5,那么关系是 1 -> 4, 1 -> 5, 1 -> 7, 4 -> 1, 4 -> 5, 4 -> 7, 5 -> 1, 5 -> 4, 5 -> 7
输入数据框 -
+---+----+
| id|link|
+---+----+
| 1| 2|
| 3| 1|
| 4| 2|
| 6| 5|
| 9| 7|
| 9| 10|
+---+----+
我正在尝试实现以下输出-
+---+----+
| Id|Link|
+---+----+
| 1| 2|
| 1| 3|
| 1| 4|
| 2| 1|
| 2| 3|
| 2| 4|
| 3| 1|
| 3| 2|
| 3| 4|
| 4| 1|
| 4| 2|
| 4| 3|
| 5| 6|
| 6| 5|
| 7| 9|
| 7| 10|
| 9| 7|
| 9| 10|
| 10| 7|
| 10| 9|
+---+----+
我已经尝试了许多方法,但都没有起作用。我尝试了以下代码:
# 你的第一段代码
和
# 你的第二段代码
希望对你有所帮助。
英文:
I am trying to implement some logic to get a relationship between ID and Link based on the logic below.
Logic -
- if id 1 has link with 2 and 2 has link with 3, then relation is 1 -> 2, 1 -> 3, 2 -> 1, 2 -> 3, 3 -> 1, 3 -> 2
- Similarly if 1 with 4, 4 with 7 and 7 with 5 then relation is 1 -> 4, 1 -> 5, 1 -> 7, 4 -> 1, 4 -> 5, 4 -> 7, 5 -> 1, 5 -> 4, 5 -> 7
Input DataFrame -
+---+----+
| id|link|
+---+----+
| 1| 2|
| 3| 1|
| 4| 2|
| 6| 5|
| 9| 7|
| 9| 10|
+---+----+
I am trying to achieve below output-
+---+----+
| Id|Link|
+---+----+
| 1| 2|
| 1| 3|
| 1| 4|
| 2| 1|
| 2| 3|
| 2| 4|
| 3| 1|
| 3| 2|
| 3| 4|
| 4| 1|
| 4| 2|
| 4| 3|
| 5| 6|
| 6| 5|
| 7| 9|
| 7| 10|
| 9| 7|
| 9| 10|
| 10| 7|
| 10| 9|
+---+----+
I have tried many way, but it's not at all working. I have tried following codes as well
df = spark.createDataFrame([(1, 2), (3, 1), (4, 2), (6, 5), (9, 7), (9, 10)], ["id", "link"])
ids = df.select("Id").distinct().rdd.flatMap(lambda x: x).collect()
links = df.select("Link").distinct().rdd.flatMap(lambda x: x).collect()
combinations = [(id, link) for id in ids for link in links]
df_combinations = spark.createDataFrame(combinations, ["Id", "Link"])
result = df_combinations.join(df, ["Id", "Link"], "left_anti").union(df).dropDuplicates()
result = result.sort(asc("Id"), asc("Link"))
and
df = spark.createDataFrame([(1, 2), (3, 1), (4, 2), (6, 5), (9, 7), (9, 10)], ["id", "link"])
combinations = df.alias("a").crossJoin(df.alias("b")) \
.filter(F.col("a.id") != F.col("b.id"))\
.select(col("a.id").alias("a_id"), col("b.id").alias("b_id"), col("a.link").alias("a_link"), col("b.link").alias("b_link"))
window = Window.partitionBy("a_id").orderBy("a_id", "b_link")
paths = combinations.groupBy("a_id", "b_link") \
.agg(F.first("b_id").over(window).alias("id")) \
.groupBy("id").agg(F.collect_list("b_link").alias("links"))
result = paths.select("id", F.explode("links").alias("link"))
result = result.union(df.selectExpr("id as id_", "link as link_"))
Any help would be much appreciated.
答案1
得分: 2
这不是一种通用方法,但你可以使用graphframes
包。你可能会在设置它时遇到一些困难,但可以使用它,结果很简单。
import os
sc.addPyFile(os.path.expanduser('graphframes-0.8.1-spark3.0-s_2.12.jar'))
from graphframes import *
e = df.select('id', 'link').toDF('src', 'dst')
v = e.select('src').toDF('id') \
.union(e.select('dst')) \
.distinct()
g = GraphFrame(v, e)
sc.setCheckpointDir("/tmp/graphframes")
df = g.connectedComponents()
df.join(df.withColumnRenamed('id', 'link'), ['component'], 'inner') \
.drop('component') \
.filter('id != link') \
.show()
connectedComponents
方法返回每个顶点的组件ID,对于每个顶点组(通过边连接,并且如果没有边连接到其他组件,则分隔开)。因此,您可以对每个组件进行笛卡尔积,但不包括顶点本身。
附加答案
受到上述方法的启发,我查找并找到了networkx
包。
import networkx as nx
df = df.toPandas()
G = nx.from_pandas_edgelist(df, 'id', 'link')
components = [[list(c)] for c in nx.connected_components(G)]
df2 = spark.createDataFrame(components, ['array']) \
.withColumn('component', f.monotonically_increasing_id()) \
.select('component', f.explode('array').alias('id'))
df2.join(df2.withColumnRenamed('id', 'link'), ['component'], 'inner') \
.drop('component') \
.filter('id != link') \
.show()
希望这有所帮助。
英文:
This is not a general approach but you can use the graphframes
package. You might struggle to set it up but one can use it, the result is simple.
import os
sc.addPyFile(os.path.expanduser('graphframes-0.8.1-spark3.0-s_2.12.jar'))
from graphframes import *
e = df.select('id', 'link').toDF('src', 'dst')
v = e.select('src').toDF('id') \
.union(e.select('dst')) \
.distinct()
g = GraphFrame(v, e)
sc.setCheckpointDir("/tmp/graphframes")
df = g.connectedComponents()
df.join(df.withColumnRenamed('id', 'link'), ['component'], 'inner') \
.drop('component') \
.filter('id != link') \
.show()
+---+----+
| id|link|
+---+----+
| 7| 10|
| 7| 9|
| 3| 2|
| 3| 4|
| 3| 1|
| 5| 6|
| 6| 5|
| 9| 10|
| 9| 7|
| 1| 2|
| 1| 4|
| 1| 3|
| 10| 9|
| 10| 7|
| 4| 2|
| 4| 1|
| 4| 3|
| 2| 4|
| 2| 1|
| 2| 3|
+---+----+
connectedComponents
method returns the component id for each vertex, that is unique for each vertex group (that is connected by edge and seperated if there is no edge to the other component). So you can do the cartesian product for each component without the vertex itself.
Added answer
Inspired from the above approach, I looked up and found the networkx
package.
import networkx as nx
df = df.toPandas()
G = nx.from_pandas_edgelist(df, 'id', 'link')
components = [[list(c)] for c in nx.connected_components(G)]
df2 = spark.createDataFrame(components, ['array']) \
.withColumn('component', f.monotonically_increasing_id()) \
.select('component', f.explode('array').alias('id'))
df2.join(df2.withColumnRenamed('id', 'link'), ['component'], 'inner') \
.drop('component') \
.filter('id != link') \
.show()
+---+----+
| id|link|
+---+----+
| 1| 2|
| 1| 3|
| 1| 4|
| 2| 1|
| 2| 3|
| 2| 4|
| 3| 1|
| 3| 2|
| 3| 4|
| 4| 1|
| 4| 2|
| 4| 3|
| 5| 6|
| 6| 5|
| 9| 10|
| 9| 7|
| 10| 9|
| 10| 7|
| 7| 9|
| 7| 10|
+---+----+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论