英文:
Is it possible to use contexts and buffered channels as queue? And I'm not sure if this is thread safe or not
问题
我需要创建一个将数据传递给多个消费者的队列。我可以使用带缓冲的通道和上下文来实现吗?我不确定这是否是线程安全的。
以下是我所说的示例代码:
package main
import (
"context"
"fmt"
"strconv"
"time"
)
func main() {
runQueue()
}
func runQueue() {
// 当缓冲区满时,发送通道将被阻塞
queue := make(chan string, 10000)
// 如果消费者太少,通道缓冲区将被填满,发送通道将被阻塞
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
consumerCount := 5
go runProducer(queue, ctx, cancel)
for i := 0; i < consumerCount; i++ {
go runConsumer(queue, ctx)
}
select {
case <-ctx.Done():
// 关闭通道以让goroutine获取ctx.Done()
close(queue)
}
}
func runConsumer(queue chan string, ctx context.Context) {
for {
data := <-queue
select {
case <-ctx.Done():
return
default:
}
fmt.Println(data)
<-time.After(time.Millisecond * 1000)
}
}
func runProducer(queue chan string, ctx context.Context, cancel context.CancelFunc) {
for {
fmt.Println("从服务器获取数据")
select {
case <-ctx.Done():
return
default:
}
// dataList将从其他服务器填充
dataList, err := getSomethingFromServer()
if err != nil {
if err.Error() == "非常严重的错误" {
cancel()
return
}
fmt.Println(err)
continue
}
select {
case <-ctx.Done():
return
default:
}
for _, el := range dataList {
queue <- el
}
<-time.After(time.Millisecond * 2000)
}
}
func getSomethingFromServer() ([]string, error) {
var newList []string
for i := 1; i < 4; i++ {
newList = append(newList, strconv.Itoa(i))
}
return newList, nil
}
这段代码是线程安全的吗?我的逻辑是否正确?
如果有任何错误,我希望能得到反馈。
如果有更好的做法,请告诉我。
英文:
I need to create a queue that passes data to multiple consumers.
Can I make it using buffered channel and context?
And I'm not sure if this is thread safe or not
Here's the sample code I'm talking about:
package main
import (
"context"
"fmt"
"strconv"
"time"
)
func main() {
runQueue()
}
func runQueue() {
// When the buffer is full
// sending channel is blocked
queue := make(chan string, 10000)
// If there are too few consumer,
// the channel buffer will be full, and the sending channel will be blocked.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
consumerCount := 5
go runProducer(queue, ctx, cancel)
for i := 0; i < consumerCount; i++ {
go runConsumer(queue, ctx)
}
select {
case <-ctx.Done():
// close channel to let goroutine get ctx.Done()
close(queue)
}
}
func runConsumer(queue chan string, ctx context.Context) {
for {
data := <-queue
select {
case <-ctx.Done():
return
default:
}
fmt.Println(data)
<-time.After(time.Millisecond * 1000)
}
}
func runProducer(queue chan string, ctx context.Context, cancel context.CancelFunc) {
for {
fmt.Println("get data from server")
select {
case <-ctx.Done():
return
default:
}
// dataList will be filled from other server
dataList, err := getSomethingFromServer()
if err != nil {
if err.Error() == "very fatal error" {
cancel()
return
}
fmt.Println(err)
continue
}
select {
case <-ctx.Done():
return
default:
}
for _, el := range dataList {
queue <- el
}
<-time.After(time.Millisecond * 2000)
}
}
func getSomethingFromServer() ([]string, error) {
var newList []string
for i := 1; i < 4; i++ {
newList = append(newList, strconv.Itoa(i))
}
return newList, nil
}
Is it thread safe?
And Is my logic going well?
If there are any mistakes, I would like to receive feedback
Please let me know if there is a better practice.
答案1
得分: 2
- 上下文(Context)是线程安全的。https://go.dev/blog/context
> 上下文(Context)可以被多个 goroutine 同时使用。代码可以将一个上下文(Context)传递给任意数量的 goroutine,并通过取消该上下文(Context)来向它们发送信号。
因此,在 Go 语言中,由于你无法确定 goroutine 在哪些线程(相同/不同)上运行,所以上下文(Context)是线程安全的。
- 通道(Channel)是线程安全的 - https://go.dev/ref/spec#Channel_types
> 单个通道(Channel)可以在发送语句、接收操作和调用内置函数 cap 和 len 的过程中被任意数量的 goroutine 使用,而无需进一步的同步。
通道(Channel)在底层使用了互斥锁(mutex)。
https://github.com/golang/go/blob/master/src/runtime/chan.go#L51
- 关于并发模式,请参考一些非常好的 Go 博客文章:
英文:
- Contexts are thread-safe. https://go.dev/blog/context
> A Context is safe for simultaneous use by multiple goroutines. Code can pass a single Context to any number of goroutines and cancel that Context to signal all of them.
So in go realms safe by multiple goroutines ~ thread-safe, since you never know on which threads (same/different) goroutines are running
- Channels are thread-safe - https://go.dev/ref/spec#Channel_types
> A single channel may be used in send statements, receive operations, and calls to the built-in functions cap and len by any number of goroutines without further synchronization
Channels use a mutex under-the-hood
https://github.com/golang/go/blob/master/src/runtime/chan.go#L51
- For concurrency patterns take a look at really good go blog posts:
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论