实现一个无锁的无界队列,使用新的atomic.Pointer类型。

huangapple go评论107阅读模式
英文:

Implementing an lock-free unbounded queue with new atomic.Pointer types

问题

我正在尝试实现Michael和Scott的这个非阻塞队列。

我尝试使用Go 1.19中引入的新的atomic.Pointer类型,但是我的应用程序中出现了数据竞争。

这是我的实现代码:

  1. package queue
  2. import (
  3. "errors"
  4. "sync/atomic"
  5. )
  6. // LockfreeQueue表示具有入队和出队通用值操作的FIFO结构。
  7. // 参考:https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
  8. type LockFreeQueue[T any] struct {
  9. head atomic.Pointer[node[T]]
  10. tail atomic.Pointer[node[T]]
  11. }
  12. // node表示队列中的一个节点
  13. type node[T any] struct {
  14. value T
  15. next atomic.Pointer[node[T]]
  16. }
  17. // newNode创建并初始化一个节点
  18. func newNode[T any](v T) *node[T] {
  19. return &node[T]{value: v}
  20. }
  21. // NewQueue创建并初始化一个LockFreeQueue
  22. func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
  23. var head atomic.Pointer[node[T]]
  24. var tail atomic.Pointer[node[T]]
  25. n := &node[T]{}
  26. head.Store(n)
  27. tail.Store(n)
  28. return &LockFreeQueue[T]{
  29. head: head,
  30. tail: tail,
  31. }
  32. }
  33. // Enqueue向队列中添加一系列请求
  34. func (q *LockFreeQueue[T]) Enqueue(v T) {
  35. n := newNode(v)
  36. for {
  37. tail := q.tail.Load()
  38. next := tail.next.Load()
  39. if tail == q.tail.Load() {
  40. if next == nil {
  41. if tail.next.CompareAndSwap(next, n) {
  42. q.tail.CompareAndSwap(tail, n)
  43. return
  44. }
  45. } else {
  46. q.tail.CompareAndSwap(tail, next)
  47. }
  48. }
  49. }
  50. }
  51. // Dequeue从队列中移除一个请求
  52. func (q *LockFreeQueue[T]) Dequeue() (T, error) {
  53. for {
  54. head := q.head.Load()
  55. tail := q.tail.Load()
  56. next := head.next.Load()
  57. if head == q.head.Load() {
  58. if head == tail {
  59. if next == nil {
  60. return head.value, errors.New("队列为空")
  61. }
  62. q.tail.CompareAndSwap(tail, next)
  63. } else {
  64. v := next.value
  65. if q.head.CompareAndSwap(head, next) {
  66. return v, nil
  67. }
  68. }
  69. }
  70. }
  71. }
  72. // 检查队列是否为空。
  73. func (q *LockFreeQueue[T]) IsEmpty() bool {
  74. return q.head.Load() == q.tail.Load()
  75. }

我在这里找到了另一种实现,它在我的应用程序中没有数据竞争,但我似乎无法确定两者之间的确切区别。

任何帮助或反馈将不胜感激!

英文:

I am trying to implement this non-blocking queue from Michael and Scott.

I am trying to use the new atomic.Pointer types introduced in Go 1.19 but I am getting a data race in my application.

Here is my implementation:

  1. package queue
  2. import (
  3. "errors"
  4. "sync/atomic"
  5. )
  6. // LockfreeQueue represents a FIFO structure with operations to enqueue
  7. // and dequeue generic values.
  8. // Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
  9. type LockFreeQueue[T any] struct {
  10. head atomic.Pointer[node[T]]
  11. tail atomic.Pointer[node[T]]
  12. }
  13. // node represents a node in the queue
  14. type node[T any] struct {
  15. value T
  16. next atomic.Pointer[node[T]]
  17. }
  18. // newNode creates and initializes a node
  19. func newNode[T any](v T) *node[T] {
  20. return &node[T]{value: v}
  21. }
  22. // NewQueue creates and initializes a LockFreeQueue
  23. func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
  24. var head atomic.Pointer[node[T]]
  25. var tail atomic.Pointer[node[T]]
  26. n := &node[T]{}
  27. head.Store(n)
  28. tail.Store(n)
  29. return &LockFreeQueue[T]{
  30. head: head,
  31. tail: tail,
  32. }
  33. }
  34. // Enqueue adds a series of Request to the queue
  35. func (q *LockFreeQueue[T]) Enqueue(v T) {
  36. n := newNode(v)
  37. for {
  38. tail := q.tail.Load()
  39. next := tail.next.Load()
  40. if tail == q.tail.Load() {
  41. if next == nil {
  42. if tail.next.CompareAndSwap(next, n) {
  43. q.tail.CompareAndSwap(tail, n)
  44. return
  45. }
  46. } else {
  47. q.tail.CompareAndSwap(tail, next)
  48. }
  49. }
  50. }
  51. }
  52. // Dequeue removes a Request from the queue
  53. func (q *LockFreeQueue[T]) Dequeue() (T, error) {
  54. for {
  55. head := q.head.Load()
  56. tail := q.tail.Load()
  57. next := head.next.Load()
  58. if head == q.head.Load() {
  59. if head == tail {
  60. if next == nil {
  61. return head.value, errors.New("queue is empty")
  62. }
  63. q.tail.CompareAndSwap(tail, next)
  64. } else {
  65. v := next.value
  66. if q.head.CompareAndSwap(head, next) {
  67. return v, nil
  68. }
  69. }
  70. }
  71. }
  72. }
  73. // Check if the queue is empty.
  74. func (q *LockFreeQueue[T]) IsEmpty() bool {
  75. return q.head.Load() == q.tail.Load()
  76. }

I found a different implementation here which works in my application without a data race, but I can't seem to figure out what is exactly different between the two.

Any help or feedback is appreciated!

答案1

得分: 1

原来是改变了一些东西解决了问题。

第一个改变:

  1. var n = node[T]{}
  2. head.Store(&n)
  3. tail.Store(&n)

第二个改变是改变了Dequeue()的返回类型。

最终的文件如下所示:

  1. package queue
  2. import (
  3. "sync/atomic"
  4. )
  5. // LockfreeQueue 表示具有入队和出队通用操作的FIFO结构。
  6. // 参考:https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
  7. type LockFreeQueue[T any] struct {
  8. head atomic.Pointer[node[T]]
  9. tail atomic.Pointer[node[T]]
  10. }
  11. // node 表示队列中的一个节点
  12. type node[T any] struct {
  13. value T
  14. next atomic.Pointer[node[T]]
  15. }
  16. // newNode 创建并初始化一个节点
  17. func newNode[T any](v T) *node[T] {
  18. return &node[T]{value: v}
  19. }
  20. // NewQueue 创建并初始化一个LockFreeQueue
  21. func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
  22. var head atomic.Pointer[node[T]]
  23. var tail atomic.Pointer[node[T]]
  24. var n = node[T]{}
  25. head.Store(&n)
  26. tail.Store(&n)
  27. return &LockFreeQueue[T]{
  28. head: head,
  29. tail: tail,
  30. }
  31. }
  32. // Enqueue 向队列中添加一系列请求
  33. func (q *LockFreeQueue[T]) Enqueue(v T) {
  34. n := newNode(v)
  35. for {
  36. tail := q.tail.Load()
  37. next := tail.next.Load()
  38. if tail == q.tail.Load() {
  39. if next == nil {
  40. if tail.next.CompareAndSwap(next, n) {
  41. q.tail.CompareAndSwap(tail, n)
  42. return
  43. }
  44. } else {
  45. q.tail.CompareAndSwap(tail, next)
  46. }
  47. }
  48. }
  49. }
  50. // Dequeue 从队列中移除一个请求
  51. func (q *LockFreeQueue[T]) Dequeue() T {
  52. var t T
  53. for {
  54. head := q.head.Load()
  55. tail := q.tail.Load()
  56. next := head.next.Load()
  57. if head == q.head.Load() {
  58. if head == tail {
  59. if next == nil {
  60. return t
  61. }
  62. q.tail.CompareAndSwap(tail, next)
  63. } else {
  64. v := next.value
  65. if q.head.CompareAndSwap(head, next) {
  66. return v
  67. }
  68. }
  69. }
  70. }
  71. }
  72. // 检查队列是否为空。
  73. func (q *LockFreeQueue[T]) IsEmpty() bool {
  74. return q.head.Load() == q.tail.Load()
  75. }
英文:

It turned out that changing some things fixed the issue.

First change:

  1. var n = node[T]{}
  2. head.Store(&n)
  3. tail.Store(&n)

Second change was changing the Dequeue() return signature.

The final file looks like this:

  1. package queue
  2. import (
  3. "sync/atomic"
  4. )
  5. // LockfreeQueue represents a FIFO structure with operations to enqueue
  6. // and dequeue generic values.
  7. // Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
  8. type LockFreeQueue[T any] struct {
  9. head atomic.Pointer[node[T]]
  10. tail atomic.Pointer[node[T]]
  11. }
  12. // node represents a node in the queue
  13. type node[T any] struct {
  14. value T
  15. next atomic.Pointer[node[T]]
  16. }
  17. // newNode creates and initializes a node
  18. func newNode[T any](v T) *node[T] {
  19. return &node[T]{value: v}
  20. }
  21. // NewQueue creates and initializes a LockFreeQueue
  22. func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
  23. var head atomic.Pointer[node[T]]
  24. var tail atomic.Pointer[node[T]]
  25. var n = node[T]{}
  26. head.Store(&n)
  27. tail.Store(&n)
  28. return &LockFreeQueue[T]{
  29. head: head,
  30. tail: tail,
  31. }
  32. }
  33. // Enqueue adds a series of Request to the queue
  34. func (q *LockFreeQueue[T]) Enqueue(v T) {
  35. n := newNode(v)
  36. for {
  37. tail := q.tail.Load()
  38. next := tail.next.Load()
  39. if tail == q.tail.Load() {
  40. if next == nil {
  41. if tail.next.CompareAndSwap(next, n) {
  42. q.tail.CompareAndSwap(tail, n)
  43. return
  44. }
  45. } else {
  46. q.tail.CompareAndSwap(tail, next)
  47. }
  48. }
  49. }
  50. }
  51. // Dequeue removes a Request from the queue
  52. func (q *LockFreeQueue[T]) Dequeue() T {
  53. var t T
  54. for {
  55. head := q.head.Load()
  56. tail := q.tail.Load()
  57. next := head.next.Load()
  58. if head == q.head.Load() {
  59. if head == tail {
  60. if next == nil {
  61. return t
  62. }
  63. q.tail.CompareAndSwap(tail, next)
  64. } else {
  65. v := next.value
  66. if q.head.CompareAndSwap(head, next) {
  67. return v
  68. }
  69. }
  70. }
  71. }
  72. }
  73. // Check if the queue is empty.
  74. func (q *LockFreeQueue[T]) IsEmpty() bool {
  75. return q.head.Load() == q.tail.Load()
  76. }

huangapple
  • 本文由 发表于 2023年4月22日 08:11:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/76077440.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定