如何避免并发回调到用户定义的例程?

huangapple go评论63阅读模式
英文:

How to avoid concurrent callbacks to user defined routine?

问题

以下是翻译后的内容:

我正试图修改一些 Boost 代码,以使其与 Autoit 兼容。原始项目可以在此处找到。我的版本可以在这里找到。我需要一些帮助来确定如何防止多个并发的回调进入用户提供的 Autoit 程序。

以下是现有的 on_read 回调函数代码 --

/// 由 async_read 注册的回调函数。它调用用户注册的回调函数来实际处理数据。然后再次发起 async_read 以等待服务器发送的数据。
/// \param ec 错误代码实例
/// \param bytes_transferred
void
on_read(
        beast::error_code ec,
        std::size_t bytes_transferred) {
    if(EnableVerbose)
    {
        boost::lock_guard<boost::mutex> guard(mtx_);
        std::wcout << L"<WsDll-" ARCH_LABEL "> in on read" << std::endl;
    }		
    boost::ignore_unused(bytes_transferred);

    {
        boost::lock_guard<boost::mutex> guard(mtx_);
        if(!Is_Connected) {
            return;
        }

    }

    // 出现错误
    if (ec) {
        if(on_fail_cb)
            on_fail_cb(L"read");
        return fail(ec, L"read&quot);
    }

    const std::string data = beast::buffers_to_string(buffer_.data());
    const std::wstring wdata(data.begin(), data.end());
    if(EnableVerbose)
    {
        boost::lock_guard<boost::mutex> guard(mtx_);
        std::wcout << L"<WsDll-" ARCH_LABEL "> received[" << bytes_transferred << L"] " << wdata << std::endl;
    }		

// 下一部分是我遇到问题的地方

    if (on_data_cb)
        on_data_cb(wdata.c_str(), wdata.length());

    buffer_.consume(buffer_.size());

    if(EnableVerbose)
    {
        boost::lock_guard<boost::mutex> guard(mtx_);
        std::wcout << L"<WsDll-" ARCH_LABEL "> issue new async_read in on_read" << std::endl;
    }		
    ws_.async_read(
            buffer_,
            beast::bind_front_handler(
                    &session::on_read,
                    shared_from_this()));

    // 关闭 WebSocket 连接
    // ws_.async_close(websocket::close_code::normal,
    //     beast::bind_front_handler(
    //         &session::on_close,
    //         shared_from_this()));
}

代码 if (on_data_cb) on_data_cb(wdata.c_str(), wdata.length()); 执行了回调到 Autoit,我需要知道如何防止它在同一时间内执行多次。我不太精通 C++ / Boost,所以请多包涵。;-)

英文:

I am attempting to modify some Boost code so that it is compatible with Autoit. The original project can be found here. My version can be found here. I could use some help in determining how to prevent multiple concurrent callbacks into the user supplied Autoit routine.

Here is the existing on_read callback --

/// Callback registered by async_read. It calls user registered callback to actually process the data. And then issue another async_read to wait for data from server again.
/// \param ec instance of error code
/// \param bytes_transferred
void
on_read(
beast::error_code ec,
std::size_t bytes_transferred) {
if(EnableVerbose)
{
boost::lock_guard<boost::mutex> guard(mtx_);
std::wcout << L"<WsDll-" ARCH_LABEL "> in on read" << std::endl;
}		
boost::ignore_unused(bytes_transferred);
{
boost::lock_guard<boost::mutex> guard(mtx_);
if(!Is_Connected) {
return;
}
}
// error occurs
if (ec) {
if(on_fail_cb)
on_fail_cb(L"read");
return fail(ec, L"read");
}
const std::string data = beast::buffers_to_string(buffer_.data());
const std::wstring wdata(data.begin(), data.end());
if(EnableVerbose)
{
boost::lock_guard<boost::mutex> guard(mtx_);
std::wcout << L"<WsDll-" ARCH_LABEL "> received[" << bytes_transferred << L"] " << wdata << std::endl;
}		
//  The next section is where my issue resides
if (on_data_cb)
on_data_cb(wdata.c_str(), wdata.length());
buffer_.consume(buffer_.size());
if(EnableVerbose)
{
boost::lock_guard<boost::mutex> guard(mtx_);
std::wcout << L"<WsDll-" ARCH_LABEL "> issue new async_read in on_read" << std::endl;
}		
ws_.async_read(
buffer_,
beast::bind_front_handler(
&session::on_read,
shared_from_this()));
// Close the WebSocket connection
// ws_.async_close(websocket::close_code::normal,
//     beast::bind_front_handler(
//         &session::on_close,
//         shared_from_this()));
}

The code if (on_data_cb) on_data_cb(wdata.c_str(), wdata.length()); executes the callback into Autoit, and I need to know how I can prevent this from executing more than once at a time. I am not well versed in C++ / Boost, so please be gentle. 如何避免并发回调到用户定义的例程?

答案1

得分: 0

以下是要翻译的代码部分:

"The gentle answer would be to point to the documentation: Strands: Use Threads Without Explicit Locking

In reality you don't show enough code. For example, we have no way of knowing

  • what execution context is being used. If you're using a io_context with a single service thread run()-ing it, you already have the implicit strand and a guarantee that no handlers ever run simultaneously
  • what executor the IO object(s) bind to. In your code, the only object visible is ws_ which we'll assume for to be something like
    net::io_context ctx_;
    websocket::streamtcp::socket ws_{ctx_};
    Now, in case you want to have multiple threads servicing ctx_ you could bind the ws_ to a strand executor instead:
    websocket::streamtcp::socket ws_{make_strand(ctx_)};
    Now, as long as you make sure your own accesses (e.g. async_ initiations) are on the proper strand, your code is already safe. If you want - and you don't mind hardcoding the executor type, you can assert this:
    auto strand = ws_.get_executor().target<net::strandnet::io_context::executor_type>();
    assert(strand && strand->running_in_this_thread());

Pro tip:

If you really commit to a particular executor type, consider statically binding that type:
using Context = net::io_context::executor_type;
using Executor = net::io_context::executor_type;
using Strand = net::strandnet::io_context::executor_type;
using Socket = net::basic_stream_socket<tcp, Strand>;

Context ctx_;
websocket::stream ws_{make_strand(ctx_)};

This avoids the overhead of type-erased executors, and you can
simplify the assert:
assert(ws_.get_executor().running_in_this_thread());

Side Notes

Demo

Obligatory "live" code:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using net::ip::tcp;

static std::mutex s_consoleMtx;

static void fail(beast::error_code ec, std::string txt) {
std::cerr << txt << ": " << ec.message() << " at " << ec.location() << std::endl;
}

#define ARCH_LABEL "STACKO"
struct session : std::enable_shared_from_this {
using Context = net::io_context::executor_type;
using Executor = net::io_context::executor_type;
using Strand = net::strandnet::io_context::executor_type;
using Socket = net::basic_stream_socket<tcp, Strand>;

Context                   ctx_;
websocket::stream<Socket> ws_{make_strand(ctx_)};
static bool const  EnableVerbose = true;
std::atomic_bool   Is_Connected  = false;
beast::flat_buffer buffer_;
std::function<void(std::string)>         on_fail_cb;
std::function<void(char const*, size_t)> on_data_cb;
/// Callback registered by async_read. It calls user registered
/// callback to actually process the data. And then issue another
/// async_read to wait for data from server again. 
/// \param ec instance of error code 
/// \param bytes_transferred
void on_read(beast::error_code ec, [[maybe_unused]] size_t bytes_transferred) {
if (EnableVerbose) {
std::lock_guard<std::mutex> guard(s_consoleMtx);
std::cout << "<WsDll-" ARCH_LABEL "> in on read" << std::endl;
}
if (!Is_Connected)
return;
// error occurs
if (ec) {
if (on_fail_cb)
on_fail_cb("read");
return fail(ec, "read");
}
std::string const data = beast::buffers_to_string(buffer_.data());
if (EnableVerbose) {
std::lock_guard<std::mutex> guard(s_consoleMtx);
std::cout << "<WsDll-" ARCH_LABEL "> received[" << bytes_transferred << "] " << data << std::endl;
}
if (on_data_cb)
on_data_cb(data.c_str(), data.length());
buffer_.consume(buffer_.size());
if (EnableVerbose) {
std::lock_guard<std::mutex> guard(s_consoleMtx);
std::cout << "<WsDll-" ARCH_LABEL "> issue new async_read in on_read" << std::endl;
}
assert(ws_.get_executor().running_in_this_thread());
ws_.async_read(buffer_, beast::bind_front_handler(&session::on_read, shared_from_this()));
}

};"

希望这有所帮助。

英文:

The gentle answer would be to point to the documentation: Strands: Use Threads Without Explicit Locking

In reality you don't show enough code. For example, we have no way of knowing

  • what execution context is being used. If you're using a io_context with a single service thread run()-ing it, you already have the implicit strand and a guarantee that no handlers ever run simultaneously

  • what executor the IO object(s) bind to. In your code, the only object visible is ws_ which we'll assume for to be something like

     net::io_context                ctx_;
    websocket::stream&lt;tcp::socket&gt; ws_{ctx_};
    

    Now, in case you want to have multiple threads servicing ctx_ you could bind the ws_ to a strand executor instead:

     websocket::stream&lt;tcp::socket&gt; ws_{make_strand(ctx_)};
    

    Now, as long as you make sure your own accesses (e.g. async_ initiations) are on the proper strand, your code is already safe. If you want - and you don't mind hardcoding the executor type, you can assert this:

    auto strand = ws_.get_executor().target<net::strand<net::io_context::executor_type>>();
    assert(strand && strand->running_in_this_thread());

> ### Pro tip:
>
> If you really commit to a particular executor type, consider statically binding that type:
>
> using Context = net::io_context::executor_type;
> using Executor = net::io_context::executor_type;
> using Strand = net::strand<net::io_context::executor_type>;
> using Socket = net::basic_stream_socket<tcp, Strand>;
>
> Context ctx_;
> websocket::stream<Socket> ws_{make_strand(ctx_)};
>
> This avoids the overhead of type-erased executors, and you can
> simplify the assert:
>
> assert(ws_.get_executor().running_in_this_thread());

Side Notes

Demo

Obligatory "live" code:

Live On Coliru

#include &lt;boost/asio.hpp&gt;
#include &lt;boost/beast.hpp&gt;
#include &lt;iostream&gt;
namespace net       = boost::asio;
namespace beast     = boost::beast;
namespace websocket = beast::websocket;
using net::ip::tcp;
static std::mutex s_consoleMtx;
static void fail(beast::error_code ec, std::string txt) {
std::cerr &lt;&lt; txt &lt;&lt; &quot;: &quot; &lt;&lt; ec.message() &lt;&lt; &quot; at &quot; &lt;&lt; ec.location() &lt;&lt; std::endl;
}
#define ARCH_LABEL &quot;STACKO&quot;
struct session : std::enable_shared_from_this&lt;session&gt; {
using Context  = net::io_context::executor_type;
using Executor = net::io_context::executor_type;
using Strand   = net::strand&lt;net::io_context::executor_type&gt;;
using Socket   = net::basic_stream_socket&lt;tcp, Strand&gt;;
Context                   ctx_;
websocket::stream&lt;Socket&gt; ws_{make_strand(ctx_)};
static bool const  EnableVerbose = true;
std::atomic_bool   Is_Connected  = false;
beast::flat_buffer buffer_;
std::function&lt;void(std::string)&gt;         on_fail_cb;
std::function&lt;void(char const*, size_t)&gt; on_data_cb;
/// Callback registered by async_read. It calls user registered
/// callback to actually process the data. And then issue another
/// async_read to wait for data from server again. 
/// \param ec instance of error code 
/// \param bytes_transferred
void on_read(beast::error_code ec, [[maybe_unused]] size_t bytes_transferred) {
if (EnableVerbose) {
std::lock_guard&lt;std::mutex&gt; guard(s_consoleMtx);
std::cout &lt;&lt; &quot;&lt;WsDll-&quot; ARCH_LABEL &quot;&gt; in on read&quot; &lt;&lt; std::endl;
}
if (!Is_Connected)
return;
// error occurs
if (ec) {
if (on_fail_cb)
on_fail_cb(&quot;read&quot;);
return fail(ec, &quot;read&quot;);
}
std::string const data = beast::buffers_to_string(buffer_.data());
if (EnableVerbose) {
std::lock_guard&lt;std::mutex&gt; guard(s_consoleMtx);
std::cout &lt;&lt; &quot;&lt;WsDll-&quot; ARCH_LABEL &quot;&gt; received[&quot; &lt;&lt; bytes_transferred &lt;&lt; &quot;] &quot; &lt;&lt; data &lt;&lt; std::endl;
}
if (on_data_cb)
on_data_cb(data.c_str(), data.length());
buffer_.consume(buffer_.size());
if (EnableVerbose) {
std::lock_guard&lt;std::mutex&gt; guard(s_consoleMtx);
std::cout &lt;&lt; &quot;&lt;WsDll-&quot; ARCH_LABEL &quot;&gt; issue new async_read in on_read&quot; &lt;&lt; std::endl;
}
assert(ws_.get_executor().running_in_this_thread());
ws_.async_read(buffer_, beast::bind_front_handler(&amp;session::on_read, shared_from_this()));
}
};

huangapple
  • 本文由 发表于 2023年4月11日 01:31:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/75979289.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定