Transfer blob by SAS URI in Azure functions

huangapple go评论59阅读模式
英文:

Transfer blob by SAS URI in Azure functions

问题

我正在开发一个使用C#编写的Azure函数应用程序。其中一个任务是创建一个函数,该函数将使用SAS URI访问源Blob,将其复制到另一个存储帐户。重要的是要等待操作完成(或失败),并在数据库中进行适当的更改。

但存在一个潜在的问题:复制操作可能需要比活动函数允许的时间更长。解决方案可能是使用持久化函数来启动复制并监视其状态。但是我不知道如何以正确的方式编排这个过程:应该传递什么给“复制”活动,应该返回什么以传递给“检查”活动...对于这个问题,我很抱歉,但我对Azure函数还很陌生。我会感激所有的建议。

更新:

受到Kristin的建议启发,我开发了以下编排流程:

[FunctionName("UploadProjectBlob")]
public async Task RunOrchestrator(
    [OrchestrationTrigger] IDurableOrchestrationContext context,
    ILogger log)
{
    string sourceSas = "https://...";
    string destination = $"test/bigfile_{context.CurrentUtcDateTime.Ticks}.dat";

    await context.CallActivityAsync(nameof(StartCopying), (sourceSas, destination));
    bool done = false;
    bool success = false;
    do
    {
        DateTime fireAt = DateTime.UtcNow + TimeSpan.FromSeconds(60);
        await context.CreateTimer(fireAt, CancellationToken.None);
        var status = await context.CallActivityAsync<CopyStatus>(nameof(CheckStatus), destination);
        switch (status)
        {
            case CopyStatus.Pending:
                // 未完成,继续循环
                continue;
            case CopyStatus.Success:
                // 处理复制完成
                done = success = true;
                break;
            case CopyStatus.Failed:
                // 处理复制失败
                done = true;
                break;
            case CopyStatus.Aborted:
                // 处理复制中止
                done = true;
                break;
            // 如果BlobCopyStatus添加了新值
            default:
                throw new Exception($"未知的Blob复制状态 {status}");
        }
    } 
    while (!done);

    if (success)
    {
        log.LogInformation("复制成功");
    }
    else
    {
        log.LogInformation("复制失败");
    }

}

[FunctionName(nameof(StartCopying))]
public void StartCopying(
    [ActivityTrigger] IDurableActivityContext inputs, 
    ILogger log)
{
    var (sas, destination) = inputs.GetInput<(string, string)>();
    var blobClient = CreateBlobClient(destination);
    _ = blobClient.StartCopyFromUriAsync(new Uri(sas));
}

[FunctionName(nameof(CheckStatus))]
public async Task<CopyStatus> CheckStatus(
    [ActivityTrigger] IDurableActivityContext inputs,
    ILogger log)
{
    var destination = inputs.GetInput<string>();
    var blobClient = CreateBlobClient(destination);
    var properties = await blobClient.GetPropertiesAsync();
    var status = properties.Value.BlobCopyStatus ?? CopyStatus.Failed;
    return status;
}

public BlobClient CreateBlobClient(string name)
{
    BlobServiceClient blobServiceClient = new BlobServiceClient(Environment.GetEnvironmentVariable("STORAGE_CS"));
    BlobContainerClient blobContainerClient = blobServiceClient.GetBlobContainerClient("emails");
    return blobContainerClient.GetBlobClient(name);
}

这对我来说有效,似乎可以避免在活动函数中出现超时问题。但我会感激那些在Azure函数方面有经验的人提供反馈。这样的解决方案安全吗?有什么可以改进的地方吗?

英文:

I'm working on Azure functions application, written in C#. One from my tasks is to create a function which will copy a blob to another storage account using SAS uri for accessing the source blob. Important to wait until operation is completed (or failed) and make appropriate changes in database.
But there is a potential problem: copy operation could take longer than allowed for activity function. The solution could be using a durable function for starting copying and monitoring its status. But I have no idea how to orchestrate such process in a proper way: what to pass to "copy" activity, what to return for passing into "check" activity... I'm sorry for this lame question, but I'm new in Azure functions. I'll appreciate all suggestions.

Update:

Inspired with suggestion from Kristin, I've developed the following orchestration:

[FunctionName(&quot;UploadProjectBlob&quot;)]
public async Task RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context,
ILogger log)
{
string sourceSas = &quot;https://...&quot;;
string destination = $&quot;test/bigfile_{context.CurrentUtcDateTime.Ticks}.dat&quot;;
await context.CallActivityAsync(nameof(StartCopying), (sourceSas, destination));
bool done = false;
bool success = false;
do
{
DateTime fireAt = DateTime.UtcNow + TimeSpan.FromSeconds(60);
await context.CreateTimer(fireAt, CancellationToken.None);
var status = await context.CallActivityAsync&lt;CopyStatus&gt;(nameof(CheckStatus), destination);
switch (status)
{
case CopyStatus.Pending:
// not done, continue the loop
continue;
case CopyStatus.Success:
// handle completed copy
done = success = true;
break;
case CopyStatus.Failed:
// handle failed copy
done = true;
break;
case CopyStatus.Aborted:
// handle aborted copy
done = true;
break;
// in case new values are added to the BlobCopyStatus
default:
throw new Exception($&quot;Unknown blob copy status {status}&quot;);
}
} 
while (!done);
if (success)
{
log.LogInformation(&quot;Copy success&quot;);
}
else
{
log.LogInformation(&quot;Copy failed&quot;);
}
}
[FunctionName(nameof(StartCopying))]
public void StartCopying(
[ActivityTrigger] IDurableActivityContext inputs, 
ILogger log)
{
var (sas, destination) = inputs.GetInput&lt;(string, string)&gt;();
var blobClient = CreateBlobClient(destination);
_ = blobClient.StartCopyFromUriAsync(new Uri(sas));
}
[FunctionName(nameof(CheckStatus))]
public async Task&lt;CopyStatus&gt; CheckStatus(
[ActivityTrigger] IDurableActivityContext inputs,
ILogger log)
{
var destination = inputs.GetInput&lt;string&gt;();
var blobClient = CreateBlobClient(destination);
var properties = await blobClient.GetPropertiesAsync();
var status = properties.Value.BlobCopyStatus ?? CopyStatus.Failed;
return status;
}
public BlobClient CreateBlobClient(string name)
{
BlobServiceClient blobServiceClient = new BlobServiceClient(Environment.GetEnvironmentVariable(&quot;STORAGE_CS&quot;));
BlobContainerClient blobContainerClient = blobServiceClient.GetBlobContainerClient(&quot;emails&quot;);
return blobContainerClient.GetBlobClient(name);
}

It works for me and seems safe from the point of timeouts in activity functions. But I'll appreciate feedback from people who have experience in Azure functions. Is such solution safe? Can I be somehow improved?

答案1

得分: 1

I'm guessing you've looked into how to actually create Durable Functions, and that you know how OrchestrationTrigger and ActivityTrigger are built.

The jest of it is to use await blobClient.StartCopyFromUriAsync(uri) in your ActivityTrigger.

The blobClient instance where you invoke this method on, should be the destination storage account.

The uri variable must be the source blob, including a SAS token (that is, the URI will be something like https://uri.to.source.storage/blob-location?sasToken=value). Remember to give read access when you build this SAS token.

In your ActivityTrigger you'd start the copy operation, and subsequently query the copy operation's current status using blobClient.GetPropertiesAsync, and accessing the properties.Values.BlobCopyStatus.

A simple example of how it would look like in your ActivityTrigger:

// start copy operation
_ = await destinationBlobClient.StartCopyFromUriAsync(sourceBlobUriWithSasToken);

// keep querying the status until we get something we 
while (true)
{
    // to avoid spamming this over and over, only query the 
    // properties once per 5 seconds
    await Task.Delay(TimeSpan.FromSeconds(5));

    // get the copy status and handle accordingly
    var properties = await destinationBlobClient.GetPropertiesAsync();
    switch (properties.Value.BlobCopyStatus)
    {
        case Azure.Storage.Blobs.Models.CopyStatus.Pending:
            // not done, continue the loop
            continue;

        case Azure.Storage.Blobs.Models.CopyStatus.Success:
            // handle completed copy
            break;

        case Azure.Storage.Blobs.Models.CopyStatus.Failed:
            // handle failed copy
            break;

        case Azure.Storage.Blobs.Models.CopyStatus.Aborted:
            // handle aborted copy
            break;

        // in case new values are added to the BlobCopyStatus
        default:
            throw new UnknownBlobCopyStatusException(properties.Value.BlobCopyStatus);
    }
}

I'm not sure if it's better, but an alternative is to have a regular Function that starts the blobClient.StartCopyFromUriAsync and immediately returns. Then have a BlobTrigger, that is generic enough to listen to any changes for your destination account storage (if the destination storage account is variable, then I'm not sure this approach is possible, due to the limited BlobTrigger-pattern matching). I don't remember if BlobCopyStatus triggers a BlobTrigger, which of course is also necessary for this alternative to be viable. You could test it yourself. In any case, if the previous mentioned things are non-issues, then you could do something like this as well:

[FunctionName("BlobCopyStatusListener")]
public static async Task Bleh(
    [BlobTrigger("destination-pattern")] BlobClient blobClient)
{
    var properties = await blobClient.GetPropertiesAsync();
    switch (properties.Value.BlobCopyStatus)
    {
        ...
    }
}
英文:

I'm guessing you've looked into how to actually create Durable Functions, and that you know how OrchestrationTrigger and ActivityTrigger are built.

The jest of it is to use await blobClient.StartCopyFromUriAsync(uri) in your ActivityTrigger.

The blobClient instance where you invoke this method on, should be the destination storage account.

The uri variable must be the source blob, including a SAS token (that is, the URI will be something like https://uri.to.source.storage/blob-location?sasToken=value). Remember to give read access when you build this SAS token.

In your ActivityTrigger you'd start the copy operation, and subsequently query the copy operation's current status using blobClient.GetPropertiesAsync, and accessing the properties.Values.BlobCopyStatus.

A simple example of how it would look like in your ActivityTrigger:

// start copy operation
_ = await destinationBlobClient.StartCopyFromUriAsync(sourceBlobUriWithSasToken);

// keep querying the status until we get something we 
while (true)
{
    // to avoid spamming this over and over, only query the 
    // properties once per 5 seocnds
    await Task.Delay(TimeSpan.FromSeconds(5));

    // get the copy status and handle accordingly
    var properties = await destinationBlobClient.GetPropertiesAsync();
    switch (properties.Value.BlobCopyStatus)
    {
        case Azure.Storage.Blobs.Models.CopyStatus.Pending:
            // not done, continue the loop
            continue;

        case Azure.Storage.Blobs.Models.CopyStatus.Success:
            // handle completed copy
            break;

        case Azure.Storage.Blobs.Models.CopyStatus.Failed:
            // handle failed copy
            break;

        case Azure.Storage.Blobs.Models.CopyStatus.Aborted:
            // handle aborted copy
            break;

        // in case new values are added to the BlobCopyStatus
        default:
            throw new UnknownBlobCopyStatusException(properties.Value.BlobCopyStatus);
    }
}

I'm not sure if it's better, but an alternative is to have a regular Function that starts the blobClient.StartCopyFromUriAsync and immediately returns. Then have a BlobTrigger, that is generic enough to listen to any changes for your destination account storage (if the destination storage account is variable, then I'm not sure this approach is possible, due to the limited BlobTrigger-pattern matching). I don't remember if BlobCopyStatus triggers a BlobTrigger, which of course is also necessary for this alternative to be viable. You could test it yourself. In any case, if the previous mentioned things are non-issues, then you could do something like this as well:

[FunctionName(&quot;BlobCopyStatusListener&quot;)]
public static async Task Bleh(
    [BlobTrigger(&quot;destination-pattern&quot;)] BlobClient blobClient)
{
    var properties = await blobClient.GetPropertiesAsync();
    switch (properties.Value.BlobCopyStatus)
    {
        ...
    }
}

huangapple
  • 本文由 发表于 2023年7月24日 17:15:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/76752993.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定