英文:
How to get OData source file updated data into sink file (Azure SQL Server) using Azure Data Factory
问题
如何使用Azure Data Factory将OData源文件的更新数据传输到目标文件(Azure SQL Server)?
如何使用OData源文件创建管道,以便获取更新的记录,并将其传输到目标文件,使用Azure Data Factory?
英文:
How to get an OData source file updated data into sink file (Azure SQL Server) using Azure Data Factory?
How to create pipeline using OData source file to sink for getting updated records, using Azure Data Factory?
答案1
得分: 1
为了将数据逐增量地从ODATA源加载到SQL数据库中,您需要在源中拥有一个递增键列。递增键是添加到表的每一行的唯一标识符,该值在添加新行时会递增。此列将用于识别已复制的行和要复制到目标的行。
- 在SQL数据库中创建一个水印表并插入初始值。
- 如果您的递增键是日期,请将值保持为
1900-01-01
。如果是数字,请从0
开始。
create table watermark_table ( watermark_column datetime2)
insert into watermark_table values ('1900-01-01')
- 在Data Factory管道中,添加一个查找活动并为水印表创建源数据集。
- 然后添加一个复制活动。在源数据集中添加ODATA连接器数据集,在接收器中添加SQL数据库表的数据集。
- 在源中,将查询输入为
$filter=<递增列名称> gt '@{activity('Lookup1').output.firstRow.watermark_column}'
用相应的键列替换<递增列名称>
。
- 然后添加脚本活动,并添加SQL数据库的链接服务。输入脚本如下:
update watermark_table
set watermark_column=(select max(<递增列名称>) from <接收器表名称>)
将<递增列名称>
和<接收器表名称>
替换为相应的列名和表名。
这将用新值替换旧的水印值,新值将是加载到接收器的行的最新值。
参考:使用控制表从数据库进行增量复制 - Azure数据工厂 | Microsoft Learn
英文:
In order to load data incrementally from ODATA source to SQL database, you need to have an incrementing key column in source. Incrementing key is the unique identifier that is added to each row of the table and the value will be increasing whenever new rows are added. This column will be used to identify the rows that are copied already and the rows which are to be copied to sink.
- Create a watermark table in SQL Database and insert the initial value.
- If your incrementing key is date, keep the value as
1900-01-01
. And if it is a number, start with0
.
create table watermark_table ( watermark_column datetime2)
insert into watermark_table values ('1900-01-01')
- In Data factory pipeline, add a lookup activity and create a source dataset for the watermark table.
- Then add a copy activity. In source dataset add OData connector dataset and in sink, add the dataset for SQL database table.
- In source, enter the query as
$filter=<incremental-column-name> gt '@{activity('Lookup1').output.firstRow.watermark_column}'
Replace the <incremental-column-name>
with the respective key column.
- Then add the Script activity and add the linked service for SQL database. Enter the script as,
update watermark_table
set watermark_column=(select max(<incremental-column-name>) from <sink-table-name>)
Replace <incremental-column-name>
and <sink-table-name>
with the respective column name and table name respectively.
This will replace the old watermark value with new value and new value will be the latest value of the row that got loaded to sink.
Reference: Delta copy from a database using a control table - Azure Data Factory | Microsoft Learn
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论