TCPListener (Server) not getting accept request from Client for all clients before the server instance in ip list, when running in threads

huangapple go评论71阅读模式
英文:

TCPListener (Server) not getting accept request from Client for all clients before the server instance in ip list, when running in threads

问题

我有4个EC2实例,计划建立一个分布式网络,使每个实例都能向所有人(包括自己)发送数据。

首先,我从文件中读取IP地址到变量ip_address_clone中。

假设列表如下:

A.A.A.A
B.B.B.B
C.C.C.C
D.D.D.D

然后我尝试为所有实例的每个实例运行服务器和客户端,以便在每个实例中都有一个发送者和接收者工作线程(再次包括自身)。

thread::scope(|s| {
    s.spawn(|| {
        for _ip in ip_address_clone.clone() {
            let _result = newserver::handle_server(INITIAL_PORT + port_count);
        }
    });

    s.spawn(|| {
        let three_millis = time::Duration::from_millis(3);
        thread::sleep(three_millis);

        for ip in ip_address_clone.clone() {
            let self_ip_clone = self_ip.clone();

            let _result = newclient::match_tcp_client(
                [ip.to_string(), (INITIAL_PORT + port_count).to_string()].join(":"), 
                self_ip_clone,
            );
        }
    });
});

服务器代码如下:

use std::error::Error;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::tcp::ReadHalf;
use tokio::net::TcpListener;

#[tokio::main]
pub async fn handle_server(port: u32) -> Result<(), Box<dyn Error>> {
    let listener = TcpListener::bind(["0.0.0.0".to_string(), port.to_string()].join(":"))
        .await
        .unwrap(); // open connection

    let (mut socket, _) = listener.accept().await.unwrap(); // starts listening
    println!("---continue---");

    let (reader, mut writer) = socket.split(); // tokio socket split to read and write concurrently

    let mut reader: BufReader<ReadHalf> = BufReader::new(reader);
    let mut line: String = String::new();

    loop {
        //loop to get all the data from client until EOF is reached

        let _bytes_read: usize = reader.read_line(&mut line).await.unwrap();

        if line.contains("EOF")
        //REACTOR to be used here
        {
            println!("EOF Reached");

            writer.write_all(line.as_bytes()).await.unwrap();
            println!("{}", line);

            line.clear();

            break;
        }
    }

    Ok(())
}

客户端代码如下:

use std::error::Error;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;

#[tokio::main]
pub async fn match_tcp_client(address: String, self_ip: String) -> Result<(), Box<dyn Error>> {
    // Connect to a peer
    let mut stream = TcpStream::connect(address.clone()).await?;
    // Write some data.
    stream.write_all(self_ip.as_bytes()).await?;
    stream.write_all(b"hello world!EOF").await?;
    // stream.shutdown().await?;
    Ok(())
}

问题是,我得不到我期望的通信。事实上,我首先运行的实例(通过ssh)接收到所有数据,第二个实例接收除第一个实例外的所有数据,第三个实例接收除第一个和第二个实例外的所有数据,依此类推。

以下是第一个实例的日志:

Starting
execution type
nok
launched
---continue---
EOF Reached
A.A.A.Ahello world!EOF
---continue---
EOF Reached
B.B.B.Bhello world!EOF
---continue---
EOF Reached
C.C.C.Chello world!EOF
---continue---
EOF Reached
D.D.D.Dhello world!EOF

第二个实例的日志如下:

Starting
execution type
nok
launched
---continue---
EOF Reached
B.B.B.Bhello world!EOF
---continue---
EOF Reached
C.C.C.Chello world!EOF
---continue---
EOF Reached
D.D.D.Dhello world!EOF

看起来,尽管我使用了线程,通信仍然是同步的,特定实例只能从ip_address_clone中的自身获取数据到其他IP地址。

英文:

I have 4 EC2 instances and I plan to have a distributed network, so every one will send a data to everyone (including itself).

I first read the ip addresses from a file to a variable ip_address_clone.

Say the list is like this:

A.A.A.A
B.B.B.B
C.C.C.C
D.D.D.D

Then I try to run server and client for all of them in thread, so that theres a sender and receiver worker active in an instance for all instances (again for itself as well).

thread::scope(|s| {
    s.spawn(|| {
        for _ip in ip_address_clone.clone() {
            let _result = newserver::handle_server(INITIAL_PORT + port_count);
        }
    });

    s.spawn(|| {
        let three_millis = time::Duration::from_millis(3);
        thread::sleep(three_millis);

        for ip in ip_address_clone.clone() {
            let self_ip_clone = self_ip.clone();

            let _result = newclient::match_tcp_client(
                [ip.to_string(), (INITIAL_PORT + port_count).to_string()].join(&quot;:&quot;),
                self_ip_clone,
            );
        }
    });
});

The server code is:

use std::error::Error;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::tcp::ReadHalf;
use tokio::net::TcpListener;

#[tokio::main]
pub async fn handle_server(port: u32) -&gt; Result&lt;(), Box&lt;dyn Error&gt;&gt; {
    let listener = TcpListener::bind([&quot;0.0.0.0&quot;.to_string(), port.to_string()].join(&quot;:&quot;))
        .await
        .unwrap(); // open connection

    let (mut socket, _) = listener.accept().await.unwrap(); // starts listening
    println!(&quot;---continue---&quot;);

    let (reader, mut writer) = socket.split(); // tokio socket split to read and write concurrently

    let mut reader: BufReader&lt;ReadHalf&gt; = BufReader::new(reader);
    let mut line: String = String::new();

    loop {
        //loop to get all the data from client until EOF is reached

        let _bytes_read: usize = reader.read_line(&amp;mut line).await.unwrap();

        if line.contains(&quot;EOF&quot;)
        //REACTOR to be used here
        {
            println!(&quot;EOF Reached&quot;);

            writer.write_all(line.as_bytes()).await.unwrap();
            println!(&quot;{}&quot;, line);

            line.clear();

            break;
        }
    }

    Ok(())
}

And client code is:

use std::error::Error;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;

#[tokio::main]
pub async fn match_tcp_client(address: String, self_ip: String) -&gt; Result&lt;(), Box&lt;dyn Error&gt;&gt; {
    // Connect to a peer
    let mut stream = TcpStream::connect(address.clone()).await?;
    // Write some data.
    stream.write_all(self_ip.as_bytes()).await?;
    stream.write_all(b&quot;hello world!EOF&quot;).await?;
    // stream.shutdown().await?;
    Ok(())
}

Problem is, I am not getting the communication as I expect to. In fact, the first instance I run (with ssh) receives all the data, the second one receives all data except from the first one, the third one receives all data except from the first and second one, and so on.

Here's a log of the first instance:

Starting
execution type
nok
launched
---continue---
EOF Reached
A.A.A.Ahello world!EOF
---continue---
EOF Reached
B.B.B.Bhello world!EOF
---continue---
EOF Reached
C.C.C.Chello world!EOF
---continue---
EOF Reached
D.D.D.Dhello world!EOF

And log of second instance:

Starting
execution type
nok
launched
---continue---
EOF Reached
B.B.B.Bhello world!EOF
---continue---
EOF Reached
C.C.C.Chello world!EOF
---continue---
EOF Reached
D.D.D.Dhello world!EOF

It seems like though I am using thread, the communication remains synchronous, and a particular instance can only get data from itself to the rest of the ips in ip_address_clone.
You can see the number of times ---continue--- occurs in the second instance log, its listener doesnt seem to accept the request from the first instance.

答案1

得分: 1

我认为,“一个节点仅接收自己的数据”的证据表明它只向自己的端口发送数据(仅限于自己的端口),而不是其他相同的端口。在这里,我相信独特的端口应该能解决您的问题。

  • 您正在为所有实例使用相同的端口号。当多个实例尝试绑定到相同的端口时,可能会导致冲突。相反,您应该为每个实例使用一个唯一的端口号。一种做法是通过将偏移量添加到基本端口号(3000、3001、...)来实现这一点。当每个实例绑定到唯一的端口号时,更适合开发测试。
  • 您为每个实例创建了一个新的线程,但每个线程只处理一个连接。这可能效率低下,并且可能限制您的程序能够处理的连接数量。相反,您可以使用Tokio的spawn函数为每个连接生成一个任务。这使您可以同时处理多个连接。
  • 此外,循环在继续下一个IP地址之前不等待线程完成。这可能会导致同步问题,并导致意外行为

个人认为,在分布式节点之间测试异步通信很困难;特别是当我们有多个线程,而它们不起作用时。

英文:

I think, the evidence that "A node is only getting data to itself", highly indicates that it is sending data to its own port (only) and not other ports (which are exactly the same). Here, I believe that unique ports should solve your problem.

  • You are using the same port number for all the instances. This can cause conflicts when multiple instances try to bind to the same port. Instead, you should use a unique port number for each instance. One practice is that you do this by adding an offset to a base port number (3000, 3001, ...). When each instance binds to a unique port number, it is better for dev testing.
  • You are creating a new thread for each instance, but each thread is only handling one connection. This can be inefficient and can limit the number of connections that your program can handle. Instead, you can use Tokio's spawn function to spawn a task for each connection. This allows you to handle multiple connections concurrently.
  • Also, loop is not waiting for the threads to finish before moving on to the next IP address. This can cause issues with synchronization and can lead to unexpected behavior.

On a personal note, testing asynchronous communication between distributed nodes is hard; especially when we have multiple threads, and they don't work.

huangapple
  • 本文由 发表于 2023年5月6日 18:24:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/76188373.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定