Why does my C++ Asio TCP server disconnect previous clients when a new one tries to connect?

huangapple go评论66阅读模式
英文:

Why does my C++ Asio TCP server disconnect previous clients when a new one tries to connect?

问题

I tried setting up a simple client-server tcp program using Boost::Asio features wrapped up in my "physicalserver" and "physicalclient" classes. On connection, the server tries in a loop to send data to a client; the client waits in a loop for data from the server.
I tried creating simple and minimal examples showing the problem:

client.cpp:

#include "..\..\CppLibraries\Networks\client\physicalclient.hpp"

#include <iostream>
#include <conio.h>

using namespace dnw;

void connectionCallback(Client::Socket& socket, Client& client)
{
    std::cout << "Connected!\n";

    std::array<std::byte, 256> data;

    while(true)
    {
        try
        {
            boost::asio::read(socket, boost::asio::buffer(data));
        } catch(boost::system::system_error& e)
        {
            std::cout << "Failed to receive! Server closed?\n";
            getch();
            exit(1);
        }
    }
}

int main()
{
    PhysicalClient client;

    client.setOnConnection(connectionCallback);

    try
    {
        client.connect("localhost", "10666");
    } catch(std::exception& e)
    {
        std::cerr << "Server didn't respond...\n";
        getch();
    }

    while(true);
}

server.cpp:

#include "..\..\CppLibraries\Networks\server\physicalserver.hpp"

using namespace dnw;

void connectionCallback(Server::Socket& socket, Server& server)
{
    std::cout << "Client connected!\n";

    std::array<std::byte, 256> data;

    while(true)
    {
        try
        {
            boost::asio::write(socket, boost::asio::buffer(data));
        } catch(boost::system::system_error& e)
        {
            std::cerr << "Failed to write! Client disconnected?\n";
            break;
        }
    }
}

int main()
{
    PhysicalServer server;

    server.setOnConnection(10666, connectionCallback);

    server.startListening(10666);

    while(true);
}

physicalclient.cpp:

#include "physicalclient.hpp"

#include <thread>

dnw::PhysicalClient::PhysicalClient()
:
    socket(context),
    resolver(context)
{

}

void dnw::PhysicalClient::connect(const std::string_view addr, const std::string_view port)
{
    if(socket.is_open())
    {
        return;
    }

    boost::asio::connect(socket, resolver.resolve(addr, port));

    std::thread thread {processor, std::ref(socket), std::ref(*this)};

    thread.detach();
}

void dnw::PhysicalClient::setOnConnection(const Processor& proc)
{
    processor = proc;
}

void dnw::PhysicalClient::disconnect()
{
    socket.close();
}

dnw::PhysicalClient::~PhysicalClient()
{
    disconnect();
}

physicalserver.cpp:

#include "physicalserver.hpp"

#include <algorithm>
#include <thread>

#include <iostream>

void dnw::PhysicalServer::startListening()
{
    for(auto& pair : ports)
    {
        startListening(pair.first);
    }
}

void dnw::PhysicalServer::stopListening()
{
    for(auto& pair : ports)
    {
        stopListening(pair.first);
    }
}

void dnw::PhysicalServer::startListening(const Port port)
{
    std::get<bool>(ports[port]) = true;

    std::thread thread {listen, this, port};

    thread.detach();
}

void dnw::PhysicalServer::stopListening(const Port port)
{
    std::get<bool>(ports[port]) = false;
}

void dnw::PhysicalServer::listen(const Port port)
{
    using namespace boost::asio::ip;

    static tcp::acceptor acceptor {
        context,
        {tcp::v4(), port}
    };

    while(std::get<bool>(ports[port]))
    {
        Socket socket {context};

        acceptor.accept(socket);

        auto& sockets {std::get<std::vector<Socket>>(ports[port])}; 

        sockets.push_back(std::move(socket));

        std::thread thread {std::get<Processor>(ports[port]), std::ref(sockets.back()), std::ref(*this)};
        
        thread.detach();
    }
}

void dnw::PhysicalServer::setOnConnection(const Port port, const Processor& proc)
{
    std::get<Processor>(ports[port]) = proc;
}

void dnw::PhysicalServer::closeConnection()
{
    for(auto& [key, tuple] : ports)
    {
        closeConnection(key);
    }
}

void dnw::PhysicalServer::closeConnection(Port port)
{
    auto& sockets {std::get<std::vector<Socket>>(ports[port])};

    for(auto& socket : sockets)
    {
        socket.close();
    }

    sockets.clear();
}

void dnw::PhysicalServer::closeConnection(Socket& socket)
{
    using std::get;

    auto& sockets {get<std::vector<Socket>>(ports[getSocketPort(socket)])};

    auto socket_iter {
        std::find_if(
            sockets.begin(),
            sockets.end(),
            [&](auto& s2)
            {
                return s2.local_endpoint() == socket.local_endpoint();
            } 
        )
    };

    if(socket_iter != sockets.end())
    {
        socket.close();
        sockets.erase(socket_iter);
    }
}

size_t dnw::PhysicalServer::getClientCount() const
{
    size_t total {};

    for(auto& pair : ports)
    {
        total += getClientCount(pair.first);
    }

    return total;
}

size_t dnw::PhysicalServer::getClientCount(const Port port) const
{
    return std::get<std::vector<Socket>>(ports.at(port)).size();
}

dnw::Server::Port dnw::PhysicalServer::getSocketPort(const Socket& socket)
{
    return socket.local_endpoint().port();
}

void dnw::PhysicalServer::applyOnClient(const std::function<void(Socket&)>& action)
{
    for(auto& [key, port] : ports)
    {
        applyOnClient(key, action);
    }
}

void dnw::PhysicalServer::applyOnClient(const Port port, const std::function<void(Socket&)>& action)
{
    std::for_each(
        std::get<std::vector<Socket>>(ports[port]).begin(),
        std::get<std::vector<Socket>>(ports[port]).end(),
        action
    );
}

I was expecting multiple client connections to be allowed; however, whenever I try to start a new client, the previous one gets disconnected (I get "Failed to write! Client disconnected?" from the server. I'm using different sockets for every client, so I don't know why the program is behaving this way.

英文:

I tried setting up a simple client-server tcp program using Boost::Asio features wrapped up in my "physicalserver" and "physicalclient" classes. On connection, the server tries in a loop to send data to a client; the client waits in a loop for data from the server.
I tried creating simple and minimal examples showing the problem:

client.cpp:

#include &quot;..\..\CppLibraries\Networks\client\physicalclient.hpp&quot;
#include &lt;iostream&gt;
#include &lt;conio.h&gt;
using namespace dnw;
void connectionCallback(Client::Socket&amp; socket, Client&amp; client)
{
std::cout &lt;&lt; &quot;Connected!\n&quot;;
std::array&lt;std::byte, 256&gt; data;
while(true)
{
try
{
boost::asio::read(socket, boost::asio::buffer(data));
} catch(boost::system::system_error&amp; e)
{
std::cout &lt;&lt; &quot;Failed to receive! Server closed?\n&quot;;
getch();
exit(1);
}
}
}
int main()
{
PhysicalClient client;
client.setOnConnection(connectionCallback);
try
{
client.connect(&quot;localhost&quot;, &quot;10666&quot;);
} catch(std::exception&amp; e)
{
std::cerr &lt;&lt; &quot;Server didn&#39;t respond...\n&quot;;
getch();
}
while(true);
}

server.cpp:

#include &quot;..\..\CppLibraries\Networks\server\physicalserver.hpp&quot;
using namespace dnw;
void connectionCallback(Server::Socket&amp; socket, Server&amp; server)
{
std::cout &lt;&lt; &quot;Client connected!\n&quot;;
std::array&lt;std::byte, 256&gt; data;
while(true)
{
try
{
boost::asio::write(socket, boost::asio::buffer(data));
} catch(boost::system::system_error&amp; e)
{
std::cerr &lt;&lt; &quot;Failed to write! Client disconnected?\n&quot;;
break;
}
}
}
int main()
{
PhysicalServer server;
server.setOnConnection(10666, connectionCallback);
server.startListening(10666);
while(true);
}

physicalclient.cpp:

#include &quot;physicalclient.hpp&quot;
#include &lt;thread&gt;
dnw::PhysicalClient::PhysicalClient()
:
socket(context),
resolver(context)
{
}
void dnw::PhysicalClient::connect(const std::string_view addr, const std::string_view port)
{
if(socket.is_open())
{
return;
}
boost::asio::connect(socket, resolver.resolve(addr, port));
std::thread thread {processor, std::ref(socket), std::ref(*this)};
thread.detach();
}
void dnw::PhysicalClient::setOnConnection(const Processor&amp; proc)
{
processor = proc;
}
void dnw::PhysicalClient::disconnect()
{
socket.close();
}
dnw::PhysicalClient::~PhysicalClient()
{
disconnect();
}

physicalserver.cpp:

#include &quot;physicalserver.hpp&quot;
#include &lt;algorithm&gt;
#include &lt;thread&gt;
#include &lt;iostream&gt;
void dnw::PhysicalServer::startListening()
{
for(auto&amp; pair : ports)
{
startListening(pair.first);
}
}
void dnw::PhysicalServer::stopListening()
{
for(auto&amp; pair : ports)
{
stopListening(pair.first);
}
}
void dnw::PhysicalServer::startListening(const Port port)
{
std::get&lt;bool&gt;(ports[port]) = true;
std::thread thread {listen, this, port};
thread.detach();
}
void dnw::PhysicalServer::stopListening(const Port port)
{
std::get&lt;bool&gt;(ports[port]) = false;
}
void dnw::PhysicalServer::listen(const Port port)
{
using namespace boost::asio::ip;
static tcp::acceptor acceptor {
context,
{tcp::v4(), port}
};
while(std::get&lt;bool&gt;(ports[port]))
{
Socket socket {context};
acceptor.accept(socket);
auto&amp; sockets {std::get&lt;std::vector&lt;Socket&gt;&gt;(ports[port])}; 
sockets.push_back(std::move(socket));
std::thread thread {std::get&lt;Processor&gt;(ports[port]), std::ref(sockets.back()), std::ref(*this)};
thread.detach();
}
}
void dnw::PhysicalServer::setOnConnection(const Port port, const Processor&amp; proc)
{
std::get&lt;Processor&gt;(ports[port]) = proc;
}
void dnw::PhysicalServer::closeConnection()
{
for(auto&amp; [key, tuple] : ports)
{
closeConnection(key);
}
}
void dnw::PhysicalServer::closeConnection(Port port)
{
auto&amp; sockets {std::get&lt;std::vector&lt;Socket&gt;&gt;(ports[port])};
for(auto&amp; socket : sockets)
{
socket.close();
}
sockets.clear();
}
void dnw::PhysicalServer::closeConnection(Socket&amp; socket)
{
using std::get;
auto&amp; sockets {get&lt;std::vector&lt;Socket&gt;&gt;(ports[getSocketPort(socket)])};
auto socket_iter {
std::find_if(
sockets.begin(),
sockets.end(),
[&amp;](auto&amp; s2)
{
return s2.local_endpoint() == socket.local_endpoint();
} 
)
};
if(socket_iter != sockets.end())
{
socket.close();
sockets.erase(socket_iter);
}
}
size_t dnw::PhysicalServer::getClientCount() const
{
size_t total {};
for(auto&amp; pair : ports)
{
total += getClientCount(pair.first);
}
return total;
}
size_t dnw::PhysicalServer::getClientCount(const Port port) const
{
return std::get&lt;std::vector&lt;Socket&gt;&gt;(ports.at(port)).size();
}
dnw::Server::Port dnw::PhysicalServer::getSocketPort(const Socket&amp; socket)
{
return socket.local_endpoint().port();
}
void dnw::PhysicalServer::applyOnClient(const std::function&lt;void(Socket&amp;)&gt;&amp; action)
{
for(auto&amp; [key, port] : ports)
{
applyOnClient(key, action);
}
}
void dnw::PhysicalServer::applyOnClient(const Port port, const std::function&lt;void(Socket&amp;)&gt;&amp; action)
{
std::for_each(
std::get&lt;std::vector&lt;Socket&gt;&gt;(ports[port]).begin(),
std::get&lt;std::vector&lt;Socket&gt;&gt;(ports[port]).end(),
action
);
}

I was expecting multiple client connections to be allowed; however, whenever I try to start a new client, the previous one gets disconnected (I get "Failed to write! Client disconnected?" from the server. I'm using different sockets for every client, so I don't know why the program is behaving this way.

答案1

得分: 1

这是您提供的代码的翻译:

使它简化1000倍。您很可能会在调试器中看到您的问题。

例如,从以下开始:

**[Live On Coliru](http://coliru.stacked-crooked.com/a/e4aa50e5460cf289)**

```cpp
#include <boost/asio.hpp>
#include <iostream>
#include <map>
#include <thread>
#include <tuple>

using boost::asio::ip::tcp;
using Server = struct PhysicalServer;
using Client = struct PhysicalClient;
using Socket = tcp::socket;

struct PhysicalServer {
    using Port      = uint16_t;
    using Processor = std::function<void(Socket& s, Server&)>;
    using Conn      = std::tuple<bool, std::vector<Socket>, Processor>;

    std::map<Port, Conn>    ports;
    boost::asio::io_context context;

    void startListening() {
        for (auto& pair : ports) {
            startListening(pair.first);
        }
    }

    void stopListening() {
        for (auto& pair : ports) {
            stopListening(pair.first);
        }
    }

    void startListening(const Port port) {
        std::get<bool>(ports[port]) = true;
        std::thread{&PhysicalServer::listen, this, port}.detach();
    }

    void stopListening(const Port port) { std::get<bool>(ports[port]) = false; }

    void listen(const Port port) {
        using namespace boost::asio::ip;

        static tcp::acceptor acceptor{context, {tcp::v4(), port}};

        while (std::get<bool>(ports[port])) {
            Socket socket{context};

            acceptor.accept(socket);

            auto& sockets{std::get<std::vector<Socket>>(ports[port])};

            sockets.push_back(std::move(socket));

            std::thread{std::get<Processor>(ports[port]), std::ref(sockets.back()), std::ref(*this)}.detach();
        }
    }

    void setOnConnection(const Port port, Processor const& proc) { std::get<Processor>(ports[port]) = proc; }

    void closeConnection() {
        for (auto& [key, tuple] : ports) {
            closeConnection(key);
        }
    }

    void closeConnection(Port port) {
        auto& sockets{std::get<std::vector<Socket>>(ports[port])};

        for (auto& socket : sockets) {
            socket.close();
        }

        sockets.clear();
    }

    void closeConnection(Socket& socket) {
        using std::get;

        auto& sockets{get<std::vector<Socket>>(ports[getSocketPort(socket)])};

        auto socket_iter{std::find_if(sockets.begin(), sockets.end(), [&](auto& s2) {
            return s2.local_endpoint() == socket.local_endpoint();
        })};

        if (socket_iter != sockets.end()) {
            socket.close();
            sockets.erase(socket_iter);
        }
    }

    size_t getClientCount() const {
        size_t total{};

        for (auto& pair : ports) {
            total += getClientCount(pair.first);
        }

        return total;
    }

    size_t getClientCount(const Port port) const {
        return std::get<std::vector<Socket>>(ports.at(port)).size();
    }

    Server::Port getSocketPort(Socket const& socket) { return socket.local_endpoint().port(); }

    void applyOnClient(std::function<void(Socket&)> const& action) {
        for (auto& [key, port] : ports) {
            applyOnClient(key, action);
        }
    }

    void applyOnClient(const Port port, std::function<void(Socket&)> const& action) {
        std::for_each(std::get<std::vector<Socket>>(ports[port]).begin(),
                      std::get<std::vector<Socket>>(ports[port]).end(), action);
    }
};

using Client = struct PhysicalClient;
struct PhysicalClient {
    using Processor = std::function<void(Socket& s, Client&)>;
    boost::asio::io_context context;
    Socket                  socket{context};
    tcp::resolver           resolver{context};
    Processor               processor;

    PhysicalClient() : socket(context), resolver(context) {}

    void connect(const std::string_view addr, const std::string_view port) {
        if (socket.is_open()) {
            return;
        }

        boost::asio::connect(socket, resolver.resolve(addr, port));
        std::thread{processor, std::ref(socket), std::ref(*this)}.detach();
    }

    void setOnConnection(Processor const& proc) { processor = proc; }
    void disconnect() { socket.close(); }
    ~PhysicalClient() { disconnect(); }
};

#include <iostream>

void clientCallback(Socket& socket, Client& /*client*/) {
    std::cout << "Connected!\n";

    std::array<std::byte, 256> data;

    try {
        while (true) {
            auto n = boost::asio::read(socket, boost::asio::buffer(data));
            std::cout << "Received " << n << " bytes" << std::endl;
        }
    } catch (boost::system::system_error& e) {
        std::cout << "Failed to receive! " << e.what() << std::endl;
        // exit(1);
    }
}

void serverCallback(Socket& socket, Server& /*server*/) {
    std::cout << "Client connected!\n";

    std::array<std::byte, 256> data;

    try {
        while (auto n = boost::asio::write(socket, boost::asio::buffer(data))) {
            std::cout << "Sent " << n << " bytes" << std::endl;
        }
    } catch (boost::system::system_error& e) {
        std::cerr << "Failed to write! Client disconnected?\n";
    }
}

int main(int argc, char**) {
    if (argc > 1) { // server
        PhysicalServer server;

        server.setOnConnection(10666, serverCallback);
        server.startListening(/*10666*/);
        std::cin.ignore(1024, '\n');
    } else { // client
        PhysicalClient client;

        client.setOnConnection(clientCallback);

        try {
            client.connect("127.0.0.1", "10666");
        } catch (std::exception& e) {
            std::cerr << "Connection failure: " << e.what() << std::endl;
            return 1;
        }
    }
}

这是您提供的代码的翻译。如果您有任何其他问题或需要进一步的帮助,请告诉我。

英文:

Make that 1000x simpler. You will likely see your issue in a debugger.

E.g. start with

Live On Coliru

#include &lt;boost/asio.hpp&gt;
#include &lt;iostream&gt;
#include &lt;map&gt;
#include &lt;thread&gt;
#include &lt;tuple&gt;
using boost::asio::ip::tcp;
using Server = struct PhysicalServer;
using Client = struct PhysicalClient;
using Socket = tcp::socket;
struct PhysicalServer {
using Port      = uint16_t;
using Processor = std::function&lt;void(Socket&amp; s, Server&amp;)&gt;;
using Conn      = std::tuple&lt;bool, std::vector&lt;Socket&gt;, Processor&gt;;
std::map&lt;Port, Conn&gt;    ports;
boost::asio::io_context context;
void startListening() {
for (auto&amp; pair : ports) {
startListening(pair.first);
}
}
void stopListening() {
for (auto&amp; pair : ports) {
stopListening(pair.first);
}
}
void startListening(const Port port) {
std::get&lt;bool&gt;(ports[port]) = true;
std::thread{&amp;PhysicalServer::listen, this, port}.detach();
}
void stopListening(const Port port) { std::get&lt;bool&gt;(ports[port]) = false; }
void listen(const Port port) {
using namespace boost::asio::ip;
static tcp::acceptor acceptor{context, {tcp::v4(), port}};
while (std::get&lt;bool&gt;(ports[port])) {
Socket socket{context};
acceptor.accept(socket);
auto&amp; sockets{std::get&lt;std::vector&lt;Socket&gt;&gt;(ports[port])};
sockets.push_back(std::move(socket));
std::thread{std::get&lt;Processor&gt;(ports[port]), std::ref(sockets.back()), std::ref(*this)}.detach();
}
}
void setOnConnection(const Port port, Processor const&amp; proc) { std::get&lt;Processor&gt;(ports[port]) = proc; }
void closeConnection() {
for (auto&amp; [key, tuple] : ports) {
closeConnection(key);
}
}
void closeConnection(Port port) {
auto&amp; sockets{std::get&lt;std::vector&lt;Socket&gt;&gt;(ports[port])};
for (auto&amp; socket : sockets) {
socket.close();
}
sockets.clear();
}
void closeConnection(Socket&amp; socket) {
using std::get;
auto&amp; sockets{get&lt;std::vector&lt;Socket&gt;&gt;(ports[getSocketPort(socket)])};
auto socket_iter{std::find_if(sockets.begin(), sockets.end(), [&amp;](auto&amp; s2) {
return s2.local_endpoint() == socket.local_endpoint();
})};
if (socket_iter != sockets.end()) {
socket.close();
sockets.erase(socket_iter);
}
}
size_t getClientCount() const {
size_t total{};
for (auto&amp; pair : ports) {
total += getClientCount(pair.first);
}
return total;
}
size_t getClientCount(const Port port) const {
return std::get&lt;std::vector&lt;Socket&gt;&gt;(ports.at(port)).size();
}
Server::Port getSocketPort(Socket const&amp; socket) { return socket.local_endpoint().port(); }
void applyOnClient(std::function&lt;void(Socket&amp;)&gt; const&amp; action) {
for (auto&amp; [key, port] : ports) {
applyOnClient(key, action);
}
}
void applyOnClient(const Port port, std::function&lt;void(Socket&amp;)&gt; const&amp; action) {
std::for_each(std::get&lt;std::vector&lt;Socket&gt;&gt;(ports[port]).begin(),
std::get&lt;std::vector&lt;Socket&gt;&gt;(ports[port]).end(), action);
}
};
using Client = struct PhysicalClient;
struct PhysicalClient {
using Processor = std::function&lt;void(Socket&amp; s, Client&amp;)&gt;;
boost::asio::io_context context;
Socket                  socket{context};
tcp::resolver           resolver{context};
Processor               processor;
PhysicalClient() : socket(context), resolver(context) {}
void connect(const std::string_view addr, const std::string_view port) {
if (socket.is_open()) {
return;
}
boost::asio::connect(socket, resolver.resolve(addr, port));
std::thread{processor, std::ref(socket), std::ref(*this)}.detach();
}
void setOnConnection(Processor const&amp; proc) { processor = proc; }
void disconnect() { socket.close(); }
~PhysicalClient() { disconnect(); }
};
#include &lt;iostream&gt;
void clientCallback(Socket&amp; socket, Client&amp; /*client*/) {
std::cout &lt;&lt; &quot;Connected!\n&quot;;
std::array&lt;std::byte, 256&gt; data;
try {
while (true) {
auto n = boost::asio::read(socket, boost::asio::buffer(data));
std::cout &lt;&lt; &quot;Received &quot; &lt;&lt; n &lt;&lt; &quot; bytes&quot; &lt;&lt; std::endl;
}
} catch (boost::system::system_error&amp; e) {
std::cout &lt;&lt; &quot;Failed to receive! &quot; &lt;&lt; e.what() &lt;&lt; std::endl;
// exit(1);
}
}
void serverCallback(Socket&amp; socket, Server&amp; /*server*/) {
std::cout &lt;&lt; &quot;Client connected!\n&quot;;
std::array&lt;std::byte, 256&gt; data;
try {
while (auto n = boost::asio::write(socket, boost::asio::buffer(data))) {
std::cout &lt;&lt; &quot;Sent &quot; &lt;&lt; n &lt;&lt; &quot; bytes&quot; &lt;&lt; std::endl;
}
} catch (boost::system::system_error&amp; e) {
std::cerr &lt;&lt; &quot;Failed to write! Client disconnected?\n&quot;;
}
}
int main(int argc, char**) {
if (argc &gt; 1) { // server
PhysicalServer server;
server.setOnConnection(10666, serverCallback);
server.startListening(/*10666*/);
std::cin.ignore(1024, &#39;\n&#39;);
} else { // client
PhysicalClient client;
client.setOnConnection(clientCallback);
try {
client.connect(&quot;127.0.0.1&quot;, &quot;10666&quot;);
} catch (std::exception&amp; e) {
std::cerr &lt;&lt; &quot;Connection failure: &quot; &lt;&lt; e.what() &lt;&lt; std::endl;
return 1;
}
}
}

Some glaring errors are around the threads. The detach() is a strong sign that you are giving away control (there will not be a graceful shutdown).

Worse, you're liberally accessing ports across threads. That's a data race and the result is Undefined Behaviour. Beyond, at least make the bool listening flag an atomic_bool.

The tuple abuse is really not doing you any favours either. Why not just have class instead?

//using Conn      = std::tuple&lt;bool, std::vector&lt;Socket&gt;, Processor&gt;;
struct Conn {
bool                listening = false;
std::vector&lt;Socket&gt; sockets;
Processor           processor;
};
// ... 
ports[port].listening = true;

Or, even just use structured bindings on the tuple instead:

void listen(Port port) {
static tcp::acceptor acceptor{context, {tcp::v4(), port}};
auto&amp; [listening, sockets, processor] = ports[port];
while (ports[port].listening) {
sockets.push_back(acceptor.accept());
std::thread{processor, std::ref(sockets.back()), std::ref(*this)}.detach();
}
}

Orders of magnitude better already. Next up, don't forget local_endpoint may throw (e.g. when the socket isn't valid anymore).

Your Question

Likely, the most central issue is this:

static tcp::acceptor acceptor{context, {tcp::v4(), port}};

You're using the same acceptor instance for all your ports. That won't work. Moving the acceptor into the Conn object/tuple may help:

void listen(Port port) {
auto&amp; [acceptor, listening, sockets, processor] = ports[port];
acceptor = {context, {{}, port}};
while (ports[port].listening) {
sockets.push_back(acceptor.accept());
std::thread{
processor, std::ref(sockets.back()), std::ref(*this)}
.detach();
}
}

However, you have to rethink your close() approach, as that is a data race again, but also, you're destructing the socket objects from another thread. If you need cancellation in Asio, you need asynchronous operations. There's no way you can safely destruct (or even close()) the socket while it is being used in a synchronous operation.

Here's the midway point that still has the data races and shutdown problems:

#include &lt;boost/asio.hpp&gt;
#include &lt;iostream&gt;
#include &lt;map&gt;
#include &lt;thread&gt;
#include &lt;tuple&gt;
using boost::asio::ip::tcp;
using Server = struct PhysicalServer;
using Client = struct PhysicalClient;
using Socket = tcp::socket;
static boost::asio::system_executor context;
struct PhysicalServer {
using Port      = uint16_t;
using Processor = std::function&lt;void(Socket&amp; s, Server&amp;)&gt;;
// using Conn      = std::tuple&lt;bool, std::vector&lt;Socket&gt;,
// Processor&gt;;
struct Conn {
tcp::acceptor       acceptor{context};
std::atomic_bool    listening = false;
std::vector&lt;Socket&gt; sockets;
Processor           processor;
};
std::map&lt;Port, Conn&gt; ports;
void startListening()
{
for(auto&amp; pair : ports)
startListening(pair.first);
}
void stopListening()
{
for(auto&amp; pair : ports)
stopListening(pair.first);
}
void startListening(Port port)
{
ports[port].listening = true;
std::thread{&amp;PhysicalServer::listen, this, port}.detach();
}
void stopListening(Port port)
{
ports[port].listening = false;
}
void listen(Port port) {
auto&amp; [acceptor, listening, sockets, processor] = ports[port];
acceptor = {context, {{}, port}};
while (ports[port].listening) {
sockets.push_back(acceptor.accept());
std::thread{
processor, std::ref(sockets.back()), std::ref(*this)}
.detach();
}
}
void setOnConnection(Port port, Processor const&amp; proc) {
ports[port].processor = proc;
}
void closeConnection() {
for(auto&amp; [key, tuple] : ports)
closeConnection(key);
}
void closeConnection(Port port) {
auto&amp; sockets = ports[port].sockets;
for(auto&amp; socket : sockets)
socket.close();
sockets.clear();
}
void closeConnection(Socket&amp; socket) {
auto&amp; sockets{ports[getSocketPort(socket)].sockets};
auto it = std::find_if( //
sockets.begin(), sockets.end(),
[&amp;](auto&amp; s2) { return s2.local_endpoint() == socket.local_endpoint(); });
if (it != sockets.end()) {
socket.close();
sockets.erase(it);
}
}
size_t getClientCount() const
{
size_t total = 0;
for (auto&amp; [port, conn] : ports)
total += getClientCount(port);
return total;
}
size_t       getClientCount(Port port) const { return ports.at(port).sockets.size(); }
Server::Port getSocketPort(Socket const&amp; socket) { return socket.local_endpoint().port(); }
void applyOnClient(std::function&lt;void(Socket&amp;)&gt; const&amp; action) {
for (auto&amp; [key, port] : ports) {
applyOnClient(key, action);
}
}
void applyOnClient(Port port, std::function&lt;void(Socket&amp;)&gt; const&amp; action) {
auto&amp; sockets = ports[port].sockets;
std::for_each(sockets.begin(), sockets.end(), action);
}
};
using Client = struct PhysicalClient;
struct PhysicalClient
{
using Processor = std::function&lt;void(Socket&amp; s, Client&amp;)&gt;;
Socket        socket{context};
tcp::resolver resolver{context};
Processor     processor;
PhysicalClient()
: socket(context)
, resolver(context)
{
}
void connect(std::string_view addr, std::string_view port)
{
if(socket.is_open())
{
return;
}
boost::asio::connect(socket, resolver.resolve(addr, port));
std::thread{processor, std::ref(socket), std::ref(*this)}
.detach();
}
void setOnConnection(Processor const&amp; proc)
{
processor = proc;
}
void disconnect()
{
socket.close();
}
~PhysicalClient()
{
disconnect();
}
};
#include &lt;iostream&gt;
void clientCallback(Socket&amp; socket, Client&amp; /*client*/)
{
std::cout &lt;&lt; &quot;Connected!\n&quot;;
std::array&lt;std::byte, 256&gt; data;
try
{
while(true)
{
auto n =
boost::asio::read(socket, boost::asio::buffer(data));
std::cout &lt;&lt; &quot;Received &quot; &lt;&lt; n &lt;&lt; &quot; bytes&quot; &lt;&lt; std::endl;
}
} catch(boost::system::system_error&amp; e)
{
std::cout &lt;&lt; &quot;Failed to receive! &quot; &lt;&lt; e.what() &lt;&lt; std::endl;
// exit(1);
}
}
void serverCallback(Socket&amp; socket, Server&amp; /*server*/)
{
std::cout &lt;&lt; &quot;Client connected!\n&quot;;
std::array&lt;std::byte, 256&gt; data;
try
{
while(auto n = boost::asio::write(
socket, boost::asio::buffer(data)))
{
std::cout &lt;&lt; &quot;Sent &quot; &lt;&lt; n &lt;&lt; &quot; bytes&quot; &lt;&lt; std::endl;
}
} catch(boost::system::system_error&amp; e)
{
std::cerr &lt;&lt; &quot;Failed to write! Client disconnected?\n&quot;;
}
}
int main(int argc, char**) try {
if (argc &gt; 1) { // server
PhysicalServer server;
server.setOnConnection(10666, serverCallback);
server.startListening(/*10666*/);
std::cin.ignore(1024, &#39;\n&#39;);
} else { // client
PhysicalClient client;
client.setOnConnection(clientCallback);
client.connect(&quot;127.0.0.1&quot;, &quot;10666&quot;);
std::cin.ignore(1024, &#39;\n&#39;);
}
} catch (std::exception&amp; e) {
std::cerr &lt;&lt; &quot;failure: &quot; &lt;&lt; e.what() &lt;&lt; std::endl;
return 1;
}

Why does my C++ Asio TCP server disconnect previous clients when a new one tries to connect?

huangapple
  • 本文由 发表于 2023年5月20日 21:47:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/76295553.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定