英文:
json object is too complex for ADF dataflow to parse/roll
问题
我需要收集卖家的ID和卖家的名称。我尝试在ADF数据流中实现这一目标,但在展开转换中,由于某种原因无法按卖家汇总。似乎这不是一个有效的数组。
我尝试过以下方法:
- 我尝试了一些其他方法,比如将它复制到blob中,并调整了一些设置,比如将文件模式设置为“对象数组”,然后将其作为数据流的源。这没有起作用。
- 尝试使用一些聚合转换,但不确定如何应用它。我对数据流是新手。
如果在数据流中完全不可能实现,那么在pyspark中是否可以轻松完成?
英文:
I have a json which looks like below:
I need to collect the seller ID and seller Name for now. I was trying to achieve in adf data flow but it was un able to roll it by sellers for some reason in the flatten transformation . Seems like this not a valid array.
what I tried:
- I tried some other ways like copying it into blob and tweaked some
settings like file pattern as 'array of objects' and feed it as a source to dataflow which
didn't work. - Try to use some aggregate transformation but not sure how to apply it. I am new to dataflow.
Can it be done easily in pyspark if its not possible in dataflow at all is also I am open to.
答案1
得分: 1
I have taken the following sample data in order to get the sellerID and sellerName
.
Now, using the select transformation with rule-based mapping, I have selected the properties of sellers
.
I have used derived column transformation to add a new column called columns
with the value as columnNames('select1')
which would give me an array column with value as ["ABC","XYZ"]
(the keys which we need).
Now, iterate over the columns array as shown in the above image using the items value as @activity('Data flow1').output.runStatus.output.sink1.value[0].columns
Inside the for each, first I have a set variable activity to build each object containing sellerId and sellerName
.
Now, I have used append variable activity after this to append this object to an array variable using the dynamic content @json(variables('tp'))
.
Now you can write this variable's value to a file as JSON.
英文:
- I have taken the following sample data in order to get the
sellerID and sellerName
.
{
"timestamp":1686036525840,
"tokensLeft":1,
"refillIn":1,
"refillRate":1,
"tokenFlowReduction":0.0,
"tokensConsumed":2,
"processingTimeInMs":0,
"sellers":{
"ABC":{
"trackedSince":1,
"domainId":1,
"sellerId":"XX",
"sellerName":"LS",
"csv":[
[
6062020
],
[
6062020
]
],
"lastUpdate":6536604,
"isScammer":false,
"hasFBA":true,
"totalStorefrontAsins":[
1,
44
],
"sellerCategoryStatistics":[
{
"catId":1,
"productCount":2,
"avg30SalesRank":3,
"productCountWithAmazonOffer":5
},
{
"catId":3,
"productCount":11,
"avg30SalesRank":72203,
"productCountWithAmazonOffer":3
}
],
"sellerBrandStatistics":[
{
"brand":"1",
"productCount":3,
"avg30SalesRank":18820,
"productCountWithAmazonOffer":0
},
{
"brand":"l3",
"productCount":3,
"avg30SalesRank":32525,
"productCountWithAmazonOffer":1
},
{
"brand":"1",
"productCount":3,
"avg30SalesRank":40102,
"productCountWithAmazonOffer":1
},
{
"brand":"1",
"productCount":1,
"avg30SalesRank":5315,
"productCountWithAmazonOffer":0
}
],
"shipsFromChina":false,
"address":[
"1"
],
"recentFeedback":[
{
"rating":10,
"date":6531840,
"feedback":"Never . Black ",
"isStriked":true
},
{
"rating":50,
"date":6523200,
"feedback":"I received.",
"isStriked":false
},
{
"rating":50,
"date":6521760,
"feedback":"Fast ship a",
"isStriked":false
},
{
"rating":50,
"date":6518880,
"feedback":"It to .",
"isStriked":false
},
{
"rating":20,
"date":6517440,
"feedback":"I r .",
"isStriked":true
}
],
"lastRatingUpdate":6536604,
"neutralRating":[
0,
0,
0,
0
],
"negativeRating":[
0,
0,
2,
2
],
"positiveRating":[
100,
100,
98,
98
],
"ratingCount":[
7,
23,
59,
59
],
"currentRating":1,
"currentRatingCount":1,
"ratingsLast30Days":6
},
"XYZ":{
"trackedSince":2795761,
"domainId":1,
"sellerId":"CVB",
"sellerName":"ABC",
"csv":[
[
271
],
[
6,
46101
]
],
"lastUpdate":6536586,
"isScammer":false,
"hasFBA":true,
"totalStorefrontAsins":[
1,
1
],
"sellerCategoryStatistics":[
{
"catId":1,
"productCount":131,
"avg30SalesRank":11,
"productCountWithAmazonOffer":1
},
{
"catId":3760911,
"productCount":106,
"avg30SalesRank":101900,
"productCountWithAmazonOffer":8
},
{
"catId":1055398,
"productCount":93,
"avg30SalesRank":107441,
"productCountWithAmazonOffer":41
},
{
"catId":3760901,
"productCount":60,
"avg30SalesRank":11,
"productCountWithAmazonOffer":3
},
{
"catId":11,
"productCount":11,
"avg30SalesRank":11,
"productCountWithAmazonOffer":5
},
{
"catId":11,
"productCount":28,
"avg30SalesRank":489,
"productCountWithAmazonOffer":23
},
{
"catId":1,
"productCount":1,
"avg30SalesRank":1,
"productCountWithAmazonOffer":0
},
{
"catId":1,
"productCount":12,
"avg30SalesRank":1,
"productCountWithAmazonOffer":0
},
{
"catId":1,
"productCount":8,
"avg30SalesRank":1,
"productCountWithAmazonOffer":4
},
{
"catId":1,
"productCount":6,
"avg30SalesRank":1,
"productCountWithAmazonOffer":3
}
],
"sellerBrandStatistics":[
{
"brand":"1e",
"productCount":35,
"avg30SalesRank":1,
"productCountWithAmazonOffer":0
}
]
}
}
}
- Now, using the select transformation with rule-based mapping, I have selected the properties of
sellers
.
- I have used derived column transformation to add a new column called
columns
with the value ascolumnNames('select1')
which would give me an array column with value as["ABC","XYZ"]
(the keys which we need).
- I have using sink cache and writing to the activity output. The following is the complete Dataflow JSON:
{
"name": "dataflow1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "Json1",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"name": "sink1"
}
],
"transformations": [
{
"name": "select1"
},
{
"name": "derivedColumn1"
}
],
"scriptLines": [
"source(output(",
" timestamp as integer,",
" tokensLeft as integer,",
" refillIn as integer,",
" refillRate as integer,",
" tokenFlowReduction as double,",
" tokensConsumed as integer,",
" processingTimeInMs as integer,",
" sellers as (ABC as (trackedSince as integer, domainId as integer, sellerId as string, sellerName as string, csv as integer[][], lastUpdate as integer, isScammer as boolean, hasFBA as boolean, totalStorefrontAsins as integer[], sellerCategoryStatistics as (catId as integer, productCount as integer, avg30SalesRank as integer, productCountWithAmazonOffer as integer)[], sellerBrandStatistics as (brand as string, productCount as integer, avg30SalesRank as integer, productCountWithAmazonOffer as integer)[], shipsFromChina as boolean, address as string[], recentFeedback as (rating as integer, date as integer, feedback as string, isStriked as boolean)[], lastRatingUpdate as integer, neutralRating as integer[], negativeRating as integer[], positiveRating as integer[], ratingCount as integer[], currentRating as integer, currentRatingCount as integer, ratingsLast30Days as integer), XYZ as (trackedSince as integer, domainId as integer, sellerId as string, sellerName as string, csv as integer[][], lastUpdate as integer, isScammer as boolean, hasFBA as boolean, totalStorefrontAsins as integer[], sellerCategoryStatistics as (catId as integer, productCount as integer, avg30SalesRank as integer, productCountWithAmazonOffer as integer)[], sellerBrandStatistics as (brand as string, productCount as integer, avg30SalesRank as integer, productCountWithAmazonOffer as integer)[]))",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false,",
" documentForm: 'singleDocument') ~> source1",
"source1 select(mapColumn(",
" each(sellers,match(true()))",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select1",
"select1 derive(columns = columnNames('select1')) ~> derivedColumn1",
"derivedColumn1 sink(validateSchema: false,",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" store: 'cache',",
" format: 'inline',",
" output: true,",
" saveOrder: 1) ~> sink1"
]
}
}
}
-
Now, iterate over the columns array as shown in the above image using the items value as
@activity('Data flow1').output.runStatus.output.sink1.value[0].columns
-
Inside the for each, first I have a set variable activity to build each object containing
sellerId and sellerName
with following dynamic content:
{
"sellerId":"@{activity('Data flow1').output.runStatus.output.sink1.value[0][item()].sellerId}",
"sellerName":"@{activity('Data flow1').output.runStatus.output.sink1.value[0][item()].sellerName}"
}
- Now, I have used append variable activity after this to append this object to an array variable using the dynamic content
@json(variables('tp'))
.
- I have used another set variable activity for demonstration purposes to show the value of this array that was just built.
- Now you can write this variable's value to a file as JSON. Refer to the solution provided in this question. The following is the Pipeline Json:
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Data flow1",
"type": "ExecuteDataFlow",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataflow": {
"referenceName": "dataflow1",
"type": "DataFlowReference"
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "None",
"cacheSinks": {
"firstRowOnly": false
}
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Data flow1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Data flow1').output.runStatus.output.sink1.value[0].columns",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [
{
"activity": "Set variable1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "req",
"value": {
"value": "@json(variables('tp'))",
"type": "Expression"
}
}
},
{
"name": "Set variable1",
"type": "SetVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "tp",
"value": {
"value": "{\n \"sellerId\":\"@{activity('Data flow1').output.runStatus.output.sink1.value[0][item()].sellerId}\",\n \"sellerName\":\"@{activity('Data flow1').output.runStatus.output.sink1.value[0][item()].sellerName}\"\n}",
"type": "Expression"
}
}
}
]
}
},
{
"name": "Set variable2",
"type": "SetVariable",
"dependsOn": [
{
"activity": "ForEach1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "test",
"value": {
"value": "@variables('req')",
"type": "Expression"
}
}
}
],
"variables": {
"tp": {
"type": "String"
},
"req": {
"type": "Array"
},
"test": {
"type": "Array"
}
},
"annotations": []
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论