英文:
How to avoid concurrent callbacks to user defined routine?
问题
以下是翻译后的内容:
我正试图修改一些 Boost 代码,以使其与 Autoit 兼容。原始项目可以在此处找到。我的版本可以在这里找到。我需要一些帮助来确定如何防止多个并发的回调进入用户提供的 Autoit 程序。
以下是现有的 on_read 回调函数代码 --
/// 由 async_read 注册的回调函数。它调用用户注册的回调函数来实际处理数据。然后再次发起 async_read 以等待服务器发送的数据。
/// \param ec 错误代码实例
/// \param bytes_transferred
void
on_read(
beast::error_code ec,
std::size_t bytes_transferred) {
if(EnableVerbose)
{
boost::lock_guard<boost::mutex> guard(mtx_);
std::wcout << L"<WsDll-" ARCH_LABEL "> in on read" << std::endl;
}
boost::ignore_unused(bytes_transferred);
{
boost::lock_guard<boost::mutex> guard(mtx_);
if(!Is_Connected) {
return;
}
}
// 出现错误
if (ec) {
if(on_fail_cb)
on_fail_cb(L"read");
return fail(ec, L"read");
}
const std::string data = beast::buffers_to_string(buffer_.data());
const std::wstring wdata(data.begin(), data.end());
if(EnableVerbose)
{
boost::lock_guard<boost::mutex> guard(mtx_);
std::wcout << L"<WsDll-" ARCH_LABEL "> received[" << bytes_transferred << L"] " << wdata << std::endl;
}
// 下一部分是我遇到问题的地方
if (on_data_cb)
on_data_cb(wdata.c_str(), wdata.length());
buffer_.consume(buffer_.size());
if(EnableVerbose)
{
boost::lock_guard<boost::mutex> guard(mtx_);
std::wcout << L"<WsDll-" ARCH_LABEL "> issue new async_read in on_read" << std::endl;
}
ws_.async_read(
buffer_,
beast::bind_front_handler(
&session::on_read,
shared_from_this()));
// 关闭 WebSocket 连接
// ws_.async_close(websocket::close_code::normal,
// beast::bind_front_handler(
// &session::on_close,
// shared_from_this()));
}
代码 if (on_data_cb) on_data_cb(wdata.c_str(), wdata.length());
执行了回调到 Autoit,我需要知道如何防止它在同一时间内执行多次。我不太精通 C++ / Boost,所以请多包涵。;-)
英文:
I am attempting to modify some Boost code so that it is compatible with Autoit. The original project can be found here. My version can be found here. I could use some help in determining how to prevent multiple concurrent callbacks into the user supplied Autoit routine.
Here is the existing on_read callback --
/// Callback registered by async_read. It calls user registered callback to actually process the data. And then issue another async_read to wait for data from server again.
/// \param ec instance of error code
/// \param bytes_transferred
void
on_read(
beast::error_code ec,
std::size_t bytes_transferred) {
if(EnableVerbose)
{
boost::lock_guard<boost::mutex> guard(mtx_);
std::wcout << L"<WsDll-" ARCH_LABEL "> in on read" << std::endl;
}
boost::ignore_unused(bytes_transferred);
{
boost::lock_guard<boost::mutex> guard(mtx_);
if(!Is_Connected) {
return;
}
}
// error occurs
if (ec) {
if(on_fail_cb)
on_fail_cb(L"read");
return fail(ec, L"read");
}
const std::string data = beast::buffers_to_string(buffer_.data());
const std::wstring wdata(data.begin(), data.end());
if(EnableVerbose)
{
boost::lock_guard<boost::mutex> guard(mtx_);
std::wcout << L"<WsDll-" ARCH_LABEL "> received[" << bytes_transferred << L"] " << wdata << std::endl;
}
// The next section is where my issue resides
if (on_data_cb)
on_data_cb(wdata.c_str(), wdata.length());
buffer_.consume(buffer_.size());
if(EnableVerbose)
{
boost::lock_guard<boost::mutex> guard(mtx_);
std::wcout << L"<WsDll-" ARCH_LABEL "> issue new async_read in on_read" << std::endl;
}
ws_.async_read(
buffer_,
beast::bind_front_handler(
&session::on_read,
shared_from_this()));
// Close the WebSocket connection
// ws_.async_close(websocket::close_code::normal,
// beast::bind_front_handler(
// &session::on_close,
// shared_from_this()));
}
The code if (on_data_cb) on_data_cb(wdata.c_str(), wdata.length());
executes the callback into Autoit, and I need to know how I can prevent this from executing more than once at a time. I am not well versed in C++ / Boost, so please be gentle.
答案1
得分: 0
以下是要翻译的代码部分:
"The gentle answer would be to point to the documentation: Strands: Use Threads Without Explicit Locking
In reality you don't show enough code. For example, we have no way of knowing
- what execution context is being used. If you're using a
io_context
with a single service threadrun()
-ing it, you already have the implicit strand and a guarantee that no handlers ever run simultaneously - what executor the IO object(s) bind to. In your code, the only object visible is
ws_
which we'll assume for to be something like
net::io_context ctx_;
websocket::streamtcp::socket ws_{ctx_};
Now, in case you want to have multiple threads servicingctx_
you could bind thews_
to a strand executor instead:
websocket::streamtcp::socket ws_{make_strand(ctx_)};
Now, as long as you make sure your own accesses (e.g. async_ initiations) are on the proper strand, your code is already safe. If you want - and you don't mind hardcoding the executor type, you can assert this:
auto strand = ws_.get_executor().target<net::strandnet::io_context::executor_type>();
assert(strand && strand->running_in_this_thread());
Pro tip:
If you really commit to a particular executor type, consider statically binding that type:
using Context = net::io_context::executor_type;
using Executor = net::io_context::executor_type;
using Strand = net::strandnet::io_context::executor_type;
using Socket = net::basic_stream_socket<tcp, Strand>;
Context ctx_;
websocket::stream
This avoids the overhead of type-erased executors, and you can
simplify the assert:
assert(ws_.get_executor().running_in_this_thread());
Side Notes
- Instead of locking a full mutex to inspect a boolean, use
atomic_bool
. - Use standard library features (std::mutex, std::lock_guard) to make easier better
- Consider building for the ANSI build, or check your widening approaches:
const std::wstring wdata(data.begin(), data.end());
is an antipattern. Look here https://en.cppreference.com/w/cpp/string/multibyte or here for more information https://en.cppreference.com/w/cpp/locale/ctype/widen
Demo
Obligatory "live" code:
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using net::ip::tcp;
static std::mutex s_consoleMtx;
static void fail(beast::error_code ec, std::string txt) {
std::cerr << txt << ": " << ec.message() << " at " << ec.location() << std::endl;
}
#define ARCH_LABEL "STACKO"
struct session : std::enable_shared_from_this
using Context = net::io_context::executor_type;
using Executor = net::io_context::executor_type;
using Strand = net::strandnet::io_context::executor_type;
using Socket = net::basic_stream_socket<tcp, Strand>;
Context ctx_;
websocket::stream<Socket> ws_{make_strand(ctx_)};
static bool const EnableVerbose = true;
std::atomic_bool Is_Connected = false;
beast::flat_buffer buffer_;
std::function<void(std::string)> on_fail_cb;
std::function<void(char const*, size_t)> on_data_cb;
/// Callback registered by async_read. It calls user registered
/// callback to actually process the data. And then issue another
/// async_read to wait for data from server again.
/// \param ec instance of error code
/// \param bytes_transferred
void on_read(beast::error_code ec, [[maybe_unused]] size_t bytes_transferred) {
if (EnableVerbose) {
std::lock_guard<std::mutex> guard(s_consoleMtx);
std::cout << "<WsDll-" ARCH_LABEL "> in on read" << std::endl;
}
if (!Is_Connected)
return;
// error occurs
if (ec) {
if (on_fail_cb)
on_fail_cb("read");
return fail(ec, "read");
}
std::string const data = beast::buffers_to_string(buffer_.data());
if (EnableVerbose) {
std::lock_guard<std::mutex> guard(s_consoleMtx);
std::cout << "<WsDll-" ARCH_LABEL "> received[" << bytes_transferred << "] " << data << std::endl;
}
if (on_data_cb)
on_data_cb(data.c_str(), data.length());
buffer_.consume(buffer_.size());
if (EnableVerbose) {
std::lock_guard<std::mutex> guard(s_consoleMtx);
std::cout << "<WsDll-" ARCH_LABEL "> issue new async_read in on_read" << std::endl;
}
assert(ws_.get_executor().running_in_this_thread());
ws_.async_read(buffer_, beast::bind_front_handler(&session::on_read, shared_from_this()));
}
};"
希望这有所帮助。
英文:
The gentle answer would be to point to the documentation: Strands: Use Threads Without Explicit Locking
In reality you don't show enough code. For example, we have no way of knowing
-
what execution context is being used. If you're using a
io_context
with a single service threadrun()
-ing it, you already have the implicit strand and a guarantee that no handlers ever run simultaneously -
what executor the IO object(s) bind to. In your code, the only object visible is
ws_
which we'll assume for to be something likenet::io_context ctx_; websocket::stream<tcp::socket> ws_{ctx_};
Now, in case you want to have multiple threads servicing
ctx_
you could bind thews_
to a strand executor instead:websocket::stream<tcp::socket> ws_{make_strand(ctx_)};
Now, as long as you make sure your own accesses (e.g. async_ initiations) are on the proper strand, your code is already safe. If you want - and you don't mind hardcoding the executor type, you can assert this:
auto strand = ws_.get_executor().target<net::strand<net::io_context::executor_type>>();
assert(strand && strand->running_in_this_thread());
> ### Pro tip:
>
> If you really commit to a particular executor type, consider statically binding that type:
>
> using Context = net::io_context::executor_type;
> using Executor = net::io_context::executor_type;
> using Strand = net::strand<net::io_context::executor_type>;
> using Socket = net::basic_stream_socket<tcp, Strand>;
>
> Context ctx_;
> websocket::stream<Socket> ws_{make_strand(ctx_)};
>
> This avoids the overhead of type-erased executors, and you can
> simplify the assert:
>
> assert(ws_.get_executor().running_in_this_thread());
Side Notes
-
Instead of locking a full mutex to inspect a boolean, use
atomic_bool
. -
Use standard library features (
std::mutex
,std::lock_guard
) to make easier better -
Consider building for the ANSI build, or check your widening approaches:
const std::wstring wdata(data.begin(), data.end());
is an antipattern. Look here https://en.cppreference.com/w/cpp/string/multibyte or here for more information https://en.cppreference.com/w/cpp/locale/ctype/widen
Demo
Obligatory "live" code:
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <iostream>
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using net::ip::tcp;
static std::mutex s_consoleMtx;
static void fail(beast::error_code ec, std::string txt) {
std::cerr << txt << ": " << ec.message() << " at " << ec.location() << std::endl;
}
#define ARCH_LABEL "STACKO"
struct session : std::enable_shared_from_this<session> {
using Context = net::io_context::executor_type;
using Executor = net::io_context::executor_type;
using Strand = net::strand<net::io_context::executor_type>;
using Socket = net::basic_stream_socket<tcp, Strand>;
Context ctx_;
websocket::stream<Socket> ws_{make_strand(ctx_)};
static bool const EnableVerbose = true;
std::atomic_bool Is_Connected = false;
beast::flat_buffer buffer_;
std::function<void(std::string)> on_fail_cb;
std::function<void(char const*, size_t)> on_data_cb;
/// Callback registered by async_read. It calls user registered
/// callback to actually process the data. And then issue another
/// async_read to wait for data from server again.
/// \param ec instance of error code
/// \param bytes_transferred
void on_read(beast::error_code ec, [[maybe_unused]] size_t bytes_transferred) {
if (EnableVerbose) {
std::lock_guard<std::mutex> guard(s_consoleMtx);
std::cout << "<WsDll-" ARCH_LABEL "> in on read" << std::endl;
}
if (!Is_Connected)
return;
// error occurs
if (ec) {
if (on_fail_cb)
on_fail_cb("read");
return fail(ec, "read");
}
std::string const data = beast::buffers_to_string(buffer_.data());
if (EnableVerbose) {
std::lock_guard<std::mutex> guard(s_consoleMtx);
std::cout << "<WsDll-" ARCH_LABEL "> received[" << bytes_transferred << "] " << data << std::endl;
}
if (on_data_cb)
on_data_cb(data.c_str(), data.length());
buffer_.consume(buffer_.size());
if (EnableVerbose) {
std::lock_guard<std::mutex> guard(s_consoleMtx);
std::cout << "<WsDll-" ARCH_LABEL "> issue new async_read in on_read" << std::endl;
}
assert(ws_.get_executor().running_in_this_thread());
ws_.async_read(buffer_, beast::bind_front_handler(&session::on_read, shared_from_this()));
}
};
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论