英文:
How to handle a database connection from multiple Service Bus queue message triggers within Azure Functions?
问题
I am using Azure Functions with a Service Bus queue message trigger to receive a queue message and process it to store in Azure SQL using Entity Framework. This does process some queue messages properly but for some queue messages, it throws the following exception:
Result: An exception occurred while iterating over the results of a query for context type 'webhooksecretply.DatabaseModels.igdbapi.IGDBAPIAppContext'. System.InvalidOperationException: There is already an open DataReader associated with this Connection which must be closed first.
For reference, the database connection is opened through the following line:
Program.cs
s.AddDbContext<IGDBAPIAppContext>(options => options.UseSqlServer("connectionstring"));
I have tried to look at the stack trace, but I cannot exactly pinpoint the specific code that throws this exception. From what I am able to find, it always throws this DbCommand
error:
Failed executing DbCommand (0ms) [Parameters=[p0='?' (Size = 4000), @__queueMessageProcessingRecord_Endpoint_1='?' (Size = 50), @__queueMessageProcessingRecord_Method_2='?' (Size = 50)], CommandType='Text', CommandTimeout='300']
SELECT [w].[id], [w].[endpoint], [w].[igdb_id], [w].[method], [w].[timestamp_added]
FROM (
SELECT * FROM queuemessageprocessing WHERE timestamp_added >= @p0
) AS [w]
WHERE @__queueMessageProcessingRecord_Endpoint_1 = [w].[endpoint] AND @__queueMessageProcessingRecord_Method_2 = [w].[method]
This is the only line that uses the queuemessageprocessing
table and is the first use of context _IGDBAPIAppContext
so there should not be an open DataReader for _IGDBAPIAppContext
prior to this line:
ProcessQueueMessages.cs
List<DatabaseModels.igdbapi.QueueMessageProcessing> queueMessageProcessingResults = _IGDBAPIAppContext.QueueMessageProcessing.FromSqlRaw("SELECT * FROM queuemessageprocessing WHERE timestamp_added >= {0}", DateTime.UtcNow.AddMinutes(-10).ToString("yyyy-MM-dd HH:mm:ss")).Where(result => Equals(queueMessageProcessingRecord.Endpoint, result.Endpoint) && Equals(queueMessageProcessingRecord.Method, result.Method)).ToList();
From what I understand about the System.InvalidOperationException
exception, Entity Framework would return an IQueryable
data type and thus keeping a DataReader opened but should not be the case because of the query being enumerated with .ToList()
. My best guess of why this exception occurs is because other queue messages are triggering the Service Bus queue message trigger to the same database connection when a queue message is executing additional queries within ProcessQueueMessages.cs
. I could be completely wrong about my hypothesis so any insights on anything else that could potentially cause the exception to be thrown that I can look at within the Azure portal or Application Insights would be greatly appreciated.
Update 1
I have tried adjusting the amount of queue messages being consumed in host.json
but the exception is still thrown.
{
"extensions": {
"queues": {
"batchSize": 1,
"newBatchThreshold": 0
}
}
}
I also tried adding WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT = 1
in the app service application setting.
These changes come from the following StackOverflow post: StackOverflow Post
Update 2
I have also tried adding MultipleActiveResultSets=true
to the connection string but receive this new exception:
Result: An exception occurred while iterating over the results of a query for context type 'webhooksecretply.DatabaseModels.igdbapi.IGDBAPIAppContext'. System.InvalidOperationException: ExecuteReader requires an open and available Connection. The connection's current state is open.
英文:
I am using Azure Functions with a Service Bus queue message trigger to receive a queue message and process it to store in Azure SQL using Entity Framework. This does process some queue messages properly but for some queue messages, it throws the following exception:
Result: An exception occurred while iterating over the results of a query for context type 'webhooksecretply.DatabaseModels.igdbapi.IGDBAPIAppContext'.
System.InvalidOperationException: There is already an open DataReader associated with this Connection which must be closed first.
For reference, the database connection is opened through the following line:
Program.cs
s.AddDbContext<IGDBAPIAppContext>(options => options.UseSqlServer("connectionstring"));
I have tried to look at the stack trace, but I cannot exactly pinpoint the specific code that throws this exception. From what I am able to find, it always throws this DbCommand
error:
Failed executing DbCommand (0ms) [Parameters=[p0='?' (Size = 4000), @__queueMessageProcessingRecord_Endpoint_1='?' (Size = 50), @__queueMessageProcessingRecord_Method_2='?' (Size = 50)], CommandType='Text', CommandTimeout='300']
SELECT [w].[id], [w].[endpoint], [w].[igdb_id], [w].[method], [w].[timestamp_added]
FROM (
SELECT * FROM queuemessageprocessing WHERE timestamp_added >= @p0
) AS [w]
WHERE @__queueMessageProcessingRecord_Endpoint_1 = [w].[endpoint] AND @__queueMessageProcessingRecord_Method_2 = [w].[method]
This is the only line that uses the queuemessageprocessing
table and is the first use of context _IGDBAPIAppContext
so there should not be an open DataReader for _IGDBAPIAppContext
prior to this line:
ProcessQueueMessages.cs
List<DatabaseModels.igdbapi.QueueMessageProcessing> queueMessageProcessingResults = _IGDBAPIAppContext.QueueMessageProcessing.FromSqlRaw("SELECT * FROM queuemessageprocessing WHERE timestamp_added >= {0}", DateTime.UtcNow.AddMinutes(-10).ToString("yyyy-MM-dd HH:mm:ss")).Where(result => Equals(queueMessageProcessingRecord.Endpoint, result.Endpoint) && Equals(queueMessageProcessingRecord.Method, result.Method)).ToList();
From what I understand about the System.InvalidOperationException
exception, Entity Framework would return an IQueryable
data type and thus keeping a DataReader opened but should not be the case because of the query being enumerated with .ToList()
. My best guess of why this exception occurs is because other queue messages are triggering the Service Bus queue message trigger to the same database connection when a queue message is executing additional queries within ProcessQueueMessages.cs
. I could be completely wrong about my hypothesis so any insights on anything else that could potentially cause the exception to be thrown that I can look at within the Azure portal or Application Insights would be greatly appreciated.
Update 1
I have tried adjusting the amount of queue messages being consumed in host.json
but the exception is still thrown.
{
"extensions": {
"queues": {
"batchSize": 1,
"newBatchThreshold": 0
}
}
}
I also tried adding WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT = 1
in the app service application setting.
These changes come from the following StackOverflow post: https://stackoverflow.com/questions/53124612/how-to-limit-concurrent-azure-function-executions
Update 2
I have also tried adding MultipleActiveResultSets=true
to the connection string but receive this new exception:
Result: An exception occurred while iterating over the results of a query for context type 'webhooksecretply.DatabaseModels.igdbapi.IGDBAPIAppContext'.
System.InvalidOperationException: ExecuteReader requires an open and available Connection. The connection's current state is open.
答案1
得分: 1
我知道自从我发布这个问题以来已经过去了几个月,但我最终有时间来处理这个问题,并找到了解决方案。基本上,在Service Bus队列中批处理消息是我最终采用的解决方案。
这个解决方案源自GitHub文档页面,经过一些绑定修改:https://github.com/Azure/azure-sdk-for-net/blob/Microsoft.Azure.WebJobs.Extensions.ServiceBus_5.0.0-beta.2/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/README.md#batch-triggers
[FunctionName("TriggerBatch")]
public static void Run(
[ServiceBusTrigger("<queue_name>", Connection = "<connection_name>", IsBatched = true)] string[] messages,
ILogger logger)
{
foreach (string message in messages)
logger.LogInformation($"C# function triggered to process a message: {message}");
}
英文:
I know it has been a few months since I posted this question, but I finally got some time to work on this and have figured out a solution. Essentially, batching messages in the Service Bus queue was the solution I ended up with.
This solution is taken from this GitHub documentation page with some binding modifications: https://github.com/Azure/azure-sdk-for-net/blob/Microsoft.Azure.WebJobs.Extensions.ServiceBus_5.0.0-beta.2/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/README.md#batch-triggers
[FunctionName("TriggerBatch")]
public static void Run(
[ServiceBusTrigger("<queue_name>", Connection = "<connection_name>", IsBatched = true)] string[] messages,
ILogger logger)
{
foreach (string message in messages)
logger.LogInformation($"C# function triggered to process a message: {message}");
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论