json object is too complex for ADF dataflow to parse/roll

huangapple go评论57阅读模式
英文:

json object is too complex for ADF dataflow to parse/roll

问题

我需要收集卖家的ID和卖家的名称。我尝试在ADF数据流中实现这一目标,但在展开转换中,由于某种原因无法按卖家汇总。似乎这不是一个有效的数组。

我尝试过以下方法:

  1. 我尝试了一些其他方法,比如将它复制到blob中,并调整了一些设置,比如将文件模式设置为“对象数组”,然后将其作为数据流的源。这没有起作用。
  2. 尝试使用一些聚合转换,但不确定如何应用它。我对数据流是新手。

如果在数据流中完全不可能实现,那么在pyspark中是否可以轻松完成?

英文:

I have a json which looks like below:

json object is too complex for ADF dataflow to parse/roll

I need to collect the seller ID and seller Name for now. I was trying to achieve in adf data flow but it was un able to roll it by sellers for some reason in the flatten transformation . Seems like this not a valid array.

what I tried:

  1. I tried some other ways like copying it into blob and tweaked some
    settings like file pattern as 'array of objects' and feed it as a source to dataflow which
    didn't work.
  2. Try to use some aggregate transformation but not sure how to apply it. I am new to dataflow.

Can it be done easily in pyspark if its not possible in dataflow at all is also I am open to.

答案1

得分: 1

I have taken the following sample data in order to get the sellerID and sellerName.

Now, using the select transformation with rule-based mapping, I have selected the properties of sellers.

I have used derived column transformation to add a new column called columns with the value as columnNames('select1') which would give me an array column with value as ["ABC","XYZ"] (the keys which we need).

Now, iterate over the columns array as shown in the above image using the items value as @activity('Data flow1').output.runStatus.output.sink1.value[0].columns

Inside the for each, first I have a set variable activity to build each object containing sellerId and sellerName.

Now, I have used append variable activity after this to append this object to an array variable using the dynamic content @json(variables('tp')).

Now you can write this variable's value to a file as JSON.

英文:
  • I have taken the following sample data in order to get the sellerID and sellerName.
{
"timestamp":1686036525840,
"tokensLeft":1,
"refillIn":1,
"refillRate":1,
"tokenFlowReduction":0.0,
"tokensConsumed":2,
"processingTimeInMs":0,
"sellers":{
"ABC":{
"trackedSince":1,
"domainId":1,
"sellerId":"XX",
"sellerName":"LS",
"csv":[
[
6062020
],
[
6062020
]
],
"lastUpdate":6536604,
"isScammer":false,
"hasFBA":true,
"totalStorefrontAsins":[
1,
44
],
"sellerCategoryStatistics":[
{
"catId":1,
"productCount":2,
"avg30SalesRank":3,
"productCountWithAmazonOffer":5
},
{
"catId":3,
"productCount":11,
"avg30SalesRank":72203,
"productCountWithAmazonOffer":3
}
],
"sellerBrandStatistics":[
{
"brand":"1",
"productCount":3,
"avg30SalesRank":18820,
"productCountWithAmazonOffer":0
},
{
"brand":"l3",
"productCount":3,
"avg30SalesRank":32525,
"productCountWithAmazonOffer":1
},
{
"brand":"1",
"productCount":3,
"avg30SalesRank":40102,
"productCountWithAmazonOffer":1
},
{
"brand":"1",
"productCount":1,
"avg30SalesRank":5315,
"productCountWithAmazonOffer":0
}
],
"shipsFromChina":false,
"address":[
"1"
],
"recentFeedback":[
{
"rating":10,
"date":6531840,
"feedback":"Never .  Black ",
"isStriked":true
},
{
"rating":50,
"date":6523200,
"feedback":"I received.",
"isStriked":false
},
{
"rating":50,
"date":6521760,
"feedback":"Fast ship a",
"isStriked":false
},
{
"rating":50,
"date":6518880,
"feedback":"It  to .",
"isStriked":false
},
{
"rating":20,
"date":6517440,
"feedback":"I r .",
"isStriked":true
}
],
"lastRatingUpdate":6536604,
"neutralRating":[
0,
0,
0,
0
],
"negativeRating":[
0,
0,
2,
2
],
"positiveRating":[
100,
100,
98,
98
],
"ratingCount":[
7,
23,
59,
59
],
"currentRating":1,
"currentRatingCount":1,
"ratingsLast30Days":6
},
"XYZ":{
"trackedSince":2795761,
"domainId":1,
"sellerId":"CVB",
"sellerName":"ABC",
"csv":[
[
271
],
[
6,
46101
]
],
"lastUpdate":6536586,
"isScammer":false,
"hasFBA":true,
"totalStorefrontAsins":[
1,
1
],
"sellerCategoryStatistics":[
{
"catId":1,
"productCount":131,
"avg30SalesRank":11,
"productCountWithAmazonOffer":1
},
{
"catId":3760911,
"productCount":106,
"avg30SalesRank":101900,
"productCountWithAmazonOffer":8
},
{
"catId":1055398,
"productCount":93,
"avg30SalesRank":107441,
"productCountWithAmazonOffer":41
},
{
"catId":3760901,
"productCount":60,
"avg30SalesRank":11,
"productCountWithAmazonOffer":3
},
{
"catId":11,
"productCount":11,
"avg30SalesRank":11,
"productCountWithAmazonOffer":5
},
{
"catId":11,
"productCount":28,
"avg30SalesRank":489,
"productCountWithAmazonOffer":23
},
{
"catId":1,
"productCount":1,
"avg30SalesRank":1,
"productCountWithAmazonOffer":0
},
{
"catId":1,
"productCount":12,
"avg30SalesRank":1,
"productCountWithAmazonOffer":0
},
{
"catId":1,
"productCount":8,
"avg30SalesRank":1,
"productCountWithAmazonOffer":4
},
{
"catId":1,
"productCount":6,
"avg30SalesRank":1,
"productCountWithAmazonOffer":3
}
],
"sellerBrandStatistics":[
{
"brand":"1e",
"productCount":35,
"avg30SalesRank":1,
"productCountWithAmazonOffer":0
}
]
}
}
}
  • Now, using the select transformation with rule-based mapping, I have selected the properties of sellers.

json object is too complex for ADF dataflow to parse/roll

  • I have used derived column transformation to add a new column called columns with the value as columnNames('select1') which would give me an array column with value as ["ABC","XYZ"] (the keys which we need).

json object is too complex for ADF dataflow to parse/roll

  • I have using sink cache and writing to the activity output. The following is the complete Dataflow JSON:
{
"name": "dataflow1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "Json1",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"name": "sink1"
}
],
"transformations": [
{
"name": "select1"
},
{
"name": "derivedColumn1"
}
],
"scriptLines": [
"source(output(",
"          timestamp as integer,",
"          tokensLeft as integer,",
"          refillIn as integer,",
"          refillRate as integer,",
"          tokenFlowReduction as double,",
"          tokensConsumed as integer,",
"          processingTimeInMs as integer,",
"          sellers as (ABC as (trackedSince as integer, domainId as integer, sellerId as string, sellerName as string, csv as integer[][], lastUpdate as integer, isScammer as boolean, hasFBA as boolean, totalStorefrontAsins as integer[], sellerCategoryStatistics as (catId as integer, productCount as integer, avg30SalesRank as integer, productCountWithAmazonOffer as integer)[], sellerBrandStatistics as (brand as string, productCount as integer, avg30SalesRank as integer, productCountWithAmazonOffer as integer)[], shipsFromChina as boolean, address as string[], recentFeedback as (rating as integer, date as integer, feedback as string, isStriked as boolean)[], lastRatingUpdate as integer, neutralRating as integer[], negativeRating as integer[], positiveRating as integer[], ratingCount as integer[], currentRating as integer, currentRatingCount as integer, ratingsLast30Days as integer), XYZ as (trackedSince as integer, domainId as integer, sellerId as string, sellerName as string, csv as integer[][], lastUpdate as integer, isScammer as boolean, hasFBA as boolean, totalStorefrontAsins as integer[], sellerCategoryStatistics as (catId as integer, productCount as integer, avg30SalesRank as integer, productCountWithAmazonOffer as integer)[], sellerBrandStatistics as (brand as string, productCount as integer, avg30SalesRank as integer, productCountWithAmazonOffer as integer)[]))",
"     ),",
"     allowSchemaDrift: true,",
"     validateSchema: false,",
"     ignoreNoFilesFound: false,",
"     documentForm: 'singleDocument') ~> source1",
"source1 select(mapColumn(",
"          each(sellers,match(true()))",
"     ),",
"     skipDuplicateMapInputs: true,",
"     skipDuplicateMapOutputs: true) ~> select1",
"select1 derive(columns = columnNames('select1')) ~> derivedColumn1",
"derivedColumn1 sink(validateSchema: false,",
"     skipDuplicateMapInputs: true,",
"     skipDuplicateMapOutputs: true,",
"     store: 'cache',",
"     format: 'inline',",
"     output: true,",
"     saveOrder: 1) ~> sink1"
]
}
}
}

json object is too complex for ADF dataflow to parse/roll

  • Now, iterate over the columns array as shown in the above image using the items value as @activity('Data flow1').output.runStatus.output.sink1.value[0].columns

  • Inside the for each, first I have a set variable activity to build each object containing sellerId and sellerName with following dynamic content:

{
"sellerId":"@{activity('Data flow1').output.runStatus.output.sink1.value[0][item()].sellerId}",
"sellerName":"@{activity('Data flow1').output.runStatus.output.sink1.value[0][item()].sellerName}"
}

json object is too complex for ADF dataflow to parse/roll

  • Now, I have used append variable activity after this to append this object to an array variable using the dynamic content @json(variables('tp')).

json object is too complex for ADF dataflow to parse/roll

  • I have used another set variable activity for demonstration purposes to show the value of this array that was just built.

json object is too complex for ADF dataflow to parse/roll

  • Now you can write this variable's value to a file as JSON. Refer to the solution provided in this question. The following is the Pipeline Json:
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Data flow1",
"type": "ExecuteDataFlow",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataflow": {
"referenceName": "dataflow1",
"type": "DataFlowReference"
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "None",
"cacheSinks": {
"firstRowOnly": false
}
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Data flow1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Data flow1').output.runStatus.output.sink1.value[0].columns",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [
{
"activity": "Set variable1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "req",
"value": {
"value": "@json(variables('tp'))",
"type": "Expression"
}
}
},
{
"name": "Set variable1",
"type": "SetVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "tp",
"value": {
"value": "{\n    \"sellerId\":\"@{activity('Data flow1').output.runStatus.output.sink1.value[0][item()].sellerId}\",\n    \"sellerName\":\"@{activity('Data flow1').output.runStatus.output.sink1.value[0][item()].sellerName}\"\n}",
"type": "Expression"
}
}
}
]
}
},
{
"name": "Set variable2",
"type": "SetVariable",
"dependsOn": [
{
"activity": "ForEach1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "test",
"value": {
"value": "@variables('req')",
"type": "Expression"
}
}
}
],
"variables": {
"tp": {
"type": "String"
},
"req": {
"type": "Array"
},
"test": {
"type": "Array"
}
},
"annotations": []
}
}

huangapple
  • 本文由 发表于 2023年6月6日 12:25:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/76411438.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定