英文:
amqp-cpp library: channel->cancel(consumerTag) doesn't seem to cancel
问题
我正在使用AMQP-CPP库(而不是amqp C库)连接到RabbitMQ。消费队列和发布消息都可以正常工作。我没有使用交换机或其他任何东西,只是在一个队列上使用一个消费者。当尝试取消队列时,在执行DeferredCancel::onSuccess回调之后,我仍然收到消息。此外,callback(std::string consumer)是空的,这应该再次是consumerTag吗?
这是我观察到的情况:
// 向"queueName"发布多条消息
m_channel->consume("queueName", "hardcodedConsumerTag").onReceived([](AMQP::Message m){std::cout << m.body()<< std::endl;});
m_channel->cancel("hardcodedConsumerTag").onSuccess([](std::string consumer){std::cout << "should have stopped consuming for: " << consumer << std::endl});
输出:
message 1
should have stopped consuming for: (这里只是一个空字符串)
message 2
message 3
... 直到所有消息都被传递完毕
我本来希望在打印输出"should have stopped consuming"之后消息停止传递。
英文:
I'm using the AMQP-CPP library (not the amqp C library) to connect to RabbitMQ. Consuming a queue works, as does publishing. I don't use exchanges or anything, just one consumer on one queue. When attempting to cancel a queue, I'm still receiving messages after the DeferredCancel::onSuccess callback is executed. Also, the callback(std::string consumer) is empty, should this be the consumerTag yet again?
Here's what I'm observing:
// publish many messages to "queueName"
m_channel->consume("queueName", "hardcodedConsumerTag").onReceived([](AMQP::Message m){std::cout << m.body()<< std::endl;});
m_channel->cancel("hardcodedConsumerTag").onSuccess([](std::string consumer){std::cout << "should have stopped consuming for: " << consumer << std::endl});
output:
message 1
should have stopped consuming for: (here is just an empty string)
message 2
message 3
... until all messages have been delivered
I would have expected the messages to stop after the output "should have stopped consuming" is printed.
答案1
得分: 0
原来,在发送cancel()请求时,consume请求尚未被处理。RabbitMQ/AMQP-CPP库会响应"success",即使没有取消任何消费者,因为在RabbitMQ端还没有消费者存在。然后,consume()请求被处理,这就是我看到上述行为的原因。
我通过将所有内容包装在回调函数中来修复了这个问题。我维护了自己的DeferredQueue和DeferredConsumer列表,并存储了onSuccess回调是否已经执行(因为在AMQP-CPP中似乎没有"pending"的等价物)。
如果onSuccess回调尚未执行,我会覆盖onSuccess回调;如果已经执行,我可以正常取消。
// publish many messages to "queueName"
bool onSuccessExecuted = false;
auto& deferredConsumer = m_channel->consume("queueName", "hardcodedConsumerTag");
deferredConsumer.onReceived([](AMQP::Message m){
std::cout << m.body()<< std::endl;
});
deferredConsumer.onSuccess([&](){
onSuccessExecuted=true;
// do stuff you want to do when starting consuming a queue
});
if (onSuccessExecuted == false){
// this overwrites the previous onSuccess callback
deferredConsumer.onSuccess([this](){
cancel();
// must still be set if we might want to cancel again later
onSuccessExecuted=true;
});
} else {
// if onSuccess has already been executed we just cancel normally,
// as the onSuccess callback won't be executed again
cancel();
}
void cancel() {
m_channel->cancel("hardcodedConsumerTag").onSuccess([](std::string consumer){
std::cout << "should have stopped consuming for: " << consumer << std::endl;
});
}
英文:
Turns out the consume request has not yet been processed when the cancel() request is sent. RabbitMQ/the AMQP-CPP library responds "success" even though no consumer was canceled, since no consumer existed on the RabbitMQ side yet. THEN the consume() is processed, which is why I was seeing the aforementioned behavior.
I fixed it by wrapping everything in callbacks. I'm maintaining my own list of DeferredQueue and DeferredConsumer, and store if the onSuccess callback has already been executed (since there doesn't seem to be a "pending" equivalent in AMQP-CPP).
If the onSuccess callback has NOT yet been executed, I override the onSuccess callback, if it has already been executed I can just cancel normally.
// publish many messages to "queueName"
bool onSuccessExecuted = false;
auto& deferredConsumer = m_channel->consume("queueName", "hardcodedConsumerTag");
deferredConsumer.onReceived([](AMQP::Message m){
std::cout << m.body()<< std::endl;
});
deferredConsumer.onSuccess([&](){
onSuccessExecuted=true;
// do stuff you want to do when starting consuming a queue
});
if (onSuccessExecuted == false){
// this overwrites the previous onSuccess callback
deferredConsumer.onSuccess([this](){
cancel();
// must still be set if we might want to cancel again later
onSuccessExecuted=true;
}
} else {
// if onSuccess has already been executed we just cancel normally,
// as the onSuccess callback won't be executed again
cancel();
}
void cancel() {
m_channel->cancel("hardcodedConsumerTag").onSuccess([](std::string consumer){
std::cout << "should have stopped consuming for: " << consumer << std::endl
});
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论