awaitable operator|| 在计时器到期时不返回

huangapple go评论76阅读模式
英文:

asio: awaitable operator|| don't return when timer expires

问题

以下是您提供的代码的翻译部分:

从一个较大的代码库中提取的附加代码具有我无法解释的行为。

问题出现在“run”函数中,我希望等待由async_initiate返回的可等待完成的最长时间

async_initiate启动的异步操作从不完成(在此示例中是因为我从未调用处理程序,在实际程序中是因为它正在等待网络数据包),但是即使计时器到期,协程也会停在co_await上

asio版本是与boost 1.81.0一起提供的

// 此函数仅存在以保持io_context繁忙
asio::awaitable<void> busy() {
    auto exec = co_await asio::this_coro::executor;

    using asio::ip::udp;
    auto socket = udp::socket(exec, udp::endpoint(udp::v4(), 40000));

    uint8_t msg[1024];
    std::cout << "reading from socket...\n";
    co_await socket.async_receive(asio::buffer(msg), asio::use_awaitable);
}

std::optional<asio::any_completion_handler<void(int)>> stored;

asio::awaitable<void> run() {
    std::cout << "run() called\n";

    auto exec = co_await asio::this_coro::executor;
    asio::steady_timer timer{exec, std::chrono::seconds(2)};

    auto initiate = [&]([[maybe_unused]] asio::any_completion_handler<void(int)> handler) {
        // 故意不调用处理程序
        //
        // 仅出于此示例的缘故,将其移至“stored”以排除处理程序析构的任何副作用
        stored = std::move(handler);
    };

    co_await (asio::async_initiate<const asio::use_awaitable_t<>, void(int)>(initiate, asio::use_awaitable)
              || timer.async_wait(asio::use_awaitable));
    std::cout << "done\n";
}

int main() {
    asio::io_context io;

    asio::co_spawn(io, busy, asio::detached);
    asio::co_spawn(io, run, asio::detached);
    io.run();
}

请注意,我已经将HTML实体(如<和>)转换回了正常的C++代码标记。如果您需要更多的帮助或有其他问题,请随时提出。

英文:

The attached code, extracted from a larger codebase, has behavior that I cannot
explain.

The problem is in the run function where I would like to wait up to a maximum
time that the awaitable returned by async_initiate completes.

The async operation initiated by async_initiate never completes (in this
example because I never call the handler, in the real program because it is
waiting for a network packet), but the coroutine is stuck on the co_await even
if the timer expires.

The asio version is the one shipped with boost 1.81.0

#include &lt;boost/asio/any_completion_handler.hpp&gt;
#include &lt;boost/asio/any_io_executor.hpp&gt;
#include &lt;boost/asio/async_result.hpp&gt;
#include &lt;boost/asio/co_spawn.hpp&gt;
#include &lt;boost/asio/detached.hpp&gt;
#include &lt;boost/asio/experimental/awaitable_operators.hpp&gt;
#include &lt;boost/asio/io_context.hpp&gt;
#include &lt;boost/asio/ip/udp.hpp&gt;
#include &lt;boost/asio/steady_timer.hpp&gt;
#include &lt;boost/asio/use_awaitable.hpp&gt;
#include &lt;iostream&gt;

namespace asio = boost::asio;
using namespace asio::experimental::awaitable_operators;

// this function exists only to keep the io_context busy
asio::awaitable&lt;void&gt; busy() {
    auto exec = co_await asio::this_coro::executor;

    using asio::ip::udp;
    auto socket = udp::socket(exec, udp::endpoint(udp::v4(), 40000));

    uint8_t msg[1024];
    std::cout &lt;&lt; &quot;reading from socket...\n&quot;;
    co_await socket.async_receive(asio::buffer(msg), asio::use_awaitable);
}

std::optional&lt;asio::any_completion_handler&lt;void(int)&gt;&gt; stored;

asio::awaitable&lt;void&gt; run() {
    std::cout &lt;&lt; &quot;run() called\n&quot;;

    auto exec = co_await asio::this_coro::executor;
    asio::steady_timer timer{exec, std::chrono::seconds(2)};

    auto initiate = [&amp;]([[maybe_unused]] asio::any_completion_handler&lt;void(int)&gt; handler) {
        // don&#39;t call the handler on purpose
        //
        // move it on `stored` only for the sake of this example, to rule out
        // any side-effect of the handler destructor
        stored = std::move(handler);
    };

    co_await (asio::async_initiate&lt;const asio::use_awaitable_t&lt;&gt;, void(int)&gt;(initiate, asio::use_awaitable)
              || timer.async_wait(asio::use_awaitable));
    std::cout &lt;&lt; &quot;done\n&quot;;
}

int main() {
    asio::io_context io;

    asio::co_spawn(io, busy, asio::detached);
    asio::co_spawn(io, run, asio::detached);
    io.run();
}

I know that operator|| is waiting for "a success" but this should not be the cause of the problem because the time completes (or should complete) without an error

答案1

得分: 4

以下是您请求的内容的翻译:

async_initiate启动的异步操作永远不会完成(在此示例中,因为我从未调用处理程序,

如果不完成,您也无法观察到它以operation_aborted完成。

在实际程序中,因为它正在等待网络数据包,但是协程却在co_await上卡住,即使定时器已经过期。

让我们来测试一下:

在Coliru上查看

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>

namespace asio = boost::asio;
using namespace asio::experimental::awaitable_operators;
using namespace std::chrono_literals;

asio::awaitable<void> busy() {
    auto exec = co_await asio::this_coro::executor;

    using asio::ip::udp;

    try {
        while (true) {
            auto socket = udp::socket(exec, udp::endpoint(udp::v4(), 40000));

            uint8_t msg[1024];
            std::cout << "reading from socket..." << std::endl;
            auto n = co_await socket.async_receive(asio::buffer(msg), asio::use_awaitable);

            std::cout << "received " << n << " bytes" << std::endl;
        }
    } catch (boost::system::system_error const& se) {
        auto const& ec = se.code();
        std::cout << "receive failed: " << ec.message() << " at " << ec.location() << std::endl;
    }
}

asio::awaitable<void> run() {
    std::cout << "run() called\n";

    auto exec = co_await asio::this_coro::executor;
    asio::steady_timer timer{exec, 2s};

    co_await (busy() || timer.async_wait(asio::use_awaitable));
    std::cout << "done\n";
}

int main() {
    asio::io_context io;

    asio::co_spawn(io, run, asio::detached);
    io.run();
}

awaitable operator|| 在计时器到期时不返回

与广告一样运作。如果您喜欢避免异常处理,您可以使用as_tupleredirect_error

asio::awaitable<void> busy() {
    auto exec = co_await asio::this_coro::executor;

    for (error_code ec; !ec.failed();) {
        auto socket = asio::ip::udp::socket{exec, {{}, 40000}};

        uint8_t msg[1024];
        std::cout << "reading from socket..." << std::endl;
        auto n = co_await socket.async_receive(asio::buffer(msg), redirect_error(asio::use_awaitable, ec));

        std::cout << "received " << n << " bytes (" << ec.message() << ")" << std::endl;
    }
}

您自己的异步初始化?

您自己的IO对象/操作可能不支持取消。如果您想要,可以在cancellation_slot上实现它。让我们还展示如何使用工作保护来保持执行上下文的活动:

template <typename Token>
auto async_stuff(Token token) {
    auto initiate = [](auto handler) {
        auto cs   = asio::get_associated_cancellation_slot(handler);
        auto work = make_work_guard(asio::get_associated_executor(handler));

        cs.assign([work, h = std::move(handler)](asio::cancellation_type type) mutable {
            using ct = asio::cancellation_type;
            if (ct::none != (type & ct::terminal))
                std::cout << "cancellation_type terminal" << std::endl;
            if (ct::none != (type & ct::partial))
                std::cout << "cancellation_type partial" << std::endl;
            if (ct::none != (type & ct::total))
                std::cout << "cancellation_type total" << std::endl;

            work.reset();
            std::move(h)(asio::error::operation_aborted, -1);
        });
    };

    return asio::async_initiate<Token, void(error_code, int)>(initiate, token);
}

您可以像这样绑定您的取消槽:

在这里查看

int main() {
    asio::thread_pool io(1);

    asio::cancellation_signal cancel;
    auto token = asio::bind_cancellation_slot(cancel.slot(), asio::use_awaitable);
    asio::co_spawn(io, async_stuff(token), asio::detached);

    using namespace std::chrono_literals;
    std::this_thread::sleep_for(2s);
    cancel.emit(asio::cancellation_type::terminal);

    io.join();
}

实验性操作符使用parallel_group,它在底层使用取消槽。

英文:

> The async operation initiated by async_initiate never completes (in this example because I never call the handler,

If you don't complete, you can't witness that it completes with operation_aborted either.

> in the real program because it is waiting for a network packet), but the coroutine is stuck on the co_await even if the timer expires.

Let's put that to the test:

Live On Coliru

#include &lt;boost/asio.hpp&gt;
#include &lt;boost/asio/experimental/awaitable_operators.hpp&gt;
#include &lt;iostream&gt;
namespace asio = boost::asio;
using namespace asio::experimental::awaitable_operators;
using namespace std::chrono_literals;
asio::awaitable&lt;void&gt; busy() {
auto exec = co_await asio::this_coro::executor;
using asio::ip::udp;
try {
while (true) {
auto socket = udp::socket(exec, udp::endpoint(udp::v4(), 40000));
uint8_t msg[1024];
std::cout &lt;&lt; &quot;reading from socket...&quot; &lt;&lt; std::endl;
auto n = co_await socket.async_receive(asio::buffer(msg), asio::use_awaitable);
std::cout &lt;&lt; &quot;received &quot; &lt;&lt; n &lt;&lt; &quot; bytes&quot; &lt;&lt; std::endl;
}
} catch (boost::system::system_error const&amp; se) {
auto const&amp; ec = se.code();
std::cout &lt;&lt; &quot;receive failed: &quot; &lt;&lt; ec.message() &lt;&lt; &quot; at &quot; &lt;&lt; ec.location() &lt;&lt; std::endl;
}
}
asio::awaitable&lt;void&gt; run() {
std::cout &lt;&lt; &quot;run() called\n&quot;;
auto exec = co_await asio::this_coro::executor;
asio::steady_timer timer{exec, 2s};
co_await (busy() || timer.async_wait(asio::use_awaitable));
std::cout &lt;&lt; &quot;done\n&quot;;
}
int main() {
asio::io_context io;
asio::co_spawn(io, run, asio::detached);
io.run();
}

awaitable operator|| 在计时器到期时不返回

Works as advertised. If you prefer to avoid exception handling, you can use as_tuple or redirect_error:

asio::awaitable&lt;void&gt; busy() {
auto exec = co_await asio::this_coro::executor;
for (error_code ec; !ec.failed();) {
auto socket = asio::ip::udp::socket{exec, {{}, 40&#39;000}};
uint8_t msg[1024];
std::cout &lt;&lt; &quot;reading from socket...&quot; &lt;&lt; std::endl;
auto n = co_await socket.async_receive(asio::buffer(msg), redirect_error(asio::use_awaitable, ec));
std::cout &lt;&lt; &quot;received &quot; &lt;&lt; n &lt;&lt; &quot; bytes (&quot; &lt;&lt; ec.message() &lt;&lt; &quot;)&quot; &lt;&lt; std::endl;
}
}

Your Own Async Initiatiation?

Your own IO objects/operations may not support cancellation. If you want, implement is on cancellation_slot. Let's also show how to use a work guard to keep the execution context alive:

template &lt;typename Token&gt;
auto async_stuff(Token token) {
auto initiate = [](auto handler) {
auto cs   = asio::get_associated_cancellation_slot(handler);
auto work = make_work_guard(asio::get_associated_executor(handler));
cs.assign([work, h = std::move(handler)](asio::cancellation_type type) mutable {
using ct = asio::cancellation_type;
if (ct::none != (type &amp; ct::terminal))
std::cout &lt;&lt; &quot;cancellation_type terminal&quot; &lt;&lt; std::endl;
if (ct::none != (type &amp; ct::partial))
std::cout &lt;&lt; &quot;cancellation_type partial&quot; &lt;&lt; std::endl;
if (ct::none != (type &amp; ct::total))
std::cout &lt;&lt; &quot;cancellation_type total&quot; &lt;&lt; std::endl;
work.reset();
std::move(h)(asio::error::operation_aborted, -1);
});
};
return asio::async_initiate&lt;Token, void(error_code, int)&gt;(initiate, token);
}

You can bind your cancellation slot like this:

Live

int main() {
asio::thread_pool io(1);
asio::cancellation_signal cancel;
auto token = asio::bind_cancellation_slot(cancel.slot(), asio::use_awaitable);
asio::co_spawn(io, async_stuff(token), asio::detached);
using namespace std::chrono_literals;
std::this_thread::sleep_for(2s);
cancel.emit(asio::cancellation_type::terminal);
io.join();
}

The experimental operators use parallel_group which use cancellation slots under the hood.

huangapple
  • 本文由 发表于 2023年4月13日 19:00:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/76004637.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定