How do I make this program thread-safe, would channels be the best implementation, if so, how?

huangapple go评论119阅读模式
英文:

How do I make this program thread-safe, would channels be the best implementation, if so, how?

问题

我正在使用Golang编写程序,我想要使这个程序线程安全。它接受一个数字作为参数(表示要启动的消费者任务数量),从输入中读取行,并累积单词计数。我希望线程是安全的(但我不希望它只是锁定一切,它需要高效),我应该使用通道吗?我该如何做到这一点?

  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "log"
  6. "os"
  7. "sync"
  8. )
  9. // 消费者任务处理队列
  10. func consumer_task(task_num int) {
  11. fmt.Printf("我是消费者任务 #%v ", task_num)
  12. fmt.Println("正在处理的行: " + queue[0])
  13. queue = queue[1:]
  14. }
  15. // 初始化队列
  16. var queue = make([]string, 0)
  17. func main() {
  18. // 初始化等待组
  19. var wg sync.WaitGroup
  20. // 从用户获取要运行的任务数量
  21. var numof_tasks int
  22. fmt.Print("输入要运行的任务数量: ")
  23. fmt.Scan(&numof_tasks)
  24. // 打开文件
  25. file, err := os.Open("test.txt")
  26. if err != nil {
  27. log.Fatal(err)
  28. }
  29. defer file.Close()
  30. // 创建文件扫描器
  31. scanner := bufio.NewScanner(file)
  32. if err := scanner.Err(); err != nil {
  33. log.Fatal(err)
  34. }
  35. // 循环遍历文件中的每一行并将其添加到队列中
  36. for scanner.Scan() {
  37. line := scanner.Text()
  38. queue = append(queue, line)
  39. }
  40. // 启动指定数量的消费者任务
  41. for i := 1; i <= numof_tasks; i++ {
  42. wg.Add(1)
  43. go func(i int) {
  44. consumer_task(i)
  45. wg.Done()
  46. }(i)
  47. }
  48. wg.Wait()
  49. fmt.Println("全部完成")
  50. fmt.Println(queue)
  51. }
英文:

I'm using Golang, I'm trying to make this program thread-safe. It takes a number as a parameter (which is the number of consumer tasks to start), reads lines from an input, and accumulates word count. I want the threads to be safe (but I don't want it to just lock everything, it needs to be efficient) should I use channels? How do I do this?

  1. package main
  2. import (
  3. &quot;bufio&quot;
  4. &quot;fmt&quot;
  5. &quot;log&quot;
  6. &quot;os&quot;
  7. &quot;sync&quot;
  8. )
  9. // Consumer task to operate on queue
  10. func consumer_task(task_num int) {
  11. fmt.Printf(&quot;I&#39;m consumer task #%v &quot;, task_num)
  12. fmt.Println(&quot;Line being popped off queue: &quot; + queue[0])
  13. queue = queue[1:]
  14. }
  15. // Initialize queue
  16. var queue = make([]string, 0)
  17. func main() {
  18. // Initialize wait group
  19. var wg sync.WaitGroup
  20. // Get number of tasks to run from user
  21. var numof_tasks int
  22. fmt.Print(&quot;Enter number of tasks to run: &quot;)
  23. fmt.Scan(&amp;numof_tasks)
  24. // Open file
  25. file, err := os.Open(&quot;test.txt&quot;)
  26. if err != nil {
  27. log.Fatal(err)
  28. }
  29. defer file.Close()
  30. // Scanner to scan the file
  31. scanner := bufio.NewScanner(file)
  32. if err := scanner.Err(); err != nil {
  33. log.Fatal(err)
  34. }
  35. // Loop through each line in the file and append it to the queue
  36. for scanner.Scan() {
  37. line := scanner.Text()
  38. queue = append(queue, line)
  39. }
  40. // Start specified # of consumer tasks
  41. for i := 1; i &lt;= numof_tasks; i++ {
  42. wg.Add(1)
  43. go func(i int) {
  44. consumer_task(i)
  45. wg.Done()
  46. }(i)
  47. }
  48. wg.Wait()
  49. fmt.Println(&quot;All done&quot;)
  50. fmt.Println(queue)
  51. }

答案1

得分: 2

你在切片queue上存在数据竞争。并发的goroutine,在从队列头部弹出元素时,要通过sync.Mutex锁来控制。或者使用通道来管理工作项的“队列”。

要将你的代码转换为使用通道,需要更新worker函数,将输入通道作为你的队列,并在通道上使用range,这样每个worker可以处理多个任务:

  1. func consumer_task(task_num int, ch <-chan string) {
  2. fmt.Printf("我是消费者任务 #%v\n", task_num)
  3. for item := range ch {
  4. fmt.Printf("任务 %d 正在消费:行项目:%v\n", task_num, item)
  5. }
  6. // 当通道关闭时,每个worker将退出循环
  7. }

queue从切片改为通道,并像下面这样向其中添加元素:

  1. queue := make(chan string)
  2. go func() {
  3. // 遍历文件中的每一行,并将其添加到队列中
  4. for scanner.Scan() {
  5. queue <- scanner.Text()
  6. }
  7. close(queue) // 向worker信号表示没有更多的元素了
  8. }()

然后只需更新你的工作调度器代码,添加通道输入:

  1. go func(i int) {
  2. consumer_task(i, queue) // 添加queue参数
  3. wg.Done()
  4. }(i)

你可以在这里查看示例代码:https://go.dev/play/p/AzHyztipUZI

英文:

You have a data race on the slice queue. Concurrent goroutines, when popping elements off the head of the queue to do so in a controlled manner either via a sync.Mutex lock. Or use a channel to manage the "queue" of work items.

To convert what you have to using channels, update the worker to take an input channel as your queue - and range on the channel, so each worker can handle more than one task:

  1. func consumer_task(task_num int, ch &lt;-chan string) {
  2. fmt.Printf(&quot;I&#39;m consumer task #%v\n&quot;, task_num)
  3. for item := range ch {
  4. fmt.Printf(&quot;task %d consuming: Line item: %v\n&quot;, task_num, item)
  5. }
  6. // each worker will drop out of their loop when channel is closed
  7. }

change queue from a slice to a channel & feed items in like so:

  1. queue := make(chan string)
  2. go func() {
  3. // Loop through each line in the file and append it to the queue
  4. for scanner.Scan() {
  5. queue &lt;- scanner.Text()
  6. }
  7. close(queue) // signal to workers that there is no more items
  8. }()

then just update your work dispatcher code to add the channel input:

  1. go func(i int) {
  2. consumer_task(i, queue) // add the queue parameter
  3. wg.Done()
  4. }(i)

https://go.dev/play/p/AzHyztipUZI

huangapple
  • 本文由 发表于 2022年3月21日 06:20:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/71551099.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定