了解在Rust中使用RabbitMQ的异步函数调用

huangapple go评论65阅读模式
英文:

Understanding async functions calls in Rust with RabbitMQ

问题

I'm new in Rust and I am struggling a little bit with async functions. The goal of my program is to send messages to a RabbitMQ queue.

I have defined two functions, one to get the channel and another one to actually send the message:

// 获取通道
async fn get_amqp_channel() -> Channel {
    let connection_arguments = OpenConnectionArguments::new(RABBIT_SERVER_URL, PORT, USER, PASSWORD);
    let connection = Connection::open(&connection_arguments).await.unwrap();
    return connection.open_channel(None).await.unwrap();
}

// 发送消息
async fn send_amqp_message(channel: &Channel, routing_key: &str, message: String) {
    let publish_arguments = BasicPublishArguments::new(EXCHANGE, routing_key);
    channel.basic_publish(BasicProperties::default(), message.into_bytes(), publish_arguments).await.unwrap();
}

If I call them from a async function like this, the message is never sent:

fn send_command() {
    // 构建消息
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(send_message(message_type, serde_json::to_string(&message).unwrap()));
}

async fn send_message(message_type: String, message: String) {
    let channel = get_amqp_channel().await;
    send_amqp_message(&channel, get_routing_key(message_type).as_str(), message).await;
}

But, if I combine both functions into a single one, then everything works fine:

async fn send_message(message_type: String, message: String) {
    // 获取通道
    let connection_arguments = OpenConnectionArguments::new(RABBIT_SERVER_URL, PORT, USER, PASSWORD);
    let connection = Connection::open(&connection_arguments).await.unwrap();
    let channel = connection.open_channel(None).await.unwrap();
    
    // 发送消息
    let publish_arguments = BasicPublishArguments::new(EXCHANGE, get_routing_key(message_type).as_str());
    channel.basic_publish(BasicProperties::default(), message.into_bytes(), publish_arguments).await.unwrap();
}

As far as I understand, it shouldn't be any difference, since the block_on call forces the whole program to wait for the future to end. Where is my mistake?

Any help would be welcome.

Thank you very much.

英文:

I'm new in Rust and I am struggling a little bit with async functions. The goal of my program is to send messages to a RabbitMQ queue.

I have defined two functions, one to get the channel and another one to actually send the message:

//get the channel
async fn get_amqp_channel() -> Channel {
    let connection_arguments = OpenConnectionArguments::new(RABBIT_SERVER_URL, PORT, USER, PASSWORD);
    let connection = Connection::open(&connection_arguments).await.unwrap();
    return connection.open_channel(None).await.unwrap();
}

//send the message
async fn send_amqp_message(channel: &Channel, routing_key: &str, message: String) {
    let publish_arguments = BasicPublishArguments::new(EXCHANGE, routing_key);
    channel.basic_publish(BasicProperties::default(), message.into_bytes(), publish_arguments).await.unwrap();
}

If I call them from a async function like this, the message is never sent:

fn send_command() {
    //build message
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(send_message(message_type, serde_json::to_string(&message).unwrap()));
}

async fn send_message(message_type : String, message : String) {
    let channel = get_amqp_channel().await;
    send_amqp_message(&channel, get_routing_key(message_type).as_str(), message).await;
}

But, if I combine both functions into a single one, then everything works fine:

async fn send_message(message_type : String, message : String) {
    //get_amqp_channel
    let connection_arguments = OpenConnectionArguments::new(RABBIT_SERVER_URL, PORT, USER, PASSWORD);
    let connection = Connection::open(&connection_arguments).await.unwrap();
    let channel = connection.open_channel(None).await.unwrap();
    
    //send_amqp_message
    let publish_arguments = BasicPublishArguments::new(EXCHANGE, get_routing_key(message_type).as_str());
    channel.basic_publish(BasicProperties::default(), message.into_bytes(), publish_arguments).await.unwrap();
}

As far as I understand, it shouldn't be any difference, since the block_on call forces the whole program to wait for the future to end. Where is my mistake?

Any help would be welcome.

Thank you very much.

答案1

得分: 0

最后,我找到了问题所在:在get_amqp_channel函数中,返回了一个Channel结构体,其中包含一个Connection元素。当控制返回到主函数时,连接不再处于活动状态,因此在发送消息的时候,与该通道关联的连接已经不存在(channel.is_connection_open() == false)。

在我看来,只要连接变量超出作用域,连接就会自动关闭。

英文:

Finally, I have found the problem: in the get_amqp_channel function, a Channel struct is returned, which has a Connection element inside. When the control returns to the main function, the connection is not longer active, so when the time for sending the message comes, there's no connection linked to this channel (channel.is_connection_open() == false).

It seems to me that connection is automatically closed whenever the connection variable is out of scope.

huangapple
  • 本文由 发表于 2023年5月29日 19:00:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/76356764.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定