英文:
Asio. Error: "The I/o operation has been aborted because of either a thread exit or an application requests"
问题
我正试图使用 asio 创建一个异步服务器,但当接受器调用 async_accept
函数时,我收到以下错误消息 I/o 操作已因线程退出或应用程序请求而中止,这导致进程无法继续。我尝试更改端口,但不起作用。当我尝试创建同步服务器时,它可以工作。另外,我将 asio 作为一个独立的库使用,而不是从 Boost 中引用。
这是我遇到错误的函数:
void startAsyncAccept()
{
acceptor.async_accept(socket, [&](const asio::error_code& error)
{
if(!error)
{
std::cout << "Client is connected" << "\n";
startAsyncRead();
}
else
{
std::cerr << "错误信息: " << error.message() << '\n';
return 0;
}
startAsyncAccept();
}
);
}
这是我的异步服务器:
class Server
{
public:
Server(asio::io_context& io_context) : acceptor(io_context, asio::ip::tcp::endpoint(asio::ip::address::from_string("127.0.0.1"), 2000)), socket(io_context)
{
startAsyncAccept();
}
~Server() {}
private:
// 成员变量
asio::ip::tcp::acceptor acceptor;
asio::ip::tcp::socket socket;
asio::streambuf buffer;
std::vector<asio::ip::tcp::socket> sockets;
// 函数
void startAsyncAccept()
{
acceptor.async_accept(socket, [&](const asio::error_code& error)
{
if(!error)
{
std::cout << "Client is connected" << "\n";
startAsyncRead();
}
else
{
std::cerr << "错误信息: " << error.message() << '\n';
return 0;
}
startAsyncAccept();
}
);
}
void startAsyncRead()
{
asio::async_read_until(socket, buffer, '\n', [&](const asio::error_code& error, size_t length)
{
if(!error)
{
std::string message(asio::buffers_begin(buffer.data()), asio::buffers_begin(buffer.data()) + length);
std::cout << "Received from client: " << message;
for(auto& clients : sockets)
{
asio::async_write(clients, asio::buffer(message), [&](const asio::error_code& error, size_t length)
{
if(error)
{
std::cerr << "Failed to write to client: " << error.message() << "\n";
}
}
);
}
buffer.consume(length);
startAsyncRead();
}
else
{
std::cout << "Client disconnected." << std::endl;
removeClient();
}
}
);
}
void startAsyncWrite()
{
asio::async_write(socket, asio::buffer("Connected to chat Server. \n"), [&](const asio::error_code& error, size_t length)
{
if(error)
{
std::cerr << "Failed to write to the client: " << error.message() << "\n";
removeClient();
}
}
);
}
void removeClient()
{
auto it = std::find_if(sockets.begin(), sockets.end(),
[&](const auto& client_socket)
{
return &client_socket == &socket;
});
if (it != sockets.end())
{
sockets.erase(it);
}
}
};
英文:
I am trying to create an async server using asio but when the acceptor calls async_accept function, I get this error The I/o operation has been aborted because of either a thread exit or an application requests, which doesn't let the process continue. I have tried to change the port but it doesn't work. When I try to make a sync server so it works.
Also I use asio as a separate lib from boost.
Here's the function where I get the error:
void startAsyncAccept()
{
acceptor.async_accept(socket, [&](const asio::error_code& error)
{
if(!error)
{
std::cout << "Client is connected" << "\n";
startAsyncRead();
}
else
{
std::cerr << "It is: " << error.message() << '\n';
return 0;
}
startAsyncAccept();
}
);
}
Ans here's my async server:
class Server
{
public:
Server(asio::io_context& io_context) : acceptor(io_context, asio::ip::tcp::endpoint(asio::ip::address::from_string("127.0.0.1"), 2000)), socket(io_context)
{
startAsyncAccept();
}
~Server() {}
private:
// Members
asio::ip::tcp::acceptor acceptor;
asio::ip::tcp::socket socket;
asio::streambuf buffer;
std::vector<asio::ip::tcp::socket> sockets;
// Functions
void startAsyncAccept()
{
acceptor.async_accept(socket, [&](const asio::error_code& error)
{
if(!error)
{
std::cout << "Client is connected" << "\n";
startAsyncRead();
}
else
{
std::cerr << "It is: " << error.message() << '\n';
return 0;
}
startAsyncAccept();
}
);
}
void startAsyncRead()
{
asio::async_read_until(socket, buffer, '\n', [&](const asio::error_code& error, size_t length)
{
if(!error)
{
std::string message(asio::buffers_begin(buffer.data()), asio::buffers_begin(buffer.data()) + length);
std::cout << "Received from client: " << message;
for(auto& clients : sockets)
{
asio::async_write(clients, asio::buffer(message), [&](const asio::error_code& error, size_t length)
{
if(error)
{
std::cerr << "Failed to write to client: " << error.message() << "\n";
}
}
);
}
buffer.consume(length);
startAsyncRead();
}
else
{
std::cout << "Client disconnected." << std::endl;
removeClient();
}
}
);
}
void startAsyncWrite()
{
asio::async_write(socket, asio::buffer("Connected to chat Server. \n"), [&](const asio::error_code& error, size_t length)
{
if(error)
{
std::cerr << "Failed to write to the client: " << error.message() << "\n";
removeClient();
}
}
);
}
void removeClient()
{
auto it = std::find_if(sockets.begin(), sockets.end(),
[&](const auto& client_socket)
{
return &client_socket == &socket;
});
if (it != sockets.end())
{
sockets.erase(it);
}
}
};
答案1
得分: 2
首先,存在未定义行为,因为 lambda 表达式在某些路径上缺少返回语句。这似乎是一种复制粘贴错误,所以我将删除无意义的 return 0;
。
真正的问题
真正的问题是你在每次接受连接时都使用相同的 socket
。这意味着你在再次调用 startAsyncAccept
时会重置启动第一个 asyncRead
的套接字。这会导致 asio::error::operation_aborted
错误代码。
简单地说,不要使用相同的套接字。将其移入一个 "会话" 对象中,或者根据你的需求将它们放入你的 sockets
集合中。
修复?
具有讽刺意味的是,很难知道你希望如何使用 sockets
,因为在你当前的代码中它完全未使用(除了永远不会迭代的循环,因为 sockets
总是为空)。更糟糕的是,只有因为它没有工作才是好事,否则你会得到更多的未定义行为,因为 std::vector
会重新分配以增加容量,导致对现有套接字的任何引用无效。
处理这个问题的通常方法是不拥有一组套接字,而是拥有一组(弱)共享指向会话对象的指针。为了简化起见,让我在此处通过将 std::vector
更改为 std::list
并删除有问题的 socket
变量来修复引用稳定性问题。
tcp::socket socket;
asio::streambuf buffer;
std::vector<tcp::socket> sockets;
变成
asio::streambuf buffer;
std::list<tcp::socket> sockets;
还有关于 async_write
的一些大问题:
- 你使用一个局部变量
message
作为缓冲区。这再次引发了未定义行为,因为它的生命周期在写操作完成之前结束。 - 你必须避免重叠调用。因为你无法控制
async_read_until
何时完成(你可能有任意数量的已连接客户端),所以你不能确保没有写操作重叠。唯一的解决方案是具有“出箱”队列 - 通常是每个客户端一个,以实现每个连接的独立操作。
另外,startAsyncWrite
的用途不清楚(它未使用,并且无法解决上述问题)。removeClient
也一样,即使你填充了 sockets
,它也不会执行任何操作,因为无效的套接字引用会导致未定义行为。我暂时删除了这两个函数。
最后,你使用了一个流缓冲区,但似乎可以直接使用动态字符串缓冲区。无论如何,将 consume
放在“consume”附近,以防在某些错误条件下遗漏它。
简化版 V1
这个简化版本解决了你提出的问题以及 message
的生命周期问题。
class Server {
public:
Server(asio::io_context& io_context) //
: acceptor(io_context, {{}, 2000}) {
startAsyncAccept();
}
private:
// 成员
tcp::acceptor acceptor;
asio::streambuf buffer;
std::list<tcp::socket> sockets;
void startAsyncAccept() {
acceptor.async_accept([&](error_code error, tcp::socket accepted) {
if (!error) {
auto& client = sockets.emplace_back(std::move(accepted));
std::cout << "Client is connected\n";
startAsyncRead(client);
} else {
std::cerr << "It is: " << error.message() << "\n";
}
startAsyncAccept();
});
}
void startAsyncRead(tcp::socket& client) {
asio::async_read_until(client, buffer, '\n', [&](error_code error, size_t n) {
if (!error) {
auto f = asio::buffers_begin(buffer.data()), l = f + static_cast<ptrdiff_t>(n);
auto message = std::make_shared<std::string>(f, l);
buffer.consume(n);
std::cout << "Received from client: " << message;
for (auto& peer : sockets) {
// SEHE: TODO FIXME serializing writes per peer!
asio::async_write(
peer, asio::buffer(*message), [&](error_code error, size_t /*length*/) {
if (error) {
std::cerr << "Failed to write to client: " << error.message() << "\n";
}
});
}
startAsyncRead(client);
} else {
std::cout << "Client disconnected." << std::endl;
// SEHE: TODO ERASE
}
});
}
};
修复:客户端会话
修复其他问题需要更多工作:
[早餐后]
更新 你绝对需要一个单独的 Session
,因为我之前(早餐前)错过了,你还在错误地使用相同的 streambuf
实例处理所有连接... Session
类型是一个逻辑上将传入缓冲区、传出缓冲区队列和套接字分组在一起的好地方。
请注意,一切都取决于正确的关注点分离:服务器管理监听,聊天室管理已连接的客户端,会话管理单个客户端。
**
英文:
First of all, there's Undefined Behaviour because the lambda misses a return on some paths. This seems to be a copy-paste error of some sort, so I'll just remove the return 0;
that was meaningless.
The Real Problem
The real problem is that you use the same socket
on each accept. That means that you reset the socket that you started the first asyncRead
on as soon as you call startAsyncAccept
again. That causes the asio::error::operation_aborted
error code.
Simply don't use the same socket. Move it into a "session" object e.g. Or, as you seem to have wanted to do, put them in your collection of sockets
.
Fixing?
Ironically, it is hard to know how you wanted to use sockets
since it is literally unused in your current code (except for the loops that will never iterate, because sockets
is always empty).
What's worse, it's only good that didn't work, because otherwise you get more UB, because std::vector
will reallocate to grow capacity, meaning that any references to existing sockets are invalidated.
The usual way to deal with this is not to have a collection of sockets, but rather collection of (weak) shared pointers to sessions objects. For simplicity, let me fix the reference stability here by changing std::vector
to std::list
and removing the problematic socket
variable.
tcp::socket socket;
asio::streambuf buffer;
std::vector<tcp::socket> sockets;
Becomes
asio::streambuf buffer;
std::list<tcp::socket> sockets;
There's big problems as well with async_write
as
- you use a local variable
message
as the buffer. This is UB again, because its lifetime ends before the write operations complete - you MUST avoid overlapping calls. Because you don't control when
async_read_until
completes (and you might have an arbitrary number of connected clients), you cannot be sure no writes overlap. The only solution there is to have an "outbox" queue - typically per client, for independent operation of each connection.
As a sidenote, it's unclear what startAsyncWrite
was supposed to do (it was unused, and doesn't help to solve the problem just described). Same with removeClient
which would never do anything even if you had filled sockets
, it would just cause UB due to the invalidation of socket references. I dropped those two for now.
Finally, you use a streambuf, where it seems you might just use a dynamic string buffer directly. In any case, put the consume
closer to the "consume" so that you don't risk missing it under some error condition.
Simplified V1
This simplified version removes the problems that you ask about as well as the lifetime of the message
:
class Server {
public:
Server(asio::io_context& io_context) //
: acceptor(io_context, {{}, 2000}) {
startAsyncAccept();
}
private:
// Members
tcp::acceptor acceptor;
asio::streambuf buffer;
std::list<tcp::socket> sockets;
void startAsyncAccept() {
acceptor.async_accept([&](error_code error, tcp::socket accepted) {
if (!error) {
auto& client = sockets.emplace_back(std::move(accepted));
std::cout << "Client is connected\n";
startAsyncRead(client);
} else {
std::cerr << "It is: " << error.message() << "\n";
}
startAsyncAccept();
});
}
void startAsyncRead(tcp::socket& client) {
asio::async_read_until(client, buffer, '\n', [&](error_code error, size_t n) {
if (!error) {
auto f = asio::buffers_begin(buffer.data()), l = f + static_cast<ptrdiff_t>(n);
auto message = std::make_shared<std::string>(f, l);
buffer.consume(n);
std::cout << "Received from client: " << message;
for (auto& peer : sockets) {
// SEHE: TODO FIXME serializing writes per peer!
asio::async_write(
peer, asio::buffer(*message), [&, message](error_code error, size_t /*length*/) {
if (error) {
std::cerr << "Failed to write to client: " << error.message() << "\n";
}
});
}
startAsyncRead(client);
} else {
std::cout << "Client disconnected." << std::endl;
// SEHE: TODO ERASE
}
});
}
};
Fixed: Client Sessions
Some more work to fix the other problems:
[after breakfast]
UPDATE You absolutely need a separate Session
, because I missed earlier (before breakfast) that you were also falsely using the same streambuf
instance for all connections... A Session
type makes for a logical place to group the incoming buffer, outgoing buffer queue and socket.
We use enable_shared_from_this
together with shared_from_this()
to get automatic lifetime management when a connection gets bad/closed.
Note that it all comes down to correct Separation Of Concerns: The Server manages the listening, the ChatRoom manages connected clients, the Session manages a single client.
#include <boost/asio.hpp>
#include <deque>
#include <iostream>
#include <list>
#ifndef STANDALONE_ASIO
namespace asio = boost::asio;
using boost::system::error_code;
#else
using asio::error_code;
#endif
using asio::ip::tcp;
struct Session;
using SessionPtr = std::shared_ptr<Session>;
using Handle = std::weak_ptr<Session>;
struct ChatRoom {
std::list<Handle> clients_;
void garbage_collect();
void broadcast(std::string_view message);
// just for example:
void direct_message(std::string recipient, std::string_view message);
};
struct Session : std::enable_shared_from_this<Session> {
Session(tcp::socket s, ChatRoom& room) : socket_(std::move(s)), room_(room) {}
void start() { readLoop(); }
void send(std::string_view message){
outbox_.emplace_back(message);
if (outbox_.size() == 1)
writeLoop();
}
private:
tcp::socket socket_;
ChatRoom& room_;
asio::streambuf incoming_;
std::deque<std::string> outbox_;
void readLoop() {
asio::async_read_until(
socket_, incoming_, '\n', [&, self = shared_from_this()](error_code ec, size_t n) {
if (ec) {
std::cout << "Client disconnect (" << ec.message() << ")" << std::endl;
return;
}
auto f = asio::buffers_begin(incoming_.data()), l = f + static_cast<ptrdiff_t>(n);
room_.broadcast(std::string(f, l));
incoming_.consume(n);
readLoop();
});
}
void writeLoop() {
if (outbox_.empty())
return;
asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()](error_code ec, size_t /*length*/) {
outbox_.pop_front();
if (ec)
std::cerr << "Failed to write to client: " << ec.message() << "\n";
else
writeLoop();
});
}
};
class Server {
public:
Server(asio::any_io_executor ex) : acceptor_(ex, {{}, 2000}) { acceptLoop(); }
private:
tcp::acceptor acceptor_;
ChatRoom room_;
void acceptLoop() {
room_.garbage_collect(); // optionally prevent dead connections piling up
acceptor_.async_accept([&](error_code ec, tcp::socket accepted) {
if (!ec) {
auto sess = std::make_shared<Session>(std::move(accepted), room_);
room_.clients_.push_back(sess);
std::cout << "Client is connected\n";
sess->start();
} else {
std::cerr << "Accept error: " << ec.message() << "\n";
}
acceptLoop();
});
}
};
void ChatRoom::garbage_collect() {
clients_.remove_if(std::mem_fn(&Handle::expired));
}
void ChatRoom::broadcast(std::string_view message) {
for (auto& handle : clients_) {
if (auto peer = handle.lock()) {
peer->send(message);
}
}
}
using namespace std::chrono_literals;
int main() {
asio::io_context io;
Server chat(io.get_executor());
io.run_for(30s);
}
Testing with a number of clients:
for a in {1..10}; do (
sleep 1.$RANDOM
echo "hello from $a"
sleep 1.$RANDOM
echo "bye from $a"
) | nc 127.0.0.1 2000 -w3 | (while read line; do echo "Client $a received '$line'"; done) & done
Prints output like:
Also live on Coliru
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论