英文:
Implementing an lock-free unbounded queue with new atomic.Pointer types
问题
我正在尝试实现Michael和Scott的这个非阻塞队列。
我尝试使用Go 1.19中引入的新的atomic.Pointer类型,但是我的应用程序中出现了数据竞争。
这是我的实现代码:
package queue
import (
"errors"
"sync/atomic"
)
// LockfreeQueue表示具有入队和出队通用值操作的FIFO结构。
// 参考:https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
head atomic.Pointer[node[T]]
tail atomic.Pointer[node[T]]
}
// node表示队列中的一个节点
type node[T any] struct {
value T
next atomic.Pointer[node[T]]
}
// newNode创建并初始化一个节点
func newNode[T any](v T) *node[T] {
return &node[T]{value: v}
}
// NewQueue创建并初始化一个LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
var head atomic.Pointer[node[T]]
var tail atomic.Pointer[node[T]]
n := &node[T]{}
head.Store(n)
tail.Store(n)
return &LockFreeQueue[T]{
head: head,
tail: tail,
}
}
// Enqueue向队列中添加一系列请求
func (q *LockFreeQueue[T]) Enqueue(v T) {
n := newNode(v)
for {
tail := q.tail.Load()
next := tail.next.Load()
if tail == q.tail.Load() {
if next == nil {
if tail.next.CompareAndSwap(next, n) {
q.tail.CompareAndSwap(tail, n)
return
}
} else {
q.tail.CompareAndSwap(tail, next)
}
}
}
}
// Dequeue从队列中移除一个请求
func (q *LockFreeQueue[T]) Dequeue() (T, error) {
for {
head := q.head.Load()
tail := q.tail.Load()
next := head.next.Load()
if head == q.head.Load() {
if head == tail {
if next == nil {
return head.value, errors.New("队列为空")
}
q.tail.CompareAndSwap(tail, next)
} else {
v := next.value
if q.head.CompareAndSwap(head, next) {
return v, nil
}
}
}
}
}
// 检查队列是否为空。
func (q *LockFreeQueue[T]) IsEmpty() bool {
return q.head.Load() == q.tail.Load()
}
我在这里找到了另一种实现,它在我的应用程序中没有数据竞争,但我似乎无法确定两者之间的确切区别。
任何帮助或反馈将不胜感激!
英文:
I am trying to implement this non-blocking queue from Michael and Scott.
I am trying to use the new atomic.Pointer types introduced in Go 1.19 but I am getting a data race in my application.
Here is my implementation:
package queue
import (
"errors"
"sync/atomic"
)
// LockfreeQueue represents a FIFO structure with operations to enqueue
// and dequeue generic values.
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
head atomic.Pointer[node[T]]
tail atomic.Pointer[node[T]]
}
// node represents a node in the queue
type node[T any] struct {
value T
next atomic.Pointer[node[T]]
}
// newNode creates and initializes a node
func newNode[T any](v T) *node[T] {
return &node[T]{value: v}
}
// NewQueue creates and initializes a LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
var head atomic.Pointer[node[T]]
var tail atomic.Pointer[node[T]]
n := &node[T]{}
head.Store(n)
tail.Store(n)
return &LockFreeQueue[T]{
head: head,
tail: tail,
}
}
// Enqueue adds a series of Request to the queue
func (q *LockFreeQueue[T]) Enqueue(v T) {
n := newNode(v)
for {
tail := q.tail.Load()
next := tail.next.Load()
if tail == q.tail.Load() {
if next == nil {
if tail.next.CompareAndSwap(next, n) {
q.tail.CompareAndSwap(tail, n)
return
}
} else {
q.tail.CompareAndSwap(tail, next)
}
}
}
}
// Dequeue removes a Request from the queue
func (q *LockFreeQueue[T]) Dequeue() (T, error) {
for {
head := q.head.Load()
tail := q.tail.Load()
next := head.next.Load()
if head == q.head.Load() {
if head == tail {
if next == nil {
return head.value, errors.New("queue is empty")
}
q.tail.CompareAndSwap(tail, next)
} else {
v := next.value
if q.head.CompareAndSwap(head, next) {
return v, nil
}
}
}
}
}
// Check if the queue is empty.
func (q *LockFreeQueue[T]) IsEmpty() bool {
return q.head.Load() == q.tail.Load()
}
I found a different implementation here which works in my application without a data race, but I can't seem to figure out what is exactly different between the two.
Any help or feedback is appreciated!
答案1
得分: 1
原来是改变了一些东西解决了问题。
第一个改变:
var n = node[T]{}
head.Store(&n)
tail.Store(&n)
第二个改变是改变了Dequeue()
的返回类型。
最终的文件如下所示:
package queue
import (
"sync/atomic"
)
// LockfreeQueue 表示具有入队和出队通用操作的FIFO结构。
// 参考:https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
head atomic.Pointer[node[T]]
tail atomic.Pointer[node[T]]
}
// node 表示队列中的一个节点
type node[T any] struct {
value T
next atomic.Pointer[node[T]]
}
// newNode 创建并初始化一个节点
func newNode[T any](v T) *node[T] {
return &node[T]{value: v}
}
// NewQueue 创建并初始化一个LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
var head atomic.Pointer[node[T]]
var tail atomic.Pointer[node[T]]
var n = node[T]{}
head.Store(&n)
tail.Store(&n)
return &LockFreeQueue[T]{
head: head,
tail: tail,
}
}
// Enqueue 向队列中添加一系列请求
func (q *LockFreeQueue[T]) Enqueue(v T) {
n := newNode(v)
for {
tail := q.tail.Load()
next := tail.next.Load()
if tail == q.tail.Load() {
if next == nil {
if tail.next.CompareAndSwap(next, n) {
q.tail.CompareAndSwap(tail, n)
return
}
} else {
q.tail.CompareAndSwap(tail, next)
}
}
}
}
// Dequeue 从队列中移除一个请求
func (q *LockFreeQueue[T]) Dequeue() T {
var t T
for {
head := q.head.Load()
tail := q.tail.Load()
next := head.next.Load()
if head == q.head.Load() {
if head == tail {
if next == nil {
return t
}
q.tail.CompareAndSwap(tail, next)
} else {
v := next.value
if q.head.CompareAndSwap(head, next) {
return v
}
}
}
}
}
// 检查队列是否为空。
func (q *LockFreeQueue[T]) IsEmpty() bool {
return q.head.Load() == q.tail.Load()
}
英文:
It turned out that changing some things fixed the issue.
First change:
var n = node[T]{}
head.Store(&n)
tail.Store(&n)
Second change was changing the Dequeue()
return signature.
The final file looks like this:
package queue
import (
"sync/atomic"
)
// LockfreeQueue represents a FIFO structure with operations to enqueue
// and dequeue generic values.
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
head atomic.Pointer[node[T]]
tail atomic.Pointer[node[T]]
}
// node represents a node in the queue
type node[T any] struct {
value T
next atomic.Pointer[node[T]]
}
// newNode creates and initializes a node
func newNode[T any](v T) *node[T] {
return &node[T]{value: v}
}
// NewQueue creates and initializes a LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
var head atomic.Pointer[node[T]]
var tail atomic.Pointer[node[T]]
var n = node[T]{}
head.Store(&n)
tail.Store(&n)
return &LockFreeQueue[T]{
head: head,
tail: tail,
}
}
// Enqueue adds a series of Request to the queue
func (q *LockFreeQueue[T]) Enqueue(v T) {
n := newNode(v)
for {
tail := q.tail.Load()
next := tail.next.Load()
if tail == q.tail.Load() {
if next == nil {
if tail.next.CompareAndSwap(next, n) {
q.tail.CompareAndSwap(tail, n)
return
}
} else {
q.tail.CompareAndSwap(tail, next)
}
}
}
}
// Dequeue removes a Request from the queue
func (q *LockFreeQueue[T]) Dequeue() T {
var t T
for {
head := q.head.Load()
tail := q.tail.Load()
next := head.next.Load()
if head == q.head.Load() {
if head == tail {
if next == nil {
return t
}
q.tail.CompareAndSwap(tail, next)
} else {
v := next.value
if q.head.CompareAndSwap(head, next) {
return v
}
}
}
}
}
// Check if the queue is empty.
func (q *LockFreeQueue[T]) IsEmpty() bool {
return q.head.Load() == q.tail.Load()
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论