英文:
Using apoc.periodic.commit to insert an endless json stream into neo4j
问题
我是新手,正在尝试将JSON流中的数据插入到NEO4J数据库中。JSON流的根元素是一个数组,数组中的每个元素都是包含键/值和数组的对象。
JSON流的示例:
[
{
"access_point": 4864834,
"objects": [
{"class_id": 10, "name": "iphone", "snr": 0.557461},
{"class_id": 7, "name": "android", "snr": 0.822390},
{"class_id": 7, "name": "android", "snr": 0.320850},
{"class_id": 2, "name": "pc", "snr": 0.915604}
]
},
{
"access_point": 4864835,
"objects": [
{"class_id": 12, "name": "iphone", "snr": 0.268736},
{"class_id": 10, "name": "android", "snr": 0.585927},
{"class_id": 7, "name": "android", "snr": 0.821383},
{"class_id": 2, "name": "pc", "snr": 0.254997},
{"class_id": 7, "name": "android", "snr": 0.326559},
{"class_id": 2, "name": "pc", "snr": 0.905473}
]
}
]
由于这是一个无限流,我需要在数组的n个元素后执行批量提交,因为apoc.load.json永远不会到达数组的末尾。
到目前为止,我的查询看起来像是这样的:
CALL apoc.periodic.commit("
CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
YIELD value as accesspoint MERGE(f:Accesspoint {id: accesspoint.access_point, name: accesspoint.access_point})
FOREACH(object IN accesspoint.objects | MERGE (f)-[r:OBSERVED]->(:Object {class_id:object.class_id, name:object.name, access_point_id:accesspoint.access_point}))",
{limit:10, batchSize: 10});
当然,这并不是引用JSON流,而是我GitHub上的静态JSON。
有没有办法告诉它在数组的n个元素后持续执行?
英文:
I'm new to NEO4J, and am attempting to insert data from a JSON stream into the database. The root element of the JSON stream is an array, each element in the array is an object that contains a key/value, and an array.
sample of JSON stream:
[
{
"access_point":4864834,
"objects": [
{"class_id":10, "name":"iphone", "snr":0.557461},
{"class_id":7, "name":"android", "snr":0.822390},
{"class_id":7, "name":"android", "snr":0.320850},
{"class_id":2, "name":"pc", "snr":0.915604}
]
},
{
"access_point":4864835,
"objects": [
{"class_id":12, "name":"iphone", "snr":0.268736},
{"class_id":10, "name":"android", "snr":0.585927},
{"class_id":7, "name":"android", "snr":0.821383},
{"class_id":2, "name":"pc", "snr":0.254997},
{"class_id":7, "name":"android", "snr":0.326559},
{"class_id":2, "name":"pc", "snr":0.905473}
]
},
Because it is an endless stream, I need to do batch commits as the apoc.load.json will never reach the end of the array.
so far my query looks like:
CALL apoc.periodic.commit("
CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
YIELD value as accesspoint MERGE(f:Accesspoint {id: accesspoint.access_point, name: accesspoint.access_point})
FOREACH(object IN accesspoint.objects | MERGE (f)-[r:OBSERVED]->(:Object {class_id:object.class_id, name:object.name, access_point_id:accesspoint.access_point}))",
{limit:10, batchSize: 10});
This of course is not referencing a JSON stream, but static JSON in my github.
Is there a way to tell it to persist after n elements in the array?
答案1
得分: 3
看起来你应该使用apoc.periodic.iterate,而不是apoc.periodic.commit
。例如:
CALL apoc.periodic.iterate(
"CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
YIELD value AS ap",
"MERGE(f:Accesspoint {id: ap.access_point, name: ap.access_point})
FOREACH(obj IN ap.objects |
MERGE (f)-[r:OBSERVED]->(:Object {class_id:obj.class_id, name:obj.name, access_point_id:ap.access_point}))",
{batchSize: 10});
apoc.periodic.iterate
的文档中说明了它支持batchSize
选项,该选项可以在单个事务中处理第二条Cypher语句的N次执行。
英文:
It looks like you should be using apoc.periodic.iterate instead of apoc.periodic.commit
. For example:
CALL apoc.periodic.iterate(
"CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
YIELD value AS ap",
"MERGE(f:Accesspoint {id: ap.access_point, name: ap.access_point})
FOREACH(obj IN ap.objects |
MERGE (f)-[r:OBSERVED]->(:Object {class_id:obj.class_id, name:obj.name, access_point_id:ap.access_point}))",
{batchSize: 10});
apoc.periodic.iterate
is documented to support the batchSize
option, which processes N executions of the second Cypher statement in a single transaction.
答案2
得分: 1
因为我可以访问数据源,所以我们能够修改它输出的JSON格式。我们将其切换为JSONL(每行都是一个JSON文档),其中每行JSON都被视为自己的JSON文档。我确实借鉴了@cybersam的答案,还有Michael Hunger的,所以谢谢。
将数据源的JSON更改为以下的JSONL格式:
{"access_point": 4864834, "objects": [{"class_id": 10, "name": "iphone", "snr": 0.557461}, {"class_id": 7, "name": "android", "snr": 0.822390}, {"class_id": 7, "name": "android", "snr": 0.320850}, {"class_id": 2, "name": "pc", "snr": 0.915604}]}
{"access_point": 4864835, "objects": [{"class_id": 12, "name": "iphone", "snr": 0.268736}, {"class_id": 10, "name": "android", "snr": 0.585927}, {"class_id": 7, "name": "android", "snr": 0.821383}]}
我的Neo4j Cypher查询如下:
CALL apoc.periodic.iterate(
"CALL apoc.load.jsonArray('http://13.68.174.185:8899/',null)
YIELD value AS ap",
MERGE(f:AccessPoint {id: ap.frame_id, name: ap.access_point_id})
FOREACH(obj IN ap.objects |
MERGE (f)-[r:OBSERVED]->(:Object {class_id: obj.class_id, name: obj.name, access_point_id: ap.ap_id}))",
{batchSize: 1});
英文:
Because I have access to the source of the data, we were able to modify how it outputs the JSON. We switched it to JSONL (line delineated JSON) where each line of JSON is essentially treated as it's own JSON document. I did utilize a lot of @cybersam answer, and also Michael Hunger, so thank you.
changed the source JSON to JSONL like the following:
{"access_point":4864834, "objects": [{"class_id":10, "name":"iphone", "snr":0.557461}, {"class_id":7, "name":"android", "snr":0.822390}, {"class_id":7, "name":"android", "snr":0.320850}, {"class_id":2, "name":"pc", "snr":0.915604}]}
{"access_point":4864835, "objects": [{"class_id":12, "name":"iphone", "snr":0.268736}, {"class_id":10, "name":"android", "snr":0.585927}, {"class_id":7, "name":"android", "snr":0.821383}]}
and my neo4j cypher query looked like the following:
CALL apoc.periodic.iterate(
"CALL apoc.load.jsonArray('http://13.68.174.185:8899/',null)
YIELD value AS ap",
MERGE(f:AccessPoint {id: ap.frame_id, name: ap.access_point_id})
FOREACH(obj IN frames.objects |
MERGE (f)-[r:OBSERVED]->(:Object {class_id:obj.class_id, name:obj.name, access_point_id:ap.ap_id}))",
{batchSize: 1});
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论