json object is too complex for ADF dataflow to parse/roll

huangapple go评论155阅读模式
英文:

json object is too complex for ADF dataflow to parse/roll

问题

我需要收集卖家的ID和卖家的名称。我尝试在ADF数据流中实现这一目标,但在展开转换中,由于某种原因无法按卖家汇总。似乎这不是一个有效的数组。

我尝试过以下方法:

  1. 我尝试了一些其他方法,比如将它复制到blob中,并调整了一些设置,比如将文件模式设置为“对象数组”,然后将其作为数据流的源。这没有起作用。
  2. 尝试使用一些聚合转换,但不确定如何应用它。我对数据流是新手。

如果在数据流中完全不可能实现,那么在pyspark中是否可以轻松完成?

英文:

I have a json which looks like below:

json object is too complex for ADF dataflow to parse/roll

I need to collect the seller ID and seller Name for now. I was trying to achieve in adf data flow but it was un able to roll it by sellers for some reason in the flatten transformation . Seems like this not a valid array.

what I tried:

  1. I tried some other ways like copying it into blob and tweaked some
    settings like file pattern as 'array of objects' and feed it as a source to dataflow which
    didn't work.
  2. Try to use some aggregate transformation but not sure how to apply it. I am new to dataflow.

Can it be done easily in pyspark if its not possible in dataflow at all is also I am open to.

答案1

得分: 1

I have taken the following sample data in order to get the sellerID and sellerName.

Now, using the select transformation with rule-based mapping, I have selected the properties of sellers.

I have used derived column transformation to add a new column called columns with the value as columnNames('select1') which would give me an array column with value as ["ABC","XYZ"] (the keys which we need).

Now, iterate over the columns array as shown in the above image using the items value as @activity('Data flow1').output.runStatus.output.sink1.value[0].columns

Inside the for each, first I have a set variable activity to build each object containing sellerId and sellerName.

Now, I have used append variable activity after this to append this object to an array variable using the dynamic content @json(variables('tp')).

Now you can write this variable's value to a file as JSON.

英文:
  • I have taken the following sample data in order to get the sellerID and sellerName.
{
   "timestamp":1686036525840,
   "tokensLeft":1,
   "refillIn":1,
   "refillRate":1,
   "tokenFlowReduction":0.0,
   "tokensConsumed":2,
   "processingTimeInMs":0,
   "sellers":{
      "ABC":{
         "trackedSince":1,
         "domainId":1,
         "sellerId":"XX",
         "sellerName":"LS",
         "csv":[
            [
               6062020
            ],
            [
               6062020
            ]
         ],
         "lastUpdate":6536604,
         "isScammer":false,
         "hasFBA":true,
         "totalStorefrontAsins":[
            1,
            44
         ],
         "sellerCategoryStatistics":[
            {
               "catId":1,
               "productCount":2,
               "avg30SalesRank":3,
               "productCountWithAmazonOffer":5
            },
            {
               "catId":3,
               "productCount":11,
               "avg30SalesRank":72203,
               "productCountWithAmazonOffer":3
            }
         ],
         "sellerBrandStatistics":[
            {
               "brand":"1",
               "productCount":3,
               "avg30SalesRank":18820,
               "productCountWithAmazonOffer":0
            },
            {
               "brand":"l3",
               "productCount":3,
               "avg30SalesRank":32525,
               "productCountWithAmazonOffer":1
            },
            {
               "brand":"1",
               "productCount":3,
               "avg30SalesRank":40102,
               "productCountWithAmazonOffer":1
            },
            {
               "brand":"1",
               "productCount":1,
               "avg30SalesRank":5315,
               "productCountWithAmazonOffer":0
            }
         ],
         "shipsFromChina":false,
         "address":[
            "1"
         ],
         "recentFeedback":[
            {
               "rating":10,
               "date":6531840,
               "feedback":"Never .  Black ",
               "isStriked":true
            },
            {
               "rating":50,
               "date":6523200,
               "feedback":"I received.",
               "isStriked":false
            },
            {
               "rating":50,
               "date":6521760,
               "feedback":"Fast ship a",
               "isStriked":false
            },
            {
               "rating":50,
               "date":6518880,
               "feedback":"It  to .",
               "isStriked":false
            },
            {
               "rating":20,
               "date":6517440,
               "feedback":"I r .",
               "isStriked":true
            }
         ],
         "lastRatingUpdate":6536604,
         "neutralRating":[
            0,
            0,
            0,
            0
         ],
         "negativeRating":[
            0,
            0,
            2,
            2
         ],
         "positiveRating":[
            100,
            100,
            98,
            98
         ],
         "ratingCount":[
            7,
            23,
            59,
            59
         ],
         "currentRating":1,
         "currentRatingCount":1,
         "ratingsLast30Days":6
      },
      "XYZ":{
         "trackedSince":2795761,
         "domainId":1,
         "sellerId":"CVB",
         "sellerName":"ABC",
         "csv":[
            [
               271
            ],
            [
               6,
               46101
            ]
         ],
         "lastUpdate":6536586,
         "isScammer":false,
         "hasFBA":true,
         "totalStorefrontAsins":[
            1,
            1
         ],
         "sellerCategoryStatistics":[
            {
               "catId":1,
               "productCount":131,
               "avg30SalesRank":11,
               "productCountWithAmazonOffer":1
            },
            {
               "catId":3760911,
               "productCount":106,
               "avg30SalesRank":101900,
               "productCountWithAmazonOffer":8
            },
            {
               "catId":1055398,
               "productCount":93,
               "avg30SalesRank":107441,
               "productCountWithAmazonOffer":41
            },
            {
               "catId":3760901,
               "productCount":60,
               "avg30SalesRank":11,
               "productCountWithAmazonOffer":3
            },
            {
               "catId":11,
               "productCount":11,
               "avg30SalesRank":11,
               "productCountWithAmazonOffer":5
            },
            {
               "catId":11,
               "productCount":28,
               "avg30SalesRank":489,
               "productCountWithAmazonOffer":23
            },
            {
               "catId":1,
               "productCount":1,
               "avg30SalesRank":1,
               "productCountWithAmazonOffer":0
            },
            {
               "catId":1,
               "productCount":12,
               "avg30SalesRank":1,
               "productCountWithAmazonOffer":0
            },
            {
               "catId":1,
               "productCount":8,
               "avg30SalesRank":1,
               "productCountWithAmazonOffer":4
            },
            {
               "catId":1,
               "productCount":6,
               "avg30SalesRank":1,
               "productCountWithAmazonOffer":3
            }
         ],
         "sellerBrandStatistics":[
            {
               "brand":"1e",
               "productCount":35,
               "avg30SalesRank":1,
               "productCountWithAmazonOffer":0
            }
         ]
      }
   }
}
  • Now, using the select transformation with rule-based mapping, I have selected the properties of sellers.

json object is too complex for ADF dataflow to parse/roll

  • I have used derived column transformation to add a new column called columns with the value as columnNames('select1') which would give me an array column with value as ["ABC","XYZ"] (the keys which we need).

json object is too complex for ADF dataflow to parse/roll

  • I have using sink cache and writing to the activity output. The following is the complete Dataflow JSON:
{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "Json1",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "select1"
                },
                {
                    "name": "derivedColumn1"
                }
            ],
            "scriptLines": [
                "source(output(",
                "          timestamp as integer,",
                "          tokensLeft as integer,",
                "          refillIn as integer,",
                "          refillRate as integer,",
                "          tokenFlowReduction as double,",
                "          tokensConsumed as integer,",
                "          processingTimeInMs as integer,",
                "          sellers as (ABC as (trackedSince as integer, domainId as integer, sellerId as string, sellerName as string, csv as integer[][], lastUpdate as integer, isScammer as boolean, hasFBA as boolean, totalStorefrontAsins as integer[], sellerCategoryStatistics as (catId as integer, productCount as integer, avg30SalesRank as integer, productCountWithAmazonOffer as integer)[], sellerBrandStatistics as (brand as string, productCount as integer, avg30SalesRank as integer, productCountWithAmazonOffer as integer)[], shipsFromChina as boolean, address as string[], recentFeedback as (rating as integer, date as integer, feedback as string, isStriked as boolean)[], lastRatingUpdate as integer, neutralRating as integer[], negativeRating as integer[], positiveRating as integer[], ratingCount as integer[], currentRating as integer, currentRatingCount as integer, ratingsLast30Days as integer), XYZ as (trackedSince as integer, domainId as integer, sellerId as string, sellerName as string, csv as integer[][], lastUpdate as integer, isScammer as boolean, hasFBA as boolean, totalStorefrontAsins as integer[], sellerCategoryStatistics as (catId as integer, productCount as integer, avg30SalesRank as integer, productCountWithAmazonOffer as integer)[], sellerBrandStatistics as (brand as string, productCount as integer, avg30SalesRank as integer, productCountWithAmazonOffer as integer)[]))",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false,",
                "     documentForm: 'singleDocument') ~> source1",
                "source1 select(mapColumn(",
                "          each(sellers,match(true()))",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> select1",
                "select1 derive(columns = columnNames('select1')) ~> derivedColumn1",
                "derivedColumn1 sink(validateSchema: false,",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     store: 'cache',",
                "     format: 'inline',",
                "     output: true,",
                "     saveOrder: 1) ~> sink1"
            ]
        }
    }
}

json object is too complex for ADF dataflow to parse/roll

  • Now, iterate over the columns array as shown in the above image using the items value as @activity('Data flow1').output.runStatus.output.sink1.value[0].columns

  • Inside the for each, first I have a set variable activity to build each object containing sellerId and sellerName with following dynamic content:

{
"sellerId":"@{activity('Data flow1').output.runStatus.output.sink1.value[0][item()].sellerId}",
"sellerName":"@{activity('Data flow1').output.runStatus.output.sink1.value[0][item()].sellerName}"
}

json object is too complex for ADF dataflow to parse/roll

  • Now, I have used append variable activity after this to append this object to an array variable using the dynamic content @json(variables('tp')).

json object is too complex for ADF dataflow to parse/roll

  • I have used another set variable activity for demonstration purposes to show the value of this array that was just built.

json object is too complex for ADF dataflow to parse/roll

  • Now you can write this variable's value to a file as JSON. Refer to the solution provided in this question. The following is the Pipeline Json:
{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Data flow1",
                "type": "ExecuteDataFlow",
                "dependsOn": [],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "dataflow": {
                        "referenceName": "dataflow1",
                        "type": "DataFlowReference"
                    },
                    "compute": {
                        "coreCount": 8,
                        "computeType": "General"
                    },
                    "traceLevel": "None",
                    "cacheSinks": {
                        "firstRowOnly": false
                    }
                }
            },
            {
                "name": "ForEach1",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "Data flow1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@activity('Data flow1').output.runStatus.output.sink1.value[0].columns",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "Append variable1",
                            "type": "AppendVariable",
                            "dependsOn": [
                                {
                                    "activity": "Set variable1",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "req",
                                "value": {
                                    "value": "@json(variables('tp'))",
                                    "type": "Expression"
                                }
                            }
                        },
                        {
                            "name": "Set variable1",
                            "type": "SetVariable",
                            "dependsOn": [],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "tp",
                                "value": {
                                    "value": "{\n    \"sellerId\":\"@{activity('Data flow1').output.runStatus.output.sink1.value[0][item()].sellerId}\",\n    \"sellerName\":\"@{activity('Data flow1').output.runStatus.output.sink1.value[0][item()].sellerName}\"\n}",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            },
            {
                "name": "Set variable2",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "ForEach1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "variableName": "test",
                    "value": {
                        "value": "@variables('req')",
                        "type": "Expression"
                    }
                }
            }
        ],
        "variables": {
            "tp": {
                "type": "String"
            },
            "req": {
                "type": "Array"
            },
            "test": {
                "type": "Array"
            }
        },
        "annotations": []
    }
}

huangapple
  • 本文由 发表于 2023年6月6日 12:25:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/76411438.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定