ADF – 数据流中的复杂嵌套数组

huangapple go评论97阅读模式
英文:

ADF - Complex nested array in Data Flow

问题

我是一个新手,正在使用ADF中的Data Flow,所以我想请教您帮助。以下是情况:

我想在Data Flow中从REST API中转换Json文件,以创建一个表格/逻辑表结构。转换后,我想将这些数据+结构发送到Azure SQL数据库。

这是我已经完成的工作概述:

创建了一个管道,将数据从网站复制到我的Data Lake。

这是输入源:

{
    "id":[
        {
            "value":40051
        }
    ],
    "uuid":[
        {
            "value":"0ca12ac9-d94b-44cf-a35b-8b7256006cf8"
        }
    ],
    "revision_id":[
        {
            "value":1452381
        }
    ],
    ...
}

这是ADF管道的汇聚部分:

它以.json文件的形式保存在数据湖中。

重要提示:我只需要转换Json元素的这一部分:
field_par_chart 具有“csv”元素

期望的数据结构必须如下所示(请参见下面的图像)并保存为文本文件。

任何关于如何在ADF管道和/或ADF Data Flow中执行此操作的建议都会受到欢迎!

感谢您的时间和努力!

英文:

I am bit new using Data Flow in ADF so thats why im asking u for help. here's the situation:

I would like to transform a Json file from REST API in Data Flow for creating a tabular/ logical table structure. After transformation i would like to send this data + structure to a Azure SQL Database.

Here is a overview what i've done:

Created a Pipeline to copy data from a website to my Data Lake.

ADF – 数据流中的复杂嵌套数组

ADF – 数据流中的复杂嵌套数组

This is the input source:

{
"id":[
{
"value":40051
}
],
"uuid":[
{
"value":"0ca12ac9-d94b-44cf-a35b-8b7256006cf8"
}
],
"revision_id":[
{
"value":1452381
}
],
"langcode":[
{
"value":"nl"
}
],
"type":[
{
"target_id":"par_chart",
"target_type":"paragraphs_type",
"target_uuid":"2c3143a2-bd78-4b4d-afb6-19160de928f2"
}
],
"status":[
{
"value":true
}
],
"created":[
{
"value":"2019-10-17T12:08:05+00:00",
"format":"Y-m-d\\TH:i:sP"
}
],
"parent_id":[
{
"value":"2561"
}
],
"parent_type":[
{
"value":"node"
}
],
"parent_field_name":[
{
"value":"field_paragraphs"
}
],
"behavior_settings":[
{
"value":[
]
}
],
"default_langcode":[
{
"value":true
}
],
"revision_translation_affected":[
{
"value":true
}
],
"content_translation_source":[
{
"value":"und"
}
],
"content_translation_outdated":[
{
"value":false
}
],
"content_translation_changed":[
{
"value":"2023-05-09T09:48:38+00:00",
"format":"Y-m-d\\TH:i:sP"
}
],
"field_par_chart":[
{
"csv":"[[\"\",\"Percentage behandeling\",\"Percentage behandeling\",\"Percentage behandeling\"],[\"2010\",\"19.4\",null,\"\"],[\"2011\",\"16.6\",null,\"\"],[\"2012\",\"15.4\",null,\"\"],[\"2013\",\"13.5\",null,\"\"],[\"2014\",\"13\",null,\"\"],[\"2015\",\"13\",null,\"\"],[\"2016\",\"14.1\",null,\"\"],[\"2017\",\"17.7\",null,\"\"],[\"2018\",\"24\",null,\"\"],[\"2019\",\"\",\"27.7\",\"\"],[\"2020\",\"\",\"31.9\",\"\"],[\"2021*\",\"\",\"\",\"42.9\"],[\"2022*\",null,\"\",\"46.2\"]]",
"csv_url":"",
"config":"{\"chart\":{\"type\":\"line\",\"renderTo\":{\"hcEvents\":{\"mousedown\":[{\"order\":null}],\"touchstart\":[{\"order\":null}],\"mouseover\":[{\"order\":null}],\"mouseout\":[{\"order\":null}]},\"__EV_STORE_KEY@7\":{}}},\"xAxis\":[{\"type\":\"category\",\"index\":0,\"isX\":true}],\"yAxis\":[{\"title\":{\"text\":\"Percentage\",\"offset\":-81.859375},\"min\":0,\"tickInterval\":5,\"index\":0,\"events\":{}}],\"templateName\":\"lineBasic\",\"series\":[{\"type\":null,\"animation\":false,\"data\":[{\"name\":\"2010\",\"y\":19.4},{\"name\":\"2011\",\"y\":16.6},{\"name\":\"2012\",\"y\":15.4},{\"name\":\"2013\",\"y\":13.5},{\"name\":\"2014\",\"y\":13},{\"name\":\"2015\",\"y\":13},{\"name\":\"2016\",\"y\":14.1},{\"name\":\"2017\",\"y\":17.7},{\"name\":\"2018\",\"y\":24},{\"name\":\"2019\",\"y\":null},{\"name\":\"2020\",\"y\":null},{\"name\":\"2021*\",\"y\":null},{\"name\":\"2022*\",\"y\":null}],\"name\":\"Percentage behandeling\",\"_colorIndex\":0,\"_symbolIndex\":0},{\"type\":\"line\",\"animation\":false,\"data\":[{\"name\":\"2010\",\"y\":null},{\"name\":\"2011\",\"y\":null},{\"name\":\"2012\",\"y\":null},{\"name\":\"2013\",\"y\":null},{\"name\":\"2014\",\"y\":null},{\"name\":\"2015\",\"y\":null},{\"name\":\"2016\",\"y\":null},{\"name\":\"2017\",\"y\":null},{\"name\":\"2018\",\"y\":null},{\"name\":\"2019\",\"y\":27.7},{\"name\":\"2020\",\"y\":31.9},{\"name\":\"2021*\",\"y\":null},{\"name\":\"2022*\",\"y\":null}],\"name\":\"Percentage behandeling\"},{\"type\":\"line\",\"animation\":false,\"data\":[{\"y\":null,\"name\":\"2010\"},{\"y\":null,\"name\":\"2011\"},{\"y\":null,\"name\":\"2012\"},{\"y\":null,\"name\":\"2013\"},{\"y\":null,\"name\":\"2014\"},{\"y\":null,\"name\":\"2015\"},{\"y\":null,\"name\":\"2016\"},{\"y\":null,\"name\":\"2017\"},{\"y\":null,\"name\":\"2018\"},{\"y\":null,\"name\":\"2019\"},{\"y\":null,\"name\":\"2020\"},{\"y\":42.9,\"name\":\"2021*\"},{\"y\":46.2,\"name\":\"2022*\"}],\"name\":\"Percentage behandeling\"}],\"title\":{\"text\":\"Trend in wachttijden voor behandeling in ziekenhuis langer dan de Treeknorm\"},\"legend\":{\"enabled\":false}}"
}
],
"field_par_extra_info":[
{
"value":"singlecard"
}
],
"field_par_hidden":[
{
"value":false
}
],
"field_par_text":[
{
"value":"<ul>\r\n\t<li><em>Door meerdere veranderingen van de bron en methode zijn de percentages van 2010-2018, 2019-2020 en 2021-2022 niet goed met elkaar te vergelijken.</em></li>\r\n\t<li><em><span><span><span>De wachttijden van 2021 zijn enkel gebaseerd op wachttijden gemeten van augustus 2021 t/m december 2021 vanwege een verandering in de methode. In de eerste helft van 2021 was 28,7% van de wachttijden voor behandeling langer dan de treeknorm.</span></span></span></em></li>\r\n\t<li><em>In 2020 brak de coronapandemie uit. Hiermee moet rekening worden gehouden bij het interpreteren van de trends.</em></li>\r\n</ul>\r\n\r\n<p><strong>Bron </strong> <br />\r\nWachttijdenonderzoek, Mediquest<br />\r\nWachttijdenregistratie NZa<br />\r\n<strong>Verslagjaar  t/m</strong><br />\r\n2022<br />\r\n<strong>Laatste update gegevens </strong><br />\r\n24 mei 2023<br />\r\n<strong>Updatefrequentie </strong><br />\r\nJaarlijks<br />\r\n<strong>Meer info</strong><br />\r\n<a href=\"https://www.vzinfo.nl/prestatie-indicatoren/wachttijd-langer-dan-treeknorm-behandeling\" target=\"_blank\">Prestatie-indicatoren gezondheidszorg op VZinfo.nl</a></p>\r\n",
"format":"volledige_html",
"processed":"<ul><li><em>Door meerdere veranderingen van de bron en methode zijn de percentages van 2010-2018, 2019-2020 en 2021-2022 niet goed met elkaar te vergelijken.</em></li>\n\t<li><em><span><span><span>De wachttijden van 2021 zijn enkel gebaseerd op wachttijden gemeten van augustus 2021 t/m december 2021 vanwege een verandering in de methode. In de eerste helft van 2021 was 28,7% van de wachttijden voor behandeling langer dan de treeknorm.</span></span></span></em></li>\n\t<li><em>In 2020 brak de coronapandemie uit. Hiermee moet rekening worden gehouden bij het interpreteren van de trends.</em></li>\n</ul><p><strong>Bron </strong> <br />\nWachttijdenonderzoek, Mediquest<br />\nWachttijdenregistratie NZa<br /><strong>Verslagjaar  t/m</strong><br />\n2022<br /><strong>Laatste update gegevens </strong><br />\n24 mei 2023<br /><strong>Updatefrequentie </strong><br />\nJaarlijks<br /><strong>Meer info</strong><br /><a href=\"https://www.vzinfo.nl/prestatie-indicatoren/wachttijd-langer-dan-treeknorm-behandeling\" target=\"_blank\">Prestatie-indicatoren gezondheidszorg op VZinfo.nl</a></p>\n\n"
}
],
"field_par_text_bgcolor":[
{
"value":"bg-gray-lightest"
}
],
"field_par_text_position":[
{
"value":"below"
}
],
"field_par_title":[
{
"value":"Trend "
}
],
"field_par_title_class":[
],
"field_par_title_enable":[
{
"value":false
}
],
"field_par_title_tag":[
{
"value":"h4"
}
]
}

This is the sink part of ADF Pipeline:

Its saved in the data lake as a .json file.

ADF – 数据流中的复杂嵌套数组

Important: i only need to transform this part of the Json element:
field_par_chart with "csv" element

ADF – 数据流中的复杂嵌套数组

The expected data structure must be like this (see image below) and saved as a text file.

ADF – 数据流中的复杂嵌套数组

Any suggestions how to do this with a ADF Pipeline and/or ADF Data Flow?

Many thanks for your time and effort!

答案1

得分: 1

以下是翻译好的部分:

如果您的目标列有限且列名已知,那么您可以尝试以下方法。

在您的预期输出中,有3列具有相同的名称,ADF或SQL不支持这些列名相同的情况。因此,我已忽略了field_par_chart字段中的第一个数组(列名数组)。

由于您只想从field_par_chart字段中获取数据,因此请使用select转换从源中移除其余字段。

然后我进行了3个派生列转换。

派生列1:

它将字符串拆分并生成以下动态表达式的数组数组

map(split(replace(field_par_chart_csv,'[',''),'],',split(replace(replace(#item,'"',''),']',''),','))

派生列2:

它跳过了第一个子数组(列名数组),由于列名相同,然后展开了所有子数组并将它们转换成了行,如下所示,使用以下动态表达式。

unfold(slice(arr,2))

派生列3:

它从行数组生成所需的列。在这里,我手动提供列名并将值从字符串转换为双精度。您可以手动提供任何列名。

sink中,指定您的目标SQL表,并为仅那4列提供映射,删除从先前转换中获得的额外列。

结果

执行此数据流通过管道,您可以将此数据加载到目标SQL表中。

供您参考的我的数据流JSON:

{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "Json1",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "AzureSqlTable1",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "derivedColumn1"
                },
                {
                    "name": "select1"
                },
                {
                    "name": "derivedColumn2"
                },
                {
                    "name": "derivedColumn3"
                }
            ],
            "scriptLines": [
                "source(output(",
                "          id as (value as integer)[],",
                "          uuid as (value as string)[],",
                "          revision_id as (value as integer)[],",
                "          langcode as (value as string)[],",
                "          type as (target_id as string, target_type as string, target_uuid as string)[],",
                "          status as (value as boolean)[],",
                "          created as (value as string, format as string)[],",
                "          parent_id as (value as string)[],",
                "          parent_type as (value as string)[],",
                "          parent_field_name as (value as string)[],",
                "          behavior_settings as (value as string[])[],",
                "          default_langcode as (value as boolean)[],",
                "          revision_translation_affected as (value as boolean)[],",
                "          content_translation_source as (value as string)[],",
                "          content_translation_outdated as (value as boolean)[],",
                "          content_translation_changed as (value as string, format as string)[],",
                "          field_par_chart as (csv as string, csv_url as string, config as string)[],",
                "          field_par_extra_info as (value as string)[],",
                "          field_par_hidden as (value as boolean)[],",
                "          field_par_text as (value as string, format as string, processed as string)[],",
                "          field_par_text_bgcolor as (value as string)[],",
                "          field_par_text_position as (value as string)[],",
                "          field_par_title as (value as string)[],",
                "          field_par_title_class as string[],",
                "          field_par_title_enable as (value as boolean)[],",
                "          field_par_title_tag as (value as string)[]",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false,",
                "     documentForm: 'singleDocument') ~> source1",
                "select1 derive(arr = map(split(replace(field_par_chart_csv,'[',''),'],',split(replace(replace(#item,'\"',''),']',''),','))) ~> derivedColumn1",
                "source1 select(mapColumn(",
                "          field_par_chart_csv = field_par_chart[1].csv",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> select1",
                "derivedColumn1 derive(new_arr = unfold(slice(arr,2))) ~> derivedColumn2",
                "derivedColumn2 derive(Category = new_arr[1],",
                "          {Percentage behandeling1} = toDouble(new_arr[2]),",
                "          {Percentage behandeling2} = toDouble(new_arr[3]),",
                "          {Percentage behandeling3} = toDouble(new_arr[4])) ~> derivedColumn3",
                "derivedColumn3 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     deletable:false,",
                "     insertable:true,",
                "     updateable:false,",
                "     upsertable:false,",
                "     recreate:true,",
                "     format: 'table',",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     errorHandlingOption: 'stopOnFirstError',",
                "     mapColumn(",
                "          Category,",
                "          {Percentage behandeling1},",
                "          {Percentage behandeling2},",
                "          {Percentage behandeling3}",
                "     )) ~> sink1"
            ]
        }
    }
}
英文:

If your target columns are limited and column names are known, then you can try the below approach.

In your expected output, 3 columns have same name, ADF or SQL won't support these. So, I have ignored the first array(column names array) from the field_par_chart field.

As you want the data only from field_par_chart field, remove the remaining fields from the source using select transformation.

ADF – 数据流中的复杂嵌套数组

Then I took 3 dervied column transformations.

derived column1:

It splits the string and generates array of arrays with below dynamic expression.

map(split(replace(field_par_chart_csv,'[',''),'],'),split(replace(replace(#item,'"',''),']',''),','))

ADF – 数据流中的复杂嵌套数组

derived column2:

It skips the first sub array(array of column names) due to the same column names and unfolds all the sub arrays and converts them into rows like below using this dynamic expression.

unfold(slice(arr,2))

ADF – 数据流中的复杂嵌套数组

derived column3:

It generates the required columns from the row arrays. Here, I am giving the column names manually and converting the values from string to double. you can give whatever column names you want manually.

ADF – 数据流中的复杂嵌套数组

In the sink, give your target SQL table and give the mapping for only those 4 columns and remove the extra columns that we got from previous transformations.

ADF – 数据流中的复杂嵌套数组

Result:

ADF – 数据流中的复杂嵌套数组

Execute this dataflow by pipeline and you can load this data to your target SQL table.

My Dataflow JSON for your reference:

{
"name": "dataflow1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "Json1",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "derivedColumn1"
},
{
"name": "select1"
},
{
"name": "derivedColumn2"
},
{
"name": "derivedColumn3"
}
],
"scriptLines": [
"source(output(",
"          id as (value as integer)[],",
"          uuid as (value as string)[],",
"          revision_id as (value as integer)[],",
"          langcode as (value as string)[],",
"          type as (target_id as string, target_type as string, target_uuid as string)[],",
"          status as (value as boolean)[],",
"          created as (value as string, format as string)[],",
"          parent_id as (value as string)[],",
"          parent_type as (value as string)[],",
"          parent_field_name as (value as string)[],",
"          behavior_settings as (value as string[])[],",
"          default_langcode as (value as boolean)[],",
"          revision_translation_affected as (value as boolean)[],",
"          content_translation_source as (value as string)[],",
"          content_translation_outdated as (value as boolean)[],",
"          content_translation_changed as (value as string, format as string)[],",
"          field_par_chart as (csv as string, csv_url as string, config as string)[],",
"          field_par_extra_info as (value as string)[],",
"          field_par_hidden as (value as boolean)[],",
"          field_par_text as (value as string, format as string, processed as string)[],",
"          field_par_text_bgcolor as (value as string)[],",
"          field_par_text_position as (value as string)[],",
"          field_par_title as (value as string)[],",
"          field_par_title_class as string[],",
"          field_par_title_enable as (value as boolean)[],",
"          field_par_title_tag as (value as string)[]",
"     ),",
"     allowSchemaDrift: true,",
"     validateSchema: false,",
"     ignoreNoFilesFound: false,",
"     documentForm: 'singleDocument') ~> source1",
"select1 derive(arr = map(split(replace(field_par_chart_csv,'[',''),'],'),split(replace(replace(#item,'\"',''),']',''),','))) ~> derivedColumn1",
"source1 select(mapColumn(",
"          field_par_chart_csv = field_par_chart[1].csv",
"     ),",
"     skipDuplicateMapInputs: true,",
"     skipDuplicateMapOutputs: true) ~> select1",
"derivedColumn1 derive(new_arr = unfold(slice(arr,2))) ~> derivedColumn2",
"derivedColumn2 derive(Category = new_arr[1],",
"          {Percentage behandeling1} = toDouble(new_arr[2]),",
"          {Percentage behandeling2} = toDouble(new_arr[3]),",
"          {Percentage behandeling3} = toDouble(new_arr[4])) ~> derivedColumn3",
"derivedColumn3 sink(allowSchemaDrift: true,",
"     validateSchema: false,",
"     deletable:false,",
"     insertable:true,",
"     updateable:false,",
"     upsertable:false,",
"     recreate:true,",
"     format: 'table',",
"     skipDuplicateMapInputs: true,",
"     skipDuplicateMapOutputs: true,",
"     errorHandlingOption: 'stopOnFirstError',",
"     mapColumn(",
"          Category,",
"          {Percentage behandeling1},",
"          {Percentage behandeling2},",
"          {Percentage behandeling3}",
"     )) ~> sink1"
]
}
}
}

huangapple
  • 本文由 发表于 2023年7月31日 20:18:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/76803553.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定