实现一个无锁的无界队列,使用新的atomic.Pointer类型。

huangapple go评论75阅读模式
英文:

Implementing an lock-free unbounded queue with new atomic.Pointer types

问题

我正在尝试实现Michael和Scott的这个非阻塞队列。

我尝试使用Go 1.19中引入的新的atomic.Pointer类型,但是我的应用程序中出现了数据竞争。

这是我的实现代码:

package queue

import (
	"errors"
	"sync/atomic"
)

// LockfreeQueue表示具有入队和出队通用值操作的FIFO结构。
// 参考:https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
	head atomic.Pointer[node[T]]
	tail atomic.Pointer[node[T]]
}

// node表示队列中的一个节点
type node[T any] struct {
	value T
	next  atomic.Pointer[node[T]]
}

// newNode创建并初始化一个节点
func newNode[T any](v T) *node[T] {
	return &node[T]{value: v}
}

// NewQueue创建并初始化一个LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
	var head atomic.Pointer[node[T]]
	var tail atomic.Pointer[node[T]]
	n := &node[T]{}
	head.Store(n)
	tail.Store(n)
	return &LockFreeQueue[T]{
		head: head,
		tail: tail,
	}
}

// Enqueue向队列中添加一系列请求
func (q *LockFreeQueue[T]) Enqueue(v T) {
	n := newNode(v)
	for {
		tail := q.tail.Load()
		next := tail.next.Load()
		if tail == q.tail.Load() {
			if next == nil {
				if tail.next.CompareAndSwap(next, n) {
					q.tail.CompareAndSwap(tail, n)
					return
				}
			} else {
				q.tail.CompareAndSwap(tail, next)
			}
		}
	}
}

// Dequeue从队列中移除一个请求
func (q *LockFreeQueue[T]) Dequeue() (T, error) {
	for {
		head := q.head.Load()
		tail := q.tail.Load()
		next := head.next.Load()
		if head == q.head.Load() {
			if head == tail {
				if next == nil {
					return head.value, errors.New("队列为空")
				}
				q.tail.CompareAndSwap(tail, next)
			} else {
				v := next.value
				if q.head.CompareAndSwap(head, next) {
					return v, nil
				}
			}
		}
	}
}

// 检查队列是否为空。
func (q *LockFreeQueue[T]) IsEmpty() bool {
	return q.head.Load() == q.tail.Load()
}

我在这里找到了另一种实现,它在我的应用程序中没有数据竞争,但我似乎无法确定两者之间的确切区别。

任何帮助或反馈将不胜感激!

英文:

I am trying to implement this non-blocking queue from Michael and Scott.

I am trying to use the new atomic.Pointer types introduced in Go 1.19 but I am getting a data race in my application.

Here is my implementation:

package queue

import (
	"errors"
	"sync/atomic"
)

// LockfreeQueue represents a FIFO structure with operations to enqueue
// and dequeue generic values.
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
	head atomic.Pointer[node[T]]
	tail atomic.Pointer[node[T]]
}

// node represents a node in the queue
type node[T any] struct {
	value T
	next  atomic.Pointer[node[T]]
}

// newNode creates and initializes a node
func newNode[T any](v T) *node[T] {
	return &node[T]{value: v}
}

// NewQueue creates and initializes a LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
	var head atomic.Pointer[node[T]]
	var tail atomic.Pointer[node[T]]
	n := &node[T]{}
	head.Store(n)
	tail.Store(n)
	return &LockFreeQueue[T]{
		head: head,
		tail: tail,
	}
}

// Enqueue adds a series of Request to the queue
func (q *LockFreeQueue[T]) Enqueue(v T) {
	n := newNode(v)
	for {
		tail := q.tail.Load()
		next := tail.next.Load()
		if tail == q.tail.Load() {
			if next == nil {
				if tail.next.CompareAndSwap(next, n) {
					q.tail.CompareAndSwap(tail, n)
					return
				}
			} else {
				q.tail.CompareAndSwap(tail, next)
			}
		}
	}
}

// Dequeue removes a Request from the queue
func (q *LockFreeQueue[T]) Dequeue() (T, error) {
	for {
		head := q.head.Load()
		tail := q.tail.Load()
		next := head.next.Load()
		if head == q.head.Load() {
			if head == tail {
				if next == nil {
					return head.value, errors.New("queue is empty")
				}
				q.tail.CompareAndSwap(tail, next)
			} else {
				v := next.value
				if q.head.CompareAndSwap(head, next) {
					return v, nil
				}
			}
		}
	}
}

// Check if the queue is empty.
func (q *LockFreeQueue[T]) IsEmpty() bool {
        return q.head.Load() == q.tail.Load()
}

I found a different implementation here which works in my application without a data race, but I can't seem to figure out what is exactly different between the two.

Any help or feedback is appreciated!

答案1

得分: 1

原来是改变了一些东西解决了问题。

第一个改变:

var n = node[T]{}
head.Store(&n)
tail.Store(&n)

第二个改变是改变了Dequeue()的返回类型。

最终的文件如下所示:

package queue

import (
	"sync/atomic"
)

// LockfreeQueue 表示具有入队和出队通用操作的FIFO结构。
// 参考:https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
	head atomic.Pointer[node[T]]
	tail atomic.Pointer[node[T]]
}

// node 表示队列中的一个节点
type node[T any] struct {
	value T
	next  atomic.Pointer[node[T]]
}

// newNode 创建并初始化一个节点
func newNode[T any](v T) *node[T] {
	return &node[T]{value: v}
}

// NewQueue 创建并初始化一个LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
	var head atomic.Pointer[node[T]]
	var tail atomic.Pointer[node[T]]
	var n = node[T]{}
	head.Store(&n)
	tail.Store(&n)
	return &LockFreeQueue[T]{
		head: head,
		tail: tail,
	}
}

// Enqueue 向队列中添加一系列请求
func (q *LockFreeQueue[T]) Enqueue(v T) {
	n := newNode(v)
	for {
		tail := q.tail.Load()
		next := tail.next.Load()
		if tail == q.tail.Load() {
			if next == nil {
				if tail.next.CompareAndSwap(next, n) {
					q.tail.CompareAndSwap(tail, n)
					return
				}
			} else {
				q.tail.CompareAndSwap(tail, next)
			}
		}
	}
}

// Dequeue 从队列中移除一个请求
func (q *LockFreeQueue[T]) Dequeue() T {
	var t T
	for {
		head := q.head.Load()
		tail := q.tail.Load()
		next := head.next.Load()
		if head == q.head.Load() {
			if head == tail {
				if next == nil {
					return t
				}
				q.tail.CompareAndSwap(tail, next)
			} else {
				v := next.value
				if q.head.CompareAndSwap(head, next) {
					return v
				}
			}
		}
	}
}

// 检查队列是否为空。
func (q *LockFreeQueue[T]) IsEmpty() bool {
	return q.head.Load() == q.tail.Load()
}
英文:

It turned out that changing some things fixed the issue.

First change:

var n = node[T]{}
head.Store(&n)
tail.Store(&n)

Second change was changing the Dequeue() return signature.

The final file looks like this:

package queue

import (
	"sync/atomic"
)

// LockfreeQueue represents a FIFO structure with operations to enqueue
// and dequeue generic values.
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
	head atomic.Pointer[node[T]]
	tail atomic.Pointer[node[T]]
}

// node represents a node in the queue
type node[T any] struct {
	value T
	next  atomic.Pointer[node[T]]
}

// newNode creates and initializes a node
func newNode[T any](v T) *node[T] {
	return &node[T]{value: v}
}

// NewQueue creates and initializes a LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
	var head atomic.Pointer[node[T]]
	var tail atomic.Pointer[node[T]]
	var n = node[T]{}
	head.Store(&n)
	tail.Store(&n)
	return &LockFreeQueue[T]{
		head: head,
		tail: tail,
	}
}

// Enqueue adds a series of Request to the queue
func (q *LockFreeQueue[T]) Enqueue(v T) {
	n := newNode(v)
	for {
		tail := q.tail.Load()
		next := tail.next.Load()
		if tail == q.tail.Load() {
			if next == nil {
				if tail.next.CompareAndSwap(next, n) {
					q.tail.CompareAndSwap(tail, n)
					return
				}
			} else {
				q.tail.CompareAndSwap(tail, next)
			}
		}
	}
}

// Dequeue removes a Request from the queue
func (q *LockFreeQueue[T]) Dequeue() T {
	var t T
	for {
		head := q.head.Load()
		tail := q.tail.Load()
		next := head.next.Load()
		if head == q.head.Load() {
			if head == tail {
				if next == nil {
					return t
				}
				q.tail.CompareAndSwap(tail, next)
			} else {
				v := next.value
				if q.head.CompareAndSwap(head, next) {
					return v
				}
			}
		}
	}
}

// Check if the queue is empty.
func (q *LockFreeQueue[T]) IsEmpty() bool {
	return q.head.Load() == q.tail.Load()
}

huangapple
  • 本文由 发表于 2023年4月22日 08:11:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/76077440.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定