英文:
Is there a way to define the relationship type using a dataframe column in spark?
问题
Here's the translated code snippet with the requested parts:
有没有办法使用Spark中的DataFrame列来定义关系类型?
import pandas as pd
_list = []
_dict = {}
_dict['ENV'] = "DEV"
_dict['PRIVILEGE'] = "DML"
_dict['ROLE'] = "ROLE1"
_dict['DATABASE'] = "Database1"
_list.append(_dict)
_dict['ENV'] = "DEV"
_dict['PRIVILEGE'] = "DDL"
_dict['ROLE'] = "ROLE2"
_dict['DATABASE'] = "Database1"
_list.append(_dict)
df = pd.DataFrame(_list)
df = spark.createDataFrame(df)
df.write.format("org.neo4j.spark.DataSource") \
.mode("Overwrite") \
.option("relationship", df["PRIVILEGE"]) \
.option("relationship.save.strategy", "keys") \
.option("relationship.target.labels", "DATABASE") \
.option("relationship.target.node.keys", "DATABASE, ENV") \
.option("relationship.target.save.mode", "overwrite") \
.option("relationship.source.labels", "ROLE") \
.option("relationship.source.save.mode", "overwrite") \
.option("relationship.source.node.keys", "ROLE, ENV") \
.save()
It will write the relationship type as "DML" or "DDL" based on the "PRIVILEGE" column values, rather than as "Column<'PRIVILEGE'>" as you mentioned.
英文:
Is there a way to define the relationship type using a dataframe column in spark?
import pandas as pd
_list = []
_dict = {}
_dict['ENV'] = "DEV"
_dict['PRIVILEGE'] = "DML"
_dict['ROLE'] = "ROLE1"
_dict['DATABASE'] = "Database1"
_list.append(_dict)
_dict['ENV'] = "DEV"
_dict['PRIVILEGE'] = "DDL"
_dict['ROLE'] = "ROLE2"
_dict['DATABASE'] = "Database1"
_list.append(_dict)
df = pd.DataFrame(_list)
df = spark.createDataFrame(df)
df.write.format("org.neo4j.spark.DataSource") \
.mode("Overwrite") \
.option("relationship", df["PRIVILEGE"]) \
.option("relationship.save.strategy", "keys") \
.option("relationship.target.labels", "DATABASE") \
.option("relationship.target.node.keys", "DATABASE, ENV") \
.option("relationship.target.save.mode", "overwrite") \
.option("relationship.source.labels", "ROLE") \
.option("relationship.source.save.mode", "overwrite") \
.option("relationship.source.node.keys", "ROLE, ENV") \
.save()
It is writing the type as Column<'PRIVILEGE'>, when I want the value.
答案1
得分: 1
df["PRIVILEGE"]
返回一个 org.apache.spark.sql.Column
,所以你所使用的关系类型只是 Column.toString()
的值(即 Column<'PRIVILEGE'>
)。
"relationship" 选项期望其参数(关系类型)是一个固定的字符串。它不支持从 DataFrame
列中动态获取关系类型名称。
因此,你应该为每个关系类型创建一个单独的 DataFrame
,并为每个 DataFrame
单独进行写入操作。
另外,顺便说一句,由于你对每个 _list
元素重用了相同的 _dict
实例,所有 _list
元素都将相同。你需要为每个元素创建一个新的字典。
英文:
df["PRIVILEGE"]
returns a org.apache.spark.sql.Column
, so what you are using as the relationship type is just the value of Column.toString()
(i.e., Column<'PRIVILEGE'>
).
The "relationship" option
expects its argument (the relationship type) to be a fixed string. It does not support getting the relationship type name dynamically from a DataFrame
column.
Therefore, you should create a separate DataFrame
for each relationship type and have a separate write
for each DataFrame
.
And, by the way, since you reuse the same _dict
instance for every _list
element, all _list
elements will be identical. You need to create a new dict for every element.
答案2
得分: 0
请提出一个问题,以便团队可以考虑如何实施。
我见过使用以下两种方法之一:
- 逐个迭代关系类型(由@cybersam和@jose_bacoy提到)
例如:获取 PRIVILEGE 列的唯一值 = 关系类型
relTypeColumn="PRIVILEGE"
uniquerelTypes=df.dropDuplicates([relTypeColumn])
relTypes = uniquerelTypes.select(relTypeColumn).collect()
然后对这些关系类型进行迭代,筛选匹配的 DataFrame 行并写入 Neo4j
for row in relTypes:
relType=row[relTypeColumn]
relsubset=df.filter(df[relTypeColumn] == relType)
relsubset.write.format("org.neo4j.spark.DataSource") \
.mode("Overwrite") \
.option("relationship", relType) \
.option("relationship.save.strategy", "keys") \
.option("relationship.target.labels", "DATABASE") \
.option("relationship.target.node.keys", "DATABASE, ENV") \
.option("relationship.target.save.mode", "overwrite") \
.option("relationship.source.labels", "ROLE") \
.option("relationship.source.save.mode", "overwrite") \
.option("relationship.source.node.keys", "ROLE, ENV") \
.option("url", neo4j_url) \
.option("authentication.type", "basic") \
.option("authentication.basic.username", neo4j_user) \
.option("authentication.basic.password", neo4j_password) \
.save()
- 在
query
中使用 apoc.create.relationship 或 apoc.merge.relationship
例如:写入节点
df.write.format("org.neo4j.spark.DataSource") \
.mode("Overwrite") \
.option("labels", ":DATABASE") \
.option("node.keys", "DATABASE,ENV") \
.option("schema.optimization.type", "NODE_CONSTRAINTS") \
.option("url", neo4j_url) \
.option("authentication.type", "basic") \
.option("authentication.basic.username", neo4j_user) \
.option("authentication.basic.password", neo4j_password) \
.save()
然后使用 apoc 写入关系以允许从 DataFrame 中传递动态值
df.write.format("org.neo4j.spark.DataSource") \
.mode("Overwrite") \
.option("query", """
MATCH (database:DATABASE {DATABASE:event.DATABASE, ENV:event.ENV})
MATCH (role:ROLE {ROLE:event.ROLE, ENV:event.ENV})
WITH database, role, event
CALL apoc.create.relationship(database, event.PRIVILEGE, {}, role) YIELD rel
return count(role)""") \
.option("url", neo4j_url) \
.option("authentication.type", "basic") \
.option("authentication.basic.username", neo4j_user) \
.option("authentication.basic.password", neo4j_password) \
.save()
英文:
Please do raise an issue so the team can consider how it can be implemented
The approaches i've seen be used are to either:
- iterate one relationship type at a time (as mentioned by @cybersam and @jose_bacoy)
e.g. get unique values of the PRIVILEGE column = relationship types
relTypeColumn="PRIVILEGE"
uniquerelTypes=df.dropDuplicates([relTypeColumn])
relTypes = uniquerelTypes.select(relTypeColumn).collect()
Then iterate over those relationship types, filter the DataFrame for those rows that match and write to Neo4j
for row in relTypes:
relType=row[relTypeColumn]
relsubset=df.filter(df[relTypeColumn] == relType)
relsubset.write.format("org.neo4j.spark.DataSource") \
.mode("Overwrite") \
.option("relationship", relType) \
.option("relationship.save.strategy", "keys") \
.option("relationship.target.labels", "DATABASE") \
.option("relationship.target.node.keys", "DATABASE, ENV") \
.option("relationship.target.save.mode", "overwrite") \
.option("relationship.source.labels", "ROLE") \
.option("relationship.source.save.mode", "overwrite") \
.option("relationship.source.node.keys", "ROLE, ENV") \
.option("url", neo4j_url) \
.option("authentication.type", "basic") \
.option("authentication.basic.username", neo4j_user) \
.option("authentication.basic.password", neo4j_password) \
.save()
- use apoc.create.relationship or apoc.merge.relationship within a
query
e.g. write nodes
df.write.format("org.neo4j.spark.DataSource") \
.mode("Overwrite") \
.option("labels", ":DATABASE") \
.option("node.keys", "DATABASE,ENV") \
.option("schema.optimization.type", "NODE_CONSTRAINTS") \
.option("url", neo4j_url) \
.option("authentication.type", "basic") \
.option("authentication.basic.username", neo4j_user) \
.option("authentication.basic.password", neo4j_password) \
.save()
Then write the relationships using apoc to allow passing in dynamic values from the DataFrame
df.write.format("org.neo4j.spark.DataSource") \
.mode("Overwrite") \
.option("query", """
MATCH (database:DATABASE {DATABASE:event.DATABASE, ENV:event.ENV})
MATCH (role:ROLE {ROLE:event.ROLE, ENV:event.ENV})
WITH database, role, event
CALL apoc.create.relationship(database, event.PRIVILEGE, {}, role) YIELD rel
return count(role)""") \
.option("url", neo4j_url) \
.option("authentication.type", "basic") \
.option("authentication.basic.username", neo4j_user) \
.option("authentication.basic.password", neo4j_password) \
.save()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论