英文:
Boost asio. Client udp efficiency
问题
1.1 This multi-threaded implementation appears suitable for sending data at a frequency of ~100 Hz. It uses separate strands (m_send_strand
and m_rcv_strand
) to ensure thread safety when handling send and receive operations, respectively. The use of async operations in Boost Asio also allows for efficient handling of I/O without blocking threads. However, the overall efficiency depends on factors like the hardware and network conditions.
1.2 A single-threaded client, as in the example you provided, can also be efficient for many use cases. In a single-threaded client, both reading and writing operations are handled in a single thread, which can simplify the code but may limit scalability on multi-core systems. The choice between a multi-threaded and single-threaded client depends on your specific requirements and performance goals.
-
The transfer of data between tasks using
std::move
withunique_ptr
(std::move(pData)
) is correct and efficient. It allows ownership of theDataBuffer
to be transferred from one task to another without making unnecessary copies, which is beneficial for performance. -
The number of threads allocated to the client for processing reads and writes depends on various factors, including the available CPU cores and the specific requirements of your application. In your code, you have created three threads to run the
asio::io_context
. The number of threads can be adjusted based on the desired level of parallelism and the performance characteristics of your system. -
For a TCP client, the same principles apply regarding multi-threading. You can design a multi-threaded TCP client in a similar way as your UDP client, using strands and async operations to handle concurrency. The number of threads allocated can be adjusted based on the specific needs of the TCP client.
Keep in mind that the optimal number of threads and the overall design of your network client can vary based on factors such as the amount of data being transferred, the network conditions, and the available hardware resources. Performance testing and profiling can help fine-tune your implementation for your specific use case.
英文:
I have implemented a udp session using a multi-threaded environment.
using RawDataArray=std::array <unsigned char,65000>;
class StaticBuffer
{
private:
RawDataArray m_data;
std::size_t m_n_avail;
public:
StaticBuffer():m_data(),m_n_avail(0){}
StaticBuffer(std::size_t n_bytes){m_n_avail=n_bytes;}
StaticBuffer(const StaticBuffer& other)
{
std::cout<<"ctor cpy\n";
m_data=other.m_data;
m_n_avail=other.m_n_avail;
}
StaticBuffer(const StaticBuffer& other,std::size_t n_bytes)
{
std::cout<<"ctor cpy\n";
m_data=other.m_data;
m_n_avail=n_bytes;
}
StaticBuffer(const RawDataArray& data,std::size_t n_bytes)
{
std::cout<<"ctor static buff\n";
m_data=data;
m_n_avail=n_bytes;
}
void set_size(int n)
{
m_n_avail=n;
}
void set_max_size(){m_n_avail=m_data.size();}
std::size_t max_size()const {return m_data.size();}
unsigned char& operator[](std::size_t i){return m_data[i];}
const unsigned char& operator[] (std::size_t i)const{return m_data[i];}
StaticBuffer& operator=(const StaticBuffer& other)
{
if (this == &other)
return *this;
m_data = other.m_data;
m_n_avail = other.m_n_avail;
return *this;
}
void push_back(unsigned char val)
{
if (m_n_avail<m_data.size())
{
m_data[m_n_avail]=val;
}else
throw "Out of memory";
}
void reset(){m_n_avail=0;}
unsigned char* data(){return m_data.data();}
const unsigned char* data()const {return m_data.data();}
std::size_t size()const{return m_n_avail;}
~StaticBuffer(){}
};
class UDPSeassion;
using DataBuffer = StaticBuffer;
using DataBufferPtr=std::unique_ptr<DataBuffer>;
using ExternalReadHandler=std::function<void(DataBufferPtr)>;
class UDPSeassion:public std::enable_shared_from_this<UDPSeassion>
{
private:
asio::io_context& m_ctx;
asio::ip::udp::socket m_socket;
asio::ip::udp::endpoint m_endpoint;
std::string m_addr;
unsigned short m_port;
asio::io_context::strand m_send_strand;
std::deque<DataBufferPtr> m_dq_send;
asio::io_context::strand m_rcv_strand;
DataBufferPtr m_rcv_data;
ExternalReadHandler external_rcv_handler;
private:
void do_send_data_from_dq()
{
if (m_dq_send.empty())
return;
m_socket.async_send_to(
asio::buffer(m_dq_send.front()->data(),m_dq_send.front()->size()),
m_endpoint,
asio::bind_executor(m_send_strand,[this](const boost::system::error_code& er, std::size_t bytes_transferred){
if (!er)
{
m_dq_send.pop_front();
do_send_data_from_dq();
}else
{
//post to loggger
}
}));
}
void do_read(const boost::system::error_code& er, std::size_t bytes_transferred)
{
if (!er)
{
m_rcv_data->set_size(bytes_transferred);
asio::post(m_ctx,[this,data=std::move(m_rcv_data)]()mutable{ external_rcv_handler(std::move(data));});
m_rcv_data=std::make_unique<DataBuffer>();
m_rcv_data->set_max_size();
async_read();
}
}
public:
UDPSeassion(asio::io_context& ctx,const std::string& addr, unsigned short port):
m_ctx(ctx),
m_socket(ctx),
m_endpoint(asio::ip::address::from_string(addr),port),
m_addr(addr),
m_port(port),
m_send_strand(ctx),
m_dq_send(),
m_rcv_strand(ctx),
m_rcv_data(std::make_unique<DataBuffer>(65000))
{}
~UDPSeassion(){}
const std::string& get_host()const{return m_addr;}
unsigned short get_port(){return m_port;}
template<typename ExternalReadHandlerCallable>
void set_read_data_headnler(ExternalReadHandlerCallable&& handler)
{
external_rcv_handler=std::forward<ExternalReadHandlerCallable>(handler);
}
void start()
{
m_socket.open(asio::ip::udp::v4());
async_read();
}
void async_read()
{
m_socket.async_receive_from(
asio::buffer(m_rcv_data->data(),m_rcv_data->size()),
m_endpoint,
asio::bind_executor(m_rcv_strand,std::bind(&UDPSeassion::do_read,this,std::placeholders::_1,std::placeholders::_2) )
);
}
void async_send(DataBufferPtr pData)
{
asio::post(m_ctx,
asio::bind_executor(m_send_strand,[this,pDt=std::move(pData)]()mutable{
m_dq_send.emplace_back(std::move(pDt));
if (m_dq_send.size()==1)
do_send_data_from_dq();
}));
}
};
void handler_read(DataBufferPtr pdata)
{
// decoding raw_data -> decod_data
// lock mutext
// queue.push_back(decod_data)
// unlock mutext
//for view pdata
std::stringstream ss;
ss<<"thread handler: "<<std::this_thread::get_id()<<" "<<pdata->data()<<" "<<pdata->size()<<std::endl;
std::cout<<ss.str()<<std::endl;
}
int main()
{
asio::io_context ctx;
//auto work_guard = asio::make_work_guard(ctx);
std::cout<<"MAIN thread: "<<std::this_thread::get_id()<<std::endl;
StaticBuffer b{4};
b[0]='A';
b[1]='B';
b[2]='C';
b[4]='\n';
UDPSeassion client(ctx,"127.0.0.1",11223);
client.set_read_data_headnler(handler_read);
client.start();
std::vector<std::thread> threads;
for (int i=0;i<3;++i)
{
threads.emplace_back([&](){
std::stringstream ss;
ss<<"run thread: "<<std::this_thread::get_id()<<std::endl;
std::cout<<ss.str();
ctx.run();
std::cout<<"end thread\n";
}
);
}
client.async_send(std::make_unique<StaticBuffer>(b));
ctx.run();
for (auto& t:threads)
t.join();
return 1;
}
in the code above, the main emphasis is on the UDPSeasion class. Class StaticBuffer is written so that it performs the main functions. I have some questions:
- Suppose that this class will be built into a system that works with a frequency of ~ 100 Hz. Every 10ms, the system will send its state through the client.
1.1 Is it properly done for a multi-threaded environment? How efficient is this implementation?
1.2 How efficient is a client implementation that contains only one thread within itself that serves reading and writing? example - Is buffer transfer between tasks correct? (std::move(unique_ptr_data))
- In practice, how many threads are given to the client to process reads and writes?
- For TCP client?
I will be very grateful for detailed answers to my questions.Thank you very much))
答案1
得分: 2
我将原文中的代码部分翻译如下:
- 你“使用” `enable_shared_from_this`,但没有异步操作捕获 `shared_from_this`。实际上,你甚至没有分配 `UDPSession` 共享,所以在任何情况下使用 `shared_from_this` 都会导致 [Undefined Behaviour]()。
- 隐含了空操作的析构函数。如果必须声明它们,就将它们设为 `= default`。
- `m_rcv_strand` 已被弃用,使用 `strand<>` 代替。
- 为什么有一个单独的 strand 用于发送/接收?当然,一个读操作允许与一个写操作重叠,但是仍然不能在没有适当同步的情况下访问共享对象(如 `m_socket`)。
- 你确实有 strands,但似乎错误地没有在相关位置将它们发布(例如 `post(m_ctx, bind_executor(m_send_strand, ....))` 冲突)。
- 你有一个繁琐的缓冲类型,似乎旨在避免分配,但最终仍然将其包装在 `unique_ptr` 中 ¯\\_(ツ)_/¯。
- `set_read_data_handler` 不需要是一个模板。既然你最终要擦除到 `std::function`,那么与其使用模板,不如只使用:
void set_read_data_handler(ExternalReadHandler handler) {
external_rcv_handler = std::move(handler);
}
- 你有重复的魔法常数(例如 `65000`)。
- 你似乎缺少 socket 的 `bind()` 调用
总之,我会用一些明智的东西替换缓冲区:
```cpp
using StaticBuffer = boost::container::static_vector<uint8_t, 65000>;
static_assert(StaticBuffer::static_capacity == 65000);
> 由于你似乎期望文本协议,平均消息可能(更)小,因此我认为只使用 std::string
或甚至 boost::container::small_vector<...>
可能会更快。
虽然不是必需的,但为了实现优雅、符合 asio 标准的使用:
using asio::buffer;
static inline auto buffer(StaticBuffer const& b) { return boost::asio::buffer(b.data(), b.size()); }
static inline auto buffer(StaticBuffer& b) { return boost::asio::buffer(b.data(), b.size()); }
查看一个大大简化的版本 在 Coliru 上实时查看
#include <boost/asio.hpp>
#include <boost/container/static_vector.hpp>
#include <deque>
#include <iomanip>
#include <iostream>
#include <list>
#include <thread>
namespace { // 用户友好的日志记录
static std::mutex s_console_mx;
static std::atomic_int t_id_gen = 0;
thread_local int t_id = ++t_id_gen;
template <typename... T> static inline void trace(T const&... args) {
std::lock_guard lk(s_console_mx);
((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
}
} // 命名空间
namespace asio = boost::asio;
using asio::ip::udp;
using StaticBuffer = boost::container::static_vector<uint8_t, 65000>;
static_assert(StaticBuffer::static_capacity == 65000);
// 不是必需的,但为了实现优雅的、符合 asio 标准的使用:
using asio::buffer;
static inline auto buffer(StaticBuffer const& b) { return boost::asio::buffer(b.data(), b.size()); }
static inline auto buffer(StaticBuffer& b) { return boost::asio::buffer(b.data(), b.size()); }
using ExternalReadHandler = std::function<void(StaticBuffer&&)>;
class UDPSession {
private:
using error_code = boost::system::error_code;
asio::any_io_executor m_ex;
std::string m_addr;
uint16_t m_port;
udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
udp::socket m_socket{make_strand(m_ex)};
std::deque<StaticBuffer> m_dq_send;
StaticBuffer m_rcv_data;
ExternalReadHandler external_rcv_handler;
public:
UDPSession(asio::any_io_executor ex, std::string const& addr, uint16_t port)
: m_ex(ex)
, m_addr(addr)
, m_port(port) {}
std::string const& get_host() const { return m_addr; }
uint16_t get_port() { return m_port; }
void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }
void start() {
m_socket.open(udp::v4());
m_socket.bind(m_endpoint);
do_read();
}
void send(StaticBuffer data) {
asio::post(m_socket.get_executor(), [this, d = std::move(data)]() mutable {
m_dq_send.emplace_back(std::move(d));
if (m_dq_send.size() == 1)
send_loop();
});
}
private:
void do_read() {
m_rcv_data.assign(m_rcv_data.static_capacity, ''\\0'');
m_socket.async_receive_from(
buffer(m_rcv_data), m_endpoint,
std::bind(&UDPSession::on_read, this, std::placeholders::_1, std::placeholders::_2));
}
void on_read(error_code er, size_t bytes_transferred) {
if (!er) {
m_rcv_data.resize(bytes_transferred);
asio::post(m_ex, [this, data = std::move(m_rcv_data)]() mutable {
external_rcv_handler(std::move(data));
});
<details>
<summary>英文:</summary>
I'd simplify a ton.
- you "use" `enable_shared_from_this` but none of the asynchronous operations capture `shared_from_this`. In fact, you don't even allocate `UDPSession` shared, so it would be [Undefined Behaviour]() to use `shared_from_this` at all.
- no-op destructors are implied. If you must declare them, `= default` them
- the `m_rcv_strand` is deprecated - use `strand<>` instead
- why is there a separate strand for send/receive? Sure, 1 read operation is allowed to overlap 1 write operations, but you still cannot access the shared objects (like `m_socket`) without proper synchronization
- you **have** strands but seem to erronously not post to them where relevant (e.g. `post(m_ctx, bind_executor(m_send_strand, ....))` is conflicting)
- you have a laborious buffer type that /seemingly/ aims to avoid allocation, but you wrap it in a unique_ptr anyways ¯\\_(ツ)_/¯
- `set_read_data_handler` doesn't need to be a template. Since you're erasing to `std::function` anyways, there's zero benefit over just using:
void set_read_data_handler(ExternalReadHandler handler) {
external_rcv_handler = std::move(handler);
}
- You have repeated magic constants (e.g. `65000`)
- You seem to be missing a socket `bind()` call
In short I'd replace the buffer with something sensible:
using StaticBuffer = boost::container::static_vector<uint8_t, 65000>;
static_assert(StaticBuffer::static_capacity == 65000);
> Since you seem to expect text protocol, chances are your average message is (much) smaller, so I reckon it may be much faster to just use `std::string` or even `boost::container::small_vector<...>`.
Not really required but to allow for elegant, asio-standard use:
using asio::buffer;
static inline auto buffer(StaticBuffer const& b) { return boost::asio::buffer(b.data(), b.size()); }
static inline auto buffer(StaticBuffer& b) { return boost::asio::buffer(b.data(), b.size()); }
See a much simplified version **[Live On Coliru](http://coliru.stacked-crooked.com/a/357c3d0ed3d6feba)**
#include <boost/asio.hpp>
#include <boost/container/static_vector.hpp>
#include <deque>
#include <iomanip>
#include <iostream>
#include <list>
#include <thread>
namespace { // user-friendly logging
static std::mutex s_console_mx;
static std::atomic_int t_id_gen = 0;
thread_local int t_id = ++t_id_gen;
template <typename... T> static inline void trace(T const&... args) {
std::lock_guard lk(s_console_mx);
((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
}
} // namespace
namespace asio = boost::asio;
using asio::ip::udp;
using StaticBuffer = boost::container::static_vector<uint8_t, 65000>;
static_assert(StaticBuffer::static_capacity == 65000);
// not really required but to allow for elegant, asio-standard use:
using asio::buffer;
static inline auto buffer(StaticBuffer const& b) { return boost::asio::buffer(b.data(), b.size()); }
static inline auto buffer(StaticBuffer& b) { return boost::asio::buffer(b.data(), b.size()); }
using ExternalReadHandler = std::function<void(StaticBuffer&&)>;
class UDPSession {
private:
using error_code = boost::system::error_code;
asio::any_io_executor m_ex;
std::string m_addr;
uint16_t m_port;
udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
udp::socket m_socket{make_strand(m_ex)};
std::deque<StaticBuffer> m_dq_send;
StaticBuffer m_rcv_data;
ExternalReadHandler external_rcv_handler;
public:
UDPSession(asio::any_io_executor ex, std::string const& addr, uint16_t port)
: m_ex(ex)
, m_addr(addr)
, m_port(port) {}
std::string const& get_host() const { return m_addr; }
uint16_t get_port() { return m_port; }
void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }
void start() {
m_socket.open(udp::v4());
m_socket.bind(m_endpoint);
do_read();
}
void send(StaticBuffer data) {
asio::post(m_socket.get_executor(), [this, d = std::move(data)]() mutable {
m_dq_send.emplace_back(std::move(d));
if (m_dq_send.size() == 1)
send_loop();
});
}
private:
void do_read() {
m_rcv_data.assign(m_rcv_data.static_capacity, '\0');
m_socket.async_receive_from(
buffer(m_rcv_data), m_endpoint,
std::bind(&UDPSession::on_read, this, std::placeholders::_1, std::placeholders::_2));
}
void on_read(error_code er, size_t bytes_transferred) {
if (!er) {
m_rcv_data.resize(bytes_transferred);
asio::post(m_ex, [this, data = std::move(m_rcv_data)]() mutable {
external_rcv_handler(std::move(data));
});
do_read();
}
}
void send_loop() {
if (m_dq_send.empty())
return;
m_socket.async_send_to(buffer(m_dq_send.front()), m_endpoint,
[this](error_code er, size_t /*bytes_transferred*/) {
if (!er) {
m_dq_send.pop_front();
send_loop();
} // else { /* post to loggger */ }
});
}
};
void handler_read(StaticBuffer&& pdata) {
if (!pdata.empty()) {
std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
}
}
int main() {
asio::io_context ctx;
auto work_guard = asio::make_work_guard(ctx);
trace("Main thread");
std::list<std::thread> threads;
for (int i = 0; i < 3; ++i)
threads.emplace_back([&]() {
trace("START");
ctx.run();
trace("END");
});
UDPSession client(ctx.get_executor(), "127.0.0.1", 11223);
client.set_read_data_handler(handler_read);
client.start();
client.send({'A', 'B', 'C', '\n'});
work_guard.reset();
for (auto& t : threads)
t.join();
}
The live demo on Coliru "eats" words from `main.cpp`. Here's a local dictionary demo:
[![enter image description here][1]][1]
## EXTRA: Thread Pool, `shared_from_this`
You might have noticed I changed to `any_io_executor` instead of `io_context&`. That way you can easily switch to `asio::thread_pool` instead of doing it manually (poorly¹).
Let's also re-instate `shared_from_this`, but correctly this time.
For simplicity I've used a static buffer ONLY for the receive buffer (because that's how datagram protocols roll), and just used `vector` (or `small_vector`) for the `DataBuffer`.
**[Live On Coliru](http://coliru.stacked-crooked.com/a/d0a40db7f55c70ce)**
#include <boost/asio.hpp>
#include <deque>
#include <iomanip>
#include <iostream>
namespace { // user-friendly logging
static std::mutex s_console_mx;
static std::atomic_int t_id_gen = 0;
thread_local int t_id = ++t_id_gen;
template <typename... T> static inline void trace(T const&... args) {
std::lock_guard lk(s_console_mx);
((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
}
} // namespace
namespace asio = boost::asio;
using asio::ip::udp;
//using DataBuffer = boost::container::small_vector<uint8_t, 320>; // e.g. median length is 320
using DataBuffer = std::vector<uint8_t>;
using ExternalReadHandler = std::function<void(DataBuffer&&)>;
class UDPSession : public std::enable_shared_from_this<UDPSession> {
private:
using error_code = boost::system::error_code;
asio::any_io_executor m_ex;
std::string m_addr;
uint16_t m_port;
udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
udp::socket m_socket{make_strand(m_ex)};
std::deque<DataBuffer> m_dq_send;
std::array<uint8_t, 65000> m_rcv_data;
ExternalReadHandler external_rcv_handler;
public:
UDPSession(asio::any_io_executor ex, std::string const& addr, uint16_t port)
: m_ex(ex)
, m_addr(addr)
, m_port(port) {}
std::string const& get_host() const { return m_addr; }
uint16_t get_port() { return m_port; }
void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }
void start() {
m_socket.open(udp::v4());
m_socket.bind(m_endpoint);
do_read();
}
void send(DataBuffer data) {
asio::post(m_socket.get_executor(), [this, self = shared_from_this(), d = std::move(data)]() mutable {
m_dq_send.emplace_back(std::move(d));
if (m_dq_send.size() == 1)
send_loop();
});
}
private:
void do_read() {
using namespace std::placeholders;
m_socket.async_receive_from( //
asio::buffer(m_rcv_data), m_endpoint,
std::bind(&UDPSession::on_read, shared_from_this(), _1, _2));
}
void on_read(error_code er, size_t bytes_transferred) {
if (!er) {
asio::post(
m_ex,
[this, self=shared_from_this(),
data = DataBuffer(m_rcv_data.data(), m_rcv_data.data() + bytes_transferred)]() mutable {
external_rcv_handler(std::move(data));
});
do_read();
}
}
void send_loop() {
if (m_dq_send.empty())
return;
m_socket.async_send_to( //
asio::buffer(m_dq_send.front()), m_endpoint,
[this, self = shared_from_this()](error_code er, size_t /*bytes_transferred*/) {
if (!er) {
m_dq_send.pop_front();
send_loop();
} // else { /* post to loggger */ }
});
}
};
void handler_read(DataBuffer&& pdata) {
if (!pdata.empty()) {
std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
}
}
int main() {
trace("Main thread");
asio::thread_pool ctx(4);
{
auto client = std::make_shared<UDPSession>(ctx.get_executor(), "127.0.0.1", 11223);
client->set_read_data_handler(handler_read);
client->start();
client->send({'A', 'B', 'C', '\n'});
} // client stays alive through shared ownership
ctx.join();
}
As icing on the cake, you can template the entire thing on the concrete executor type and avoid type-erasing the executor type:
template <typename Executor>
class UDPSession : public std::enable_shared_from_this<UDPSession<Executor> > {
using socket_t = asio::basic_datagram_socket<udp, asio::strand<Executor>>;
See it **[Live On Coliru](http://coliru.stacked-crooked.com/a/a8d2bcb987e9d82d)**
#include <boost/asio.hpp>
#include <deque>
#include <iomanip>
#include <iostream>
namespace { // user-friendly logging
static std::mutex s_console_mx;
static std::atomic_int t_id_gen = 0;
thread_local int t_id = ++t_id_gen;
template <typename... T> static inline void trace(T const&... args) {
std::lock_guard lk(s_console_mx);
((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
}
} // namespace
namespace asio = boost::asio;
using asio::ip::udp;
//using DataBuffer = boost::container::small_vector<uint8_t, 320>; // e.g. median length is 320
using DataBuffer = std::vector<uint8_t>;
using ExternalReadHandler = std::function<void(DataBuffer&&)>;
template <typename Executor>
class UDPSession : public std::enable_shared_from_this<UDPSession<Executor> > {
using socket_t = asio::basic_datagram_socket<udp, asio::strand<Executor>>;
using error_code = boost::system::error_code;
Executor m_ex;
std::string m_addr;
uint16_t m_port;
udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
socket_t m_socket{make_strand(m_ex)};
std::deque<DataBuffer> m_dq_send;
std::array<uint8_t, 65000> m_rcv_data;
ExternalReadHandler external_rcv_handler;
using std::enable_shared_from_this<UDPSession>::shared_from_this;
public:
UDPSession(Executor ex, std::string const& addr, uint16_t port)
: m_ex(ex)
, m_addr(addr)
, m_port(port) {}
std::string const& get_host() const { return m_addr; }
uint16_t get_port() { return m_port; }
void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }
void start() {
m_socket.open(udp::v4());
m_socket.bind(m_endpoint);
do_read();
}
void send(DataBuffer data) {
asio::post(m_socket.get_executor(), [this, self = shared_from_this(), d = std::move(data)]() mutable {
m_dq_send.emplace_back(std::move(d));
if (m_dq_send.size() == 1)
send_loop();
});
}
private:
void do_read() {
using namespace std::placeholders;
m_socket.async_receive_from( //
asio::buffer(m_rcv_data), m_endpoint,
std::bind(&UDPSession::on_read, shared_from_this(), _1, _2));
}
void on_read(error_code er, size_t bytes_transferred) {
if (!er) {
auto f = m_rcv_data.data(), l = f + bytes_transferred;
asio::post(m_ex, [self = shared_from_this(), data = DataBuffer(f, l)]() mutable {
self->external_rcv_handler(std::move(data));
});
do_read();
}
}
void send_loop() {
if (m_dq_send.empty())
return;
m_socket.async_send_to( //
asio::buffer(m_dq_send.front()), m_endpoint,
[this, self = shared_from_this()](error_code er, size_t /*bytes_transferred*/) {
if (!er) {
m_dq_send.pop_front();
send_loop();
} // else { /* post to loggger */ }
});
}
};
void handler_read(DataBuffer&& pdata) {
if (!pdata.empty()) {
std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
}
}
int main() {
trace("Main thread");
using Ex = asio::thread_pool::executor_type;
asio::thread_pool ctx(4);
{
auto client = std::make_shared<UDPSession<Ex> >(ctx.get_executor(), "127.0.0.1", 11223);
client->set_read_data_handler(handler_read);
client->start();
client->send({'A', 'B', 'C', '\n'});
} // client stays alive through shared ownership
ctx.join();
}
Another local demo:
[![enter image description here][2]][2]
-----
¹ you needed at least exception handling in the runner threads: https://stackoverflow.com/questions/44500818/should-the-exception-thrown-by-boostasioio-servicerun-be-caught
[1]: https://i.stack.imgur.com/qbKv6.gif
[2]: https://i.stack.imgur.com/y2534.gif
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论