Asio. Error: “I/O 操作已中止,因为线程退出或应用程序请求”

huangapple go评论98阅读模式
英文:

Asio. Error: "The I/o operation has been aborted because of either a thread exit or an application requests"

问题

我正试图使用 asio 创建一个异步服务器,但当接受器调用 async_accept 函数时,我收到以下错误消息 I/o 操作已因线程退出或应用程序请求而中止,这导致进程无法继续。我尝试更改端口,但不起作用。当我尝试创建同步服务器时,它可以工作。另外,我将 asio 作为一个独立的库使用,而不是从 Boost 中引用。

这是我遇到错误的函数:

void startAsyncAccept()
{
    acceptor.async_accept(socket, [&](const asio::error_code& error)
        {
            if(!error)
            {
                std::cout << "Client is connected" << "\n";
                startAsyncRead();
            }
            else
            {
                
                std::cerr << "错误信息: " << error.message() << '\n';
                return 0;
                
            }
            startAsyncAccept();
        }
    );
}

这是我的异步服务器:

class Server
{
public:
    Server(asio::io_context& io_context) : acceptor(io_context, asio::ip::tcp::endpoint(asio::ip::address::from_string("127.0.0.1"), 2000)), socket(io_context)
    {
        startAsyncAccept();
    }
    ~Server() {}
private:
    // 成员变量
    asio::ip::tcp::acceptor acceptor;
    asio::ip::tcp::socket socket;
    asio::streambuf buffer;
    std::vector<asio::ip::tcp::socket> sockets;

    // 函数
    void startAsyncAccept()
    {
        acceptor.async_accept(socket, [&](const asio::error_code& error)
            {
                if(!error)
                {
                    std::cout << "Client is connected" << "\n";
                    startAsyncRead();
                }
                else
                {
                    
                    std::cerr << "错误信息: " << error.message() << '\n';
                    return 0;
                    
                }
                startAsyncAccept();
            }
        );
    }

    void startAsyncRead()
    {
        asio::async_read_until(socket, buffer, '\n', [&](const asio::error_code& error, size_t length)
            {
                if(!error)
                {
                    std::string message(asio::buffers_begin(buffer.data()), asio::buffers_begin(buffer.data()) + length);
                    std::cout << "Received from client: " << message;

                    for(auto& clients : sockets)
                    {
                        asio::async_write(clients, asio::buffer(message), [&](const asio::error_code& error, size_t length) 
                            {
                                if(error)
                                {
                                    std::cerr << "Failed to write to client: " << error.message() << "\n";
                                }
                            }
                        );
                    }
                    buffer.consume(length);
                    startAsyncRead();
                }
                else
                {
                    std::cout << "Client disconnected." << std::endl;
                    removeClient();
                }
            }
        );
    }

    void startAsyncWrite() 
    {
        asio::async_write(socket, asio::buffer("Connected to chat Server. \n"), [&](const asio::error_code& error, size_t length) 
            {
                if(error)
                {
                    std::cerr << "Failed to write to the client: " << error.message() << "\n";
                    removeClient();
                }
            }
        );
    }

    void removeClient()
    {
        auto it = std::find_if(sockets.begin(), sockets.end(),
            [&](const auto& client_socket)
            {
                return &client_socket == &socket;
            });

        if (it != sockets.end())
        {
            sockets.erase(it);
        }
    }
};
英文:

I am trying to create an async server using asio but when the acceptor calls async_accept function, I get this error The I/o operation has been aborted because of either a thread exit or an application requests, which doesn't let the process continue. I have tried to change the port but it doesn't work. When I try to make a sync server so it works.
Also I use asio as a separate lib from boost.

Here's the function where I get the error:

void startAsyncAccept()
{
acceptor.async_accept(socket, [&amp;](const asio::error_code&amp; error)
{
if(!error)
{
std::cout &lt;&lt; &quot;Client is connected&quot; &lt;&lt; &quot;\n&quot;;
startAsyncRead();
}
else
{
std::cerr &lt;&lt; &quot;It is: &quot; &lt;&lt; error.message() &lt;&lt; &#39;\n&#39;;
return 0;
}
startAsyncAccept();
}
);
}

Ans here's my async server:

class Server
{
public:
Server(asio::io_context&amp; io_context) : acceptor(io_context, asio::ip::tcp::endpoint(asio::ip::address::from_string(&quot;127.0.0.1&quot;), 2000)), socket(io_context)
{
startAsyncAccept();
}
~Server() {}
private:
// Members
asio::ip::tcp::acceptor acceptor;
asio::ip::tcp::socket socket;
asio::streambuf buffer;
std::vector&lt;asio::ip::tcp::socket&gt; sockets;
// Functions
void startAsyncAccept()
{
acceptor.async_accept(socket, [&amp;](const asio::error_code&amp; error)
{
if(!error)
{
std::cout &lt;&lt; &quot;Client is connected&quot; &lt;&lt; &quot;\n&quot;;
startAsyncRead();
}
else
{
std::cerr &lt;&lt; &quot;It is: &quot; &lt;&lt; error.message() &lt;&lt; &#39;\n&#39;;
return 0;
}
startAsyncAccept();
}
);
}
void startAsyncRead()
{
asio::async_read_until(socket, buffer, &#39;\n&#39;, [&amp;](const asio::error_code&amp; error, size_t length)
{
if(!error)
{
std::string message(asio::buffers_begin(buffer.data()), asio::buffers_begin(buffer.data()) + length);
std::cout &lt;&lt; &quot;Received from client: &quot; &lt;&lt; message;
for(auto&amp; clients : sockets)
{
asio::async_write(clients, asio::buffer(message), [&amp;](const asio::error_code&amp; error, size_t length) 
{
if(error)
{
std::cerr &lt;&lt; &quot;Failed to write to client: &quot; &lt;&lt; error.message() &lt;&lt; &quot;\n&quot;;
}
}
);
}
buffer.consume(length);
startAsyncRead();
}
else
{
std::cout &lt;&lt; &quot;Client disconnected.&quot; &lt;&lt; std::endl;
removeClient();
}
}
);
}
void startAsyncWrite() 
{
asio::async_write(socket, asio::buffer(&quot;Connected to chat Server. \n&quot;), [&amp;](const asio::error_code&amp; error, size_t length) 
{
if(error)
{
std::cerr &lt;&lt; &quot;Failed to write to the client: &quot; &lt;&lt; error.message() &lt;&lt; &quot;\n&quot;;
removeClient();
}
}
);
}
void removeClient()
{
auto it = std::find_if(sockets.begin(), sockets.end(),
[&amp;](const auto&amp; client_socket)
{
return &amp;client_socket == &amp;socket;
});
if (it != sockets.end())
{
sockets.erase(it);
}
}
};

答案1

得分: 2

首先,存在未定义行为,因为 lambda 表达式在某些路径上缺少返回语句。这似乎是一种复制粘贴错误,所以我将删除无意义的 return 0;

真正的问题

真正的问题是你在每次接受连接时都使用相同的 socket。这意味着你在再次调用 startAsyncAccept 时会重置启动第一个 asyncRead 的套接字。这会导致 asio::error::operation_aborted 错误代码。

简单地说,不要使用相同的套接字。将其移入一个 "会话" 对象中,或者根据你的需求将它们放入你的 sockets 集合中。

修复?

具有讽刺意味的是,很难知道你希望如何使用 sockets,因为在你当前的代码中它完全未使用(除了永远不会迭代的循环,因为 sockets 总是为空)。更糟糕的是,只有因为它没有工作才是好事,否则你会得到更多的未定义行为,因为 std::vector 会重新分配以增加容量,导致对现有套接字的任何引用无效。

处理这个问题的通常方法是拥有一组套接字,而是拥有一组(弱)共享指向会话对象的指针。为了简化起见,让我在此处通过将 std::vector 更改为 std::list 并删除有问题的 socket 变量来修复引用稳定性问题。

tcp::socket              socket;
asio::streambuf          buffer;
std::vector<tcp::socket> sockets;

变成

asio::streambuf        buffer;
std::list<tcp::socket> sockets;

还有关于 async_write 的一些大问题:

  • 你使用一个局部变量 message 作为缓冲区。这再次引发了未定义行为,因为它的生命周期在写操作完成之前结束。
  • 你必须避免重叠调用。因为你无法控制 async_read_until 何时完成(你可能有任意数量的已连接客户端),所以你不能确保没有写操作重叠。唯一的解决方案是具有“出箱”队列 - 通常是每个客户端一个,以实现每个连接的独立操作。

另外,startAsyncWrite 的用途不清楚(它未使用,并且无法解决上述问题)。removeClient 也一样,即使你填充了 sockets,它也不会执行任何操作,因为无效的套接字引用会导致未定义行为。我暂时删除了这两个函数。

最后,你使用了一个流缓冲区,但似乎可以直接使用动态字符串缓冲区。无论如何,将 consume 放在“consume”附近,以防在某些错误条件下遗漏它。

简化版 V1

这个简化版本解决了你提出的问题以及 message 的生命周期问题。

在 Coliru 上查看

class Server {
  public:
    Server(asio::io_context& io_context) //
        : acceptor(io_context, {{}, 2000}) {
        startAsyncAccept();
    }

  private:
    // 成员
    tcp::acceptor          acceptor;
    asio::streambuf        buffer;
    std::list<tcp::socket> sockets;

    void startAsyncAccept() {
        acceptor.async_accept([&](error_code error, tcp::socket accepted) {
            if (!error) {
                auto& client = sockets.emplace_back(std::move(accepted));
                std::cout << "Client is connected\n";
                startAsyncRead(client);
            } else {
                std::cerr << "It is: " << error.message() << "\n";
            }
            startAsyncAccept();
        });
    }

    void startAsyncRead(tcp::socket& client) {
        asio::async_read_until(client, buffer, '\n', [&](error_code error, size_t n) {
            if (!error) {
                auto f = asio::buffers_begin(buffer.data()), l = f + static_cast<ptrdiff_t>(n);
                auto message = std::make_shared<std::string>(f, l);
                buffer.consume(n);
                std::cout << "Received from client: " << message;

                for (auto& peer : sockets) {
                    // SEHE: TODO FIXME serializing writes per peer!
                    asio::async_write(
                        peer, asio::buffer(*message), [&](error_code error, size_t /*length*/) {
                            if (error) {
                                std::cerr << "Failed to write to client: " << error.message() << "\n";
                            }
                        });
                }
                startAsyncRead(client);
            } else {
                std::cout << "Client disconnected." << std::endl;
                // SEHE: TODO ERASE
            }
        });
    }
};

修复:客户端会话

修复其他问题需要更多工作:

[早餐后]

更新 你绝对需要一个单独的 Session,因为我之前(早餐前)错过了,你还在错误地使用相同的 streambuf 实例处理所有连接... Session 类型是一个逻辑上将传入缓冲区、传出缓冲区队列和套接字分组在一起的好地方。

请注意,一切都取决于正确的关注点分离:服务器管理监听,聊天室管理已连接的客户端,会话管理单个客户端。

**

英文:

First of all, there's Undefined Behaviour because the lambda misses a return on some paths. This seems to be a copy-paste error of some sort, so I'll just remove the return 0; that was meaningless.

The Real Problem

The real problem is that you use the same socket on each accept. That means that you reset the socket that you started the first asyncRead on as soon as you call startAsyncAccept again. That causes the asio::error::operation_aborted error code.

Simply don't use the same socket. Move it into a "session" object e.g. Or, as you seem to have wanted to do, put them in your collection of sockets.

Fixing?

Ironically, it is hard to know how you wanted to use sockets since it is literally unused in your current code (except for the loops that will never iterate, because sockets is always empty).

What's worse, it's only good that didn't work, because otherwise you get more UB, because std::vector will reallocate to grow capacity, meaning that any references to existing sockets are invalidated.

The usual way to deal with this is not to have a collection of sockets, but rather collection of (weak) shared pointers to sessions objects. For simplicity, let me fix the reference stability here by changing std::vector to std::list and removing the problematic socket variable.

tcp::socket              socket;
asio::streambuf          buffer;
std::vector&lt;tcp::socket&gt; sockets;

Becomes

asio::streambuf        buffer;
std::list&lt;tcp::socket&gt; sockets;

There's big problems as well with async_write as

  • you use a local variable message as the buffer. This is UB again, because its lifetime ends before the write operations complete
  • you MUST avoid overlapping calls. Because you don't control when async_read_until completes (and you might have an arbitrary number of connected clients), you cannot be sure no writes overlap. The only solution there is to have an "outbox" queue - typically per client, for independent operation of each connection.

As a sidenote, it's unclear what startAsyncWrite was supposed to do (it was unused, and doesn't help to solve the problem just described). Same with removeClient which would never do anything even if you had filled sockets, it would just cause UB due to the invalidation of socket references. I dropped those two for now.

Finally, you use a streambuf, where it seems you might just use a dynamic string buffer directly. In any case, put the consume closer to the "consume" so that you don't risk missing it under some error condition.

Simplified V1

This simplified version removes the problems that you ask about as well as the lifetime of the message:

Live On Coliru

class Server {
public:
Server(asio::io_context&amp; io_context) //
: acceptor(io_context, {{}, 2000}) {
startAsyncAccept();
}
private:
// Members
tcp::acceptor          acceptor;
asio::streambuf        buffer;
std::list&lt;tcp::socket&gt; sockets;
void startAsyncAccept() {
acceptor.async_accept([&amp;](error_code error, tcp::socket accepted) {
if (!error) {
auto&amp; client = sockets.emplace_back(std::move(accepted));
std::cout &lt;&lt; &quot;Client is connected\n&quot;;
startAsyncRead(client);
} else {
std::cerr &lt;&lt; &quot;It is: &quot; &lt;&lt; error.message() &lt;&lt; &quot;\n&quot;;
}
startAsyncAccept();
});
}
void startAsyncRead(tcp::socket&amp; client) {
asio::async_read_until(client, buffer, &#39;\n&#39;, [&amp;](error_code error, size_t n) {
if (!error) {
auto f = asio::buffers_begin(buffer.data()), l = f + static_cast&lt;ptrdiff_t&gt;(n);
auto message = std::make_shared&lt;std::string&gt;(f, l);
buffer.consume(n);
std::cout &lt;&lt; &quot;Received from client: &quot; &lt;&lt; message;
for (auto&amp; peer : sockets) {
// SEHE: TODO FIXME serializing writes per peer!
asio::async_write(
peer, asio::buffer(*message), [&amp;, message](error_code error, size_t /*length*/) {
if (error) {
std::cerr &lt;&lt; &quot;Failed to write to client: &quot; &lt;&lt; error.message() &lt;&lt; &quot;\n&quot;;
}
});
}
startAsyncRead(client);
} else {
std::cout &lt;&lt; &quot;Client disconnected.&quot; &lt;&lt; std::endl;
// SEHE: TODO ERASE
}
});
}
};

Fixed: Client Sessions

Some more work to fix the other problems:

[after breakfast]

UPDATE You absolutely need a separate Session, because I missed earlier (before breakfast) that you were also falsely using the same streambuf instance for all connections... A Session type makes for a logical place to group the incoming buffer, outgoing buffer queue and socket.

We use enable_shared_from_this together with shared_from_this() to get automatic lifetime management when a connection gets bad/closed.

Note that it all comes down to correct Separation Of Concerns: The Server manages the listening, the ChatRoom manages connected clients, the Session manages a single client.

Live On Coliru

#include &lt;boost/asio.hpp&gt;
#include &lt;deque&gt;
#include &lt;iostream&gt;
#include &lt;list&gt;
#ifndef STANDALONE_ASIO
namespace asio = boost::asio;
using boost::system::error_code;
#else
using asio::error_code;
#endif
using asio::ip::tcp;
struct Session;
using SessionPtr = std::shared_ptr&lt;Session&gt;;
using Handle     = std::weak_ptr&lt;Session&gt;;
struct ChatRoom {
std::list&lt;Handle&gt; clients_;
void garbage_collect();
void broadcast(std::string_view message);
// just for example:
void direct_message(std::string recipient, std::string_view message);
};
struct Session : std::enable_shared_from_this&lt;Session&gt; {
Session(tcp::socket s, ChatRoom&amp; room) : socket_(std::move(s)), room_(room) {}
void start() { readLoop(); }
void send(std::string_view message){
outbox_.emplace_back(message);
if (outbox_.size() == 1)
writeLoop();
}
private:
tcp::socket             socket_;
ChatRoom&amp;               room_;
asio::streambuf         incoming_;
std::deque&lt;std::string&gt; outbox_;
void readLoop() {
asio::async_read_until(
socket_, incoming_, &#39;\n&#39;, [&amp;, self = shared_from_this()](error_code ec, size_t n) {
if (ec) {
std::cout &lt;&lt; &quot;Client disconnect (&quot; &lt;&lt; ec.message() &lt;&lt; &quot;)&quot; &lt;&lt; std::endl;
return;
}
auto f = asio::buffers_begin(incoming_.data()), l = f + static_cast&lt;ptrdiff_t&gt;(n);
room_.broadcast(std::string(f, l));
incoming_.consume(n);
readLoop();
});
}
void writeLoop() {
if (outbox_.empty())
return;
asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()](error_code ec, size_t /*length*/) {
outbox_.pop_front();
if (ec)
std::cerr &lt;&lt; &quot;Failed to write to client: &quot; &lt;&lt; ec.message() &lt;&lt; &quot;\n&quot;;
else
writeLoop();
});
}
};
class Server {
public:
Server(asio::any_io_executor ex) : acceptor_(ex, {{}, 2000}) { acceptLoop(); }
private:
tcp::acceptor acceptor_;
ChatRoom      room_;
void acceptLoop() {
room_.garbage_collect(); // optionally prevent dead connections piling up
acceptor_.async_accept([&amp;](error_code ec, tcp::socket accepted) {
if (!ec) {
auto sess = std::make_shared&lt;Session&gt;(std::move(accepted), room_);
room_.clients_.push_back(sess);
std::cout &lt;&lt; &quot;Client is connected\n&quot;;
sess-&gt;start();
} else {
std::cerr &lt;&lt; &quot;Accept error: &quot; &lt;&lt; ec.message() &lt;&lt; &quot;\n&quot;;
}
acceptLoop();
});
}
};
void ChatRoom::garbage_collect() {
clients_.remove_if(std::mem_fn(&amp;Handle::expired));
}
void ChatRoom::broadcast(std::string_view message) {
for (auto&amp; handle : clients_) {
if (auto peer = handle.lock()) {
peer-&gt;send(message);
}
}
}
using namespace std::chrono_literals;
int main() {
asio::io_context io;
Server chat(io.get_executor());
io.run_for(30s);
}

Testing with a number of clients:

for a in {1..10}; do (
sleep 1.$RANDOM
echo &quot;hello from $a&quot;
sleep 1.$RANDOM
echo &quot;bye from $a&quot;
) | nc 127.0.0.1 2000 -w3 | (while read line; do echo &quot;Client $a received &#39;$line&#39;&quot;; done) &amp; done

Prints output like:

Asio. Error: “I/O 操作已中止,因为线程退出或应用程序请求”

Also live on Coliru Asio. Error: “I/O 操作已中止,因为线程退出或应用程序请求”

huangapple
  • 本文由 发表于 2023年8月4日 07:25:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/76832126.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定