TCPStream随机等待并重试,如果服务器未监听。

huangapple go评论71阅读模式
英文:

TCPStream random wait and retry if server is not listening

问题

你的客户端代码包含了对TCP连接的尝试以及在连接不成功时进行重试的逻辑。你使用了一个循环来检查连接状态,然后在成功连接后执行数据写入操作。然而,有一些问题可能导致你遇到困难:

  1. 在每次循环中,你都尝试创建一个新的TCP连接,这可能导致资源泄漏或连接的重新创建,而不是维护一个已建立的连接。

  2. 你在循环内进行了写入操作,但在没有错误处理的情况下退出循环。这可能导致写入失败时程序继续运行。

  3. 在写入之后,你没有关闭连接,这可能导致连接资源泄漏。

你可以尝试以下修改:

use tokio::net::TcpStream;
use tokio::io::AsyncWriteExt;
use std::error::Error;
use socket2;
use std::{time};
use tokio::time::{sleep, Duration};

#[tokio::main]
pub async fn match_tcp_client(address: String, self_ip: String) -> Result<(), Box<dyn Error>> {
    // Connect to a peer
    println!("trying to connect from {} to address {}", self_ip, address);

    loop {
        match TcpStream::connect(address.clone()).await {
            Ok(mut stream) => {
                let sock_ref = socket2::SockRef::from(&stream);

                let mut ka = socket2::TcpKeepalive::new();
                ka = ka.with_time(time::Duration::from_secs(20));
                ka = ka.with_interval(time::Duration::from_secs(20));

                sock_ref.set_tcp_keepalive(&ka)?;

                // Write some data.
                if let Err(e) = stream.write_all(self_ip.as_bytes()).await {
                    println!("Error writing data: {:?}", e);
                }
                if let Err(e) = stream.write_all(b"hello world!EOF").await {
                    println!("Error writing data: {:?}", e);
                }
                // Close the connection when done.
                drop(stream);
                println!("Connection closed.");
                break;
            }
            Err(_) => {
                // Sleep for a random duration before retrying.
                let sleep_duration = Duration::from_millis(rand::random::<u64>() % 10000);
                sleep(sleep_duration).await;
            }
        }
    }

    Ok(())
}

这个修改的代码会保持连接的重用,仅在连接失败时才进行重试,并且在写入后关闭连接。请确保你的依赖库中包含了 "rand" crate 以生成随机数。

英文:

My client code is like this:

use tokio::net::TcpStream;
use tokio::io::AsyncWriteExt;
use std::error::Error;
use socket2;
use std::{ time};

    #[tokio::main]
    pub async fn match_tcp_client(address: String, self_ip: String) -&gt; Result&lt;(), Box&lt;dyn Error&gt;&gt; {
        // Connect to a peer
        println!(&quot;trying to connect from {} to address {}&quot;, self_ip, address);
    
        let mut stream = TcpStream::connect(address.clone()).await?;
    
    
        println!(&quot;connected from {} to address {}&quot;, self_ip, address);
    
    
        let sock_ref = socket2::SockRef::from(&amp;stream);
    
        let mut ka = socket2::TcpKeepalive::new();
        ka = ka.with_time(time::Duration::from_secs(20));
        ka = ka.with_interval(time::Duration::from_secs(20));
    
        sock_ref.set_tcp_keepalive(&amp;ka)?;
    
    
        // Write some data.
        stream.write_all(self_ip.as_bytes()).await?;
        stream.write_all(b&quot;hello world!EOF&quot;).await?;
        
        
        Ok(())
    }

I want client to wait a random time and then retry if server if not active (and listening ) yet.
Please note, the TcpKeepalive I tried didnt work.

Is there something like a loop where I can check if server is alive, if not wait a random time and try again?

Something I tried:

  use tokio::net::TcpStream;
    use tokio::io::AsyncWriteExt;
    use std::error::Error;
    use socket2;
    use std::{ time};
    use tokio::time::{ sleep, Duration};
    
    #[tokio::main]
    pub async fn match_tcp_client(address: String, self_ip: String) -&gt; Result&lt;(), Box&lt;dyn Error&gt;&gt; {
        // Connect to a peer
        println!(&quot;trying to connect from {} to address {}&quot;, self_ip, address);
    
        while TcpStream::connect(address.clone()).await.is_err()
        {
            sleep(Duration::from_millis(10)).await;
        }
    
        let mut stream = TcpStream::connect(address.clone()).await?;
    
    
        println!(&quot;connected from {} to address {}&quot;, self_ip, address);
    
    
        let sock_ref = socket2::SockRef::from(&amp;stream);
    
        let mut ka = socket2::TcpKeepalive::new();
        ka = ka.with_time(time::Duration::from_secs(20));
        ka = ka.with_interval(time::Duration::from_secs(20));
    
        sock_ref.set_tcp_keepalive(&amp;ka)?;
    
    
        // Write some data.
        stream.write_all(self_ip.as_bytes()).await?;
        stream.write_all(b&quot;hello world!EOF&quot;).await?;
        
        
        Ok(())
    }

But it is not working as well. In this case, though for all EC2 instances (I am using 4 instance), I am getting the log:

connected from A to address A
trying to connect from A to address B
connected from A to address B
trying to connect from A to address C
connected from A to address C
trying to connect from A to address D
connected from A to address D

But then for some reason the stream writing is not starting and the instances remain paused.

Edit:
I also tried this (same result as before):

 use tokio::net::TcpStream;
    use tokio::io::AsyncWriteExt;
    use std::error::Error;
    use socket2;
    use std::{ time};
    use tokio::time::{ sleep, Duration};
    
    #[tokio::main]
    pub async fn match_tcp_client(address: String, self_ip: String) -&gt; Result&lt;(), Box&lt;dyn Error&gt;&gt; {
        // Connect to a peer
        println!(&quot;trying to connect from {} to address {}&quot;, self_ip, address);
    
        
            loop{
        if TcpStream::connect(address.clone()).await.is_ok(){
        let mut stream = TcpStream::connect(address.clone()).await?;
        stream.set_linger(None)?;
    
        println!(&quot;connected from {} to address {}&quot;, self_ip, address);
    
    
        let sock_ref = socket2::SockRef::from(&amp;stream);
    
        let mut ka = socket2::TcpKeepalive::new();
        ka = ka.with_time(time::Duration::from_secs(20));
        ka = ka.with_interval(time::Duration::from_secs(20));
    
        sock_ref.set_tcp_keepalive(&amp;ka)?;
    
    
        // Write some data.
        stream.write_all(self_ip.as_bytes()).await?;
        stream.write_all(b&quot;hello world!EOF&quot;).await?;
        
        stream.shutdown().await?;
        break;
        }
    
    }
        Ok(())
    }

答案1

得分: 1

你试图连接到地址,然后在建立连接后断开连接并尝试重新连接。这不是正确的做法。相反,启动一个循环,尝试连接,如果成功,则从循环中返回包含Some(stream)Option,否则在重试之前等待,如果超过最大尝试次数,则从循环中返回None。尽管我认为在这里使用Result更好。

另一件事是,在等待write后,你在函数末尾丢弃了流,认为数据已经被发送!当write方法完成时,这意味着操作系统已经将提供的缓冲区复制到内核空间,不再需要用户缓冲区。然后,内核开始以块的形式写入数据,并在失败的情况下重试,直到成功或连接被认为已断开。

现在,当你丢弃流时,操作系统会继续在后台发送数据吗?这取决于滞留以及是否启用滞留。不同的操作系统和库有不同的默认设置。从C++ Asio网络库中来,滞留默认是禁用的,如果流被丢弃,缓存的数据将不会被发送。

要强制内核发送数据,请使用setsocketopt以启用滞留(如果Tokio允许)。

或者更好地使用shutdown,它将始终强制内核刷新套接字上累积的所有数据,从而确保如果连接仍然活动,则所有数据都已传输。

编辑:Tokio允许配置滞留:https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.set_linger

此外,这里有shutdown:https://docs.rs/tokio/latest/tokio/io/trait.AsyncWriteExt.html#method.shutdown

英文:

You are trying to connect to the address then dropping the connection if it was established and try to connect again. That is is not how to do it. Instead start a loop and try to connect and if successful return an Option from the loop containing Some(stream) otherwise wait before retrying and if you exceeded the maximum try times return a None from the loop. Although I think Result will be more favourable here.

Another thing is that you are dropping the stream in the end of the function after awaiting write thinking that the data was already sent! When write method completes this means that the OS has copied the supplied buffers into the kernel space and it no longer needs the user buffers. The kernel then starts writing the data in chunks and failed writes are retried until success pr connection is considered broken.

Now when you drop the stream will the OS continue sending the data in the background? This depends on lingering and whether it is enabled or not. Different OSes and libraries have different default settings. Coming from c++ asio networking library lingering is disabled by default and if the stream is dropped cached data will not be sent.

To force the kernel to send your data either use setsocketopt to enable lingering if tokio allows you to do this.

Or better use shutdown which will always force the kernel to flush all accumulated data on the socket and thus ensure that all data are transferred if the connection is still active.

Edit: tokio allows you to configure linger: https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.set_linger

Also there is shutdown here: https://docs.rs/tokio/latest/tokio/io/trait.AsyncWriteExt.html#method.shutdown

huangapple
  • 本文由 发表于 2023年5月6日 20:55:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/76189015.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定