Boost.ASIO如何在C++20协程中使用strands。

huangapple go评论68阅读模式
英文:

Boost.ASIO how to use strands with c++20 coroutines

问题

考虑以下代码:
```cpp
#include <boost/asio.hpp>
#include <chrono>
#include <iostream>

namespace io = boost::asio;

class test {
public:
    test(io::any_io_executor e) : exe{ std::move(e) } {}

    io::awaitable<void> delay(size_t sec) {
        io::steady_timer t{ exe };
        t.expires_after(std::chrono::seconds{ sec });
        co_await t.async_wait(io::use_awaitable);
    }

    io::awaitable<void> print_and_delay(int num) {
        std::cout << num << '\n';
        co_await delay(1);
    }

    io::awaitable<void> d_print() {
        co_await print_and_delay(1);
        co_await print_and_delay(2);
        co_await print_and_delay(3);
    }

    void start() {
        for (size_t i = 0; i != 3; ++i) {
            io::co_spawn(exe, d_print(), io::detached);
        }
    }

protected:
    io::any_io_executor exe;
};

int main() {
    io::io_context ctx;
    test t{ ctx.get_executor() };
    t.start();
    ctx.run();
    return 0;
}

输出:

1
1
1
(延迟1)
2
2
2
(延迟1)
3
3
3
(延迟1)

现在我想要使d_print的执行串行:每次只运行一个d_print协程。

期望输出:

1
2
3
1
2
3
1
2
3

每行都延迟1秒。

我可以通过使用strand来实现这个目标吗?

我想做的是在三个print_and_delay周围添加类似于synchorized(st) {的东西。当执行器尝试运行以下协程时,阻止它们,直到正在运行的协程完成执行。


<details>
<summary>英文:</summary>

Consider the following code:
```cpp
#include &lt;boost/asio.hpp&gt;
#include &lt;chrono&gt;
#include &lt;iostream&gt;

namespace io = boost::asio;

class test {
public:
    test(io::any_io_executor e) : exe{ std::move(e) } {}

    io::awaitable&lt;void&gt; delay(size_t sec) {
        io::steady_timer t{ exe };
        t.expires_after(std::chrono::seconds{ sec });
        co_await t.async_wait(io::use_awaitable);
    }

    io::awaitable&lt;void&gt; print_and_delay(int num) {
        std::cout &lt;&lt; num &lt;&lt; &#39;\n&#39;;
        co_await delay(1);
    }

    io::awaitable&lt;void&gt; d_print() {
        co_await print_and_delay(1);
        co_await print_and_delay(2);
        co_await print_and_delay(3);
    }

    void start() {
        for (size_t i = 0; i != 3; ++i) {
            io::co_spawn(exe, d_print(), io::detached);
        }
    }

protected:
    io::any_io_executor exe;
};

int main() {
    io::io_context ctx;
    test t{ ctx.get_executor() };
    t.start();
    ctx.run();
    return 0;
}

Output:

1
1
1
(delay 1 second)
2
2
2
(delay 1 second)
3
3
3
(delay 1 second)

Now I want to make the execution of d_print serial: only one d_print coroutine runs at once.

Expected output:

1
2
3
1
2
3
1
2
3

With each line delayed for 1 second.

Can I achieve this through the usage of strands?

What I want to do is adding something like synchorized(st) { around the three print_and_delays. When the executor tries to run the following coroutines,block them until the running coroutine finishes executing.

答案1

得分: 2

Strands 已经保护通过它发布的代码免受并发执行的影响。

行为是正确的,添加假定的 "synchronized" 关键字也不会改变它。

你所追求的不是对并发/重叠执行的保护 - 因为它已经在工作中。相反,你想要在前一个停止之前避免发布新的协程(d_print)实例。

解决方案

这个问题的经典解决方案是排队。

> 在全双工 IO 中也会出现类似的需求,需要将输出消息排队以避免交错写入。

Strands 仍然有用于避免对队列的非同步访问:

在 Coliru 上实时查看

#include &lt;boost/asio.hpp&gt;
#include &lt;chrono&gt;
#include &lt;deque&gt;
#include &lt;iostream&gt;

namespace asio = boost::asio;

class test {
  public:
    test(asio::any_io_executor e) : strand_{make_strand(std::move(e))} {}

    static asio::awaitable&lt;void&gt; delay(size_t sec) {
        co_await asio::steady_timer{co_await asio::this_coro::executor, std::chrono::seconds{sec}} //
            .async_wait(asio::use_awaitable);
    }

    asio::awaitable&lt;void&gt; print_and_delay(int num) {
        std::cout &lt;&lt; num &lt;&lt; &#39;\n&#39;;
        co_await delay(1);
    }

    asio::awaitable&lt;void&gt; d_print() {
        co_await print_and_delay(1);
        co_await print_and_delay(2);
        co_await print_and_delay(3);
    }

    void start() {
        post(strand_, std::bind(&amp;test::do_start, this));
    }

  private:
    asio::strand&lt;asio::any_io_executor&gt; strand_;

    std::deque&lt;asio::awaitable&lt;void&gt; &gt; queue_;

    asio::awaitable&lt;void&gt; drain_queue() { // 在 strand 上运行
        while (!queue_.empty()) {
            co_await std::move(queue_.front());
            queue_.pop_front();
        }
    }

    void do_start() { // 在 strand 上运行
        bool not_running = queue_.empty();

        for (size_t i = 0; i != 3; ++i)
            queue_.push_back(d_print());

        if (not_running)
            co_spawn(strand_, std::bind(&amp;test::drain_queue, this), asio::detached);
    }
};

int main() {
    asio::io_context ctx;

    test t{ctx.get_executor()};
    t.start();

    ctx.run();
}

Boost.ASIO如何在C++20协程中使用strands。

其他想法

你还可以使用实验性的 Asio Channels 功能,这可以简化线程安全性并提供更多的灵活性。

英文:

Strands already protect the code posted through it against concurrent execution.

The behaviour is correct, and adding the putative "synchronized" keyword would not change it either.

What you are after is not protection against concurrent/overlapping execution - because it is already working. Instead you want to avoid posting the new coroutine (d_print) instance before the previous stopped.

Solution

The classical solution to this is queuing.

> Similar requirements come up with e.g. full-duplex IO where outgoing messages have to be queued to avoid interleaving writes.

Strands are still useful to avoid unsynchronized access to the queue:

Live On Coliru

#include &lt;boost/asio.hpp&gt;
#include &lt;chrono&gt;
#include &lt;deque&gt;
#include &lt;iostream&gt;
namespace asio = boost::asio;
class test {
public:
test(asio::any_io_executor e) : strand_{make_strand(std::move(e))} {}
static asio::awaitable&lt;void&gt; delay(size_t sec) {
co_await asio::steady_timer{co_await asio::this_coro::executor, std::chrono::seconds{sec}} //
.async_wait(asio::use_awaitable);
}
asio::awaitable&lt;void&gt; print_and_delay(int num) {
std::cout &lt;&lt; num &lt;&lt; &#39;\n&#39;;
co_await delay(1);
}
asio::awaitable&lt;void&gt; d_print() {
co_await print_and_delay(1);
co_await print_and_delay(2);
co_await print_and_delay(3);
}
void start() {
post(strand_, std::bind(&amp;test::do_start, this));
}
private:
asio::strand&lt;asio::any_io_executor&gt; strand_;
std::deque&lt;asio::awaitable&lt;void&gt; &gt; queue_;
asio::awaitable&lt;void&gt; drain_queue() { // runs on strand
while (!queue_.empty()) {
co_await std::move(queue_.front());
queue_.pop_front();
}
}
void do_start() { // runs on strand
bool not_running = queue_.empty();
for (size_t i = 0; i != 3; ++i)
queue_.push_back(d_print());
if (not_running)
co_spawn(strand_, std::bind(&amp;test::drain_queue, this), asio::detached);
}
};
int main() {
asio::io_context ctx;
test t{ctx.get_executor()};
t.start();
ctx.run();
}

Boost.ASIO如何在C++20协程中使用strands。

Other Thoughts

You can also use the experimental Asio Channels feature, which could simplify thread-safety and give more flexibility.

huangapple
  • 本文由 发表于 2023年5月28日 20:13:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/76351432.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定