PySpark 创建DataFrame列之间的关系

huangapple go评论74阅读模式

PySpark Create Relationship between DataFrame Columns



逻辑 -

  • 如果ID 1与2有链接,2与3有链接,那么关系是 1 -> 2, 1 -> 3, 2 -> 1, 2 -> 3, 3 -> 1, 3 -> 2
  • 类似地,如果1与4,4与7,7与5,那么关系是 1 -> 4, 1 -> 5, 1 -> 7, 4 -> 1, 4 -> 5, 4 -> 7, 5 -> 1, 5 -> 4, 5 -> 7

输入数据框 -

  1. +---+----+
  2. | id|link|
  3. +---+----+
  4. | 1| 2|
  5. | 3| 1|
  6. | 4| 2|
  7. | 6| 5|
  8. | 9| 7|
  9. | 9| 10|
  10. +---+----+


  1. +---+----+
  2. | Id|Link|
  3. +---+----+
  4. | 1| 2|
  5. | 1| 3|
  6. | 1| 4|
  7. | 2| 1|
  8. | 2| 3|
  9. | 2| 4|
  10. | 3| 1|
  11. | 3| 2|
  12. | 3| 4|
  13. | 4| 1|
  14. | 4| 2|
  15. | 4| 3|
  16. | 5| 6|
  17. | 6| 5|
  18. | 7| 9|
  19. | 7| 10|
  20. | 9| 7|
  21. | 9| 10|
  22. | 10| 7|
  23. | 10| 9|
  24. +---+----+


  1. # 你的第一段代码

  1. # 你的第二段代码



I am trying to implement some logic to get a relationship between ID and Link based on the logic below.

Logic -

  • if id 1 has link with 2 and 2 has link with 3, then relation is 1 -> 2, 1 -> 3, 2 -> 1, 2 -> 3, 3 -> 1, 3 -> 2
  • Similarly if 1 with 4, 4 with 7 and 7 with 5 then relation is 1 -> 4, 1 -> 5, 1 -> 7, 4 -> 1, 4 -> 5, 4 -> 7, 5 -> 1, 5 -> 4, 5 -> 7

Input DataFrame -

  1. +---+----+
  2. | id|link|
  3. +---+----+
  4. | 1| 2|
  5. | 3| 1|
  6. | 4| 2|
  7. | 6| 5|
  8. | 9| 7|
  9. | 9| 10|
  10. +---+----+

I am trying to achieve below output-

  1. +---+----+
  2. | Id|Link|
  3. +---+----+
  4. | 1| 2|
  5. | 1| 3|
  6. | 1| 4|
  7. | 2| 1|
  8. | 2| 3|
  9. | 2| 4|
  10. | 3| 1|
  11. | 3| 2|
  12. | 3| 4|
  13. | 4| 1|
  14. | 4| 2|
  15. | 4| 3|
  16. | 5| 6|
  17. | 6| 5|
  18. | 7| 9|
  19. | 7| 10|
  20. | 9| 7|
  21. | 9| 10|
  22. | 10| 7|
  23. | 10| 9|
  24. +---+----+

I have tried many way, but it's not at all working. I have tried following codes as well

  1. df = spark.createDataFrame([(1, 2), (3, 1), (4, 2), (6, 5), (9, 7), (9, 10)], ["id", "link"])
  2. ids ="Id").distinct().rdd.flatMap(lambda x: x).collect()
  3. links ="Link").distinct().rdd.flatMap(lambda x: x).collect()
  4. combinations = [(id, link) for id in ids for link in links]
  5. df_combinations = spark.createDataFrame(combinations, ["Id", "Link"])
  6. result = df_combinations.join(df, ["Id", "Link"], "left_anti").union(df).dropDuplicates()
  7. result = result.sort(asc("Id"), asc("Link"))


  1. df = spark.createDataFrame([(1, 2), (3, 1), (4, 2), (6, 5), (9, 7), (9, 10)], ["id", "link"])
  2. combinations = df.alias("a").crossJoin(df.alias("b")) \
  3. .filter(F.col("") != F.col(""))\
  4. .select(col("").alias("a_id"), col("").alias("b_id"), col("").alias("a_link"), col("").alias("b_link"))
  5. window = Window.partitionBy("a_id").orderBy("a_id", "b_link")
  6. paths = combinations.groupBy("a_id", "b_link") \
  7. .agg(F.first("b_id").over(window).alias("id")) \
  8. .groupBy("id").agg(F.collect_list("b_link").alias("links"))
  9. result ="id", F.explode("links").alias("link"))
  10. result = result.union(df.selectExpr("id as id_", "link as link_"))

Any help would be much appreciated.


得分: 2


  1. import os
  2. sc.addPyFile(os.path.expanduser('graphframes-0.8.1-spark3.0-s_2.12.jar'))
  3. from graphframes import *
  4. e ='id', 'link').toDF('src', 'dst')
  5. v ='src').toDF('id') \
  6. .union('dst')) \
  7. .distinct()
  8. g = GraphFrame(v, e)
  9. sc.setCheckpointDir("/tmp/graphframes")
  10. df = g.connectedComponents()
  11. df.join(df.withColumnRenamed('id', 'link'), ['component'], 'inner') \
  12. .drop('component') \
  13. .filter('id != link') \
  14. .show()




  1. import networkx as nx
  2. df = df.toPandas()
  3. G = nx.from_pandas_edgelist(df, 'id', 'link')
  4. components = [[list(c)] for c in nx.connected_components(G)]
  5. df2 = spark.createDataFrame(components, ['array']) \
  6. .withColumn('component', f.monotonically_increasing_id()) \
  7. .select('component', f.explode('array').alias('id'))
  8. df2.join(df2.withColumnRenamed('id', 'link'), ['component'], 'inner') \
  9. .drop('component') \
  10. .filter('id != link') \
  11. .show()



This is not a general approach but you can use the graphframes package. You might struggle to set it up but one can use it, the result is simple.

  1. import os
  2. sc.addPyFile(os.path.expanduser('graphframes-0.8.1-spark3.0-s_2.12.jar'))
  3. from graphframes import *
  4. e ='id', 'link').toDF('src', 'dst')
  5. v ='src').toDF('id') \
  6. .union('dst')) \
  7. .distinct()
  8. g = GraphFrame(v, e)
  9. sc.setCheckpointDir("/tmp/graphframes")
  10. df = g.connectedComponents()
  11. df.join(df.withColumnRenamed('id', 'link'), ['component'], 'inner') \
  12. .drop('component') \
  13. .filter('id != link') \
  14. .show()
  15. +---+----+
  16. | id|link|
  17. +---+----+
  18. | 7| 10|
  19. | 7| 9|
  20. | 3| 2|
  21. | 3| 4|
  22. | 3| 1|
  23. | 5| 6|
  24. | 6| 5|
  25. | 9| 10|
  26. | 9| 7|
  27. | 1| 2|
  28. | 1| 4|
  29. | 1| 3|
  30. | 10| 9|
  31. | 10| 7|
  32. | 4| 2|
  33. | 4| 1|
  34. | 4| 3|
  35. | 2| 4|
  36. | 2| 1|
  37. | 2| 3|
  38. +---+----+

connectedComponents method returns the component id for each vertex, that is unique for each vertex group (that is connected by edge and seperated if there is no edge to the other component). So you can do the cartesian product for each component without the vertex itself.

Added answer

Inspired from the above approach, I looked up and found the networkx package.

  1. import networkx as nx
  2. df = df.toPandas()
  3. G = nx.from_pandas_edgelist(df, 'id', 'link')
  4. components = [[list(c)] for c in nx.connected_components(G)]
  5. df2 = spark.createDataFrame(components, ['array']) \
  6. .withColumn('component', f.monotonically_increasing_id()) \
  7. .select('component', f.explode('array').alias('id'))
  8. df2.join(df2.withColumnRenamed('id', 'link'), ['component'], 'inner') \
  9. .drop('component') \
  10. .filter('id != link') \
  11. .show()
  12. +---+----+
  13. | id|link|
  14. +---+----+
  15. | 1| 2|
  16. | 1| 3|
  17. | 1| 4|
  18. | 2| 1|
  19. | 2| 3|
  20. | 2| 4|
  21. | 3| 1|
  22. | 3| 2|
  23. | 3| 4|
  24. | 4| 1|
  25. | 4| 2|
  26. | 4| 3|
  27. | 5| 6|
  28. | 6| 5|
  29. | 9| 10|
  30. | 9| 7|
  31. | 10| 9|
  32. | 10| 7|
  33. | 7| 9|
  34. | 7| 10|
  35. +---+----+

  • 本文由 发表于 2023年2月14日 04:10:46
  • 转载请务必保留本文链接:



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
