英文:
Making a threadsafe queue in Haskell using MVar (maybe?)
问题
在Haskell中,我想创建一个线程安全的队列,具有两个操作:
- 从队列中移除所有元素。
- 将一个元素添加到队列中。
我的第一个想法是使用类型 MVar (NonEmpty a)
。
操作将如下实现:
- 只需使用
takeMVar
。这将使MVar
变为空。将返回的列表反转,因为如下所述,元素以相反的顺序存储。 - 如果
MVar
为空,将包含要插入的元素的单元素列表插入其中。但如果MVar
中有数据,则将该元素附加到列表中。
问题在于,我在 MVar
的文档中看不到如何以原子方式执行第2点。我可以使用 putMVar
或 modifyMVar
,但是(我推测)这两个函数都有可能阻塞(如果 MVar
为空或满的话),但是插入到队列中不应该阻塞(只有从中读取才会阻塞)。
我推测是否有解决此问题的方法?MVar
只是错误的基元,还是我可以以某种不同的方式组合它们?
英文:
In Haskell, I want to create a threadsafe queue, with two operations:
- Remove ALL elements from the queue.
- Add 1 element to the queue.
My first thought was to use the type MVar (NonEmpty a)
The operations will be implemented as follows:
- Just use
takeMVar
. This will leave theMVar
empty. Reverse the returned list because as noted below, the elements are stored in reverse order. - IF the
MVar
is empty, insert the singleton list with the element to insert. But if theMVar
has data in it, append that element to the list.
The problem is that, I can't see in the documentation for MVar
how to do point 2 atomically. I can putMVar
, or I can modifyMVar
, but (I presume) both of these functions could potentially block (if the MVar
is full or empty respectively), but inserting into the queue should never block (only reading from it should).
I presume there's a way around this issue? Is MVar
just the wrong primitive or can I combine them in some different way?
答案1
得分: 4
一个实现并发数据结构的简单方法是STM,因为它允许您将任意序列的操作变为原子操作。使用retry
也很容易进行阻塞:pop
会被挂起,只有在之前读取的变量之一被写入后,才会从头开始重新运行。
import Control.Concurrent.STM
newtype Queue a = Queue (TVar [a])
push :: a -> Queue a -> IO ()
push x (Queue q) = atomically $ do
xs <- readTVar q
writeTVar q (x : xs)
pop :: Queue a -> IO [a]
pop (Queue q) = atomically $ do
xs <- readTVar q
case xs of
[] -> retry
_ -> do
writeTVar q []
return (reverse xs)
英文:
An easy way to implement concurrent data structures is STM, because it lets you make arbitrary sequences of operations atomic. It's also easy to block with retry
: pop
is suspended and will re-run from the start only when one of the variables it read previously has been written to.
import Control.Concurrent.STM
newtype Queue a = Queue (TVar [a])
push :: a -> Queue a -> IO ()
push x (Queue q) = atomically $ do
xs <- readTVar q
writeTVar q (x : xs)
pop :: Queue a -> IO [a]
pop (Queue q) = atomically $ do
xs <- readTVar q
case xs of
[] -> retry
_ -> do
writeTVar q []
return (reverse xs)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论