如何从 Azure Functions 中的多个 Service Bus 队列消息触发器中处理数据库连接?

huangapple go评论63阅读模式
英文:

How to handle a database connection from multiple Service Bus queue message triggers within Azure Functions?

问题

I am using Azure Functions with a Service Bus queue message trigger to receive a queue message and process it to store in Azure SQL using Entity Framework. This does process some queue messages properly but for some queue messages, it throws the following exception:

Result: An exception occurred while iterating over the results of a query for context type 'webhooksecretply.DatabaseModels.igdbapi.IGDBAPIAppContext'. System.InvalidOperationException: There is already an open DataReader associated with this Connection which must be closed first.

For reference, the database connection is opened through the following line:

Program.cs

s.AddDbContext<IGDBAPIAppContext>(options => options.UseSqlServer("connectionstring"));

I have tried to look at the stack trace, but I cannot exactly pinpoint the specific code that throws this exception. From what I am able to find, it always throws this DbCommand error:

Failed executing DbCommand (0ms) [Parameters=[p0='?' (Size = 4000), @__queueMessageProcessingRecord_Endpoint_1='?' (Size = 50), @__queueMessageProcessingRecord_Method_2='?' (Size = 50)], CommandType='Text', CommandTimeout='300']
SELECT [w].[id], [w].[endpoint], [w].[igdb_id], [w].[method], [w].[timestamp_added]
FROM (
    SELECT * FROM queuemessageprocessing WHERE timestamp_added >= @p0
) AS [w]
WHERE @__queueMessageProcessingRecord_Endpoint_1 = [w].[endpoint] AND @__queueMessageProcessingRecord_Method_2 = [w].[method]

This is the only line that uses the queuemessageprocessing table and is the first use of context _IGDBAPIAppContext so there should not be an open DataReader for _IGDBAPIAppContext prior to this line:

ProcessQueueMessages.cs

List<DatabaseModels.igdbapi.QueueMessageProcessing> queueMessageProcessingResults = _IGDBAPIAppContext.QueueMessageProcessing.FromSqlRaw("SELECT * FROM queuemessageprocessing WHERE timestamp_added >= {0}", DateTime.UtcNow.AddMinutes(-10).ToString("yyyy-MM-dd HH:mm:ss")).Where(result => Equals(queueMessageProcessingRecord.Endpoint, result.Endpoint) && Equals(queueMessageProcessingRecord.Method, result.Method)).ToList();

From what I understand about the System.InvalidOperationException exception, Entity Framework would return an IQueryable data type and thus keeping a DataReader opened but should not be the case because of the query being enumerated with .ToList(). My best guess of why this exception occurs is because other queue messages are triggering the Service Bus queue message trigger to the same database connection when a queue message is executing additional queries within ProcessQueueMessages.cs. I could be completely wrong about my hypothesis so any insights on anything else that could potentially cause the exception to be thrown that I can look at within the Azure portal or Application Insights would be greatly appreciated.

Update 1

I have tried adjusting the amount of queue messages being consumed in host.json but the exception is still thrown.

{
    "extensions": {
      "queues": {
        "batchSize": 1,
        "newBatchThreshold": 0
      }
    }
}

I also tried adding WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT = 1 in the app service application setting.

These changes come from the following StackOverflow post: StackOverflow Post

Update 2

I have also tried adding MultipleActiveResultSets=true to the connection string but receive this new exception:

Result: An exception occurred while iterating over the results of a query for context type 'webhooksecretply.DatabaseModels.igdbapi.IGDBAPIAppContext'. System.InvalidOperationException: ExecuteReader requires an open and available Connection. The connection's current state is open.

英文:

I am using Azure Functions with a Service Bus queue message trigger to receive a queue message and process it to store in Azure SQL using Entity Framework. This does process some queue messages properly but for some queue messages, it throws the following exception:

Result: An exception occurred while iterating over the results of a query for context type &#39;webhooksecretply.DatabaseModels.igdbapi.IGDBAPIAppContext&#39;.
System.InvalidOperationException: There is already an open DataReader associated with this Connection which must be closed first.

For reference, the database connection is opened through the following line:

Program.cs

s.AddDbContext&lt;IGDBAPIAppContext&gt;(options =&gt; options.UseSqlServer(&quot;connectionstring&quot;));

I have tried to look at the stack trace, but I cannot exactly pinpoint the specific code that throws this exception. From what I am able to find, it always throws this DbCommand error:

Failed executing DbCommand (0ms) [Parameters=[p0=&#39;?&#39; (Size = 4000), @__queueMessageProcessingRecord_Endpoint_1=&#39;?&#39; (Size = 50), @__queueMessageProcessingRecord_Method_2=&#39;?&#39; (Size = 50)], CommandType=&#39;Text&#39;, CommandTimeout=&#39;300&#39;]
SELECT [w].[id], [w].[endpoint], [w].[igdb_id], [w].[method], [w].[timestamp_added]
FROM (
    SELECT * FROM queuemessageprocessing WHERE timestamp_added &gt;= @p0
) AS [w]
WHERE @__queueMessageProcessingRecord_Endpoint_1 = [w].[endpoint] AND @__queueMessageProcessingRecord_Method_2 = [w].[method]

This is the only line that uses the queuemessageprocessing table and is the first use of context _IGDBAPIAppContext so there should not be an open DataReader for _IGDBAPIAppContext prior to this line:

ProcessQueueMessages.cs

List&lt;DatabaseModels.igdbapi.QueueMessageProcessing&gt; queueMessageProcessingResults = _IGDBAPIAppContext.QueueMessageProcessing.FromSqlRaw(&quot;SELECT * FROM queuemessageprocessing WHERE timestamp_added &gt;= {0}&quot;, DateTime.UtcNow.AddMinutes(-10).ToString(&quot;yyyy-MM-dd HH:mm:ss&quot;)).Where(result =&gt; Equals(queueMessageProcessingRecord.Endpoint, result.Endpoint) &amp;&amp; Equals(queueMessageProcessingRecord.Method, result.Method)).ToList();

From what I understand about the System.InvalidOperationException exception, Entity Framework would return an IQueryable data type and thus keeping a DataReader opened but should not be the case because of the query being enumerated with .ToList(). My best guess of why this exception occurs is because other queue messages are triggering the Service Bus queue message trigger to the same database connection when a queue message is executing additional queries within ProcessQueueMessages.cs. I could be completely wrong about my hypothesis so any insights on anything else that could potentially cause the exception to be thrown that I can look at within the Azure portal or Application Insights would be greatly appreciated.

Update 1

I have tried adjusting the amount of queue messages being consumed in host.json but the exception is still thrown.

{
    &quot;extensions&quot;: {
      &quot;queues&quot;: {
        &quot;batchSize&quot;: 1,
        &quot;newBatchThreshold&quot;: 0
      }
    }
}

I also tried adding WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT = 1 in the app service application setting.

These changes come from the following StackOverflow post: https://stackoverflow.com/questions/53124612/how-to-limit-concurrent-azure-function-executions

Update 2

I have also tried adding MultipleActiveResultSets=true to the connection string but receive this new exception:

Result: An exception occurred while iterating over the results of a query for context type &#39;webhooksecretply.DatabaseModels.igdbapi.IGDBAPIAppContext&#39;.
System.InvalidOperationException: ExecuteReader requires an open and available Connection. The connection&#39;s current state is open.

答案1

得分: 1

我知道自从我发布这个问题以来已经过去了几个月,但我最终有时间来处理这个问题,并找到了解决方案。基本上,在Service Bus队列中批处理消息是我最终采用的解决方案。

这个解决方案源自GitHub文档页面,经过一些绑定修改:https://github.com/Azure/azure-sdk-for-net/blob/Microsoft.Azure.WebJobs.Extensions.ServiceBus_5.0.0-beta.2/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/README.md#batch-triggers

[FunctionName("TriggerBatch")]
public static void Run(
    [ServiceBusTrigger("<queue_name>", Connection = "<connection_name>", IsBatched = true)] string[] messages,
    ILogger logger)
{
    foreach (string message in messages)
        logger.LogInformation($"C# function triggered to process a message: {message}");
}
英文:

I know it has been a few months since I posted this question, but I finally got some time to work on this and have figured out a solution. Essentially, batching messages in the Service Bus queue was the solution I ended up with.

This solution is taken from this GitHub documentation page with some binding modifications: https://github.com/Azure/azure-sdk-for-net/blob/Microsoft.Azure.WebJobs.Extensions.ServiceBus_5.0.0-beta.2/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/README.md#batch-triggers

[FunctionName(&quot;TriggerBatch&quot;)]
public static void Run(
    [ServiceBusTrigger(&quot;&lt;queue_name&gt;&quot;, Connection = &quot;&lt;connection_name&gt;&quot;, IsBatched = true)] string[] messages,
    ILogger logger)
{
    foreach (string message in messages)
        logger.LogInformation($&quot;C# function triggered to process a message: {message}&quot;);
}

huangapple
  • 本文由 发表于 2023年2月8日 12:43:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/75381454.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定