amqp-cpp库:channel->cancel(consumerTag)似乎无法取消。

huangapple go评论105阅读模式
英文:

amqp-cpp library: channel->cancel(consumerTag) doesn't seem to cancel

问题

我正在使用AMQP-CPP库(而不是amqp C库)连接到RabbitMQ。消费队列和发布消息都可以正常工作。我没有使用交换机或其他任何东西,只是在一个队列上使用一个消费者。当尝试取消队列时,在执行DeferredCancel::onSuccess回调之后,我仍然收到消息。此外,callback(std::string consumer)是空的,这应该再次是consumerTag吗?

这是我观察到的情况:

// 向"queueName"发布多条消息

m_channel->consume("queueName", "hardcodedConsumerTag").onReceived([](AMQP::Message m){std::cout << m.body()<< std::endl;});

m_channel->cancel("hardcodedConsumerTag").onSuccess([](std::string consumer){std::cout << "should have stopped consuming for: " << consumer << std::endl});

输出:

message 1
should have stopped consuming for: (这里只是一个空字符串)
message 2
message 3
... 直到所有消息都被传递完毕

我本来希望在打印输出"should have stopped consuming"之后消息停止传递。

英文:

I'm using the AMQP-CPP library (not the amqp C library) to connect to RabbitMQ. Consuming a queue works, as does publishing. I don't use exchanges or anything, just one consumer on one queue. When attempting to cancel a queue, I'm still receiving messages after the DeferredCancel::onSuccess callback is executed. Also, the callback(std::string consumer) is empty, should this be the consumerTag yet again?

Here's what I'm observing:

// publish many messages to &quot;queueName&quot;

m_channel-&gt;consume(&quot;queueName&quot;, &quot;hardcodedConsumerTag&quot;).onReceived([](AMQP::Message m){std::cout &lt;&lt; m.body()&lt;&lt; std::endl;});

m_channel-&gt;cancel(&quot;hardcodedConsumerTag&quot;).onSuccess([](std::string consumer){std::cout &lt;&lt; &quot;should have stopped consuming for: &quot; &lt;&lt; consumer &lt;&lt; std::endl});

output:

message 1
should have stopped consuming for: (here is just an empty string)
message 2
message 3
... until all messages have been delivered

I would have expected the messages to stop after the output "should have stopped consuming" is printed.

答案1

得分: 0

原来,在发送cancel()请求时,consume请求尚未被处理。RabbitMQ/AMQP-CPP库会响应"success",即使没有取消任何消费者,因为在RabbitMQ端还没有消费者存在。然后,consume()请求被处理,这就是我看到上述行为的原因。

我通过将所有内容包装在回调函数中来修复了这个问题。我维护了自己的DeferredQueue和DeferredConsumer列表,并存储了onSuccess回调是否已经执行(因为在AMQP-CPP中似乎没有"pending"的等价物)。

如果onSuccess回调尚未执行,我会覆盖onSuccess回调;如果已经执行,我可以正常取消。

// publish many messages to "queueName"

bool onSuccessExecuted = false;

auto& deferredConsumer = m_channel->consume("queueName", "hardcodedConsumerTag");

deferredConsumer.onReceived([](AMQP::Message m){
      std::cout << m.body()<< std::endl;
    });

deferredConsumer.onSuccess([&](){
      onSuccessExecuted=true;
      // do stuff you want to do when starting consuming a queue
    });

if (onSuccessExecuted == false){
  // this overwrites the previous onSuccess callback
  deferredConsumer.onSuccess([this](){
      cancel();
      // must still be set if we might want to cancel again later
      onSuccessExecuted=true;
    });
} else {
  // if onSuccess has already been executed we just cancel normally,
  // as the onSuccess callback won't be executed again
  cancel();
}

void cancel() {
  m_channel->cancel("hardcodedConsumerTag").onSuccess([](std::string consumer){
        std::cout << "should have stopped consuming for: " << consumer << std::endl;
      });
}
英文:

Turns out the consume request has not yet been processed when the cancel() request is sent. RabbitMQ/the AMQP-CPP library responds "success" even though no consumer was canceled, since no consumer existed on the RabbitMQ side yet. THEN the consume() is processed, which is why I was seeing the aforementioned behavior.

I fixed it by wrapping everything in callbacks. I'm maintaining my own list of DeferredQueue and DeferredConsumer, and store if the onSuccess callback has already been executed (since there doesn't seem to be a "pending" equivalent in AMQP-CPP).

If the onSuccess callback has NOT yet been executed, I override the onSuccess callback, if it has already been executed I can just cancel normally.

// publish many messages to &quot;queueName&quot;

bool onSuccessExecuted = false;

auto&amp; deferredConsumer = m_channel-&gt;consume(&quot;queueName&quot;, &quot;hardcodedConsumerTag&quot;);

deferredConsumer.onReceived([](AMQP::Message m){
      std::cout &lt;&lt; m.body()&lt;&lt; std::endl;
    });

deferredConsumer.onSuccess([&amp;](){
      onSuccessExecuted=true;
      // do stuff you want to do when starting consuming a queue
    });

if (onSuccessExecuted == false){
  // this overwrites the previous onSuccess callback
  deferredConsumer.onSuccess([this](){
      cancel();
      // must still be set if we might want to cancel again later
      onSuccessExecuted=true;
    }
} else {
  // if onSuccess has already been executed we just cancel normally,
  // as the onSuccess callback won&#39;t be executed again
  cancel();
}

void cancel() {
  m_channel-&gt;cancel(&quot;hardcodedConsumerTag&quot;).onSuccess([](std::string consumer){
        std::cout &lt;&lt; &quot;should have stopped consuming for: &quot; &lt;&lt; consumer &lt;&lt; std::endl
      });
}

huangapple
  • 本文由 发表于 2023年8月8日 23:43:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/76861171.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定