EasyNetQ – prefetchcount参数被忽略

huangapple go评论69阅读模式
英文:

EasyNetQ - prefetchcount parameter ignored

问题

我遇到了以下问题 - 我需要将我的操作排队,以避免它们并行运行。 因此,我添加了RabbitMQ队列,该队列由接收来自多个客户端的请求的API方法提供。 问题是,尽管我在连接字符串和订阅上都将Prefetchcount设置为1,但它仍然并发处理这些任务。

这是我的连接和订阅以及回调测试过程

ConnectionConfiguration conConfig = new ConnectionConfiguration()
{
UserName = own_s.RABBIT_USER,
Password = own_s.RABBIT_PASSWORD,
VirtualHost = own_s.RABBIT_VHOST,
PrefetchCount = 1
};

await _bus.PubSub.SubscribeAsync<applypkg>(
"topic",
(req) => ApplyPackageService(req),
(cfg) => cfg.WithPrefetchCount(1));

async void ApplyPackageService(applypkg msg)
{
var g = Guid.NewGuid();
Log.Information($"Callback started: {g}");
await Task.Delay(20000);
Log.Information($"Callback ended: {g}");
}

不幸的是,它不能按预期工作:

===&gt; 08:37:41 [Information] Callback started: 6f27a664-977b-47ca-9eac-81a124066403
===&gt; 08:37:46 [Information] Callback started: cbb15964-f69d-47c6-9225-3c4fc39b326e
===&gt; 08:37:58 [Information] Callback started: 7c086690-7b7f-449c-b74e-615dda001a27
===&gt; 08:38:01 [Information] Callback ended: 6f27a664-977b-47ca-9eac-81a124066403
===&gt; 08:38:05 [Information] Callback started: 4466ce86-6d16-4707-a54e-b166555a92d8
===&gt; 08:38:06 [Information] Callback ended: cbb15964-f69d-47c6-9225-3c4fc39b326e
===&gt; 08:38:18 [Information] Callback ended: 7c086690-7b7f-449c-b74e-615dda001a27
===&gt; 08:38:25 [Information] Callback ended: 4466ce86-6d16-4707-a54e-b166555a92d8

Rabbit Management应用程序显示队列中Prefetchcount的正确值,因此它必须是客户端中的某些内容。

有人遇到过类似的行为吗?我在论坛的某处读到,这取决于EasyNetQ,但我无法确定哪个是正常的。

英文:

I'm having a following issue - I need to have my operations queued to avoid them running in parallel. Therefore, I added RabbitMQ queue which is fed by API method taking requests from multiple clients. The issue is, that despite I set Prefetchcount to 1 both on connection string and subscription, it still processes those tasks concurrently.

Here is my connection and subscription along with callback test procedure

ConnectionConfiguration conConfig = new ConnectionConfiguration()
{
    UserName = own_s.RABBIT_USER,
    Password = own_s.RABBIT_PASSWORD,
    VirtualHost = own_s.RABBIT_VHOST,
    PrefetchCount = 1
};

await _bus.PubSub.SubscribeAsync&lt;applypkg&gt;(
     &quot;topic&quot;,
     (req) =&gt; ApplyPackageService(req),
     (cfg) =&gt; cfg.WithPrefetchCount(1));

async void ApplyPackageService(applypkg msg)
{
    var g = Guid.NewGuid();
    Log.Information($&quot;Callback started: {g}&quot;);
    await Task.Delay(20000);
    Log.Information($&quot;Callback ended: {g}&quot;);
}

Unfortunately, it does not work as expected:

===&gt; 08:37:41 [Information] Callback started: 6f27a664-977b-47ca-9eac-81a124066403
===&gt; 08:37:46 [Information] Callback started: cbb15964-f69d-47c6-9225-3c4fc39b326e
===&gt; 08:37:58 [Information] Callback started: 7c086690-7b7f-449c-b74e-615dda001a27
===&gt; 08:38:01 [Information] Callback ended: 6f27a664-977b-47ca-9eac-81a124066403
===&gt; 08:38:05 [Information] Callback started: 4466ce86-6d16-4707-a54e-b166555a92d8
===&gt; 08:38:06 [Information] Callback ended: cbb15964-f69d-47c6-9225-3c4fc39b326e
===&gt; 08:38:18 [Information] Callback ended: 7c086690-7b7f-449c-b74e-615dda001a27
===&gt; 08:38:25 [Information] Callback ended: 4466ce86-6d16-4707-a54e-b166555a92d8

Rabbit Management app shows proper value of prefetchcount in the queue, so it has to be something in the client.

Anybody has encountered similar behaviour? I've read somewhere on the forum, that it is dependent on the EasyNetQ, but I could not determine which one is OK.

答案1

得分: 0

Prefetch count and dispatcher concurrency are 2 different things.

https://github.com/EasyNetQ/EasyNetQ/issues/1467

Can you to set the ConsumerDispatcherConcurrency on the connection?

英文:

Prefetch count and dispatcher concurrency are 2 different things.

https://github.com/EasyNetQ/EasyNetQ/issues/1467

Can you to set the ConsumerDispatcherConcurrency on the connection?

答案2

得分: 0

SubscribeAsync 接受一个异步方法作为消息处理程序。您想要在异步的 ApplyPackageService 函数中处理消息,但通过使用匿名方法,这就是您实际的消息处理程序。由于您在此方法中调用异步的 ApplyPackageService 方法而不等待结果(await),EasyNetQ 的消息处理立即完成,然后处理下一条消息。

要依次处理消息,可以在您的 ApplyPackageService 函数中添加一个 CancellationToken 参数,将返回类型更改为 Task 并直接将该方法用作 MessageHandler:

    await _bus.PubSub.SubscribeAsync<applypkg>(
         "topic",
         ApplyPackageService,
         (cfg) => cfg.WithPrefetchCount(1));

    async Task ApplyPackageService(applypkg msg, CancellationToken ct)
    {
        var g = Guid.NewGuid();
        Log.Information($"Callback started: {g}");
        await Task.Delay(20000);
        Log.Information($"Callback ended: {g}");
    }
英文:

SubscribeAsync takes an async method as message handler. You want to handle the messages in the asynchronous ApplyPackageService function, but by using the anonymous method, this is your actual message handler. And since you call the asynchronous ApplyPackageService method in this method without waiting for the result (await), the message handling for EasyNetQ is immediately completed and the next message is handled.

To handle the messages one after the other, you can add a CancellationToken parameter to your ApplyPackageService function, change the return type to Task and use the method directly as a MessageHandler:

await _bus.PubSub.SubscribeAsync&lt;applypkg&gt;(
     &quot;topic&quot;,
     ApplyPackageService,
     (cfg) =&gt; cfg.WithPrefetchCount(1));

async Task ApplyPackageService(applypkg msg, CancellationToken ct)
{
    var g = Guid.NewGuid();
    Log.Information($&quot;Callback started: {g}&quot;);
    await Task.Delay(20000);
    Log.Information($&quot;Callback ended: {g}&quot;);
}

huangapple
  • 本文由 发表于 2023年5月10日 14:56:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/76215669.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定