如何在C++和Rust之间共享pthread同步原语?

huangapple go评论105阅读模式
英文:

How to share pthread synchronisation primitives between C++ and Rust?

问题

我理解你的问题,你想要在C++程序和Rust程序之间使用互斥锁和条件变量进行同步。你已经在C++部分初始化成功,并且在Rust中尝试进行翻译。你遇到了一些问题,特别是在使用条件变量时。

在你的Rust代码中,有几个问题:

  1. pthread_cond_signal 函数的参数类型不正确,它应该接受一个 pthread_cond_t 类型的指针。你的代码中尝试将 &cond 强制转换为 *const _ as *mut _,但这是不正确的。正确的调用应该是:
  1. let sig = unsafe { nix::libc::pthread_cond_signal(&cond as *const _ as *mut _) };
  1. 在你的循环中,使用 pthread_cond_wait 时,你传递了 &mtx 作为第二个参数,但它应该是一个互斥锁(pthread_mutex_t),而不是条件变量。你应该创建一个互斥锁对象并在 pthread_cond_wait 中使用它。修正后的代码如下:
  1. let mut mutex = MaybeUninit::<nix::libc::pthread_mutex_t>::uninit();
  2. if unsafe {
  3. nix::libc::pthread_mutex_init(mutex.as_mut_ptr(), &mtx_attrs as *const _ as *mut _)
  4. } != 0 {
  5. panic!("failed to init mutex");
  6. };
  7. loop {
  8. if unsafe { nix::libc::pthread_mutex_lock(mutex.as_ptr()) } > 0 {
  9. panic!("Failed to acquire lock");
  10. }
  11. if unsafe {
  12. nix::libc::pthread_cond_wait(&cond as *const _ as *mut _, mutex.as_ptr())
  13. } > 0 {
  14. panic!("Failed to wait for condition");
  15. }
  16. if unsafe { nix::libc::pthread_mutex_unlock(mutex.as_ptr()) } > 0 {
  17. panic!("Failed to release lock");
  18. }
  19. }

这应该解决你的问题。你可以在循环中使用互斥锁和条件变量来进行同步。确保在等待条件变量之前锁住互斥锁,并在等待之后释放互斥锁。这样你的C++程序应该能够成功与Rust程序同步。

英文:

I have a C++ program and a Rust program, and between them I have successfully got them talking over POSIX shared memory (C++ and rust).

What I am now trying to do is synchronise them. I already managed to create a working, but inefficient, primitive system using an atomic bool (creating the AtomicBool on the rust side like this).

However, I would really like to use a mutex/condvar to synchronise between the threads, and this is where I am stuck.

I seem to be able to initialise the C++ side of it, following this example pretty much word for word.

I have attempted to translate it directly into rust:

  1. let raw_shm = shm.get_shm();
  2. let mut mtx_attrs = MaybeUninit::&lt;nix::libc::pthread_mutexattr_t&gt;::uninit();
  3. if unsafe { nix::libc::pthread_mutexattr_init(mtx_attrs.as_mut_ptr()) } != 0 {
  4. panic!(&quot;failed to create mtx_attrs&quot;);
  5. };
  6. let mtx_attrs = unsafe { mtx_attrs.assume_init() };
  7. let mut cond_attrs = MaybeUninit::&lt;nix::libc::pthread_condattr_t&gt;::uninit();
  8. if unsafe { nix::libc::pthread_condattr_init(cond_attrs.as_mut_ptr()) } != 0 {
  9. panic!(&quot;failed to create cond_attrs&quot;);
  10. };
  11. let cond_attrs = unsafe { cond_attrs.assume_init() };
  12. if unsafe {
  13. nix::libc::pthread_mutexattr_setpshared(
  14. &amp;mtx_attrs as *const _ as *mut _,
  15. PTHREAD_PROCESS_SHARED,
  16. )
  17. } != 0
  18. {
  19. panic!(&quot;failed to set mtx as process shared&quot;);
  20. };
  21. if unsafe {
  22. nix::libc::pthread_condattr_setpshared(
  23. &amp;cond_attrs as *const _ as *mut _,
  24. PTHREAD_PROCESS_SHARED,
  25. )
  26. } != 0
  27. {
  28. panic!(&quot;failed to set cond as process shared&quot;);
  29. };
  30. // I know that these offsets are correct, having used `offsetof` on the C++ side
  31. let mtx_start = unsafe { &amp;raw_shm.as_slice()[3110416] };
  32. let mtx = unsafe { &amp;*(mtx_start as *const _ as *const pthread_mutex_t) };
  33. let cond_start = unsafe { &amp;raw_shm.as_slice()[3110440] };
  34. let cond = unsafe { &amp;*(cond_start as *const _ as *const pthread_mutex_t) };
  35. if unsafe {
  36. nix::libc::pthread_mutex_init(&amp;mtx as *const _ as *mut _, &amp;mtx_attrs as *const _ as *mut _)
  37. } != 0
  38. {
  39. panic!(&quot;failed to init mtx&quot;);
  40. };
  41. if unsafe {
  42. nix::libc::pthread_cond_init(
  43. &amp;cond as *const _ as *mut _,
  44. &amp;cond_attrs as *const _ as *mut _,
  45. )
  46. } != 0
  47. {
  48. panic!(&quot;failed to init cond&quot;);
  49. };

All of that passes with return values of 0... so far so good.

I can now test it in one of two ways:

  1. I can set the trivial C++ program going and have it stop waiting at the condvar:
  1. if (pthread_mutex_lock(&amp;shmp-&gt;mutex) != 0)
  2. throw(&quot;Error locking mutex&quot;);
  3. if (pthread_cond_wait(&amp;shmp-&gt;condition, &amp;shmp-&gt;mutex) != 0)
  4. throw(&quot;Error waiting for condition variable&quot;);

and in rust:

  1. let sig = unsafe { nix::libc::pthread_cond_signal(&amp;cond as *const _ as *mut _) };
  2. dbg!(sig);

Despite returning 0 (i.e. success), my C++ program is not released past the condvar; it remains waiting as if it never received a signal.

  1. I can set of another trivial C++ program which endlessly signals the condition variable in a loop:
  1. for (unsigned int count = 0;; count++) {
  2. if (pthread_cond_signal(condition) != 0)
  3. throw(&quot;Error&quot;)
  4. // sleep for a bit
  5. }

and then in rust, something like:

  1. loop {
  2. if unsafe { nix::libc::pthread_mutex_lock(&amp;mtx as *const _ as *mut _) } &gt; 0 {
  3. panic!(&quot;Failed to acquire lock&quot;)
  4. };
  5. if unsafe {
  6. nix::libc::pthread_cond_wait(&amp;cond as *const _ as *mut _, &amp;mtx as *const _ as *mut _)
  7. } &gt; 0
  8. {
  9. panic!(&quot;Failed to acquire lock&quot;)
  10. };
  11. }

Doing it this way around, the call to lock the mutex is successful, but I get an EINVAL on pthread_cond_wait defined here, which I cannot seem to rectify...

I feel like I'm close... any thoughts on how to get this to work? (this is mostly just a proof of concept).

答案1

得分: 0

  1. Rust程序启动并创建一个新的共享内存块(如果已存在,则删除现有块,以确保程序始终在新状态下启动)。我使用shared_memory crate来处理细节,并提供有用的辅助函数,如访问原始指针以获取内存块的起始位置。

共享内存块的结构如下:

  1. #[repr(c)]
  2. struct SharedMemoryLayout {
  3. ready: std::sync::atomic::AtomicBool,
  4. mutex: libc::pthread_mutex_t,
  5. condition: libc::pthread_cond_t,
  6. }

共享内存块初始化为零,因此ready将始终为false

  1. Rust程序使用std::process::Command::spawn生成C++程序,然后在循环中等待,直到readytrue
  1. let proc = Command::new("/path/to/c++/binary").spawn().unwrap();
  2. let ptr: *mut u8 = // 指向共享内存块的第一个字节的指针;
  3. let ready: &AtomicBool = unsafe { &*(ptr as *mut bool as *const AtomicBool) };
  4. loop {
  5. if ready.load(Ordering::SeqCst) {
  6. break
  7. } else {
  8. thread::sleep(Duration::from_secs(1));
  9. }
  10. }
  1. C++程序打开共享内存块并将其mmap到其本地地址空间。
  1. struct SharedMemoryLayout
  2. {
  3. std::atomic_bool ready;
  4. pthread_mutex_t mutex;
  5. pthread_cond_t condition;
  6. };
  7. int fd = shm_open("name_of_shared_memory_block", O_RDWR, S_IRUSR | S_IWUSR);
  8. struct SharedMemoryLayout *sync = (SharedMemoryLayout *)mmap(NULL, sizeof(*sync), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
  1. C++程序继续初始化mutexcondition,然后将内存块标记为准备就绪。
  1. pthread_mutexattr_t mutex_attributes;
  2. pthread_condattr_t condition_attributes;
  3. pthread_mutexattr_init(&mutex_attributes);
  4. pthread_condattr_init(&condition_attributes);
  5. pthread_mutexattr_setpshared(&mutex_attributes, PTHREAD_PROCESS_SHARED);
  6. pthread_condattr_setpshared(&condition_attributes, PTHREAD_PROCESS_SHARED);
  7. pthread_mutex_init(&sync->mutex, &mutex_attributes);
  8. pthread_cond_init(&sync->condition, &condition_attributes);
  9. pthread_mutexattr_destroy(&mutex_attributes);
  10. pthread_condattr_destroy(&condition_attributes);
  11. std::atomic_bool *ready = &sync->ready;
  12. ready->store(true);

然后进入循环,在条件上发出信号:

  1. for (unsigned int count = 0;; count++) {
  2. // 做一些操作
  3. sleep(1);
  4. pthread_cond_signal(&sync->condition);
  5. }
  1. 现在,Rust程序将在步骤2)中的循环中被释放。实现在步骤4)中初始化的mutexcondition
  1. let mutex = unsafe {ptr.offset(4) as *mut pthread_mutex_t};
  2. let condition = unsafe {ptr.offset(32) as *mut pthread_cond_t};

现在我们可以在条件上等待,并由C++程序通知。

  1. loop {
  2. unsafe {
  3. pthread_mutex_lock(mutex);
  4. pthread_cond_wait(condition, mutex);
  5. pthread_mutex_unlock(mutex);
  6. // 做一些操作
  7. }
  8. }
英文:

For posterity, I have managed to get this working.

To clarify how the program is architectured, there are two binaries: one C++ and one rust. The Rust program spawns the C++ program using std::process::Command.

Error handling and imports elided for brevity.

  1. The rust program starts and creates a new shared memory block (removing an existing block if it exists, to ensure the program always starts in a fresh state). I use the shared_memory crate to handle the details for me, and that also provides useful helpers such as access to a raw pointer to the start of the memory block.

The shared memory block is structured like the following:

  1. #[repr(c)]
  2. struct SharedMemoryLayout {
  3. ready: std::sync::atomic::AtomicBool,
  4. mutex: libc::pthread_mutex_t,
  5. condition: libc::pthread_cond_t,
  6. }

Shared memory blocks are initialised with zeros, so ready will always be falseto begin with.

  1. The rust program spawns the C++ program with std::process::Command::spawn and then waits in a loop until ready is true.
  1. let proc = Command::new(&quot;/path/to/c++/binary&quot;).spawn().unwrap();
  2. let ptr: *mut u8 = // pointer to first byte of shared memory block;
  3. let ready: &amp;AtomicBool = unsafe { &amp;*(ptr as *mut bool as *const AtomicBool) };
  4. loop {
  5. if ready.load(Ordering::SeqCst) {
  6. break
  7. } else {
  8. thread::sleep(Duration::from_secs(1));
  9. }
  10. }
  1. The C++ program opens the shared memory block and mmaps it into its local address space.
  1. struct SharedMemoryLayout
  2. {
  3. std::atomic_bool ready;
  4. pthread_mutex_t mutex;
  5. pthread_cond_t condition;
  6. };
  7. int fd = shm_open(&quot;name_of_shared_memory_block&quot;, O_RDWR, S_IRUSR | S_IWUSR);
  8. struct SharedMemoryLayout *sync = (SharedMemoryLayout *)mmap(NULL, sizeof(*sync), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
  1. The C++ program carries on and proceeds to initialise the mutex and the condition, before marking the memory block as ready.
  1. pthread_mutexattr_t mutex_attributes;
  2. pthread_condattr_t condition_attributes;
  3. pthread_mutexattr_init(&amp;mutex_attributes);
  4. pthread_condattr_init(&amp;condition_attributes);
  5. pthread_mutexattr_setpshared(&amp;mutex_attributes, PTHREAD_PROCESS_SHARED);
  6. pthread_condattr_setpshared(&amp;condition_attributes, PTHREAD_PROCESS_SHARED);
  7. pthread_mutex_init(&amp;sync-&gt;mutex, &amp;mutex_attributes);
  8. pthread_cond_init(&amp;sync-&gt;condition, &amp;condition_attributes);
  9. pthread_mutexattr_destroy(&amp;mutex_attributes);
  10. pthread_condattr_destroy(&amp;condition_attributes);
  11. std::atomic_bool *ready = &amp;syncp-&gt;ready;
  12. ready-&gt;store(true);

And then enter a loop signalling on the condition:

  1. for (unsigned int count = 0;; count++) {
  2. // do something
  3. sleep(1);
  4. pthread_cond_signal(&amp;sync-&gt;condition);
  5. }
  1. Now, the rust program will have been released from the loop in step 2). Materialise the mutex and condition that were initialised in step 4).
  1. let mutex = unsafe {ptr.offset(4) as *mut pthread_mutex_t};
  2. let condition = unsafe {ptr.offset(32) as *mut pthread_cond_t};

And now we can wait on the condition, getting notified by the C++ program.

  1. loop {
  2. unsafe {
  3. pthread_mutex_lock(mutex);
  4. pthread_cond_wait(condition, mutex);
  5. pthread_mutex_unlock(mutex);
  6. // Do something
  7. }
  8. }

huangapple
  • 本文由 发表于 2023年2月10日 02:43:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/75403100.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定