英文:
Why my golang lock free queue always stuck there?
问题
这是我的代码:
package main
import (
"sync/atomic"
"unsafe"
"sync"
"fmt"
"time"
)
const (
MAX_DATA_SIZE = 100
)
// 无锁队列
type Queue struct {
head unsafe.Pointer
tail unsafe.Pointer
}
// 队列中的一个节点
type Node struct {
val interface{}
next unsafe.Pointer
}
// 队列函数
func (self *Queue) enQueue(val interface{}) {
newValue := unsafe.Pointer(&Node{val: val, next: nil})
var tail,next unsafe.Pointer
for {
tail = self.tail
next = ((*Node)(tail)).next
if next != nil {
atomic.CompareAndSwapPointer(&(self.tail), tail, next)
}else if atomic.CompareAndSwapPointer(&((*Node)(tail).next), nil, newValue){
break
}
}
}
func (self *Queue) deQueue() (val interface{}, success bool){
var head,tail,next unsafe.Pointer
for {
head = self.head
tail = self.tail
next = ((*Node)(head)).next
if head == tail {
if next == nil {
return nil, false
}else {
atomic.CompareAndSwapPointer(&(self.tail), tail, next)
}
}else {
val = ((*Node)(next)).val
if atomic.CompareAndSwapPointer(&(self.head), head, next) {
return val, true
}
}
}
return
}
func main() {
var wg sync.WaitGroup
wg.Add(20)
queue := new(Queue)
queue.head = unsafe.Pointer(new(Node))
queue.tail = queue.head
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
t := time.Now()
queue.enQueue(t)
fmt.Println("enq = ", t)
}
}()
}
for i := 0; i < 10; i++ {
go func() {
ok := false
var val interface{}
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
val,ok = queue.deQueue()
for !ok {
val,ok = queue.deQueue()
}
fmt.Println("deq = ",val)
}
}()
}
wg.Wait()
}
问题是,有时代码运行正常,但有时会失败并且没有响应。
我的代码有什么问题吗?
英文:
Here is my code:
package main
import (
"sync/atomic"
"unsafe"
"sync"
"fmt"
"time"
)
const (
MAX_DATA_SIZE = 100
)
// lock free queue
type Queue struct {
head unsafe.Pointer
tail unsafe.Pointer
}
// one node in queue
type Node struct {
val interface{}
next unsafe.Pointer
}
// queue functions
func (self *Queue) enQueue(val interface{}) {
newValue := unsafe.Pointer(&Node{val: val, next: nil})
var tail,next unsafe.Pointer
for {
tail = self.tail
next = ((*Node)(tail)).next
if next != nil {
atomic.CompareAndSwapPointer(&(self.tail), tail, next)
}else if atomic.CompareAndSwapPointer(&((*Node)(tail).next), nil, newValue){
break
}
}
}
func (self *Queue) deQueue() (val interface{}, success bool){
var head,tail,next unsafe.Pointer
for {
head = self.head
tail = self.tail
next = ((*Node)(head)).next
if head == tail {
if next == nil {
return nil, false
}else {
atomic.CompareAndSwapPointer(&(self.tail), tail, next)
}
}else {
val = ((*Node)(next)).val
if atomic.CompareAndSwapPointer(&(self.head), head, next) {
return val, true
}
}
}
return
}
func main() {
var wg sync.WaitGroup
wg.Add(20)
queue := new(Queue)
queue.head = unsafe.Pointer(new(Node))
queue.tail = queue.head
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
t := time.Now()
queue.enQueue(t)
fmt.Println("enq = ", t)
}
}()
}
for i := 0; i < 10; i++ {
go func() {
ok := false
var val interface{}
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
val,ok = queue.deQueue()
for !ok {
val,ok = queue.deQueue()
}
fmt.Println("deq = ",val)
}
}()
}
wg.Wait()
}
The problem is, sometimes the code runs ok, but sometimes it fails and just gets stuck with no response.
Is there any problem in my code?
答案1
得分: 5
在这段代码中有很多主动等待的情况,我强烈建议像Nick的代码一样干净地使用通道。
然而,这是我对于确切的原始问题“为什么它卡住了?”的答案:每个goroutine何时让出执行权没有保证,而且在无限循环中时,它很可能永远不会让出执行权。
你可以通过在每个可能无限循环中使用runtime.Gosched()来修复这个问题:
Gosched让出处理器,允许其他goroutine运行。它不会挂起当前的goroutine,所以执行会自动恢复。
这个增强的代码几乎和原始代码一样快,但永远不会卡住:
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
"unsafe"
)
const (
MAX_DATA_SIZE = 100
)
// 无锁队列
type Queue struct {
head unsafe.Pointer
tail unsafe.Pointer
}
// 队列中的一个节点
type Node struct {
val interface{}
next unsafe.Pointer
}
// 队列函数
func (self *Queue) enQueue(val interface{}) {
newValue := unsafe.Pointer(&Node{val: val, next: nil})
var tail, next unsafe.Pointer
for {
tail = self.tail
next = ((*Node)(tail)).next
if next != nil {
atomic.CompareAndSwapPointer(&(self.tail), tail, next)
} else if atomic.CompareAndSwapPointer(&((*Node)(tail).next), nil, newValue) {
break
}
runtime.Gosched()
}
}
func (self *Queue) deQueue() (val interface{}, success bool) {
var head, tail, next unsafe.Pointer
for {
head = self.head
tail = self.tail
next = ((*Node)(head)).next
if head == tail {
if next == nil {
return nil, false
} else {
atomic.CompareAndSwapPointer(&(self.tail), tail, next)
}
} else {
val = ((*Node)(next)).val
if atomic.CompareAndSwapPointer(&(self.head), head, next) {
return val, true
}
}
runtime.Gosched()
}
return
}
func main() {
var wg sync.WaitGroup
wg.Add(20)
queue := new(Queue)
queue.head = unsafe.Pointer(new(Node))
queue.tail = queue.head
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
t := time.Now()
queue.enQueue(t)
fmt.Println("enq = ", t)
}
}()
}
for i := 0; i < 10; i++ {
go func() {
ok := false
var val interface{}
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
val, ok = queue.deQueue()
for !ok {
val, ok = queue.deQueue()
runtime.Gosched()
}
fmt.Println("deq = ", val)
}
}()
}
wg.Wait()
}
英文:
There is a lot of active waiting in this code, and I strongly recommend a clean use of channel just like Nick's nice code.
However, here is my answer to the exact original question "Why is it stuck?" : there is no garantee of when each goroutine will yield to let the others execute, and most probably it will never yield when inside an infinite loop.
You can fix this by using runtime.Gosched() inside each possibly-infinite for loop :
> Gosched yields the processor, allowing other goroutines to run. It
> does not suspend the current goroutine, so execution resumes
> automatically.
This enhanced code runs almost as fast as the original, but never hangs :
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
"unsafe"
)
const (
MAX_DATA_SIZE = 100
)
// lock free queue
type Queue struct {
head unsafe.Pointer
tail unsafe.Pointer
}
// one node in queue
type Node struct {
val interface{}
next unsafe.Pointer
}
// queue functions
func (self *Queue) enQueue(val interface{}) {
newValue := unsafe.Pointer(&Node{val: val, next: nil})
var tail, next unsafe.Pointer
for {
tail = self.tail
next = ((*Node)(tail)).next
if next != nil {
atomic.CompareAndSwapPointer(&(self.tail), tail, next)
} else if atomic.CompareAndSwapPointer(&((*Node)(tail).next), nil, newValue) {
break
}
runtime.Gosched()
}
}
func (self *Queue) deQueue() (val interface{}, success bool) {
var head, tail, next unsafe.Pointer
for {
head = self.head
tail = self.tail
next = ((*Node)(head)).next
if head == tail {
if next == nil {
return nil, false
} else {
atomic.CompareAndSwapPointer(&(self.tail), tail, next)
}
} else {
val = ((*Node)(next)).val
if atomic.CompareAndSwapPointer(&(self.head), head, next) {
return val, true
}
}
runtime.Gosched()
}
return
}
func main() {
var wg sync.WaitGroup
wg.Add(20)
queue := new(Queue)
queue.head = unsafe.Pointer(new(Node))
queue.tail = queue.head
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
t := time.Now()
queue.enQueue(t)
fmt.Println("enq = ", t)
}
}()
}
for i := 0; i < 10; i++ {
go func() {
ok := false
var val interface{}
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
val, ok = queue.deQueue()
for !ok {
val, ok = queue.deQueue()
runtime.Gosched()
}
fmt.Println("deq = ", val)
}
}()
}
wg.Wait()
}
答案2
得分: 3
这是根据@mkb的建议重写的代码(除了无限队列大小)。
它不会锁死。
我建议您使用通道,除非您有非常好的理由不使用,因为Go团队花了很多精力使它们可靠、高性能且易于使用。
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
const (
MAX_DATA_SIZE = 100
)
func main() {
runtime.GOMAXPROCS(4)
var wg sync.WaitGroup
wg.Add(20)
queue := make(chan time.Time, 10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
t := time.Now()
queue <- t
fmt.Println("enq = ", t)
}
}()
}
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
val := <-queue
fmt.Println("deq = ", val)
}
}()
}
wg.Wait()
}
英文:
Here is the above re-written with channels as @mkb suggested (bar the infinite queue size).
It doesn't lock up.
I'd suggest you use channels unless you have a really good reason not to as the Go team have spend a great deal of effort making them reliable, high performance and easy to use.
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
const (
MAX_DATA_SIZE = 100
)
func main() {
runtime.GOMAXPROCS(4)
var wg sync.WaitGroup
wg.Add(20)
queue := make(chan time.Time, 10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
t := time.Now()
queue <- t
fmt.Println("enq = ", t)
}
}()
}
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < MAX_DATA_SIZE; j++ {
val := <-queue
fmt.Println("deq = ", val)
}
}()
}
wg.Wait()
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论