如何在C++和Rust之间共享pthread同步原语?

huangapple go评论79阅读模式
英文:

How to share pthread synchronisation primitives between C++ and Rust?

问题

我理解你的问题,你想要在C++程序和Rust程序之间使用互斥锁和条件变量进行同步。你已经在C++部分初始化成功,并且在Rust中尝试进行翻译。你遇到了一些问题,特别是在使用条件变量时。

在你的Rust代码中,有几个问题:

  1. pthread_cond_signal 函数的参数类型不正确,它应该接受一个 pthread_cond_t 类型的指针。你的代码中尝试将 &cond 强制转换为 *const _ as *mut _,但这是不正确的。正确的调用应该是:
let sig = unsafe { nix::libc::pthread_cond_signal(&cond as *const _ as *mut _) };
  1. 在你的循环中,使用 pthread_cond_wait 时,你传递了 &mtx 作为第二个参数,但它应该是一个互斥锁(pthread_mutex_t),而不是条件变量。你应该创建一个互斥锁对象并在 pthread_cond_wait 中使用它。修正后的代码如下:
let mut mutex = MaybeUninit::<nix::libc::pthread_mutex_t>::uninit();
if unsafe {
    nix::libc::pthread_mutex_init(mutex.as_mut_ptr(), &mtx_attrs as *const _ as *mut _)
} != 0 {
    panic!("failed to init mutex");
};

loop {
    if unsafe { nix::libc::pthread_mutex_lock(mutex.as_ptr()) } > 0 {
        panic!("Failed to acquire lock");
    }
    if unsafe {
        nix::libc::pthread_cond_wait(&cond as *const _ as *mut _, mutex.as_ptr())
    } > 0 {
        panic!("Failed to wait for condition");
    }
    if unsafe { nix::libc::pthread_mutex_unlock(mutex.as_ptr()) } > 0 {
        panic!("Failed to release lock");
    }
}

这应该解决你的问题。你可以在循环中使用互斥锁和条件变量来进行同步。确保在等待条件变量之前锁住互斥锁,并在等待之后释放互斥锁。这样你的C++程序应该能够成功与Rust程序同步。

英文:

I have a C++ program and a Rust program, and between them I have successfully got them talking over POSIX shared memory (C++ and rust).

What I am now trying to do is synchronise them. I already managed to create a working, but inefficient, primitive system using an atomic bool (creating the AtomicBool on the rust side like this).

However, I would really like to use a mutex/condvar to synchronise between the threads, and this is where I am stuck.

I seem to be able to initialise the C++ side of it, following this example pretty much word for word.

I have attempted to translate it directly into rust:

    let raw_shm = shm.get_shm();

    let mut mtx_attrs = MaybeUninit::&lt;nix::libc::pthread_mutexattr_t&gt;::uninit();
    if unsafe { nix::libc::pthread_mutexattr_init(mtx_attrs.as_mut_ptr()) } != 0 {
        panic!(&quot;failed to create mtx_attrs&quot;);
    };
    let mtx_attrs = unsafe { mtx_attrs.assume_init() };

    let mut cond_attrs = MaybeUninit::&lt;nix::libc::pthread_condattr_t&gt;::uninit();
    if unsafe { nix::libc::pthread_condattr_init(cond_attrs.as_mut_ptr()) } != 0 {
        panic!(&quot;failed to create cond_attrs&quot;);
    };
    let cond_attrs = unsafe { cond_attrs.assume_init() };

    if unsafe {
        nix::libc::pthread_mutexattr_setpshared(
            &amp;mtx_attrs as *const _ as *mut _,
            PTHREAD_PROCESS_SHARED,
        )
    } != 0
    {
        panic!(&quot;failed to set mtx as process shared&quot;);
    };

    if unsafe {
        nix::libc::pthread_condattr_setpshared(
            &amp;cond_attrs as *const _ as *mut _,
            PTHREAD_PROCESS_SHARED,
        )
    } != 0
    {
        panic!(&quot;failed to set cond as process shared&quot;);
    };

    // I know that these offsets are correct, having used `offsetof` on the C++ side
    let mtx_start = unsafe { &amp;raw_shm.as_slice()[3110416] };
    let mtx = unsafe { &amp;*(mtx_start as *const _ as *const pthread_mutex_t) };
    let cond_start = unsafe { &amp;raw_shm.as_slice()[3110440] };
    let cond = unsafe { &amp;*(cond_start as *const _ as *const pthread_mutex_t) };

    if unsafe {
        nix::libc::pthread_mutex_init(&amp;mtx as *const _ as *mut _, &amp;mtx_attrs as *const _ as *mut _)
    } != 0
    {
        panic!(&quot;failed to init mtx&quot;);
    };
    if unsafe {
        nix::libc::pthread_cond_init(
            &amp;cond as *const _ as *mut _,
            &amp;cond_attrs as *const _ as *mut _,
        )
    } != 0
    {
        panic!(&quot;failed to init cond&quot;);
    };

All of that passes with return values of 0... so far so good.

I can now test it in one of two ways:

  1. I can set the trivial C++ program going and have it stop waiting at the condvar:
if (pthread_mutex_lock(&amp;shmp-&gt;mutex) != 0)
    throw(&quot;Error locking mutex&quot;);
if (pthread_cond_wait(&amp;shmp-&gt;condition, &amp;shmp-&gt;mutex) != 0)
    throw(&quot;Error waiting for condition variable&quot;);

and in rust:

let sig = unsafe { nix::libc::pthread_cond_signal(&amp;cond as *const _ as *mut _) };
    dbg!(sig);

Despite returning 0 (i.e. success), my C++ program is not released past the condvar; it remains waiting as if it never received a signal.

  1. I can set of another trivial C++ program which endlessly signals the condition variable in a loop:
    for (unsigned int count = 0;; count++) {
        if (pthread_cond_signal(condition) != 0)
            throw(&quot;Error&quot;)
        // sleep for a bit
    }

and then in rust, something like:

    loop {
        if unsafe { nix::libc::pthread_mutex_lock(&amp;mtx as *const _ as *mut _) } &gt; 0 {
            panic!(&quot;Failed to acquire lock&quot;)
        };
        if unsafe {
            nix::libc::pthread_cond_wait(&amp;cond as *const _ as *mut _, &amp;mtx as *const _ as *mut _)
        } &gt; 0
        {
            panic!(&quot;Failed to acquire lock&quot;)
        };
    }

Doing it this way around, the call to lock the mutex is successful, but I get an EINVAL on pthread_cond_wait defined here, which I cannot seem to rectify...

I feel like I'm close... any thoughts on how to get this to work? (this is mostly just a proof of concept).

答案1

得分: 0

  1. Rust程序启动并创建一个新的共享内存块(如果已存在,则删除现有块,以确保程序始终在新状态下启动)。我使用shared_memory crate来处理细节,并提供有用的辅助函数,如访问原始指针以获取内存块的起始位置。

共享内存块的结构如下:

#[repr(c)]
struct SharedMemoryLayout {
    ready: std::sync::atomic::AtomicBool,
    mutex: libc::pthread_mutex_t,
    condition: libc::pthread_cond_t,
}

共享内存块初始化为零,因此ready将始终为false

  1. Rust程序使用std::process::Command::spawn生成C++程序,然后在循环中等待,直到readytrue
let proc = Command::new("/path/to/c++/binary").spawn().unwrap();

let ptr: *mut u8 = // 指向共享内存块的第一个字节的指针;
let ready: &AtomicBool =  unsafe { &*(ptr as *mut bool as *const AtomicBool) };

loop {
    if ready.load(Ordering::SeqCst) {
        break
    } else {
        thread::sleep(Duration::from_secs(1));
    }
}
  1. C++程序打开共享内存块并将其mmap到其本地地址空间。
struct SharedMemoryLayout
{
    std::atomic_bool ready;
    pthread_mutex_t mutex;
    pthread_cond_t condition;
};

int fd = shm_open("name_of_shared_memory_block", O_RDWR, S_IRUSR | S_IWUSR);
struct SharedMemoryLayout *sync = (SharedMemoryLayout *)mmap(NULL, sizeof(*sync), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
  1. C++程序继续初始化mutexcondition,然后将内存块标记为准备就绪。
pthread_mutexattr_t mutex_attributes;
pthread_condattr_t condition_attributes;

pthread_mutexattr_init(&mutex_attributes);
pthread_condattr_init(&condition_attributes);

pthread_mutexattr_setpshared(&mutex_attributes, PTHREAD_PROCESS_SHARED);
pthread_condattr_setpshared(&condition_attributes, PTHREAD_PROCESS_SHARED);

pthread_mutex_init(&sync->mutex, &mutex_attributes);
pthread_cond_init(&sync->condition, &condition_attributes);

pthread_mutexattr_destroy(&mutex_attributes);
pthread_condattr_destroy(&condition_attributes);

std::atomic_bool *ready = &sync->ready;
ready->store(true);

然后进入循环,在条件上发出信号:

for (unsigned int count = 0;; count++) {
    // 做一些操作
    sleep(1);
    pthread_cond_signal(&sync->condition);
}
  1. 现在,Rust程序将在步骤2)中的循环中被释放。实现在步骤4)中初始化的mutexcondition
let mutex = unsafe {ptr.offset(4) as *mut pthread_mutex_t};
let condition = unsafe {ptr.offset(32) as *mut pthread_cond_t};

现在我们可以在条件上等待,并由C++程序通知。

loop {
    unsafe {
        pthread_mutex_lock(mutex);
        pthread_cond_wait(condition, mutex);
        pthread_mutex_unlock(mutex);
        
        // 做一些操作
    }
}
英文:

For posterity, I have managed to get this working.

To clarify how the program is architectured, there are two binaries: one C++ and one rust. The Rust program spawns the C++ program using std::process::Command.

Error handling and imports elided for brevity.

  1. The rust program starts and creates a new shared memory block (removing an existing block if it exists, to ensure the program always starts in a fresh state). I use the shared_memory crate to handle the details for me, and that also provides useful helpers such as access to a raw pointer to the start of the memory block.

The shared memory block is structured like the following:

#[repr(c)]
struct SharedMemoryLayout {
    ready: std::sync::atomic::AtomicBool,
    mutex: libc::pthread_mutex_t,
    condition: libc::pthread_cond_t,
}

Shared memory blocks are initialised with zeros, so ready will always be falseto begin with.

  1. The rust program spawns the C++ program with std::process::Command::spawn and then waits in a loop until ready is true.
let proc = Command::new(&quot;/path/to/c++/binary&quot;).spawn().unwrap();

let ptr: *mut u8 = // pointer to first byte of shared memory block;
let ready: &amp;AtomicBool =  unsafe { &amp;*(ptr as *mut bool as *const AtomicBool) };

loop {
    if ready.load(Ordering::SeqCst) {
        break
    } else {
        thread::sleep(Duration::from_secs(1));
    }
}
  1. The C++ program opens the shared memory block and mmaps it into its local address space.
struct SharedMemoryLayout
{
    std::atomic_bool ready;
    pthread_mutex_t mutex;
    pthread_cond_t condition;
};

int fd = shm_open(&quot;name_of_shared_memory_block&quot;, O_RDWR, S_IRUSR | S_IWUSR);
struct SharedMemoryLayout *sync = (SharedMemoryLayout *)mmap(NULL, sizeof(*sync), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
  1. The C++ program carries on and proceeds to initialise the mutex and the condition, before marking the memory block as ready.
pthread_mutexattr_t mutex_attributes;
pthread_condattr_t condition_attributes;

pthread_mutexattr_init(&amp;mutex_attributes);
pthread_condattr_init(&amp;condition_attributes);

pthread_mutexattr_setpshared(&amp;mutex_attributes, PTHREAD_PROCESS_SHARED);
pthread_condattr_setpshared(&amp;condition_attributes, PTHREAD_PROCESS_SHARED);

pthread_mutex_init(&amp;sync-&gt;mutex, &amp;mutex_attributes);
pthread_cond_init(&amp;sync-&gt;condition, &amp;condition_attributes);

pthread_mutexattr_destroy(&amp;mutex_attributes);
pthread_condattr_destroy(&amp;condition_attributes);

std::atomic_bool *ready = &amp;syncp-&gt;ready;
ready-&gt;store(true);

And then enter a loop signalling on the condition:

for (unsigned int count = 0;; count++) {
    // do something
    sleep(1);
    pthread_cond_signal(&amp;sync-&gt;condition);
}
  1. Now, the rust program will have been released from the loop in step 2). Materialise the mutex and condition that were initialised in step 4).
let mutex = unsafe {ptr.offset(4) as *mut pthread_mutex_t};
let condition = unsafe {ptr.offset(32) as *mut pthread_cond_t};

And now we can wait on the condition, getting notified by the C++ program.

loop {
    unsafe {
        pthread_mutex_lock(mutex);
        pthread_cond_wait(condition, mutex);
        pthread_mutex_unlock(mutex);
        
        // Do something
    }
}

huangapple
  • 本文由 发表于 2023年2月10日 02:43:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/75403100.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定