如何检测Spark Graphframes中的循环?

huangapple go评论62阅读模式
英文:

how to detect a cycle in a Spark Graphframes?

问题

这里是一个代表有向图的Spark Graphframes数据框,这个图中可能存在一些循环。如何在Graphframe中检测这些循环呢?

例如,这是一个图:

| src | dst |
| --- | --- |
| 1   | 2   |
| 2   | 3   |
| 3   | 4   |
| 3   | 1   |
| 4   | 1   |

这个图中的循环应该是{1,2,3}和{1,2,3,4}。

英文:

Here is a Spark Graphframes df repsenting a directed graph, there may be some cycles in this graph. How can I detect the cycles in a Graphframe?

For example, here is a graph

| src | dst |
| --- | --- |
| 1   | 2   |
| 2   | 3   |
| 3   | 4   |
| 3   | 1   |
| 4   | 1   |

the cycle in this graph should be {1,2,3} and {1,2,3,4}.

答案1

得分: 1

虽然单独使用GraphFrames可能无法直接提供您任务所需的功能,但将其与NetworkX和PandasUDF结合使用被证明是一种有效的解决方案。首先,让我们探讨一下与您的示例特别相关的NetworkX的功能。

示例图的绘图:

[![示例图的可视化][1]][1]

Johnson的算法,是NetworkX中simple_cycles函数的基础,其时间复杂度针对这种类型的任务优于其他基于DFS修改的算法([来源][2])。

在NetworkX中查找循环的代码:

import pandas as pd
import networkx as nx

df_edges = pd.DataFrame({
    'src': [1, 2, 3, 3, 4],
    'dst': [2, 3, 4, 1, 1]
})
# 从DataFrame创建有向图
G = nx.from_pandas_edgelist(df_edges, source='src', target='dst', create_using=nx.DiGraph())
# 查找循环
cycles = list(nx.simple_cycles(G))
print(cycles) #输出: [[1, 2, 3], [1, 2, 3, 4]]

NetworkX的simple_cycles函数显然提供了所需的功能。然而,考虑到可能存在的可扩展性问题以及在Spark生态系统内运行的需求,有必要寻找一种以并行方式运行的解决方案。这就是PandasUDF(矢量化UDF)的用武之地。为了制定一个可扩展且通用的解决方案,我们的第一步是执行一个连通组件操作。GraphFrames方便地提供了这个功能,如下所示:

from graphframes import *
g = GraphFrame(df_edges)  
result = g.connectedComponents()

在获取来自connectedComponents函数的输出之后,通常会以[node,组件ID]的格式扩展原始的边DataFrame。这将产生一个以[src, dst, component]结构化的Spark DataFrame。

为了简洁起见,在示例的后续步骤中,我将手动生成这样一个Spark DataFrame。为了说明循环查找功能在不同连接的组件之间的并行化能力,我还将额外子图的边合并到边列表中。

假设这是扩展的边列表:

df_edges = pd.DataFrame({
    'src': [1, 2, 3, 3, 4, 5, 6, 7],
    'dst': [2, 3, 4, 1, 1, 6, 7, 5],
    'component': [1, 1, 1, 1, 1, 2, 2, 2]
})

然后,通过以下方式可视化扩展的图,由两个不同的连接组件组成:

[![扩展图][3]][3]

这是包含组件ID的扩展边列表在使用.show()时的外观:

+---+---+---------+
|src|dst|component|
+---+---+---------+
|  1|  2|        1|
|  2|  3|        1|
|  3|  4|        1|
|  3|  1|        1|
|  4|  1|        1|
|  5|  6|        2|
|  6|  7|        2|
|  7|  5|        2|
+---+---+---------+

接下来,我们定义一个Pandas UDF,可以应用于每个连接组件的分组。除了查找循环,这个函数还设计用于返回有用的信息,例如在每个组件基础上找到的循环数量以及构成每个循环的边的列表:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import json

schema = StructType([
    StructField('component', IntegerType()),
    StructField('no_of_cycles', IntegerType()),
    StructField('cyclelist', StringType())
])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def find_cycles(pdf):
    G = nx.from_pandas_edgelist(pdf, source='src', target='dst', create_using=nx.DiGraph())
    cycles = list(nx.simple_cycles(G))
    cyclelist = json.dumps(cycles)
    num_cycles = len(cycles)
    return pd.DataFrame({'component': [pdf['component'].iloc[0]], 
                         'no_of_cycles': [num_cycles], 
                         'cyclelist': [cyclelist]})

现在定义了Pandas UDF,我们继续将此函数应用于每个单独的连接组件,如下所示:

cycles = spark_df_edges.groupby('component').apply(find_cycles).show(truncate=False)

最终,我们可以将这两个DataFrame连接起来:

from pyspark.sql.functions import broadcast
joined_df = spark_df_edges.join(broadcast(cycles), on='component', how='inner')
joined_df.show(truncate=False)

结果如下:

+---------+---+---+------------+-------------------------+
|component|src|dst|no_of_cycles|cyclelist                |
+---------+---+---+------------+-------------------------+
|1        |1  |2  |2           |[[1, 2, 3], [1, 2, 3, 4]]|
|1        |2  |3  |2           |[[1, 2, 3], [1, 2, 3, 4]]|
|1        |3  |4  |2           |[[1, 2, 3], [1, 2, 3, 4]]|
|1        |3  |1  |2           |[[1, 2, 3], [1, 2, 3, 4]]|
|1        |4  |1  |2           |[[1, 2, 3], [1, 2, 3, 4]]|
|2        |5  |6  |1           |[[5, 6, 7]]              |
|2        |6  |

<details>
<summary>英文:</summary>



Although GraphFrames alone may not fully provide the requisite functionality for your task out of the box, combining it with NetworkX and PandasUDF proves to be an effective solution. Initially, let&#39;s explore the capabilities of NetworkX, particularly relevant to your example.



plot of example graph :

[![graph visualisation of example][1]][1]


Johnson&#39;s algorithm, which underpins the simple_cycles function in NetworkX, has better time complexity tailored for tasks of this nature than other DFS modification based algorithms ([source][2])

Code to find the cycles in NetworkX:


    import pandas as pd
    import networkx as nx
    
    df_edges = pd.DataFrame({
        &#39;src&#39;: [1, 2, 3, 3, 4],
        &#39;dst&#39;: [2, 3, 4, 1, 1]
    })
    # Create a directed graph from the dataframe
    G = nx.from_pandas_edgelist(df_edges, source=&#39;src&#39;, target=&#39;dst&#39;, create_using=nx.DiGraph())
    # Find cycles
    cycles = list(nx.simple_cycles(G))
    print(cycles) #output: [[1, 2, 3], [1, 2, 3, 4]]

The NetworkX function simple_cycles evidently provides the desired functionality. However, considering the potential scalability concerns and the need to function within the Spark ecosystem, it is beneficial to seek a solution that operates in a parallelized manner. This is where the utility of PandasUDF (vectorized UDF) shines. To formulate a scalable and generalizable solution, our first step is to execute a connected components operation. GraphFrames conveniently provides this capability, as demonstrated below:

    from graphframes import *
    g = GraphFrame(df_edges)  
    result = g.connectedComponents()

After obtaining the output from the connected components function, which would typically be in the format [node, component id], you can extend your original edge DataFrame with this component id. This results in a Spark DataFrame structured as [src, dst, component].

For the sake of brevity, I&#39;ll generate such a Spark DataFrame manually in the subsequent steps of the example. To illustrate the parallelization capabilities of the cycle finding function across distinct connected components, I&#39;ll also incorporate the edges of an additional subgraph into the edgelist.

Assuming this is the extended edgelist

    df_edges = pd.DataFrame({
        &#39;src&#39;: [1, 2, 3, 3, 4,5,6,7],
        &#39;dst&#39;: [2, 3, 4, 1, 1,6,7,5],
        &#39;component&#39; : [1,1,1,1,1,2,2,2]
    })

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.getOrCreate()
    # Convert pandas example DataFrame to Spark DataFrame
    # this is in place of the processed output 
    # derived from both the original DataFrame 
    # and the connected components output.
    spark_df_edges = spark.createDataFrame(df_edges)

Here&#39;s a visualization of the expanded graph, composed of two distinct connected components:

[![extended graph][3]][3]

This is how the expanded edgelist, now integrated with the component id, appears when using .show()

    +---+---+---------+
    |src|dst|component|
    +---+---+---------+
    |  1|  2|        1|
    |  2|  3|        1|
    |  3|  4|        1|
    |  3|  1|        1|
    |  4|  1|        1|
    |  5|  6|        2|
    |  6|  7|        2|
    |  7|  5|        2|
    +---+---+---------+


Next, we define a Pandas UDF that can be applied on each group of connected components.In addition to finding cycles, this function is designed to return useful information such as the count of cycles found and a list of edges constituting each cycle on a per-component basis:

    from pyspark.sql.functions import pandas_udf, PandasUDFType
    from pyspark.sql.types import StructType, StructField, IntegerType,StringType
    import json
    
    schema = StructType([
        StructField(&#39;component&#39;, IntegerType()),
        StructField(&#39;no_of_cycles&#39;, IntegerType()),
        StructField(&#39;cyclelist&#39;, StringType())
    ])
    
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def find_cycles(pdf):
        G = nx.from_pandas_edgelist(pdf, source=&#39;src&#39;, target=&#39;dst&#39;, create_using=nx.DiGraph())
        cycles = list(nx.simple_cycles(G))
        cyclelist = json.dumps(cycles)
        num_cycles = len(cycles)
        return pd.DataFrame({&#39;component&#39;: [pdf[&#39;component&#39;].iloc[0]], 
                             &#39;no_of_cycles&#39;: [num_cycles], 
                             &#39;cyclelist&#39;: [cyclelist]})

With the Pandas UDF now defined, we proceed to apply this function to each individual connected component like this:

    cycles=spark_df_edges.groupby(&#39;component&#39;).apply(find_cycles).show(truncate=False)

The result cycles dataframe look like this:

    +---------+------------+-------------------------+
    |component|no_of_cycles|cyclelist                |
    +---------+------------+-------------------------+
    |1        |2           |[[1, 2, 3], [1, 2, 3, 4]]|
    |2        |1           |[[5, 6, 7]]              |
    +---------+------------+-------------------------+


Finally, we can join the two DataFrames:


    from pyspark.sql.functions import broadcast
    joined_df = spark_df_edges.join(broadcast(cycles), on=&#39;component&#39;, how=&#39;inner&#39;)
    joined_df.show(truncate=False)

result is

    +---------+---+---+------------+-------------------------+
    |component|src|dst|no_of_cycles|cyclelist                |
    +---------+---+---+------------+-------------------------+
    |1        |1  |2  |2           |[[1, 2, 3], [1, 2, 3, 4]]|
    |1        |2  |3  |2           |[[1, 2, 3], [1, 2, 3, 4]]|
    |1        |3  |4  |2           |[[1, 2, 3], [1, 2, 3, 4]]|
    |1        |3  |1  |2           |[[1, 2, 3], [1, 2, 3, 4]]|
    |1        |4  |1  |2           |[[1, 2, 3], [1, 2, 3, 4]]|
    |2        |5  |6  |1           |[[5, 6, 7]]              |
    |2        |6  |7  |1           |[[5, 6, 7]]              |
    |2        |7  |5  |1           |[[5, 6, 7]]              |
    +---------+---+---+------------+-------------------------+

Note that we can use `broadcast` here as the number of rows in the cycles dataframe is the number of connected components which usually is much smaller than the number of rows in an edgelist. The broadcast function tells Spark to broadcast the smaller DataFrame to all the worker nodes, which can speed up the join operation if one DataFrame is much smaller than the other.


  [1]: https://i.stack.imgur.com/n1AIZ.png
  [2]: https://www.cs.tufts.edu/comp/150GA/homeworks/hw1/Johnson%2075.PDF
  [3]: https://i.stack.imgur.com/avrcU.png

</details>



# 答案2
**得分**: 0

你可以使用 [BFS][1] 算法来查找图中的循环

[1]: https://graphframes.github.io/graphframes/docs/_site/user-guide.html#breadth-first-search-bfs

<details>
<summary>英文:</summary>

You could use [BFS][1] algorithm to find cycles in your graph


  [1]: https://graphframes.github.io/graphframes/docs/_site/user-guide.html#breadth-first-search-bfs

</details>



# 答案3
**得分**: 0

我对'Spark Graphframe'一无所知

希望你会发现这对修改BFS以查找循环有用在标准BFS算法中代码会跟踪先前访问过的顶点当搜索当前顶点的可达邻居时会跳过先前访问过的顶点

在修改为查找循环的BFS中可能会遇到先前访问过的顶点因为存在循环为了检查这一点会应用Dijsktra算法以找到从当前顶点到图的其余部分然后返回先前访问的顶点的最短路径如果存在这样的路径那么就是一个循环

这里是这种情况的示意图

[![在此输入图像描述][1]][1]


这是算法的基本要点但你需要处理一些重要的细节

- 可能会多次检测到相同的循环需要代码来检查新发现的循环是否是新的还是已知的

- 多图节点对之间的多个边

- 具有多个组件的图

- 无向图。(你的问题指定了有向图所以你可以通过测试无向图来解决问题但是对算法的另一种修改也可以处理这些情况

也许一个实现这一算法的[C++代码链接][2]会有所帮助

  [1]: https://i.stack.imgur.com/BwQCh.png
  [2]: https://github.com/JamesBremner/PathFinder/blob/1e9d567dc2442a85bc6e62efb78b63af72c11537/src/GraphTheory.cpp#L413-L516

<details>
<summary>英文:</summary>

I know nothing about &#39;Spark Graphframe&#39;

In the hope you will find it useful, here is how to modify BFS to find cycles:

In the standard BFS algorithm, the code keeps track of which vertices have been previously visited.  When searching the reachable neighbors of the current vertex, a previously visited vertex is skipped.

In BFS modified to find cycles, encountering a previously visited vertex may happen because a cycle exists.

To check for this, Dijsktra is applied to find the shortest path from Current vertex, through the rest of the graph, and back to the previously visited vertex.  If such a path exists, then it is a cycle.

Here is an illustration of the situation

[![enter image description here][1]][1]


This is the basic essence of the algorithm, but you need to handle some significant details:

 - The same cycle may be detected multiple times.  Code is needed to check if a newly found cycle is novel or not

 - Multigraphs ( multiple edges between node pairs )

 - Graphs with more than one component

 - Undirected graphs. ( Your question specifies directed graphs, so you can get away with testing for undirected graphs.  However another modification to the algorithm will also handle these )

Maybe a 
[2] implementing this will be helpful?
[1]: https://i.stack.imgur.com/BwQCh.png [2]: https://github.com/JamesBremner/PathFinder/blob/1e9d567dc2442a85bc6e62efb78b63af72c11537/src/GraphTheory.cpp#L413-L516 </details>

huangapple
  • 本文由 发表于 2023年7月3日 14:18:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/76602251.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定