通过Azure数据工厂逐增量加载MongoDB数据。

huangapple go评论81阅读模式
英文:

Load MongoDB data incrementally through Azure data factory

问题

我想使用Azure数据工厂逐渐将数据从MongoDB加载到Azure存储。我找不到任何相关的文档来执行此操作。如果有使用Azure数据工厂实现此目标的方法,将不胜感激。我已经检查了下面的链接,但没有一个讨论从MongoDB逐渐加载数据。

https://learn.microsoft.com/en-us/azure/data-factory/connector-mongodb?tabs=data-factory

从MongoDB逐渐加载数据。

英文:

I would like to load data from MongoDB incrementally to Azure storage using Azure data factory. I couldn't find any relavent documentation to do this..Appreicate if there is a way to achieve this with Azure data factory. I have already checked the below links but none of them talk about loading data incrementally from mongoDB.

https://learn.microsoft.com/en-us/azure/data-factory/connector-mongodb?tabs=data-factory

Load data from mongodb incrementally.

答案1

得分: 1

为了将数据从Azure Cosmos DB - MongoDB API增量复制到Azure Blob存储,您需要维护水印表。该表将包含上次管道运行的时间戳值。如果您将水印表存储在MongoDB API中,ADF没有查询它的选项。因此,我将水印表存储在Azure SQL数据库中。

通过Azure数据工厂逐增量加载MongoDB数据。

最初,将值存储为1900-01-01在水印表中。

在ADF中需要按照以下步骤操作:

  • 使用指向水印表的数据集获取查找活动。将查询设置为select date_col from watermark

通过Azure数据工厂逐增量加载MongoDB数据。

  • 然后在查找活动旁边获取复制活动。使用MongoDB的源数据集。在筛选器中输入以下内容以筛选大于水印表值的行。
{"created_date":{$gt:@{activity('Lookup1').output.firstRow.date_col}}}
  • 创建一个接收数据集,并将文件名设置为动态的。@concat('filename_',utcnow())。这将将文件名与创建时的日期时间连接在一起。

通过Azure数据工厂逐增量加载MongoDB数据。

  • 在复制活动后获取脚本活动,并将查询设置为:
update watermark
set date_col='@{utcNow()}'

这将使用当前的UTC更新watermark表。因此,在下一个管道运行中,MongoDB API中创建的任何行,它们的创建时间晚于当前UTC,将被复制到新文件中。

英文:

In order to copy data from azure cosmos dB - mongo dB Api to azure blob storage incrementally, you need to maintain the watermark table. This table will have the timestamp value of the last pipeline run. If you store the watermark table in mongo dB API, ADF doesn't have option to query it. Thus, I am taking the watermark table in azure SQL database to store this.

通过Azure数据工厂逐增量加载MongoDB数据。

Initially, value is stored as 1900-01-01 in watermark table.
Steps to be followed in ADF:

  • Take the lookup activity with the dataset pointing to watermark table. Give the query as select date_col from watermark

通过Azure数据工厂逐增量加载MongoDB数据。

  • Then take the copy activity next to lookup activity. Take the source dataset for mongo DB. In filter, type the following to filter the rows greater than watermark table value.
{"created_date":{$gt:@{activity('Lookup1').output.firstRow.date_col}}
  • Create a sink dataset and keep the filename as dynamic. @concat('filename_',utcnow()). This will concat the filename with the datetime at when it got created.

通过Azure数据工厂逐增量加载MongoDB数据。

  • Take the script activity after copy activity and give the query as,
update watermark
set date_col='@{utcNow()}'

This will update the watermark table with current UTC. Thus, in next pipeline run, any rows which are created after the current UTC in mongo dB API, that will be copied to new file.

huangapple
  • 本文由 发表于 2023年7月10日 22:40:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/76654844.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定