使用apoc.periodic.commit将无限的JSON流插入到Neo4j中

huangapple go评论74阅读模式
英文:

Using apoc.periodic.commit to insert an endless json stream into neo4j

问题

我是新手,正在尝试将JSON流中的数据插入到NEO4J数据库中。JSON流的根元素是一个数组,数组中的每个元素都是包含键/值和数组的对象。

JSON流的示例:

[
{
 "access_point": 4864834,
 "objects": [
  {"class_id": 10, "name": "iphone", "snr": 0.557461},
  {"class_id": 7, "name": "android", "snr": 0.822390},
  {"class_id": 7, "name": "android", "snr": 0.320850},
  {"class_id": 2, "name": "pc", "snr": 0.915604}
 ]
},
{
 "access_point": 4864835,
 "objects": [
  {"class_id": 12, "name": "iphone", "snr": 0.268736},
  {"class_id": 10, "name": "android", "snr": 0.585927},
  {"class_id": 7, "name": "android", "snr": 0.821383},
  {"class_id": 2, "name": "pc", "snr": 0.254997},
  {"class_id": 7, "name": "android", "snr": 0.326559},
  {"class_id": 2, "name": "pc", "snr": 0.905473}
 ]
}
]

由于这是一个无限流,我需要在数组的n个元素后执行批量提交,因为apoc.load.json永远不会到达数组的末尾。

到目前为止,我的查询看起来像是这样的:

CALL apoc.periodic.commit("
CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
YIELD value as accesspoint MERGE(f:Accesspoint {id: accesspoint.access_point, name: accesspoint.access_point})
FOREACH(object IN accesspoint.objects | MERGE (f)-[r:OBSERVED]->(:Object {class_id:object.class_id, name:object.name, access_point_id:accesspoint.access_point}))",
{limit:10, batchSize: 10});

当然,这并不是引用JSON流,而是我GitHub上的静态JSON。

有没有办法告诉它在数组的n个元素后持续执行?

英文:

I'm new to NEO4J, and am attempting to insert data from a JSON stream into the database. The root element of the JSON stream is an array, each element in the array is an object that contains a key/value, and an array.

sample of JSON stream:

[
{
 "access_point":4864834, 
 "objects": [ 
  {"class_id":10, "name":"iphone", "snr":0.557461}, 
  {"class_id":7, "name":"android", "snr":0.822390}, 
  {"class_id":7, "name":"android", "snr":0.320850}, 
  {"class_id":2, "name":"pc", "snr":0.915604}
 ] 
}, 
{
 "access_point":4864835, 
 "objects": [ 
  {"class_id":12, "name":"iphone", "snr":0.268736}, 
  {"class_id":10, "name":"android", "snr":0.585927}, 
  {"class_id":7, "name":"android", "snr":0.821383}, 
  {"class_id":2, "name":"pc", "snr":0.254997}, 
  {"class_id":7, "name":"android", "snr":0.326559}, 
  {"class_id":2, "name":"pc", "snr":0.905473}
 ] 
}, 

Because it is an endless stream, I need to do batch commits as the apoc.load.json will never reach the end of the array.

so far my query looks like:

CALL apoc.periodic.commit("
CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
YIELD value as accesspoint MERGE(f:Accesspoint {id: accesspoint.access_point, name: accesspoint.access_point})
FOREACH(object IN accesspoint.objects | MERGE (f)-[r:OBSERVED]->(:Object {class_id:object.class_id, name:object.name, access_point_id:accesspoint.access_point}))",
{limit:10, batchSize: 10});

This of course is not referencing a JSON stream, but static JSON in my github.

Is there a way to tell it to persist after n elements in the array?

答案1

得分: 3

看起来你应该使用apoc.periodic.iterate,而不是apoc.periodic.commit。例如:

CALL apoc.periodic.iterate(
  "CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
  YIELD value AS ap",
  "MERGE(f:Accesspoint {id: ap.access_point, name: ap.access_point})
   FOREACH(obj IN ap.objects |
     MERGE (f)-[r:OBSERVED]->(:Object {class_id:obj.class_id, name:obj.name, access_point_id:ap.access_point}))",
{batchSize: 10});

apoc.periodic.iterate 的文档中说明了它支持batchSize选项,该选项可以在单个事务中处理第二条Cypher语句的N次执行。

英文:

It looks like you should be using apoc.periodic.iterate instead of apoc.periodic.commit. For example:

CALL apoc.periodic.iterate(
  "CALL apoc.load.json('https://raw.githubusercontent.com/jdharri/testjson/master/test.json','$[*]')
YIELD value AS ap",
  "MERGE(f:Accesspoint {id: ap.access_point, name: ap.access_point})
   FOREACH(obj IN ap.objects |
     MERGE (f)-[r:OBSERVED]->(:Object {class_id:obj.class_id, name:obj.name, access_point_id:ap.access_point}))",
{batchSize: 10});

apoc.periodic.iterate is documented to support the batchSize option, which processes N executions of the second Cypher statement in a single transaction.

答案2

得分: 1

因为我可以访问数据源,所以我们能够修改它输出的JSON格式。我们将其切换为JSONL(每行都是一个JSON文档),其中每行JSON都被视为自己的JSON文档。我确实借鉴了@cybersam的答案,还有Michael Hunger的,所以谢谢。

将数据源的JSON更改为以下的JSONL格式:

{"access_point": 4864834, "objects": [{"class_id": 10, "name": "iphone", "snr": 0.557461}, {"class_id": 7, "name": "android", "snr": 0.822390}, {"class_id": 7, "name": "android", "snr": 0.320850}, {"class_id": 2, "name": "pc", "snr": 0.915604}]}
{"access_point": 4864835, "objects": [{"class_id": 12, "name": "iphone", "snr": 0.268736}, {"class_id": 10, "name": "android", "snr": 0.585927}, {"class_id": 7, "name": "android", "snr": 0.821383}]}

我的Neo4j Cypher查询如下:

CALL apoc.periodic.iterate(
"CALL apoc.load.jsonArray('http://13.68.174.185:8899/',null)
YIELD value AS ap",
MERGE(f:AccessPoint {id: ap.frame_id, name: ap.access_point_id})
FOREACH(obj IN ap.objects |
  MERGE (f)-[r:OBSERVED]->(:Object {class_id: obj.class_id, name: obj.name, access_point_id: ap.ap_id}))",
{batchSize: 1});
英文:

Because I have access to the source of the data, we were able to modify how it outputs the JSON. We switched it to JSONL (line delineated JSON) where each line of JSON is essentially treated as it's own JSON document. I did utilize a lot of @cybersam answer, and also Michael Hunger, so thank you.

changed the source JSON to JSONL like the following:


{"access_point":4864834, "objects": [{"class_id":10, "name":"iphone", "snr":0.557461}, {"class_id":7, "name":"android", "snr":0.822390}, {"class_id":7, "name":"android", "snr":0.320850}, {"class_id":2, "name":"pc", "snr":0.915604}]}
{"access_point":4864835, "objects": [{"class_id":12, "name":"iphone", "snr":0.268736}, {"class_id":10, "name":"android", "snr":0.585927}, {"class_id":7, "name":"android", "snr":0.821383}]}

and my neo4j cypher query looked like the following:

CALL apoc.periodic.iterate(
"CALL apoc.load.jsonArray('http://13.68.174.185:8899/',null)
YIELD value AS ap",
MERGE(f:AccessPoint {id: ap.frame_id, name: ap.access_point_id})
FOREACH(obj IN frames.objects |
  MERGE (f)-[r:OBSERVED]->(:Object {class_id:obj.class_id, name:obj.name, access_point_id:ap.ap_id}))",
{batchSize: 1});

huangapple
  • 本文由 发表于 2020年1月7日 01:59:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/59616802.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定