英文:
How do I make this program thread-safe, would channels be the best implementation, if so, how?
问题
我正在使用Golang编写程序,我想要使这个程序线程安全。它接受一个数字作为参数(表示要启动的消费者任务数量),从输入中读取行,并累积单词计数。我希望线程是安全的(但我不希望它只是锁定一切,它需要高效),我应该使用通道吗?我该如何做到这一点?
package main
import (
	"bufio"
	"fmt"
	"log"
	"os"
	"sync"
)
// 消费者任务处理队列
func consumer_task(task_num int) {
	
	fmt.Printf("我是消费者任务 #%v ", task_num)
	fmt.Println("正在处理的行: " + queue[0])
	queue = queue[1:]
}
// 初始化队列
var queue = make([]string, 0)
func main() {
	// 初始化等待组
	var wg sync.WaitGroup
	// 从用户获取要运行的任务数量
	var numof_tasks int
	fmt.Print("输入要运行的任务数量: ")
	fmt.Scan(&numof_tasks)
	
	// 打开文件
	file, err := os.Open("test.txt")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()
	// 创建文件扫描器
	scanner := bufio.NewScanner(file)
	if err := scanner.Err(); err != nil {
        log.Fatal(err)
    }
	// 循环遍历文件中的每一行并将其添加到队列中
	for scanner.Scan() {
        line := scanner.Text()  
		queue = append(queue, line)
    }
	// 启动指定数量的消费者任务
	for i := 1; i <= numof_tasks; i++ {
		wg.Add(1)
		go func(i int) { 
			consumer_task(i) 
			wg.Done()
		}(i)
	}
	wg.Wait()
	fmt.Println("全部完成")
	fmt.Println(queue)
}
英文:
I'm using Golang, I'm trying to make this program thread-safe. It takes a number as a parameter (which is the number of consumer tasks to start), reads lines from an input, and accumulates word count. I want the threads to be safe (but I don't want it to just lock everything, it needs to be efficient) should I use channels? How do I do this?
package main
import (
	"bufio"
	"fmt"
	"log"
	"os"
	"sync"
)
// Consumer task to operate on queue
func consumer_task(task_num int) {
	
	fmt.Printf("I'm consumer task #%v ", task_num)
	fmt.Println("Line being popped off queue: " + queue[0])
	queue = queue[1:]
}
// Initialize queue
var queue = make([]string, 0)
func main() {
	// Initialize wait group 
	var wg sync.WaitGroup
	// Get number of tasks to run from user
	var numof_tasks int
	fmt.Print("Enter number of tasks to run: ")
	fmt.Scan(&numof_tasks)
	
	// Open file
	file, err := os.Open("test.txt")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()
	// Scanner to scan the file
	scanner := bufio.NewScanner(file)
	if err := scanner.Err(); err != nil {
        log.Fatal(err)
    }
	// Loop through each line in the file and append it to the queue
	for scanner.Scan() {
        line := scanner.Text()  
		queue = append(queue, line)
    }
	// Start specified # of consumer tasks
	for i := 1; i <= numof_tasks; i++ {
		wg.Add(1)
		go func(i int) { 
			consumer_task(i) 
			wg.Done()
		}(i)
	}
	wg.Wait()
	fmt.Println("All done")
	fmt.Println(queue)
}
答案1
得分: 2
你在切片queue上存在数据竞争。并发的goroutine,在从队列头部弹出元素时,要通过sync.Mutex锁来控制。或者使用通道来管理工作项的“队列”。
要将你的代码转换为使用通道,需要更新worker函数,将输入通道作为你的队列,并在通道上使用range,这样每个worker可以处理多个任务:
func consumer_task(task_num int, ch <-chan string) {
    fmt.Printf("我是消费者任务 #%v\n", task_num)
    for item := range ch {
        fmt.Printf("任务 %d 正在消费:行项目:%v\n", task_num, item)
    }
    // 当通道关闭时,每个worker将退出循环
}
将queue从切片改为通道,并像下面这样向其中添加元素:
queue := make(chan string)
go func() {
    // 遍历文件中的每一行,并将其添加到队列中
    for scanner.Scan() {
        queue <- scanner.Text()
    }
    close(queue) // 向worker信号表示没有更多的元素了
}()
然后只需更新你的工作调度器代码,添加通道输入:
go func(i int) {
    consumer_task(i, queue) // 添加queue参数
    wg.Done()
}(i)
你可以在这里查看示例代码:https://go.dev/play/p/AzHyztipUZI
英文:
You have a data race on the slice queue. Concurrent goroutines, when popping elements off the head of the queue to do so in a controlled manner either via a sync.Mutex lock. Or use a channel to manage the "queue" of work items.
To convert what you have to using channels, update the worker to take an input channel as your queue - and range on the channel, so each worker can handle more than one task:
func consumer_task(task_num int, ch <-chan string) {
	fmt.Printf("I'm consumer task #%v\n", task_num)
	for item := range ch {
		fmt.Printf("task %d consuming: Line item: %v\n", task_num, item)
	}
    // each worker will drop out of their loop when channel is closed
}
change queue from a slice to a channel & feed items in like so:
queue := make(chan string)
go func() {
	// Loop through each line in the file and append it to the queue
	for scanner.Scan() {
		queue <- scanner.Text()
	}
	close(queue) // signal to workers that there is no more items
}()
then just update your work dispatcher code to add the channel input:
go func(i int) {
    consumer_task(i, queue) // add the queue parameter
	wg.Done()
}(i)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论