英文:
EasyNetQ - prefetchcount parameter ignored
问题
我遇到了以下问题 - 我需要将我的操作排队,以避免它们并行运行。 因此,我添加了RabbitMQ队列,该队列由接收来自多个客户端的请求的API方法提供。 问题是,尽管我在连接字符串和订阅上都将Prefetchcount设置为1,但它仍然并发处理这些任务。
这是我的连接和订阅以及回调测试过程
ConnectionConfiguration conConfig = new ConnectionConfiguration()
{
UserName = own_s.RABBIT_USER,
Password = own_s.RABBIT_PASSWORD,
VirtualHost = own_s.RABBIT_VHOST,
PrefetchCount = 1
};
await _bus.PubSub.SubscribeAsync<applypkg>(
"topic",
(req) => ApplyPackageService(req),
(cfg) => cfg.WithPrefetchCount(1));
async void ApplyPackageService(applypkg msg)
{
var g = Guid.NewGuid();
Log.Information($"Callback started: {g}");
await Task.Delay(20000);
Log.Information($"Callback ended: {g}");
}
不幸的是,它不能按预期工作:
===> 08:37:41 [Information] Callback started: 6f27a664-977b-47ca-9eac-81a124066403
===> 08:37:46 [Information] Callback started: cbb15964-f69d-47c6-9225-3c4fc39b326e
===> 08:37:58 [Information] Callback started: 7c086690-7b7f-449c-b74e-615dda001a27
===> 08:38:01 [Information] Callback ended: 6f27a664-977b-47ca-9eac-81a124066403
===> 08:38:05 [Information] Callback started: 4466ce86-6d16-4707-a54e-b166555a92d8
===> 08:38:06 [Information] Callback ended: cbb15964-f69d-47c6-9225-3c4fc39b326e
===> 08:38:18 [Information] Callback ended: 7c086690-7b7f-449c-b74e-615dda001a27
===> 08:38:25 [Information] Callback ended: 4466ce86-6d16-4707-a54e-b166555a92d8
Rabbit Management应用程序显示队列中Prefetchcount的正确值,因此它必须是客户端中的某些内容。
有人遇到过类似的行为吗?我在论坛的某处读到,这取决于EasyNetQ,但我无法确定哪个是正常的。
英文:
I'm having a following issue - I need to have my operations queued to avoid them running in parallel. Therefore, I added RabbitMQ queue which is fed by API method taking requests from multiple clients. The issue is, that despite I set Prefetchcount to 1 both on connection string and subscription, it still processes those tasks concurrently.
Here is my connection and subscription along with callback test procedure
ConnectionConfiguration conConfig = new ConnectionConfiguration()
{
UserName = own_s.RABBIT_USER,
Password = own_s.RABBIT_PASSWORD,
VirtualHost = own_s.RABBIT_VHOST,
PrefetchCount = 1
};
await _bus.PubSub.SubscribeAsync<applypkg>(
"topic",
(req) => ApplyPackageService(req),
(cfg) => cfg.WithPrefetchCount(1));
async void ApplyPackageService(applypkg msg)
{
var g = Guid.NewGuid();
Log.Information($"Callback started: {g}");
await Task.Delay(20000);
Log.Information($"Callback ended: {g}");
}
Unfortunately, it does not work as expected:
===> 08:37:41 [Information] Callback started: 6f27a664-977b-47ca-9eac-81a124066403
===> 08:37:46 [Information] Callback started: cbb15964-f69d-47c6-9225-3c4fc39b326e
===> 08:37:58 [Information] Callback started: 7c086690-7b7f-449c-b74e-615dda001a27
===> 08:38:01 [Information] Callback ended: 6f27a664-977b-47ca-9eac-81a124066403
===> 08:38:05 [Information] Callback started: 4466ce86-6d16-4707-a54e-b166555a92d8
===> 08:38:06 [Information] Callback ended: cbb15964-f69d-47c6-9225-3c4fc39b326e
===> 08:38:18 [Information] Callback ended: 7c086690-7b7f-449c-b74e-615dda001a27
===> 08:38:25 [Information] Callback ended: 4466ce86-6d16-4707-a54e-b166555a92d8
Rabbit Management app shows proper value of prefetchcount in the queue, so it has to be something in the client.
Anybody has encountered similar behaviour? I've read somewhere on the forum, that it is dependent on the EasyNetQ, but I could not determine which one is OK.
答案1
得分: 0
Prefetch count and dispatcher concurrency are 2 different things.
https://github.com/EasyNetQ/EasyNetQ/issues/1467
Can you to set the ConsumerDispatcherConcurrency on the connection?
英文:
Prefetch count and dispatcher concurrency are 2 different things.
https://github.com/EasyNetQ/EasyNetQ/issues/1467
Can you to set the ConsumerDispatcherConcurrency on the connection?
答案2
得分: 0
SubscribeAsync
接受一个异步方法作为消息处理程序。您想要在异步的 ApplyPackageService
函数中处理消息,但通过使用匿名方法,这就是您实际的消息处理程序。由于您在此方法中调用异步的 ApplyPackageService
方法而不等待结果(await),EasyNetQ 的消息处理立即完成,然后处理下一条消息。
要依次处理消息,可以在您的 ApplyPackageService
函数中添加一个 CancellationToken
参数,将返回类型更改为 Task 并直接将该方法用作 MessageHandler:
await _bus.PubSub.SubscribeAsync<applypkg>(
"topic",
ApplyPackageService,
(cfg) => cfg.WithPrefetchCount(1));
async Task ApplyPackageService(applypkg msg, CancellationToken ct)
{
var g = Guid.NewGuid();
Log.Information($"Callback started: {g}");
await Task.Delay(20000);
Log.Information($"Callback ended: {g}");
}
英文:
SubscribeAsync
takes an async method as message handler. You want to handle the messages in the asynchronous ApplyPackageService
function, but by using the anonymous method, this is your actual message handler. And since you call the asynchronous ApplyPackageService
method in this method without waiting for the result (await), the message handling for EasyNetQ is immediately completed and the next message is handled.
To handle the messages one after the other, you can add a CancellationToken
parameter to your ApplyPackageService
function, change the return type to Task and use the method directly as a MessageHandler:
await _bus.PubSub.SubscribeAsync<applypkg>(
"topic",
ApplyPackageService,
(cfg) => cfg.WithPrefetchCount(1));
async Task ApplyPackageService(applypkg msg, CancellationToken ct)
{
var g = Guid.NewGuid();
Log.Information($"Callback started: {g}");
await Task.Delay(20000);
Log.Information($"Callback ended: {g}");
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论