可以使用上下文和缓冲通道作为队列吗?我不确定这是否是线程安全的。

huangapple go评论68阅读模式
英文:

Is it possible to use contexts and buffered channels as queue? And I'm not sure if this is thread safe or not

问题

我需要创建一个将数据传递给多个消费者的队列。我可以使用带缓冲的通道和上下文来实现吗?我不确定这是否是线程安全的。

以下是我所说的示例代码:

package main

import (
	"context"
	"fmt"
	"strconv"
	"time"
)

func main() {
	runQueue()
}

func runQueue() {
	// 当缓冲区满时,发送通道将被阻塞
	queue := make(chan string, 10000)

	// 如果消费者太少,通道缓冲区将被填满,发送通道将被阻塞
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	consumerCount := 5
	go runProducer(queue, ctx, cancel)
	for i := 0; i < consumerCount; i++ {
		go runConsumer(queue, ctx)
	}
	select {
	case <-ctx.Done():
		// 关闭通道以让goroutine获取ctx.Done()
		close(queue)
	}
}

func runConsumer(queue chan string, ctx context.Context) {
	for {
		data := <-queue
		select {
		case <-ctx.Done():
			return
		default:

		}
		fmt.Println(data)
		<-time.After(time.Millisecond * 1000)
	}
}

func runProducer(queue chan string, ctx context.Context, cancel context.CancelFunc) {
	for {
		fmt.Println("从服务器获取数据")
		select {
		case <-ctx.Done():
			return
		default:

		}
		// dataList将从其他服务器填充
		dataList, err := getSomethingFromServer()
		if err != nil {
			if err.Error() == "非常严重的错误" {
				cancel()
				return
			}
			fmt.Println(err)
			continue
		}
		select {
		case <-ctx.Done():
			return
		default:

		}
		for _, el := range dataList {
			queue <- el
		}
		<-time.After(time.Millisecond * 2000)
	}
}

func getSomethingFromServer() ([]string, error) {
	var newList []string
	for i := 1; i < 4; i++ {
		newList = append(newList, strconv.Itoa(i))
	}
	return newList, nil
}

这段代码是线程安全的吗?我的逻辑是否正确?

如果有任何错误,我希望能得到反馈。

如果有更好的做法,请告诉我。

英文:

I need to create a queue that passes data to multiple consumers.
Can I make it using buffered channel and context?
And I'm not sure if this is thread safe or not

Here's the sample code I'm talking about:

package main

import (
	&quot;context&quot;
	&quot;fmt&quot;
	&quot;strconv&quot;
	&quot;time&quot;
)

func main() {
	runQueue()
}

func runQueue() {
	// When the buffer is full
	// sending channel is blocked
	queue := make(chan string, 10000)

	// If there are too few consumer,
	// the channel buffer will be full, and the sending channel will be blocked.
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	consumerCount := 5
	go runProducer(queue, ctx, cancel)
	for i := 0; i &lt; consumerCount; i++ {
		go runConsumer(queue, ctx)
	}
	select {
	case &lt;-ctx.Done():
		// close channel to let goroutine get ctx.Done()
		close(queue)
	}
}

func runConsumer(queue chan string, ctx context.Context) {
	for {
		data := &lt;-queue
		select {
		case &lt;-ctx.Done():
			return
		default:

		}
		fmt.Println(data)
		&lt;-time.After(time.Millisecond * 1000)
	}
}

func runProducer(queue chan string, ctx context.Context, cancel context.CancelFunc) {
	for {
		fmt.Println(&quot;get data from server&quot;)
		select {
		case &lt;-ctx.Done():
			return
		default:

		}
		// dataList will be filled from other server
		dataList, err := getSomethingFromServer()
		if err != nil {
			if err.Error() == &quot;very fatal error&quot; {
				cancel()
				return
			}
			fmt.Println(err)
			continue
		}
		select {
		case &lt;-ctx.Done():
			return
		default:

		}
		for _, el := range dataList {
			queue &lt;- el
		}
		&lt;-time.After(time.Millisecond * 2000)
	}
}

func getSomethingFromServer() ([]string, error) {
	var newList []string
	for i := 1; i &lt; 4; i++ {
		newList = append(newList, strconv.Itoa(i))
	}
	return newList, nil
}

Is it thread safe?
And Is my logic going well?

If there are any mistakes, I would like to receive feedback

Please let me know if there is a better practice.

答案1

得分: 2

  1. 上下文(Context)是线程安全的。https://go.dev/blog/context

> 上下文(Context)可以被多个 goroutine 同时使用。代码可以将一个上下文(Context)传递给任意数量的 goroutine,并通过取消该上下文(Context)来向它们发送信号。

因此,在 Go 语言中,由于你无法确定 goroutine 在哪些线程(相同/不同)上运行,所以上下文(Context)是线程安全的。

  1. 通道(Channel)是线程安全的 - https://go.dev/ref/spec#Channel_types

> 单个通道(Channel)可以在发送语句、接收操作和调用内置函数 cap 和 len 的过程中被任意数量的 goroutine 使用,而无需进一步的同步。

通道(Channel)在底层使用了互斥锁(mutex)。
https://github.com/golang/go/blob/master/src/runtime/chan.go#L51

  1. 关于并发模式,请参考一些非常好的 Go 博客文章:
英文:
  1. Contexts are thread-safe. https://go.dev/blog/context

> A Context is safe for simultaneous use by multiple goroutines. Code can pass a single Context to any number of goroutines and cancel that Context to signal all of them.

So in go realms safe by multiple goroutines ~ thread-safe, since you never know on which threads (same/different) goroutines are running

  1. Channels are thread-safe - https://go.dev/ref/spec#Channel_types

> A single channel may be used in send statements, receive operations, and calls to the built-in functions cap and len by any number of goroutines without further synchronization

Channels use a mutex under-the-hood
https://github.com/golang/go/blob/master/src/runtime/chan.go#L51

  1. For concurrency patterns take a look at really good go blog posts:

huangapple
  • 本文由 发表于 2021年12月22日 12:49:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/70444331.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定