Copy活动

huangapple go评论63阅读模式
英文:

Adf copy activity

问题

我有3个文件在目标位置,分别是a.txt,b.txt和c.txt。但是a.txt和c.txt已经存在于我的目标位置(adls),我的复制活动应该只复制剩下的b.txt文件。我如何使用adf管道活动实现这一点。

我尝试过对源和目标使用"获取元数据"来检查文件是否存在于目标位置。但在"if"活动内复制时,它会复制所有文件,而不是那些不存在的文件。

英文:

I have 3 files at destination a.txt, b.txt and c.txt. however a.txt and c.txt is already present in my destination (adls) and my copy activity should only copy the remaining b.txt file. How can i achieve this using adf pipeline activities.

I have tried using get metadata for both source and destination if files exist at destination or not. But while copying inside if activity it copies all the files. Not the ones which are not present.

答案1

得分: 1

  1. 使用两个并行的获取元数据活动来获取源和接收端的子项目列表。
  2. 在两个获取元数据活动都成功后,使用筛选活动,将源子项目数组作为项,条件为@not(contains(sink数组的子项目,item()))
  3. 然后利用 For Each 活动,以筛选活动的输出作为迭代输入,并复制缺失的文件。
英文:
  1. use 2 get meta data activities in parallel to get the child items list of the source and sink
  2. post success of both get meta data activity, use filter activity with
    items as source child items array
    and condition as
    @not(contains(child items of sink array,item()))
  3. then leverage For each activity with filter activity output as input for iteration and copy the missing files.

答案2

得分: 1

I agree with what @Nandan has suggested. The following is the demonstration of the same approach. The following is the output of the get metadata activity on source.

Copy活动

  • And my sink has files as shown in the image below:

Copy活动

  • Using filter, get the filenames that are not present in your sink and then use for each to copy these filtered files. The following dynamic content can be used as filter condition:
items: @activity('source file list').output.childItems
condition: @not(contains(activity('sink file list').output.childItems,item()))

Copy活动

  • Now, you can iterate through this filtered list using @activity('Filter1').output.Value as items for your for each activity. The following is the entire pipeline JSON for the above implementation:
{
"name": "pipeline2",
"properties": {
"activities": [
{
"name": "source file list",
"type": "GetMetadata",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "DelimitedText1",
"type": "DatasetReference"
},
"fieldList": [
"childItems"
],
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "sink file list",
"type": "GetMetadata",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "DelimitedText2",
"type": "DatasetReference"
},
"fieldList": [
"childItems"
],
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "Filter1",
"type": "Filter",
"dependsOn": [
{
"activity": "source file list",
"dependencyConditions": [
"Succeeded"
]
},
{
"activity": "sink file list",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('source file list').output.childItems",
"type": "Expression"
},
"condition": {
"value": "@not(contains(activity('sink file list').output.childItems,item()))",
"type": "Expression"
}
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Filter1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Filter1').output.Value",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Copy data1",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
},
"sink": {
"type": "DelimitedTextSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
},
"formatSettings": {
"type": "DelimitedTextWriteSettings",
"quoteAllText": true,
"fileExtension": ".txt"
}
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
}
}
},
"inputs": [
{
"referenceName": "DelimitedText3",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "DelimitedText4",
"type": "DatasetReference"
}
]
}
]
}
}
],
"annotations": []
}
}
  • Since the end goal here would result in sink having all the files that source has, another way that you can consider is to use a delete activity inside your for each to delete the file in sink first and then copy it.

  • First get the list of files present in source using get metadata activity. Iterate through this list, first apply delete operation on the file and then copy.

NOTE: If the content of the file that already exist in both source and sink is different, go with the approach demonstrated above (as suggest by @Nandan).

英文:

I agree with what @Nandan has suggested. The following is the demonstration of the same approach. The following is the output of the get metadata activity on source.

Copy活动

  • And my sink has files as shown in the image below:

Copy活动

  • Using filter, get the filenames that are not present in your sink and then use for each to copy these filtered files. The following dynamic content can be used as filter condition:
items: @activity('source file list').output.childItems
condition: @not(contains(activity('sink file list').output.childItems,item()))

Copy活动

  • Now, you can iterate through this filtered list using @activity('Filter1').output.Value as items for your for each activity. The following is the entire pipeline JSON for the above implementation:
{
"name": "pipeline2",
"properties": {
"activities": [
{
"name": "source file list",
"type": "GetMetadata",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "DelimitedText1",
"type": "DatasetReference"
},
"fieldList": [
"childItems"
],
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "sink file list",
"type": "GetMetadata",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "DelimitedText2",
"type": "DatasetReference"
},
"fieldList": [
"childItems"
],
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "Filter1",
"type": "Filter",
"dependsOn": [
{
"activity": "source file list",
"dependencyConditions": [
"Succeeded"
]
},
{
"activity": "sink file list",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('source file list').output.childItems",
"type": "Expression"
},
"condition": {
"value": "@not(contains(activity('sink file list').output.childItems,item()))",
"type": "Expression"
}
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Filter1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Filter1').output.Value",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Copy data1",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
},
"sink": {
"type": "DelimitedTextSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
},
"formatSettings": {
"type": "DelimitedTextWriteSettings",
"quoteAllText": true,
"fileExtension": ".txt"
}
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
}
}
},
"inputs": [
{
"referenceName": "DelimitedText3",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "DelimitedText4",
"type": "DatasetReference"
}
]
}
]
}
}
],
"annotations": []
}
}
  • Since the end goal here would result in sink having all the files that source has, another way that you can consider is to use a delete activity inside your for each to delete the file in sink first and then copy it.

  • First get the list of files present in source using get metadata activity. Iterate through this list, first apply delete operation on the file and then copy.

NOTE: If the content of the file that already exist in both source and sink is different, go with the approach demonstrated above (as suggest by @Nandan).

huangapple
  • 本文由 发表于 2023年4月17日 04:11:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/76030095.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定