Python/Neo4j查询优化

huangapple go评论73阅读模式
英文:

Python/Neo4j Query Optimization

问题

我已经耗尽了尝试找出低写入速度原因的想法。我的背景是关系型数据库,所以可能我做错了什么。要添加10个节点和45个连接,我目前需要1.4秒(空白数据库)。在我看来,这是不可接受的,甚至应该是毫秒级别的。

要求

创建向Neo4j数据库添加快照的方法。一个快照包含10个节点(它们都具有不同的标签和属性)。我需要在此快照中单向连接所有节点,而且不允许递归连接。这等于每个快照10个节点和45个连接。关系使用属性strength = 1创建。每次添加新关系时,如果它已经存在(意味着match nodeA(oHash) -> nodeB(oHash)),我只是增加强度而不是创建重复关系。

测量

我已经考虑了与API、Python本身等相关的所有开销。目前超过99.9%的执行时间来自Neo4j的查询。我观察到生成节点似乎比生成连接要慢得多(大约占总时间的约90%)。

将一个快照(10个节点,45个连接)生成到空数据库中,我的查询(来自Python)平均需要1.4秒,在100次运行上。

索引

在我即将在帖子中展示的代码中,您将找到一个我从未调用的create constraints方法。这是因为我已经在所有节点/标签类型上创建了索引,并且我删除了对它的调用,以减少检查现有索引的开销。每个节点都有一个“oHash”属性,它是所有属性的json的MD5哈希,不包括内部Neo4j的ID。这唯一标识我的节点,因此我在“oHash”上创建了唯一约束。据我理解,创建唯一约束还会在Neo4j的该属性上创建索引。

最佳实践

我使用了我在网上找到的所有推荐最佳实践,包括:

  1. 创建一个单一的驱动程序实例并重用它
  2. 创建一个单一的驱动程序会话并重用它
  3. 使用显式事务
  4. 使用查询参数
  5. 创建一个批次并作为单一事务执行

实现

这是我的当前实现:

import json
import hashlib
import uuid
from neo4j import GraphDatabase

class SnapshotRepository:
    """处理Neo4j数据库中快照的存储库。"""

    def __init__(self):
        """初始化与Neo4j数据库的连接。"""
        with open("config.json", "r") as file:
            config = json.load(file)
        self._driver = GraphDatabase.driver(
            config["uri"], auth=(config["username"], config["password"])
        )
        self._session = self._driver.session()

    def delete_all(self):
        """从图中删除所有节点和关系。"""
        self._session.run("MATCH (n) DETACH DELETE n")

    def add_snapshot(self, data):
        """
        向Neo4j数据库添加一个快照。

        Args:
            data (dict): 要添加的快照数据。
        """
        snapshot_id = str(uuid.uuid4())  # 生成唯一的快照ID
        self._session.execute_write(self._add_complete_graph, data, snapshot_id)

    def _create_constraints(self, tx, labels):
        """
        为指定的标签创建唯一约束。

        Args:
            tx (neo4j.Transaction): 要执行的事务。
            labels (list): 要为其创建唯一约束的标签列表。
        """
        for label in labels:
            tx.run(f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{label}) REQUIRE n.oHash IS UNIQUE")

    @staticmethod
    def _calculate_oHash(node):
        """
        根据节点的属性计算oHash。

        Args:
            node (dict): 节点属性。

        Returns:
            str: 计算得到的oHash。
        """
        properties = {k: v for k, v in node.items() if k not in ['id', 'snapshotId', 'oHash']}
        properties_json = json.dumps(properties, sort_keys=True)
        return hashlib.md5(properties_json.encode('utf-8')).hexdigest()

    def _create_or_update_nodes(self, tx, nodes, snapshot_id):
        """
        在图中创建或更新节点。

        Args:
            tx (neo4j.Transaction): 要执行的事务。
            nodes (list): 要创建或更新的节点。
            snapshot_id (str): 快照的ID。
        """
        for node in nodes:
            node['oHash'] = self._calculate_oHash(node)
            node['snapshotId'] = snapshot_id
            tx.run("""
                MERGE (n:{0} {{oHash: $oHash}})
                ON CREATE SET n = $props
                ON MATCH SET n = $props
            """.format(node['label']), oHash=node['oHash'], props=node)

    def _create_relationships(self, tx, prev, curr):
        """
        在图中创建节点之间的关系。

        Args:
            tx (neo4j.Transaction): 要执行的事务。
            prev (dict): 前一个节点的属性。
            curr (dict): 当前节点的属性。
        """
        if prev and curr:
            oHashA = self._calculate_oHash(prev)
            oHashB = self._calculate_oHash(curr)
            tx.run("""
                MATCH (a:{0} {{oHash: $oHashA}}), (b:{1} {{oHash: $oHashB}})
                MERGE (a)-[r:HAS_NEXT]->(b)
                ON CREATE SET r.strength = 1
                ON MATCH SET r.strength = r.strength + 1
            """.format(prev['label'], curr['label']), oHashA=oHashA, oHashB=oHashB)

    def _add_complete_graph(self, tx, data, snapshot_id):
        """
        为给定的快照向Neo4j数据库添加一个完整的图。

        Args:
            tx (neo4j.Transaction): 要执行的事务。


<details>
<summary>英文:</summary>

I have run out of ideas trying to find the cause for such low write speeds. My background is relational databases so I might be doing something wrong. To add 10 nodes and 45 connections I currently need 1.4 seconds (empty DB). This is unacceptable and in my opinion should be order of milliseconds if even.

**Requirement**

Create method to add **snapshots** to the Neo4j database. One snapshot consists of 10 nodes (all have different labels and properties). I need to connect all nodes in this snapshot, unidirectionally, and without recursive connections. This equates to 10 nodes 45 connections per snapshot. Relationships are created with property strength = 1. Every time I add a new relationship, if it already exists (meaning `match nodeA(oHash) -&gt; nodeB(oHash)`) I just increment the strength instead of having duplicates.

**Measurements**

I compensated for all overhead with regards to the API, Python itself, etc. Currently over **99.9%** of the execution time is from querying Neo4j. I observed that generating nodes seems to be much slower than generating the connections (**about ~90%** of the total time).

To generate one snapshot (10 nodes, 45 connections) into an empty database my query (from Python) takes **1.4 seconds** when averaged on **100 runs**.


**Indexes**

In the code I am going to show in the post, below, you will find a create constraints method that I never call. This is because I already created the indexes on all node/label types and I removed the call to it to reduce overhead of checking on existing indexes. Every node has an &quot;oHash&quot; property which is an MD5 hash of the json of all properties excluding internal Neo4j \&lt;ID\&gt;. This uniquely identifies my nodes so I created a UNIQUE constraint on &quot;oHash&quot;. As far as I understand creating a UNIQUE constraint also creates an index on that property in Neo4j.

**Best Practices**

I used all recommended best practices I could find online. These include:

 1. Creating a single driver instance and reusing it
 2. Creating a single driver session and reusing it
 3. Using explicit transactions
 4. Using query parameters
 5. Creating a batch and executing as a single transaction

**Implementation**

Here is my current implementation:

    import json
    import hashlib
    import uuid
    from neo4j import GraphDatabase

    class SnapshotRepository:
        &quot;&quot;&quot;A repository to handle snapshots in a Neo4j database.&quot;&quot;&quot;

        def __init__(self):
            &quot;&quot;&quot;Initialize a connection to the Neo4j database.&quot;&quot;&quot;
            with open(&quot;config.json&quot;, &quot;r&quot;) as file:
                config = json.load(file)
            self._driver = GraphDatabase.driver(
                config[&quot;uri&quot;], auth=(config[&quot;username&quot;], config[&quot;password&quot;])
            )
            self._session = self._driver.session()

        def delete_all(self):
            &quot;&quot;&quot;Delete all nodes and relationships from the graph.&quot;&quot;&quot;
            self._session.run(&quot;MATCH (n) DETACH DELETE n&quot;)

        def add_snapshot(self, data):
            &quot;&quot;&quot;
            Add a snapshot to the Neo4j database.
            
            Args:
                data (dict): The snapshot data to be added.
            &quot;&quot;&quot;
            snapshot_id = str(uuid.uuid4())  # Generate a unique snapshot ID
            self._session.execute_write(self._add_complete_graph, data, snapshot_id)

        def _create_constraints(self, tx, labels):
            &quot;&quot;&quot;
            Create uniqueness constraints for the specified labels.
            
            Args:
                tx (neo4j.Transaction): The transaction to be executed.
                labels (list): List of labels for which to create uniqueness constraints.
            &quot;&quot;&quot;
            for label in labels:
                tx.run(f&quot;CREATE CONSTRAINT IF NOT EXISTS FOR (n:{label}) REQUIRE n.oHash IS UNIQUE&quot;)

        @staticmethod
        def _calculate_oHash(node):
            &quot;&quot;&quot;
            Calculate the oHash for a node based on its properties.

            Args:
                node (dict): The node properties.

            Returns:
                str: The calculated oHash.
            &quot;&quot;&quot;
            properties = {k: v for k, v in node.items() if k not in [&#39;id&#39;, &#39;snapshotId&#39;, &#39;oHash&#39;]}
            properties_json = json.dumps(properties, sort_keys=True)
            return hashlib.md5(properties_json.encode(&#39;utf-8&#39;)).hexdigest()

        def _create_or_update_nodes(self, tx, nodes, snapshot_id):
            &quot;&quot;&quot;
            Create or update nodes in the graph.

            Args:
                tx (neo4j.Transaction): The transaction to be executed.
                nodes (list): The nodes to be created or updated.
                snapshot_id (str): The ID of the snapshot.
            &quot;&quot;&quot;
            for node in nodes:
                node[&#39;oHash&#39;] = self._calculate_oHash(node)
                node[&#39;snapshotId&#39;] = snapshot_id
                tx.run(&quot;&quot;&quot;
                    MERGE (n:{0} {{oHash: $oHash}})
                    ON CREATE SET n = $props
                    ON MATCH SET n = $props
                &quot;&quot;&quot;.format(node[&#39;label&#39;]), oHash=node[&#39;oHash&#39;], props=node)

        def _create_relationships(self, tx, prev, curr):
            &quot;&quot;&quot;
            Create relationships between nodes in the graph.

            Args:
                tx (neo4j.Transaction): The transaction to be executed.
                prev (dict): The properties of the previous node.
                curr (dict): The properties of the current node.
            &quot;&quot;&quot;
            if prev and curr:
                oHashA = self._calculate_oHash(prev)
                oHashB = self._calculate_oHash(curr)
                tx.run(&quot;&quot;&quot;
                    MATCH (a:{0} {{oHash: $oHashA}}), (b:{1} {{oHash: $oHashB}})
                    MERGE (a)-[r:HAS_NEXT]-&gt;(b)
                    ON CREATE SET r.strength = 1
                    ON MATCH SET r.strength = r.strength + 1
                &quot;&quot;&quot;.format(prev[&#39;label&#39;], curr[&#39;label&#39;]), oHashA=oHashA, oHashB=oHashB)

        def _add_complete_graph(self, tx, data, snapshot_id):
            &quot;&quot;&quot;
            Add a complete graph to the Neo4j database for a given snapshot.

            Args:
                tx (neo4j.Transaction): The transaction to be executed.
                data (dict): The snapshot data.
                snapshot_id (str): The ID of the snapshot.
            &quot;&quot;&quot;
            nodes = data[&#39;nodes&#39;]
            self._create_or_update_nodes(tx, nodes, snapshot_id)
            tx.run(&quot;&quot;&quot;
                MATCH (a {snapshotId: $snapshotId}), (b {snapshotId: $snapshotId})
                WHERE a.oHash &lt; b.oHash
                MERGE (a)-[r:HAS]-&gt;(b)
                ON CREATE SET r.strength = 1, r.snapshotId = $snapshotId
                ON MATCH SET r.strength = r.strength + 1
            &quot;&quot;&quot;, snapshotId=snapshot_id)
            self._create_relationships(tx, data.get(&#39;previousMatchSnapshotNode&#39;, None), data.get(&#39;currentMatchSnapshotNode&#39;, None))

All input and suggestions are welcome.

</details>


# 答案1
**得分**: 1

Cypher查询看起来没问题。不确定Python驱动程序如何处理请求,但是否可能它直接发送所有Cypher语句?也就是说,你是否经历了长时间的持续,因为你没有等待单个网络响应,而是10个+45个=55个网络响应?1.4秒/55=每个请求25毫秒。

另外,如果你将所有Cypher语句打印到控制台,复制所有内容并在Neo4j的控制台中手动执行它们,那么执行时间是多长?

顺便说一下,你可以使用UNWIND关键字加载数据并对其进行迭代。这将减少解析Cypher语句所需的时间,同时减少多个小请求的开销:https://medium.com/neo4j/5-tips-tricks-for-fast-batched-updates-of-graph-structures-with-neo4j-and-cypher-73c7f693c8cc

<details>
<summary>英文:</summary>

Cypher queries seem fine. Not sure how the python driver handles the request, but could it be, that it sends all cypher statements directly? I.e. that you experience long durations because you&#39;re not waiting for a single network response, but 10 + 45 = 55 network responses? 1.4s / 55 = 25ms per request.
Also, if you print all cypher statements to the console, copy everything and execute them manually in Neo4j&#39;s console, how long is the execution time then?

Btw. you can use the UNWIND keyword to load data and iterate over it. This would reduce the time required for parsing the cypher statement as well reduce the overhead of multiple small requests: https://medium.com/neo4j/5-tips-tricks-for-fast-batched-updates-of-graph-structures-with-neo4j-and-cypher-73c7f693c8cc

</details>



# 答案2
**得分**: 1

这可能不是主要问题,但`_add_complete_graph`中的`MATCH`子句未指定`a`或`b`的节点标签。这会阻止使用任何索引,强制扫描所有这些节点。

为了使用相应的索引,必须指定节点标签。

*顺便说一下,[标签表达式语法](https://neo4j.com/docs/cypher-manual/current/syntax/expressions/#label-expressions)在neo4j 5.0中得到增强。*

[更新]

如果您事先不知道`a`和`b`的标签,有一个解决方法。您可以为所有可能的`a`和`b`节点添加一个新标签(比如`Snapshot`),并在`:Snapshot(snapshotId)`上创建一个[索引](https://neo4j.com/docs/cypher-manual/current/indexes-for-search-performance/)或[唯一性约束](https://neo4j.com/docs/cypher-manual/current/constraints/)。

<details>
<summary>英文:</summary>

This may not be the main issue, but the `MATCH` clause in `_add_complete_graph` does not specify the node label(s) for either `a` or `b`. That prevents the use of any indexes, forcing all nodes to be scanned for each of those nodes.

You must specify node labels in order to use the corresponding indexes.

*By the way, the [label expression syntax](https://neo4j.com/docs/cypher-manual/current/syntax/expressions/#label-expressions) was enhanced in neo4j 5.0.*

[UPDATE]

If you do not know beforehand the labels of `a` and `b`, there is one workaround. You could add a new label (say, `Snapshot`) to all possible `a` and `b` nodes, and create an [index](https://neo4j.com/docs/cypher-manual/current/indexes-for-search-performance/) or [uniqueness constraint](https://neo4j.com/docs/cypher-manual/current/constraints/) on `:Snapshot(snapshotId)`.

</details>



# 答案3
**得分**: 0

我已找到解决方案。对于可能会遇到此线程的所有人,以下是代码中存在的两个问题。

首先,我从未关闭会话。由于我正在处理GCP函数,我完全忘记了这一点。我将其添加到类析构函数中:

```python
def __del__(self):
    """Close the connection to the Neo4j database."""
    self._session.close()
    self._driver.close()

其次,我在Neo4j性能建议文档中阅读到:
https://neo4j.com/docs/python-manual/current/performance/
在创建驱动程序时不设置DB名称会在执行许多查询时引起显着的开销,而这正是我的情况。

self._db_name = "neo4j"
self._session = self._driver.session(database=self._db_name)

我在运行EXPLAIN时在分析器中发现的情况是,DB查找在大多数查询中都占有重要地位。我通常在SQL连接器中设置DB名称,从不再考虑它。

英文:

I have found the solution. For everyone who might stumble across this thread here are two things that are wrong with the code.

Firstly, I never close the session. Since I was dealing with GCP Functions I completely forgot about this. I added it to the class destructor:

def __del__(self):
&quot;&quot;&quot;Close the connection to the Neo4j database.&quot;&quot;&quot;
self._session.close()
self._driver.close()

Secondly, I read in the Neo4j Performance Recommendation docs:
https://neo4j.com/docs/python-manual/current/performance/
that not setting the DB name when creating the driver can cause a significant overhead when many queries are being executed which was my situation.

self._db_name = &quot;neo4j&quot;
self._session = self._driver.session(database=self._db_name)

What I found in the profiler when running EXPLAIN is that DB-lookup is a major part in most queries. I usually set the DB name in an SQL-connector and never think twice about it.

huangapple
  • 本文由 发表于 2023年6月13日 18:52:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/76464153.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定