英文:
Understanding async functions calls in Rust with RabbitMQ
问题
I'm new in Rust and I am struggling a little bit with async functions. The goal of my program is to send messages to a RabbitMQ queue.
I have defined two functions, one to get the channel and another one to actually send the message:
// 获取通道
async fn get_amqp_channel() -> Channel {
let connection_arguments = OpenConnectionArguments::new(RABBIT_SERVER_URL, PORT, USER, PASSWORD);
let connection = Connection::open(&connection_arguments).await.unwrap();
return connection.open_channel(None).await.unwrap();
}
// 发送消息
async fn send_amqp_message(channel: &Channel, routing_key: &str, message: String) {
let publish_arguments = BasicPublishArguments::new(EXCHANGE, routing_key);
channel.basic_publish(BasicProperties::default(), message.into_bytes(), publish_arguments).await.unwrap();
}
If I call them from a async
function like this, the message is never sent:
fn send_command() {
// 构建消息
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(send_message(message_type, serde_json::to_string(&message).unwrap()));
}
async fn send_message(message_type: String, message: String) {
let channel = get_amqp_channel().await;
send_amqp_message(&channel, get_routing_key(message_type).as_str(), message).await;
}
But, if I combine both functions into a single one, then everything works fine:
async fn send_message(message_type: String, message: String) {
// 获取通道
let connection_arguments = OpenConnectionArguments::new(RABBIT_SERVER_URL, PORT, USER, PASSWORD);
let connection = Connection::open(&connection_arguments).await.unwrap();
let channel = connection.open_channel(None).await.unwrap();
// 发送消息
let publish_arguments = BasicPublishArguments::new(EXCHANGE, get_routing_key(message_type).as_str());
channel.basic_publish(BasicProperties::default(), message.into_bytes(), publish_arguments).await.unwrap();
}
As far as I understand, it shouldn't be any difference, since the block_on
call forces the whole program to wait for the future to end. Where is my mistake?
Any help would be welcome.
Thank you very much.
英文:
I'm new in Rust and I am struggling a little bit with async functions. The goal of my program is to send messages to a RabbitMQ queue.
I have defined two functions, one to get the channel and another one to actually send the message:
//get the channel
async fn get_amqp_channel() -> Channel {
let connection_arguments = OpenConnectionArguments::new(RABBIT_SERVER_URL, PORT, USER, PASSWORD);
let connection = Connection::open(&connection_arguments).await.unwrap();
return connection.open_channel(None).await.unwrap();
}
//send the message
async fn send_amqp_message(channel: &Channel, routing_key: &str, message: String) {
let publish_arguments = BasicPublishArguments::new(EXCHANGE, routing_key);
channel.basic_publish(BasicProperties::default(), message.into_bytes(), publish_arguments).await.unwrap();
}
If I call them from a async
function like this, the message is never sent:
fn send_command() {
//build message
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(send_message(message_type, serde_json::to_string(&message).unwrap()));
}
async fn send_message(message_type : String, message : String) {
let channel = get_amqp_channel().await;
send_amqp_message(&channel, get_routing_key(message_type).as_str(), message).await;
}
But, if I combine both functions into a single one, then everything works fine:
async fn send_message(message_type : String, message : String) {
//get_amqp_channel
let connection_arguments = OpenConnectionArguments::new(RABBIT_SERVER_URL, PORT, USER, PASSWORD);
let connection = Connection::open(&connection_arguments).await.unwrap();
let channel = connection.open_channel(None).await.unwrap();
//send_amqp_message
let publish_arguments = BasicPublishArguments::new(EXCHANGE, get_routing_key(message_type).as_str());
channel.basic_publish(BasicProperties::default(), message.into_bytes(), publish_arguments).await.unwrap();
}
As far as I understand, it shouldn't be any difference, since the block_on
call forces the whole program to wait for the future to end. Where is my mistake?
Any help would be welcome.
Thank you very much.
答案1
得分: 0
最后,我找到了问题所在:在get_amqp_channel
函数中,返回了一个Channel
结构体,其中包含一个Connection
元素。当控制返回到主函数时,连接不再处于活动状态,因此在发送消息的时候,与该通道关联的连接已经不存在(channel.is_connection_open() == false
)。
在我看来,只要连接变量超出作用域,连接就会自动关闭。
英文:
Finally, I have found the problem: in the get_amqp_channel
function, a Channel
struct is returned, which has a Connection
element inside. When the control returns to the main function, the connection is not longer active, so when the time for sending the message comes, there's no connection linked to this channel (channel.is_connection_open() == false
).
It seems to me that connection is automatically closed whenever the connection variable is out of scope.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论