英文:
How to map a json object to a column in Sql
问题
我尝试使用ADF复制活动从API加载和转换数据到Azure SQL。来自JSON API(源)的数据格式如下:
{
"fdgdhgfh": {
"so2_production": 7hjhgj953,
"battery_charge": jkjlkj,
"battery_discharge": kjlklj,
"critical_load_energy": 4ljljh4
},
"9fsdsfb": {
"so2_production": asdasd,
"battery_charge": sdaasf,
"battery_discharge": ewewrwer,
"critical_load_energy": bmvkbjk
}
}
我想要将"fdgdhgfh"或"9fsdsfb"映射到Azure SQL中的一列,使用复制活动。默认情况下,当我导入模式时,我会得到对象值"so2_production","battery_charge","battery_discharge"和"critical_load_energy"以映射到SQL,但我想映射对象而不是对象值。
英文:
I am trying to load and transform data from an API to Azure SQL using ADF copy activity. The data from json API(source) is in below format.
{
"fdgdhgfh": {
"so2_production": 7hjhgj953,
"battery_charge": jkjlkj,
"battery_discharge": kjlklj,
"critical_load_energy": 4ljljh4
},
"9fsdsfb": {
"so2_production": asdasd,
"battery_charge": sdaasf,
"battery_discharge": ewewrwer,
"critical_load_energy": bmvkbjk
}
}
I want to map "fdgdhgfh" or "9fsdsfb" to a column in Azure SQL using copy activity. By default when I import schemas I am getting object values "so2_production","battery_charge","battery_discharge", "critical_load_energy" to map in SQL but I want to map object not the object value.
答案1
得分: 0
以下是翻译的内容:
AFAIK,使用复制活动,可能无法实现您期望的结果。
我可以使用变量和脚本活动的组合来完成。
**注意:**此方法仅在没有分页时有效,因为它涉及到使用Web活动。如果存在分页,您需要在每次迭代中使用Web活动。如果可以使用数据流,则结合派生列和展平转换将是更好的选择,因为数据流支持REST API和分页。
这是我的变量和管道的流程。
-
这里我使用了Blob的查找活动来获取您的JSON。在您的情况下,请在此处使用Web活动。
-
然后我将JSON转换为字符串并存储在变量
jsonstring
中。
@string(activity('Lookup1').output.value[0])
。在您的情况下,它将是@string(activity('Web1').output)
。 -
之后,我在该变量上使用了拆分,以
},
作为分隔符,并将结果存储在split1
变量中。
@split(substring(variables('jsonstring'),1,add(length(variables('jsonstring')),-2)), '},')
这将产生以下结果。
-
我在这里使用了ForEach,并将上述数组
@variables('split1')
传递给它。 -
在ForEach内部,为了将JSON的键存储到
cols
变量中,我在每次迭代中使用了以下表达式。
@substring(split(item(),':{')[0],1,add(length(split(item(),':{')[0]),-2))
-
之后,我使用以下表达式将
so2_production
值存储在values
表达式中。@activity('Lookup1').output.value[0][variables('cols')]['so2_production']
。
在您的情况下,它将是activity('Web1').output[variables('cols')] ['so2_production']
- 在同一个ForEach之后,获取列和对象值后,我使用脚本活动将数据从变量插入到目标表中。
insert into sample1(uid,usource) values('@{variables('cols')}','@{variables('values')}')
它将为每次迭代插入每个键和对象值。
我的管道JSON:
{
"name": "pipeline3",
"properties": {
"activities": [
{
"name": "Lookup1",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "JsonSource",
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "JsonReadSettings"
}
},
"dataset": {
"referenceName": "sourcejson",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "Lookup output to string",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Lookup1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "jsonstring",
"value": {
"value": "@string(activity('Lookup1').output.value[0])",
"type": "Expression"
}
}
},
{
"name": "split on string",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Lookup output to string",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "split1",
"value": {
"value": "@split(substring(variables('jsonstring'),1,add(length(variables('jsonstring')),-2)), '},')",
"type": "Expression"
}
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "split on string",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@variables('split1')",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Cols",
"type": "SetVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "cols",
"value": {
"value": "@substring(split(item(),':{')[0],1,add(length(split(item(),':{')[0]),-2))",
"type": "Expression"
}
}
},
{
"name": "values",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Cols",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "values",
"value": {
"value": "@activity('Lookup1').output.value[0][variables('cols')]['so2_production']",
"type": "Expression"
}
}
},
{
"name": "Script1",
"type": "
英文:
AFAIK, Using copy activity, it might not be possible to achieve your desired result.
I could able to get it done using combination of variables and script activity.
NOTE: This approach will only work when there is no pagination as it involves usage of web activity. If there is a pagination, you need to do use web activity in every iteration. If you can use dataflow, then it be the better option with combination of derived columns and flatten transformations beacuse dataflow supports REST API and pagination.
These are my variables and flow of pipeline.
-
Here I have used lookup activity from blob to get your JSON. In your case use web activity here.
-
Then I have converted the JSON to string and stored in a variable
jsonstring
.
@string(activity('Lookup1').output.value[0])
. In your case it will be@string(activity('Web1').output)
. -
After that, I have used split on that variable with
},
and stored the result insplit1
variable.
@split(substring(variables('jsonstring'),1,add(length(variables('jsonstring')),-2)), '},')
which will give result as below.
-
I have used a ForEach here, and given the above array
@variables('split1')
to it. -
Inside ForEach, to store the keys of JSON into
cols
variable I have used the below expression in each iteration.
@substring(split(item(),':{')[0],1,add(length(split(item(),':{')[0]),-2))
-
After that, I have used the below expression to store the
so2_production
values intovalues
expression.@activity('Lookup1').output.value[0][variables('cols')]['so2_production']
.In your case it will be
activity('Web1').output[variables('cols')] ['so2_production']
-
Inside same ForEach after getting the cols and object values, I have used a script activity to insert the data from variables into target table.
insert into sample1(uid,usource) values('@{variables('cols')}','@{variables('values')}')
It will insert each key and object value for every iteration.
My Pipeline JSON:
{
"name": "pipeline3",
"properties": {
"activities": [
{
"name": "Lookup1",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "JsonSource",
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "JsonReadSettings"
}
},
"dataset": {
"referenceName": "sourcejson",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "Lookup output to string",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Lookup1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "jsonstring",
"value": {
"value": "@string(activity('Lookup1').output.value[0])",
"type": "Expression"
}
}
},
{
"name": "split on string",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Lookup output to string",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "split1",
"value": {
"value": "@split(substring(variables('jsonstring'),1,add(length(variables('jsonstring')),-2)), '},')",
"type": "Expression"
}
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "split on string",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@variables('split1')",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Cols",
"type": "SetVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "cols",
"value": {
"value": "@substring(split(item(),':{')[0],1,add(length(split(item(),':{')[0]),-2))",
"type": "Expression"
}
}
},
{
"name": "values",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Cols",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "values",
"value": {
"value": "@activity('Lookup1').output.value[0][variables('cols')]['so2_production']",
"type": "Expression"
}
}
},
{
"name": "Script1",
"type": "Script",
"dependsOn": [
{
"activity": "values",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"linkedServiceName": {
"referenceName": "AzureSqlDatabase1",
"type": "LinkedServiceReference"
},
"typeProperties": {
"scripts": [
{
"type": "Query",
"text": {
"value": "insert into sample1(uid,usource) values('@{variables('cols')}','@{variables('values')}')",
"type": "Expression"
}
}
],
"scriptBlockExecutionTimeout": "02:00:00"
}
}
]
}
}
],
"variables": {
"jsonstring": {
"type": "String"
},
"split1": {
"type": "Array"
},
"cols": {
"type": "String"
},
"values": {
"type": "String"
}
},
"annotations": []
}
}
Result:
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论