通过在Spark中使用数据框列来定义关系类型的方式是什么?

huangapple go评论79阅读模式
英文:

Is there a way to define the relationship type using a dataframe column in spark?

问题

Here's the translated code snippet with the requested parts:

  1. 有没有办法使用Spark中的DataFrame列来定义关系类型
  2. import pandas as pd
  3. _list = []
  4. _dict = {}
  5. _dict['ENV'] = "DEV"
  6. _dict['PRIVILEGE'] = "DML"
  7. _dict['ROLE'] = "ROLE1"
  8. _dict['DATABASE'] = "Database1"
  9. _list.append(_dict)
  10. _dict['ENV'] = "DEV"
  11. _dict['PRIVILEGE'] = "DDL"
  12. _dict['ROLE'] = "ROLE2"
  13. _dict['DATABASE'] = "Database1"
  14. _list.append(_dict)
  15. df = pd.DataFrame(_list)
  16. df = spark.createDataFrame(df)
  17. df.write.format("org.neo4j.spark.DataSource") \
  18. .mode("Overwrite") \
  19. .option("relationship", df["PRIVILEGE"]) \
  20. .option("relationship.save.strategy", "keys") \
  21. .option("relationship.target.labels", "DATABASE") \
  22. .option("relationship.target.node.keys", "DATABASE, ENV") \
  23. .option("relationship.target.save.mode", "overwrite") \
  24. .option("relationship.source.labels", "ROLE") \
  25. .option("relationship.source.save.mode", "overwrite") \
  26. .option("relationship.source.node.keys", "ROLE, ENV") \
  27. .save()

It will write the relationship type as "DML" or "DDL" based on the "PRIVILEGE" column values, rather than as "Column<'PRIVILEGE'>" as you mentioned.

英文:

Is there a way to define the relationship type using a dataframe column in spark?

  1. import pandas as pd
  2. _list = []
  3. _dict = {}
  4. _dict[&#39;ENV&#39;] = &quot;DEV&quot;
  5. _dict[&#39;PRIVILEGE&#39;] = &quot;DML&quot;
  6. _dict[&#39;ROLE&#39;] = &quot;ROLE1&quot;
  7. _dict[&#39;DATABASE&#39;] = &quot;Database1&quot;
  8. _list.append(_dict)
  9. _dict[&#39;ENV&#39;] = &quot;DEV&quot;
  10. _dict[&#39;PRIVILEGE&#39;] = &quot;DDL&quot;
  11. _dict[&#39;ROLE&#39;] = &quot;ROLE2&quot;
  12. _dict[&#39;DATABASE&#39;] = &quot;Database1&quot;
  13. _list.append(_dict)
  14. df = pd.DataFrame(_list)
  15. df = spark.createDataFrame(df)
  16. df.write.format(&quot;org.neo4j.spark.DataSource&quot;) \
  17. .mode(&quot;Overwrite&quot;) \
  18. .option(&quot;relationship&quot;, df[&quot;PRIVILEGE&quot;]) \
  19. .option(&quot;relationship.save.strategy&quot;, &quot;keys&quot;) \
  20. .option(&quot;relationship.target.labels&quot;, &quot;DATABASE&quot;) \
  21. .option(&quot;relationship.target.node.keys&quot;, &quot;DATABASE, ENV&quot;) \
  22. .option(&quot;relationship.target.save.mode&quot;, &quot;overwrite&quot;) \
  23. .option(&quot;relationship.source.labels&quot;, &quot;ROLE&quot;) \
  24. .option(&quot;relationship.source.save.mode&quot;, &quot;overwrite&quot;) \
  25. .option(&quot;relationship.source.node.keys&quot;, &quot;ROLE, ENV&quot;) \
  26. .save()

It is writing the type as Column<'PRIVILEGE'>, when I want the value.

答案1

得分: 1

df["PRIVILEGE"] 返回一个 org.apache.spark.sql.Column,所以你所使用的关系类型只是 Column.toString() 的值(即 Column<'PRIVILEGE'>)。

"relationship" 选项期望其参数(关系类型)是一个固定的字符串。它不支持从 DataFrame 列中动态获取关系类型名称。

因此,你应该为每个关系类型创建一个单独的 DataFrame,并为每个 DataFrame 单独进行写入操作。

另外,顺便说一句,由于你对每个 _list 元素重用了相同的 _dict 实例,所有 _list 元素都将相同。你需要为每个元素创建一个新的字典。

英文:

df[&quot;PRIVILEGE&quot;] returns a org.apache.spark.sql.Column, so what you are using as the relationship type is just the value of Column.toString() (i.e., Column&lt;&#39;PRIVILEGE&#39;&gt;).

The "relationship" option expects its argument (the relationship type) to be a fixed string. It does not support getting the relationship type name dynamically from a DataFrame column.

Therefore, you should create a separate DataFrame for each relationship type and have a separate write for each DataFrame.

And, by the way, since you reuse the same _dict instance for every _list element, all _list elements will be identical. You need to create a new dict for every element.

答案2

得分: 0

请提出一个问题,以便团队可以考虑如何实施。

我见过使用以下两种方法之一:

  • 逐个迭代关系类型(由@cybersam和@jose_bacoy提到)

例如:获取 PRIVILEGE 列的唯一值 = 关系类型

  1. relTypeColumn="PRIVILEGE"
  2. uniquerelTypes=df.dropDuplicates([relTypeColumn])
  3. relTypes = uniquerelTypes.select(relTypeColumn).collect()

然后对这些关系类型进行迭代,筛选匹配的 DataFrame 行并写入 Neo4j

  1. for row in relTypes:
  2. relType=row[relTypeColumn]
  3. relsubset=df.filter(df[relTypeColumn] == relType)
  4. relsubset.write.format("org.neo4j.spark.DataSource") \
  5. .mode("Overwrite") \
  6. .option("relationship", relType) \
  7. .option("relationship.save.strategy", "keys") \
  8. .option("relationship.target.labels", "DATABASE") \
  9. .option("relationship.target.node.keys", "DATABASE, ENV") \
  10. .option("relationship.target.save.mode", "overwrite") \
  11. .option("relationship.source.labels", "ROLE") \
  12. .option("relationship.source.save.mode", "overwrite") \
  13. .option("relationship.source.node.keys", "ROLE, ENV") \
  14. .option("url", neo4j_url) \
  15. .option("authentication.type", "basic") \
  16. .option("authentication.basic.username", neo4j_user) \
  17. .option("authentication.basic.password", neo4j_password) \
  18. .save()

例如:写入节点

  1. df.write.format("org.neo4j.spark.DataSource") \
  2. .mode("Overwrite") \
  3. .option("labels", ":DATABASE") \
  4. .option("node.keys", "DATABASE,ENV") \
  5. .option("schema.optimization.type", "NODE_CONSTRAINTS") \
  6. .option("url", neo4j_url) \
  7. .option("authentication.type", "basic") \
  8. .option("authentication.basic.username", neo4j_user) \
  9. .option("authentication.basic.password", neo4j_password) \
  10. .save()

然后使用 apoc 写入关系以允许从 DataFrame 中传递动态值

  1. df.write.format("org.neo4j.spark.DataSource") \
  2. .mode("Overwrite") \
  3. .option("query", """
  4. MATCH (database:DATABASE {DATABASE:event.DATABASE, ENV:event.ENV})
  5. MATCH (role:ROLE {ROLE:event.ROLE, ENV:event.ENV})
  6. WITH database, role, event
  7. CALL apoc.create.relationship(database, event.PRIVILEGE, {}, role) YIELD rel
  8. return count(role)""") \
  9. .option("url", neo4j_url) \
  10. .option("authentication.type", "basic") \
  11. .option("authentication.basic.username", neo4j_user) \
  12. .option("authentication.basic.password", neo4j_password) \
  13. .save()
英文:

Please do raise an issue so the team can consider how it can be implemented

The approaches i've seen be used are to either:

  • iterate one relationship type at a time (as mentioned by @cybersam and @jose_bacoy)

e.g. get unique values of the PRIVILEGE column = relationship types

  1. relTypeColumn=&quot;PRIVILEGE&quot;
  2. uniquerelTypes=df.dropDuplicates([relTypeColumn])
  3. relTypes = uniquerelTypes.select(relTypeColumn).collect()

Then iterate over those relationship types, filter the DataFrame for those rows that match and write to Neo4j

  1. for row in relTypes:
  2. relType=row[relTypeColumn]
  3. relsubset=df.filter(df[relTypeColumn] == relType)
  4. relsubset.write.format(&quot;org.neo4j.spark.DataSource&quot;) \
  5. .mode(&quot;Overwrite&quot;) \
  6. .option(&quot;relationship&quot;, relType) \
  7. .option(&quot;relationship.save.strategy&quot;, &quot;keys&quot;) \
  8. .option(&quot;relationship.target.labels&quot;, &quot;DATABASE&quot;) \
  9. .option(&quot;relationship.target.node.keys&quot;, &quot;DATABASE, ENV&quot;) \
  10. .option(&quot;relationship.target.save.mode&quot;, &quot;overwrite&quot;) \
  11. .option(&quot;relationship.source.labels&quot;, &quot;ROLE&quot;) \
  12. .option(&quot;relationship.source.save.mode&quot;, &quot;overwrite&quot;) \
  13. .option(&quot;relationship.source.node.keys&quot;, &quot;ROLE, ENV&quot;) \
  14. .option(&quot;url&quot;, neo4j_url) \
  15. .option(&quot;authentication.type&quot;, &quot;basic&quot;) \
  16. .option(&quot;authentication.basic.username&quot;, neo4j_user) \
  17. .option(&quot;authentication.basic.password&quot;, neo4j_password) \
  18. .save()

e.g. write nodes

  1. df.write.format(&quot;org.neo4j.spark.DataSource&quot;) \
  2. .mode(&quot;Overwrite&quot;) \
  3. .option(&quot;labels&quot;, &quot;:DATABASE&quot;) \
  4. .option(&quot;node.keys&quot;, &quot;DATABASE,ENV&quot;) \
  5. .option(&quot;schema.optimization.type&quot;, &quot;NODE_CONSTRAINTS&quot;) \
  6. .option(&quot;url&quot;, neo4j_url) \
  7. .option(&quot;authentication.type&quot;, &quot;basic&quot;) \
  8. .option(&quot;authentication.basic.username&quot;, neo4j_user) \
  9. .option(&quot;authentication.basic.password&quot;, neo4j_password) \
  10. .save()

Then write the relationships using apoc to allow passing in dynamic values from the DataFrame

  1. df.write.format(&quot;org.neo4j.spark.DataSource&quot;) \
  2. .mode(&quot;Overwrite&quot;) \
  3. .option(&quot;query&quot;, &quot;&quot;&quot;
  4. MATCH (database:DATABASE {DATABASE:event.DATABASE, ENV:event.ENV})
  5. MATCH (role:ROLE {ROLE:event.ROLE, ENV:event.ENV})
  6. WITH database, role, event
  7. CALL apoc.create.relationship(database, event.PRIVILEGE, {}, role) YIELD rel
  8. return count(role)&quot;&quot;&quot;) \
  9. .option(&quot;url&quot;, neo4j_url) \
  10. .option(&quot;authentication.type&quot;, &quot;basic&quot;) \
  11. .option(&quot;authentication.basic.username&quot;, neo4j_user) \
  12. .option(&quot;authentication.basic.password&quot;, neo4j_password) \
  13. .save()

huangapple
  • 本文由 发表于 2023年5月11日 03:51:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/76222128.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定