英文:
Boost.ASIO how to use strands with c++20 coroutines
问题
考虑以下代码:
```cpp
#include <boost/asio.hpp>
#include <chrono>
#include <iostream>
namespace io = boost::asio;
class test {
public:
test(io::any_io_executor e) : exe{ std::move(e) } {}
io::awaitable<void> delay(size_t sec) {
io::steady_timer t{ exe };
t.expires_after(std::chrono::seconds{ sec });
co_await t.async_wait(io::use_awaitable);
}
io::awaitable<void> print_and_delay(int num) {
std::cout << num << '\n';
co_await delay(1);
}
io::awaitable<void> d_print() {
co_await print_and_delay(1);
co_await print_and_delay(2);
co_await print_and_delay(3);
}
void start() {
for (size_t i = 0; i != 3; ++i) {
io::co_spawn(exe, d_print(), io::detached);
}
}
protected:
io::any_io_executor exe;
};
int main() {
io::io_context ctx;
test t{ ctx.get_executor() };
t.start();
ctx.run();
return 0;
}
输出:
1
1
1
(延迟1秒)
2
2
2
(延迟1秒)
3
3
3
(延迟1秒)
现在我想要使d_print
的执行串行:每次只运行一个d_print
协程。
期望输出:
1
2
3
1
2
3
1
2
3
每行都延迟1秒。
我可以通过使用strand
来实现这个目标吗?
我想做的是在三个print_and_delay
周围添加类似于synchorized(st) {
的东西。当执行器尝试运行以下协程时,阻止它们,直到正在运行的协程完成执行。
<details>
<summary>英文:</summary>
Consider the following code:
```cpp
#include <boost/asio.hpp>
#include <chrono>
#include <iostream>
namespace io = boost::asio;
class test {
public:
test(io::any_io_executor e) : exe{ std::move(e) } {}
io::awaitable<void> delay(size_t sec) {
io::steady_timer t{ exe };
t.expires_after(std::chrono::seconds{ sec });
co_await t.async_wait(io::use_awaitable);
}
io::awaitable<void> print_and_delay(int num) {
std::cout << num << '\n';
co_await delay(1);
}
io::awaitable<void> d_print() {
co_await print_and_delay(1);
co_await print_and_delay(2);
co_await print_and_delay(3);
}
void start() {
for (size_t i = 0; i != 3; ++i) {
io::co_spawn(exe, d_print(), io::detached);
}
}
protected:
io::any_io_executor exe;
};
int main() {
io::io_context ctx;
test t{ ctx.get_executor() };
t.start();
ctx.run();
return 0;
}
Output:
1
1
1
(delay 1 second)
2
2
2
(delay 1 second)
3
3
3
(delay 1 second)
Now I want to make the execution of d_print
serial: only one d_print
coroutine runs at once.
Expected output:
1
2
3
1
2
3
1
2
3
With each line delayed for 1 second.
Can I achieve this through the usage of strand
s?
What I want to do is adding something like synchorized(st) {
around the three print_and_delay
s. When the executor tries to run the following coroutines,block them until the running coroutine finishes executing.
答案1
得分: 2
Strands 已经保护通过它发布的代码免受并发执行的影响。
行为是正确的,添加假定的 "synchronized" 关键字也不会改变它。
你所追求的不是对并发/重叠执行的保护 - 因为它已经在工作中。相反,你想要在前一个停止之前避免发布新的协程(d_print
)实例。
解决方案
这个问题的经典解决方案是排队。
> 在全双工 IO 中也会出现类似的需求,需要将输出消息排队以避免交错写入。
Strands 仍然有用于避免对队列的非同步访问:
#include <boost/asio.hpp>
#include <chrono>
#include <deque>
#include <iostream>
namespace asio = boost::asio;
class test {
public:
test(asio::any_io_executor e) : strand_{make_strand(std::move(e))} {}
static asio::awaitable<void> delay(size_t sec) {
co_await asio::steady_timer{co_await asio::this_coro::executor, std::chrono::seconds{sec}} //
.async_wait(asio::use_awaitable);
}
asio::awaitable<void> print_and_delay(int num) {
std::cout << num << '\n';
co_await delay(1);
}
asio::awaitable<void> d_print() {
co_await print_and_delay(1);
co_await print_and_delay(2);
co_await print_and_delay(3);
}
void start() {
post(strand_, std::bind(&test::do_start, this));
}
private:
asio::strand<asio::any_io_executor> strand_;
std::deque<asio::awaitable<void> > queue_;
asio::awaitable<void> drain_queue() { // 在 strand 上运行
while (!queue_.empty()) {
co_await std::move(queue_.front());
queue_.pop_front();
}
}
void do_start() { // 在 strand 上运行
bool not_running = queue_.empty();
for (size_t i = 0; i != 3; ++i)
queue_.push_back(d_print());
if (not_running)
co_spawn(strand_, std::bind(&test::drain_queue, this), asio::detached);
}
};
int main() {
asio::io_context ctx;
test t{ctx.get_executor()};
t.start();
ctx.run();
}
其他想法
你还可以使用实验性的 Asio Channels 功能,这可以简化线程安全性并提供更多的灵活性。
英文:
Strands already protect the code posted through it against concurrent execution.
The behaviour is correct, and adding the putative "synchronized" keyword would not change it either.
What you are after is not protection against concurrent/overlapping execution - because it is already working. Instead you want to avoid posting the new coroutine (d_print
) instance before the previous stopped.
Solution
The classical solution to this is queuing.
> Similar requirements come up with e.g. full-duplex IO where outgoing messages have to be queued to avoid interleaving writes.
Strands are still useful to avoid unsynchronized access to the queue:
#include <boost/asio.hpp>
#include <chrono>
#include <deque>
#include <iostream>
namespace asio = boost::asio;
class test {
public:
test(asio::any_io_executor e) : strand_{make_strand(std::move(e))} {}
static asio::awaitable<void> delay(size_t sec) {
co_await asio::steady_timer{co_await asio::this_coro::executor, std::chrono::seconds{sec}} //
.async_wait(asio::use_awaitable);
}
asio::awaitable<void> print_and_delay(int num) {
std::cout << num << '\n';
co_await delay(1);
}
asio::awaitable<void> d_print() {
co_await print_and_delay(1);
co_await print_and_delay(2);
co_await print_and_delay(3);
}
void start() {
post(strand_, std::bind(&test::do_start, this));
}
private:
asio::strand<asio::any_io_executor> strand_;
std::deque<asio::awaitable<void> > queue_;
asio::awaitable<void> drain_queue() { // runs on strand
while (!queue_.empty()) {
co_await std::move(queue_.front());
queue_.pop_front();
}
}
void do_start() { // runs on strand
bool not_running = queue_.empty();
for (size_t i = 0; i != 3; ++i)
queue_.push_back(d_print());
if (not_running)
co_spawn(strand_, std::bind(&test::drain_queue, this), asio::detached);
}
};
int main() {
asio::io_context ctx;
test t{ctx.get_executor()};
t.start();
ctx.run();
}
Other Thoughts
You can also use the experimental Asio Channels feature, which could simplify thread-safety and give more flexibility.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论