在Haskell中使用MVar创建一个线程安全的队列(也许?)

huangapple go评论61阅读模式
英文:

Making a threadsafe queue in Haskell using MVar (maybe?)

问题

在Haskell中,我想创建一个线程安全的队列,具有两个操作:

  1. 从队列中移除所有元素。
  2. 将一个元素添加到队列中。

我的第一个想法是使用类型 MVar (NonEmpty a)

操作将如下实现:

  1. 只需使用 takeMVar。这将使 MVar 变为空。将返回的列表反转,因为如下所述,元素以相反的顺序存储。
  2. 如果 MVar 为空,将包含要插入的元素的单元素列表插入其中。但如果 MVar 中有数据,则将该元素附加到列表中。

问题在于,我在 MVar 的文档中看不到如何以原子方式执行第2点。我可以使用 putMVarmodifyMVar,但是(我推测)这两个函数都有可能阻塞(如果 MVar 为空或满的话),但是插入到队列中不应该阻塞(只有从中读取才会阻塞)。

我推测是否有解决此问题的方法?MVar 只是错误的基元,还是我可以以某种不同的方式组合它们?

英文:

In Haskell, I want to create a threadsafe queue, with two operations:

  1. Remove ALL elements from the queue.
  2. Add 1 element to the queue.

My first thought was to use the type MVar (NonEmpty a)

The operations will be implemented as follows:

  1. Just use takeMVar. This will leave the MVar empty. Reverse the returned list because as noted below, the elements are stored in reverse order.
  2. IF the MVar is empty, insert the singleton list with the element to insert. But if the MVar has data in it, append that element to the list.

The problem is that, I can't see in the documentation for MVar how to do point 2 atomically. I can putMVar, or I can modifyMVar, but (I presume) both of these functions could potentially block (if the MVar is full or empty respectively), but inserting into the queue should never block (only reading from it should).

I presume there's a way around this issue? Is MVar just the wrong primitive or can I combine them in some different way?

答案1

得分: 4

一个实现并发数据结构的简单方法是STM,因为它允许您将任意序列的操作变为原子操作。使用retry也很容易进行阻塞:pop会被挂起,只有在之前读取的变量之一被写入后,才会从头开始重新运行。

import Control.Concurrent.STM

newtype Queue a = Queue (TVar [a])

push :: a -> Queue a -> IO ()
push x (Queue q) = atomically $ do
  xs <- readTVar q
  writeTVar q (x : xs)

pop :: Queue a -> IO [a]
pop (Queue q) = atomically $ do
  xs <- readTVar q
  case xs of
    [] -> retry
    _ -> do
      writeTVar q []
      return (reverse xs)
英文:

An easy way to implement concurrent data structures is STM, because it lets you make arbitrary sequences of operations atomic. It's also easy to block with retry: pop is suspended and will re-run from the start only when one of the variables it read previously has been written to.

import Control.Concurrent.STM

newtype Queue a = Queue (TVar [a])

push :: a -&gt; Queue a -&gt; IO ()
push x (Queue q) = atomically $ do
  xs &lt;- readTVar q
  writeTVar q (x : xs)

pop :: Queue a -&gt; IO [a]
pop (Queue q) = atomically $ do
  xs &lt;- readTVar q
  case xs of
    [] -&gt; retry
    _ -&gt; do
      writeTVar q []
      return (reverse xs)

huangapple
  • 本文由 发表于 2023年6月5日 09:44:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/76403099.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定