一个线程可以启动异步IO,而另一个线程可以运行完成处理程序,等等。

huangapple go评论83阅读模式
英文:

Can one thread initiate async IO but another thread run the completion handler, etc

问题

我正在探索将多线程C++应用程序转换为基于boost asio和完成处理程序的异步网络编程风格的选项。但是有两个关键问题:

  1. 一个线程能否发起异步写操作,而另一个线程运行其完成处理程序?第一个线程属于一个框架(因此不能成为异步线程池的一部分),但它是唯一能创建工作的线程。框架线程还将异步写入套接字,但它立即返回到运行框架代码,因此无法使用。第二个线程由我的代码创建,因此可以运行完成处理程序。boost asio是否有办法让框架线程发起异步IO,而另一个线程运行生成的完成处理程序?
  2. 如果异步模型不区分计算和IO线程,并且线程不可抢占(除非我们使用协程),那么IO完成处理程序(例如读取)是否可能被正在执行计算的线程无限期阻止运行?

根据我的评论给@sehe的回答:

我的目标是让插件通过异步写入(通过异步写入)为我的代码的服务器端异步发布工作,并让我的线程运行写入和读取完成处理程序,后者用于处理服务器端响应。我试图用两个线程模拟上述场景。一个在io-context的控制下(代表我的线程),另一个不在io-context的控制下(代表3D应用程序插件线程)。以下是我的代码:

#include <iostream>
#include <boost/asio.hpp>

boost::asio::io_context io_context_;
boost::asio::ip::tcp::socket socket_(io_context_);
boost::asio::streambuf receive_buffer_;

void read_completion_handler(const boost::system::error_code& ec, std::size_t bytes_transferred);
void write_completion_handler(const boost::system::error_code& ec, std::size_t bytes_transferred);

// 插件(不属于io-context的一部分)调用此函数
void sendNextMsg()
{
    io_context_.run();
    while(true)
    {
        std::string msg;
        std::cout << "msg > " << std::flush;
        std::cin >> msg;
        boost::asio::async_write(socket_, boost::asio::buffer(msg + "\n"), write_completion_handler);
    }
}

void write_completion_handler(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
    if(ec)
    {
        std::cerr << "send failed: " << ec.message() << std::endl;
        sendNextMsg();
    }
    else
    {
        std::cout << "Write succeeded" << std::endl;
        boost::asio::async_read_until(socket_, receive_buffer_, "\n", read_completion_handler);
    }
}

void read_completion_handler(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
    if(ec)
    {
        std::cerr << "receive failed: " << ec.message() << std::endl;
    }
    else
    {
        const char* data = boost::asio::buffer_cast<const char*>(receive_buffer_.data());
        std::cout << "Server response: " << data << std::endl;
        receive_buffer_.consume(bytes_transferred);
    }
    
    sendNextMsg();
}

int main()
{
    boost::system::error_code ec;
    socket_.connect(boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 7777), ec);
    
    if(ec)
    {
        std::cerr<< "[Connection error] " << ec.message() <<std::endl;
        exit(-1);
    }

    std::thread{sendNextMsg}.detach();

    io_context_.run();
    return 0;
}

// 结果:应用程序与服务器建立连接,但在调用`sendNextMsg`后终止。
英文:

I am exploring options for converting a multithreaded C++ application to an asynchronous style of networking, based on boost asio and completion handlers. However there are two sticking points:

  1. Can one thread make an async write but another thread run its completion handler? The first thread belongs to a framework (and therefore cannot be part of the async thread pool) but is the only thread that can create work. The framework thread also makes the async write to a socket, but it immediately returns to running framework code, and is therefore unavailable. The second thread is created by my code is thus available to run the completion handler. Does boost asio have a way for the framework thread to initiate asynchronous IO but another thread run the resulting completion handler?
  2. If the async model does not draw a distinction between compute and IO threads and threads aren't preemptible (unless we are using coroutines) isn't there a real possibility that an IO completion handler, e.g. read, can be indefinitely prevented from running by threads doing compute?

Following my comment to @sehe's answer:

My objective is to have the plug-in asynchronously post work for the server side of my code (via an async write) and but have my own thread run the write and read completion handers, the latter being invoked for server side responses. I am trying to model the above scenario with a with two threads. One under the control of the io-context (representing my own thread) and the other not under the control of the io-context (representing the 3d application plug-in thread). Here is my feeble attempt:

#include &lt;iostream&gt;
#include &lt;boost/asio.hpp&gt;
boost::asio::io_context io_context_;
boost::asio::ip::tcp::socket socket_(io_context_);
boost::asio::streambuf receive_buffer_;
void read_completion_handler(const boost::system::error_code&amp; ec, std::size_t bytes_transferred);
void write_completion_handler(const boost::system::error_code&amp; ec, std::size_t bytes_transferred);
// &quot;plug-in&quot; (not part of io-context) calls this
void sendNextMsg()
{
io_context_.run();
while(true)
{
std::string msg;
std::cout &lt;&lt; &quot;msg &gt; &quot; &lt;&lt; std::flush;
std::cin &gt;&gt; msg;
boost::asio::async_write(socket_, boost::asio::buffer(msg + &quot;\n&quot;), write_completion_handler);
}
}
void write_completion_handler(const boost::system::error_code&amp; ec, std::size_t bytes_transferred)
{
if(ec)
{
std::cerr &lt;&lt; &quot;send failed: &quot; &lt;&lt; ec.message() &lt;&lt; std::endl;
sendNextMsg();
}
else
{
std::cout &lt;&lt; &quot;Write succeeded&quot; &lt;&lt; std::endl;
boost::asio::async_read_until(socket_, receive_buffer_, &quot;\n&quot;, read_completion_handler);
}
}
void read_completion_handler(const boost::system::error_code&amp; ec, std::size_t bytes_transferred)
{
if(ec)
{
std::cerr &lt;&lt; &quot;receive failed: &quot; &lt;&lt; ec.message() &lt;&lt; std::endl;
}
else
{
const char* data = boost::asio::buffer_cast&lt;const char*&gt;(receive_buffer_.data());
std::cout &lt;&lt; &quot;Server response: &quot; &lt;&lt; data &lt;&lt; std::endl;
receive_buffer_.consume(bytes_transferred);
}
sendNextMsg();
}
int main()
{
boost::system::error_code ec;
socket_.connect(boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(&quot;127.0.0.1&quot;), 7777), ec);
if(ec)
{
std::cerr&lt;&lt; &quot;[Connection error] &quot; &lt;&lt; ec.message() &lt;&lt;std::endl;
exit(-1);
}
std::thread{sendNextMsg}.detach();
io_context_.run();
return 0;
}

Outcome: application establishes a connection with the server but terminates after calling sendNextMsg.

答案1

得分: 1

Sure, here is the translated content:

Q. 一个线程能异步写入,但另一个线程运行它的完成处理程序吗?

可以。Asio提供了无需线程的并发

你有控制权。你可以选择多少线程进行IO或发布任务。你可以选择使用多少执行上下文。你可以选择多少线程服务这些执行上下文。你可以选择处理程序可能在哪里运行。

在处理所有涉及线程的应用程序中,你可能需要保护共享状态免受并发访问。你可以选择是否将其应用于你的应用程序,以及如何确保它(队列、strands、经典线程同步原语等)。

一个最简单的例子是,一个线程发布一个操作,处理程序在其他地方运行。假设

asio::io_context ioc;

在线演示

std::cout << "从 #" << thread_id << " 发布" << std::endl;
post(ioc, []{ std::cout << "来自 #" << thread_id << " 的消息" << std::endl; });
std::thread([&ioc] { ioc.run(); }).join();

会输出在线演示

从 #0 发布
来自 #1 的消息

类似地,

asio::steady_timer tim(ioc, 1s);
std::cout << "等待来自 #" << thread_id << " 的消息" << std::endl;
tim.async_wait([](error_code ec){ std::cout << ec.message() << " 来自 #" << thread_id << std::endl; });

输出在线演示

等待来自 #0 的消息
成功 来自 #1

更有趣的是:

asio::io_context ioc;
asio::thread_pool pool(10);

在这里

auto handler = [](auto name, error_code ec) {
sleep_for(1s);
println("处理程序名称:", name, " (错误码:", ec.message(), ")";
};
asio::steady_timer tim(ioc, 1s);
tim.async_wait(bind(handler, "IO", _1));
tim.async_wait(bind_executor(pool, bind(handler, "Pool", _1)));
ioc.run(); // 注意,运行在主线程上!

将会输出在线演示

     0ms 线程 #0:	主线程进入
2000ms 线程 #0:	处理程序名称: IO (错误码: Success)
2000ms 线程 #0:	IO完成,等待线程池
3001ms 线程 #1:	处理程序名称: Pool (错误码: Success)
3001ms 线程 #0:	主线程退出

在线程池中添加(可选地序列化的)任务:

auto task = [](auto name) {
return [=] { sleep_for(1s); println("任务: ", name, " 完成"); };
};
for (auto ex = pool.get_executor(); std::string i : {"1", "2", "3", "4", "5"})
asio::post(bind_executor(ex, task("任务 " + i)));
for (auto strand = make_strand(pool); std::string i : {"6", "7", "8", "9", "10"})
asio::post(ioc, bind_executor(strand, task("串行 " + i)));

输出在线演示

     0ms 线程 #0:	主线程进入
1000ms 线程 #1:	任务: 任务 3 完成
1000ms 线程 #2:	任务: 任务 4 完成
1000ms 线程 #3:	任务: 串行 6 完成
1000ms 线程 #4:	任务: 任务 5 完成
1000ms 线程 #5:	任务: 任务 1 完成
1000ms 线程 #6:	任务: 任务 2 完成
2000ms 线程 #0:	处理程序名称: IO (错误码: Success)
2000ms 线程 #0:	IO完成,等待线程池
2001ms 线程 #3:	任务: 串行 7 完成
3001ms 线程 #7:	处理程序名称: Pool (错误码: Success)
3001ms 线程 #3:	任务: 串行 8 完成
4001ms 线程 #3:	任务: 串行 9 完成
5001ms 线程 #3:	任务: 串行 10 完成
5002ms 线程 #0:	主线程退出

> 注意 尽管“串行”任务在线程池上被序列化了,但它们不能保证在特定线程上运行。在这种情况下,由于Asio优化strand执行的方式,它们恰好在特定线程上运行。

> ### Q. 第一个线程属于一个框架(因此不能成为异步线程池的一部分),但是它是唯一能够创建工作的线程。

这甚至不一定准确。例如,你可以从框架线程定期调用poll_one()。此外,你可以从自己的线程中asio::thread_pool::attach

但这些可能与你的应用程序无关。

> *框架线程还可以向套接字进行异步写

英文:

> ### Q. Can one thread make an async write but another thread run its completion handler?

Yes. Asio is affords concurrency without threads.

You are in control. You choose how many threads do IO or post tasks. You choose how many execution contexts you use. You choose how many threads service these execution contexts. You choose where handlers may run.

As in all applications that deal with threads you may need to protect shared state from concurrent access. You choose whether that applies to your application, and how to ensure it (queues, strands, classic thread synchronization primitives etc).

Simplest of examples where one thread posts an operation, and the handler runs elsewhere. Assume

asio::io_context ioc;

Live On Coliru

std::cout &lt;&lt; &quot;Posting from #&quot; &lt;&lt; thread_id &lt;&lt; std::endl;
post(ioc, []{ std::cout &lt;&lt; &quot;Hello from #&quot; &lt;&lt; thread_id &lt;&lt; std::endl; });
std::thread([&amp;ioc] { ioc.run(); }).join();

Prints Live

Posting from #0
Hello from #1

Similarly,

asio::steady_timer tim(ioc, 1s);
std::cout &lt;&lt; &quot;Waiting from #&quot; &lt;&lt; thread_id &lt;&lt; std::endl;
tim.async_wait([](error_code ec){ std::cout &lt;&lt; ec.message() &lt;&lt; &quot; from #&quot; &lt;&lt; thread_id &lt;&lt; std::endl; });

Prints Live

Waiting from #0
Success from #1

Somewhat more interestingly:

asio::io_context ioc;
asio::thread_pool pool(10);

Here

auto handler = [](auto name, error_code ec) {
sleep_for(1s);
println(&quot;handler name:&quot;, name, &quot; (ec:&quot;, ec.message(), &quot;)&quot;);
};
asio::steady_timer tim(ioc, 1s);
tim.async_wait(bind(handler, &quot;IO&quot;, _1));
tim.async_wait(bind_executor(pool, bind(handler, &quot;Pool&quot;, _1)));
ioc.run(); // Note, on main thread!

Would print Live

     0ms thread #0:	Main enter
2000ms thread #0:	handler name:IO (ec:Success)
2000ms thread #0:	IO done, waiting for pool
3001ms thread #1:	handler name:Pool (ec:Success)
3001ms thread #0:	Main exit

Adding (optionally serialized) tasks on the pool:

auto task = [](auto name) {
return [=] { sleep_for(1s); println(&quot;task: &quot;, name, &quot; done&quot;); };
};
for (auto ex = pool.get_executor(); std::string i : {&quot;1&quot;, &quot;2&quot;, &quot;3&quot;, &quot;4&quot;, &quot;5&quot;})
asio::post(bind_executor(ex, task(&quot;Task &quot; + i)));
for (auto strand = make_strand(pool); std::string i : {&quot;6&quot;, &quot;7&quot;, &quot;8&quot;, &quot;9&quot;, &quot;10&quot;})
asio::post(ioc, bind_executor(strand, task(&quot;Serial &quot; + i)));

Prints Live:

     0ms thread #0:	Main enter
1000ms thread #1:	task: Task 3 done
1000ms thread #2:	task: Task 4 done
1000ms thread #3:	task: Serial 6 done
1000ms thread #4:	task: Task 5 done
1000ms thread #5:	task: Task 1 done
1000ms thread #6:	task: Task 2 done
2000ms thread #0:	handler name:IO (ec:Success)
2000ms thread #0:	IO done, waiting for pool
2001ms thread #3:	task: Serial 7 done
3001ms thread #7:	handler name:Pool (ec:Success)
3001ms thread #3:	task: Serial 8 done
4001ms thread #3:	task: Serial 9 done
5001ms thread #3:	task: Serial 10 done
5002ms thread #0:	Main exit

> Note Although the "Serial" tasks are serialized on the pool, they are not guaranteed to run on a specific thread. In this case they happen to, because of how Asio optimizes strand execution if possible.

> ### Q. The first thread belongs to a framework (and therefore cannot be part of the async thread pool) but is the only thread that can create work.

This is not even necessarily accurate. E.g. you can invoke poll_one() regurlarly from a framework thread. Also, you can asio::thread_pool::attach from your own threads.

But these may not be relevant to your application.

> The framework thread also makes the async write to a socket, but it
> immediately returns to running framework code, and is therefore
> unavailable. The second thread is created by my code is thus available
> to run the completion handler. Does boost asio have a way for the
> framework thread to initiate asynchronous IO but another thread run
> the resulting completion handler?

There are many ways. For one, you could pass futures to the framework, which can await them where it requires the result, e.g.

 std::future&lt;size_t&gt; n = asio::async_read(s, buf, asio::use_future);
// ... later:
framework::handle_result(app_state, n.get());

> Note here n.get() may throw system_error on error

You can also post any packaged task like that:

 std::future&lt;int&gt; answer = post(ioc, std::packaged_task&lt;int()&gt; {
sleep_for(100ms);
return 42;
});

Secondly, drawing from the above, you could

  1. instantiate a "framework-side" execution context that you will not run blocking continuously, but instead poll()/poll_one() regularly.
  2. post your completion to this execution context

> Note if you bind the executor to the handler at initiation, composed operations are required to run all intermediate handlers there. If this is not what you want/require, consider indirecting like e.g.

  asio::async_read(s, buf, [app_state](error_code ec, size_t n) {
asio::dispatch(framework_executor, [=] {
framework::handle_result(app_state, ec, n); 
}
});

> ### Q. If the async model does not draw a distinction between compute and IO threads and threads aren't preemptible (unless we are using coroutines) isn't there a real possibility that an IO completion handler, e.g. read, can be indefinitely prevented from running by threads doing compute?

Yes. This is solved by separating threads (like in the examples I already showed) and/or by (priority) queuing tasks so that you can throttle them.

Listing For Reference

Anti bit-rot the combined listing of the initial examples above:

Live On Coliru

#include &lt;boost/asio.hpp&gt;
#include &lt;iomanip&gt;
#include &lt;iostream&gt;
namespace asio = boost::asio;
using namespace std::chrono_literals;
using namespace std::placeholders;
using boost::system::error_code;
namespace { // fancy tracing for illustration
using std::this_thread::sleep_for;
static int       next_thread_id = 0;
thread_local int thread_id      = next_thread_id++;
constexpr auto    now   = std::chrono::steady_clock::now;
static auto const start = now();
static std::mutex console_mx;
static void println(auto const&amp;... a) {
std::lock_guard lk(console_mx);
std::cout &lt;&lt; std::setw(10) &lt;&lt; (now() - start) / 1ms &lt;&lt; &quot;ms thread #&quot; &lt;&lt; thread_id &lt;&lt; &quot;:\t&quot;;
(std::cout &lt;&lt; ... &lt;&lt; a) &lt;&lt; std::endl;
};
} // namespace
int main() {
println(&quot;Main enter&quot;);
asio::io_context ioc;
asio::thread_pool pool(10);
auto handler = [](auto name, error_code ec) {
sleep_for(1s);
println(&quot;handler name:&quot;, name, &quot; (ec:&quot;, ec.message(), &quot;)&quot;);
};
asio::steady_timer tim(ioc, 1s);
tim.async_wait(bind(handler, &quot;IO&quot;, _1));
tim.async_wait(bind_executor(pool, bind(handler, &quot;Pool&quot;, _1)));
auto task = [](auto name) {
return [=] { sleep_for(1s); println(&quot;task: &quot;, name, &quot; done&quot;); };
};
for (auto ex = pool.get_executor(); std::string i : {&quot;1&quot;, &quot;2&quot;, &quot;3&quot;, &quot;4&quot;, &quot;5&quot;})
asio::post(bind_executor(ex, task(&quot;Task &quot; + i)));
for (auto strand = make_strand(pool); std::string i : {&quot;6&quot;, &quot;7&quot;, &quot;8&quot;, &quot;9&quot;, &quot;10&quot;})
asio::post(ioc, bind_executor(strand, task(&quot;Serial &quot; + i)));
ioc.run(); // Note, on main thread!
println(&quot;IO done, waiting for pool&quot;);
pool.join();
println(&quot;Main exit&quot;);
}

huangapple
  • 本文由 发表于 2023年4月20日 07:56:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/76059605.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定