通过在Spark中使用数据框列来定义关系类型的方式是什么?

huangapple go评论65阅读模式
英文:

Is there a way to define the relationship type using a dataframe column in spark?

问题

Here's the translated code snippet with the requested parts:

有没有办法使用Spark中的DataFrame列来定义关系类型

import pandas as pd

_list = []
_dict = {}

_dict['ENV'] = "DEV"
_dict['PRIVILEGE'] = "DML"
_dict['ROLE'] = "ROLE1"
_dict['DATABASE'] = "Database1"
_list.append(_dict)

_dict['ENV'] = "DEV"
_dict['PRIVILEGE'] = "DDL"
_dict['ROLE'] = "ROLE2"
_dict['DATABASE'] = "Database1"
_list.append(_dict)

df = pd.DataFrame(_list)
df = spark.createDataFrame(df)

df.write.format("org.neo4j.spark.DataSource") \
  .mode("Overwrite")  \
  .option("relationship", df["PRIVILEGE"]) \
  .option("relationship.save.strategy", "keys") \
  .option("relationship.target.labels", "DATABASE") \
  .option("relationship.target.node.keys", "DATABASE, ENV") \
  .option("relationship.target.save.mode", "overwrite") \
  .option("relationship.source.labels", "ROLE") \
  .option("relationship.source.save.mode", "overwrite") \
  .option("relationship.source.node.keys", "ROLE, ENV") \
  .save()

It will write the relationship type as "DML" or "DDL" based on the "PRIVILEGE" column values, rather than as "Column<'PRIVILEGE'>" as you mentioned.

英文:

Is there a way to define the relationship type using a dataframe column in spark?

import pandas as pd

_list = []
_dict = {}

_dict[&#39;ENV&#39;] = &quot;DEV&quot;
_dict[&#39;PRIVILEGE&#39;] = &quot;DML&quot;
_dict[&#39;ROLE&#39;] = &quot;ROLE1&quot;
_dict[&#39;DATABASE&#39;] = &quot;Database1&quot;
_list.append(_dict)

_dict[&#39;ENV&#39;] = &quot;DEV&quot;
_dict[&#39;PRIVILEGE&#39;] = &quot;DDL&quot;
_dict[&#39;ROLE&#39;] = &quot;ROLE2&quot;
_dict[&#39;DATABASE&#39;] = &quot;Database1&quot;
_list.append(_dict)

df = pd.DataFrame(_list)
df = spark.createDataFrame(df)

df.write.format(&quot;org.neo4j.spark.DataSource&quot;) \
  .mode(&quot;Overwrite&quot;)  \
  .option(&quot;relationship&quot;, df[&quot;PRIVILEGE&quot;]) \
  .option(&quot;relationship.save.strategy&quot;, &quot;keys&quot;) \
  .option(&quot;relationship.target.labels&quot;, &quot;DATABASE&quot;) \
  .option(&quot;relationship.target.node.keys&quot;, &quot;DATABASE, ENV&quot;) \
  .option(&quot;relationship.target.save.mode&quot;, &quot;overwrite&quot;) \
  .option(&quot;relationship.source.labels&quot;, &quot;ROLE&quot;) \
  .option(&quot;relationship.source.save.mode&quot;, &quot;overwrite&quot;) \
  .option(&quot;relationship.source.node.keys&quot;, &quot;ROLE, ENV&quot;) \
  .save()

It is writing the type as Column<'PRIVILEGE'>, when I want the value.

答案1

得分: 1

df["PRIVILEGE"] 返回一个 org.apache.spark.sql.Column,所以你所使用的关系类型只是 Column.toString() 的值(即 Column<'PRIVILEGE'>)。

"relationship" 选项期望其参数(关系类型)是一个固定的字符串。它不支持从 DataFrame 列中动态获取关系类型名称。

因此,你应该为每个关系类型创建一个单独的 DataFrame,并为每个 DataFrame 单独进行写入操作。

另外,顺便说一句,由于你对每个 _list 元素重用了相同的 _dict 实例,所有 _list 元素都将相同。你需要为每个元素创建一个新的字典。

英文:

df[&quot;PRIVILEGE&quot;] returns a org.apache.spark.sql.Column, so what you are using as the relationship type is just the value of Column.toString() (i.e., Column&lt;&#39;PRIVILEGE&#39;&gt;).

The "relationship" option expects its argument (the relationship type) to be a fixed string. It does not support getting the relationship type name dynamically from a DataFrame column.

Therefore, you should create a separate DataFrame for each relationship type and have a separate write for each DataFrame.

And, by the way, since you reuse the same _dict instance for every _list element, all _list elements will be identical. You need to create a new dict for every element.

答案2

得分: 0

请提出一个问题,以便团队可以考虑如何实施。

我见过使用以下两种方法之一:

  • 逐个迭代关系类型(由@cybersam和@jose_bacoy提到)

例如:获取 PRIVILEGE 列的唯一值 = 关系类型

relTypeColumn="PRIVILEGE"
uniquerelTypes=df.dropDuplicates([relTypeColumn])
relTypes = uniquerelTypes.select(relTypeColumn).collect()

然后对这些关系类型进行迭代,筛选匹配的 DataFrame 行并写入 Neo4j

for row in relTypes:
    relType=row[relTypeColumn]
    relsubset=df.filter(df[relTypeColumn] == relType)
    relsubset.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite")  \
    .option("relationship", relType) \
    .option("relationship.save.strategy", "keys") \
    .option("relationship.target.labels", "DATABASE") \
    .option("relationship.target.node.keys", "DATABASE, ENV") \
    .option("relationship.target.save.mode", "overwrite") \
    .option("relationship.source.labels", "ROLE") \
    .option("relationship.source.save.mode", "overwrite") \
    .option("relationship.source.node.keys", "ROLE, ENV") \
    .option("url", neo4j_url) \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", neo4j_user) \
    .option("authentication.basic.password", neo4j_password) \
    .save()

例如:写入节点

df.write.format("org.neo4j.spark.DataSource") \
.mode("Overwrite")  \
.option("labels", ":DATABASE") \
.option("node.keys", "DATABASE,ENV") \
.option("schema.optimization.type", "NODE_CONSTRAINTS") \
.option("url", neo4j_url) \
.option("authentication.type", "basic") \
.option("authentication.basic.username", neo4j_user) \
.option("authentication.basic.password", neo4j_password) \
.save()

然后使用 apoc 写入关系以允许从 DataFrame 中传递动态值

df.write.format("org.neo4j.spark.DataSource") \
.mode("Overwrite")  \
.option("query", """
MATCH (database:DATABASE {DATABASE:event.DATABASE, ENV:event.ENV}) 
MATCH (role:ROLE {ROLE:event.ROLE, ENV:event.ENV})
WITH database, role, event
CALL apoc.create.relationship(database, event.PRIVILEGE, {}, role) YIELD rel
return count(role)""") \
.option("url", neo4j_url) \
.option("authentication.type", "basic") \
.option("authentication.basic.username", neo4j_user) \
.option("authentication.basic.password", neo4j_password) \
.save()
英文:

Please do raise an issue so the team can consider how it can be implemented

The approaches i've seen be used are to either:

  • iterate one relationship type at a time (as mentioned by @cybersam and @jose_bacoy)

e.g. get unique values of the PRIVILEGE column = relationship types

relTypeColumn=&quot;PRIVILEGE&quot;
uniquerelTypes=df.dropDuplicates([relTypeColumn])
relTypes = uniquerelTypes.select(relTypeColumn).collect()

Then iterate over those relationship types, filter the DataFrame for those rows that match and write to Neo4j

for row in relTypes:
    relType=row[relTypeColumn]
    relsubset=df.filter(df[relTypeColumn] == relType)
    relsubset.write.format(&quot;org.neo4j.spark.DataSource&quot;) \
    .mode(&quot;Overwrite&quot;)  \
    .option(&quot;relationship&quot;, relType) \
    .option(&quot;relationship.save.strategy&quot;, &quot;keys&quot;) \
    .option(&quot;relationship.target.labels&quot;, &quot;DATABASE&quot;) \
    .option(&quot;relationship.target.node.keys&quot;, &quot;DATABASE, ENV&quot;) \
    .option(&quot;relationship.target.save.mode&quot;, &quot;overwrite&quot;) \
    .option(&quot;relationship.source.labels&quot;, &quot;ROLE&quot;) \
    .option(&quot;relationship.source.save.mode&quot;, &quot;overwrite&quot;) \
    .option(&quot;relationship.source.node.keys&quot;, &quot;ROLE, ENV&quot;) \
    .option(&quot;url&quot;, neo4j_url) \
    .option(&quot;authentication.type&quot;, &quot;basic&quot;) \
    .option(&quot;authentication.basic.username&quot;, neo4j_user) \
    .option(&quot;authentication.basic.password&quot;, neo4j_password) \
    .save()

e.g. write nodes

df.write.format(&quot;org.neo4j.spark.DataSource&quot;) \
.mode(&quot;Overwrite&quot;)  \
.option(&quot;labels&quot;, &quot;:DATABASE&quot;) \
.option(&quot;node.keys&quot;, &quot;DATABASE,ENV&quot;) \
.option(&quot;schema.optimization.type&quot;, &quot;NODE_CONSTRAINTS&quot;) \
.option(&quot;url&quot;, neo4j_url) \
.option(&quot;authentication.type&quot;, &quot;basic&quot;) \
.option(&quot;authentication.basic.username&quot;, neo4j_user) \
.option(&quot;authentication.basic.password&quot;, neo4j_password) \
.save()

Then write the relationships using apoc to allow passing in dynamic values from the DataFrame

df.write.format(&quot;org.neo4j.spark.DataSource&quot;) \
.mode(&quot;Overwrite&quot;)  \
.option(&quot;query&quot;, &quot;&quot;&quot;
MATCH (database:DATABASE {DATABASE:event.DATABASE, ENV:event.ENV}) 
MATCH (role:ROLE {ROLE:event.ROLE, ENV:event.ENV})
WITH database, role, event
CALL apoc.create.relationship(database, event.PRIVILEGE, {}, role) YIELD rel
return count(role)&quot;&quot;&quot;) \
.option(&quot;url&quot;, neo4j_url) \
.option(&quot;authentication.type&quot;, &quot;basic&quot;) \
.option(&quot;authentication.basic.username&quot;, neo4j_user) \
.option(&quot;authentication.basic.password&quot;, neo4j_password) \
.save()

huangapple
  • 本文由 发表于 2023年5月11日 03:51:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/76222128.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定