英文:
Go routines started with for-loop - one or many channels?
问题
我想使用一个名为goroutine的for循环加载一些json文件(“<name_A-J>.json”)。我希望在加载其他文件时并行加载(同时处理第一个文件)。
问题1。由于文件的数量可能会变化(添加新文件),我将使用一个(文件)列表来存储文件名(仅在此示例中自动生成名称),因此我想使用for循环。这样最优吗?
问题2。如何最有效地使用通道?
问题3。如果需要为每个加载操作定义唯一的通道(如下面的示例代码),我该如何定义通道?
示例代码(要紧凑并能够使用文件名列表加载文件):
func load_json(aChan chan byte, s string) {
// 加载“filename” + s + “.json”
// 向通道确认
aChan <- 0
}
func do_stuff() {
// .. 使用新加载的json
}
func Main() {
chan_A := make(chan byte)
go load_json(chan_A, "_classA")
chan_B := make(chan byte)
go load_json(chan_B, "_classB")
chan_C := make(chan byte)
go load_json(chan_C, "_classC")
chan_D := make(chan byte)
go load_json(chan_D, "_classD")
<-chan_A
// 现在,处理A类
<-chan_B
// 等等...
<-chan_C
<-chan_D
fmt.Println("完成。")
}
编辑:
我根据“Tom”的建议设计了一个简化的测试解决方案(请参见下文)。在我的情况下,我将任务分为三个阶段,使用每个阶段的一个通道来控制执行。然而,我倾向于在这段代码中出现死锁(请参见代码下方的执行结果和注释)。
在PlayGround上运行此代码。
如何避免此代码中的死锁?
type TJsonFileInfo struct {
FileName string
}
type TChannelTracer struct { // 将计数并显示访问的A、B、C阶段
A, B, C int
}
var ChannelTracer TChannelTracer
var jsonFileList = []string{
"./files/classA.json",
"./files/classB.json",
"./files/classC.json",
}
func LoadJsonFiles(aFileName string, aResultQueueChan chan *TJsonFileInfo) {
var newFileInfo TJsonFileInfo
newFileInfo.FileName = aFileName
// file, e := ioutil.ReadFile(newFileInfo.FileName)...
ChannelTracer.A += 1
fmt.Printf("A. 加载文件:%s\n", newFileInfo.FileName)
aResultQueueChan <- &newFileInfo
}
func UnmarshalFile(aWorkQueueChan chan *TJsonFileInfo, aResultQueueChan chan *TJsonFileInfo) {
FileInfo := <-aWorkQueueChan
ChannelTracer.B += 1
fmt.Printf("B. 解析文件:%s\n", FileInfo.FileName)
aResultQueueChan <- FileInfo
}
func ProcessWork(aWorkQueueChan chan *TJsonFileInfo, aDoneQueueChan chan *TJsonFileInfo) {
FileInfo := <-aWorkQueueChan
ChannelTracer.C += 1
fmt.Printf("C. 处理文件:%s \n", FileInfo.FileName)
aDoneQueueChan <- FileInfo
}
func main() {
marshalChan := make(chan *TJsonFileInfo)
processChan := make(chan *TJsonFileInfo)
doneProcessingChan := make(chan *TJsonFileInfo)
for _, fileName := range jsonFileList {
go LoadJsonFiles(fileName, marshalChan)
go UnmarshalFile(marshalChan, processChan)
go ProcessWork(processChan, doneProcessingChan)
}
for {
select {
case result := <-marshalChan:
result.FileName = result.FileName // 虚拟使用
case result := <-processChan:
result.FileName = result.FileName // 虚拟使用
case result := <-doneProcessingChan:
result.FileName = result.FileName // 虚拟使用
fmt.Printf("完成%s 访问的通道:%v\n", ".", ChannelTracer)
}
}
}
/**
阶段A、B和C的结果:
A. 加载文件:./files/classA.json
A. 加载文件:./files/classB.json
A. 加载文件:./files/classC.json
B. 解析文件:./files/classB.json
B. 解析文件:./files/classC.json
C. 处理文件:./files/classB.json
C. 处理文件:./files/classC.json
完成。 访问的通道:{3 2 2} // 阶段A、B和C的通道追踪
完成。 访问的通道:{3 2 2}
fatal error: all goroutines are asleep - deadlock!
*/
注意,此代码不访问文件系统,因此应在PlayGround上运行。
编辑2:除了不安全的“ChannelTracer”之外,我只能通过与文件任务相同次数的消耗doneProcessingChannel来避免死锁。
在Playground上运行代码。
func main() {
marshalChan := make(chan *TJsonFileInfo)
processChan := make(chan *TJsonFileInfo)
doneProcessingChan := make(chan *TJsonFileInfo)
go UnmarshalFiles(marshalChan, processChan)
go ProcessWork(processChan, doneProcessingChan)
for _, fileName := range jsonFileList {
go LoadJsonFiles(fileName, marshalChan)
}
// 读取doneProcessingChan与上面生成的任务(文件)相同次数:
for i := 0; i < len(jsonFileList); i++ {
<-doneProcessingChan
fmt.Printf("完成%s 访问的通道:%v\n", ".", ChannelTracer)
}
}
// RIL
英文:
I would like to load some json files ("<name_A-J>.json") using a goroutine called from a for-loop. I'd like to have the loading parallellized (processing first files while the other files are being loaded).
<b>Q1</b>. Since the numer of files may vary (new ones to be added), I would use a (file) list with filenames (autogenerating the names only in this example), therefore I'd like to use a for-loop. Optimal?
<b>Q2</b>. What would be the most effective use of channel(s).
<b>Q3</b>. How would I define the channel(s) if a unique channel for each load operation (as in the example code below) is needed?
Example code (to be compacted & capable of loading the files using a list of file names):
func load_json(aChan chan byte, s string) {
// load "filename" + s + ".json"
// confirm to the channel
aChan <- 0
}
func do_stuff() {
// .. with the newly loaded json
}
func Main() {
chan_A := make(chan byte)
go load_json(chan_A, "_classA")
chan_B := make(chan byte)
go load_json(chan_B, "_classB")
chan_C := make(chan byte)
go load_json(chan_C, "_classC")
chan_D := make(chan byte)
go load_json(chan_D, "_classD")
<-chan_A
// Now, do stuff with Class A
<-chan_B
// etc...
<-chan_C
<-chan_D
fmt.Println("Done.")
}
<b>EDIT:</b>
I designed a simplified test solution based on the ideas suggested by "Tom" (see below). In my case I splitted the task in three phases, using one channel per phase to control the execution. However, I tend to get deadlocks with this code (See execution results and the note below below the code).
Run this code on the <b>PlayGround</b>.
How can I avoid the deadlocks in this code?:
type TJsonFileInfo struct {
FileName string
}
type TChannelTracer struct { // Will count & display visited phases A, B, C
A, B, C int
}
var ChannelTracer TChannelTracer
var jsonFileList = []string{
"./files/classA.json",
"./files/classB.json",
"./files/classC.json",
}
func LoadJsonFiles(aFileName string, aResultQueueChan chan *TJsonFileInfo) {
var newFileInfo TJsonFileInfo
newFileInfo.FileName = aFileName
// file, e := ioutil.ReadFile(newFileInfo.FileName)...
ChannelTracer.A += 1
fmt.Printf("A. Loaded file: %s\n", newFileInfo.FileName)
aResultQueueChan <- &newFileInfo
}
func UnmarshalFile(aWorkQueueChan chan *TJsonFileInfo, aResultQueueChan chan *TJsonFileInfo) {
FileInfo := <-aWorkQueueChan
ChannelTracer.B += 1
fmt.Printf("B. Marshalled file: %s\n", FileInfo.FileName)
aResultQueueChan <- FileInfo
}
func ProcessWork(aWorkQueueChan chan *TJsonFileInfo, aDoneQueueChan chan *TJsonFileInfo) {
FileInfo := <-aWorkQueueChan
ChannelTracer.C += 1
fmt.Printf("C. Processed file: %s \n", FileInfo.FileName)
aDoneQueueChan <- FileInfo
}
func main() {
marshalChan := make(chan *TJsonFileInfo)
processChan := make(chan *TJsonFileInfo)
doneProcessingChan := make(chan *TJsonFileInfo)
for _, fileName := range jsonFileList {
go LoadJsonFiles(fileName, marshalChan)
go UnmarshalFile(marshalChan, processChan)
go ProcessWork(processChan, doneProcessingChan)
}
for {
select {
case result := <-marshalChan:
result.FileName = result.FileName // dummy use
case result := <-processChan:
result.FileName = result.FileName // dummy use
case result := <-doneProcessingChan:
result.FileName = result.FileName // dummy use
fmt.Printf("Done%s Channels visited: %v\n", ".", ChannelTracer)
}
}
}
/**
RESULTS (for phases A, B and C):
A. Loaded file: ./files/classA.json
A. Loaded file: ./files/classB.json
A. Loaded file: ./files/classC.json
B. Marshalled file: ./files/classB.json
B. Marshalled file: ./files/classC.json
C. Processed file: ./files/classB.json
C. Processed file: ./files/classC.json
Done. Channels visited: {3 2 2} // ChannelTracer for phase A, B and C
Done. Channels visited: {3 2 2}
fatal error: all goroutines are asleep - deadlock!
*/
<i><b>Note</b> that this code doesn't access the file system so it should run on the PlayGround</i>.
<b>EDIT2</b>: - Apart from the unsafe "ChannelTracer" I can avoid deadlocks only by consuming doneProcessingChannel the same number of times as the file tasks.<br>
Run the code here: <b>Playground</b>
func main() {
marshalChan := make(chan *TJsonFileInfo)
processChan := make(chan *TJsonFileInfo)
doneProcessingChan := make(chan *TJsonFileInfo)
go UnmarshalFiles(marshalChan, processChan)
go ProcessWork(processChan, doneProcessingChan)
for _, fileName := range jsonFileList {
go LoadJsonFiles(fileName, marshalChan)
}
// Read doneProcessingChan equal number of times
// as the spawned tasks (files) above :
for i := 0; i < len(jsonFileList); i++ {
<-doneProcessingChan
fmt.Printf("Done%s Channels visited: %v\n", ".", ChannelTracer)
}
}
// RIL
答案1
得分: 2
package main
import (
"encoding/json"
"fmt"
"os"
)
type Result struct {
Some string
Another string
AndAn int
}
func generateWork(work chan *os.File) {
files := []string{
"/home/foo/a.json",
"/home/foo/b.json",
"/home/foo/c.json",
}
for _, path := range files {
file, e := os.Open(path)
if e != nil {
panic(e)
}
work <- file
}
}
func processWork(work chan *os.File, done chan Result) {
file := <-work
decoder := json.NewDecoder(file)
result := Result{}
decoder.Decode(&result)
done <- result
}
func main() {
work := make(chan *os.File)
go generateWork(work)
done := make(chan Result)
for i := 0; i < 100; i++ {
go processWork(work, done)
}
for {
select {
case result := <-done:
// a result is available
fmt.Println(result)
}
}
}
英文:
building on the answer by @BraveNewCurrency I have composed a simplistic example program for you:
package main
import (
"encoding/json"
"fmt"
"os"
)
type Result struct {
Some string
Another string
AndAn int
}
func generateWork(work chan *os.File) {
files := []string{
"/home/foo/a.json",
"/home/foo/b.json",
"/home/foo/c.json",
}
for _, path := range files {
file, e := os.Open(path)
if e != nil {
panic(e)
}
work <- file
}
}
func processWork(work chan *os.File, done chan Result) {
file := <-work
decoder := json.NewDecoder(file)
result := Result{}
decoder.Decode(&result)
done <- result
}
func main() {
work := make(chan *os.File)
go generateWork(work)
done := make(chan Result)
for i := 0; i < 100; i++ {
go processWork(work, done)
}
for {
select {
case result := <-done:
// a result is available
fmt.Println(result)
}
}
}
Note that this program won't work on the playground because file-system access is disallowed there.
Edit:
To answer the edition in your question, I've taken the code and changed some small things:
package main
import (
_ "encoding/json"
"fmt"
_ "io/ioutil"
_ "os"
)
type TJsonMetaInfo struct {
MetaSystem string
}
type TJsonFileInfo struct {
FileName string
}
type TChannelTracer struct { // Will count & display visited phases A, B, C
A, B, C int
}
var ChannelTracer TChannelTracer
var jsonFileList = []string{
"./files/classA.json",
"./files/classB.json",
"./files/classC.json",
}
func LoadJsonFiles(aFileName string, aResultQueueChan chan *TJsonFileInfo) {
newFileInfo := TJsonFileInfo{aFileName}
// file, e := ioutil.ReadFile(newFileInfo.FileName)
// etc...
ChannelTracer.A += 1
fmt.Printf("A. Loaded file: %s\n", newFileInfo.FileName)
aResultQueueChan <- &newFileInfo
}
func UnmarshalFiles(aWorkQueueChan chan *TJsonFileInfo, aResultQueueChan chan *TJsonFileInfo) {
for {
FileInfo := <-aWorkQueueChan
ChannelTracer.B += 1
fmt.Printf("B. Unmarshalled file: %s\n", FileInfo.FileName)
aResultQueueChan <- FileInfo
}
}
func ProcessWork(aWorkQueueChan chan *TJsonFileInfo, aDoneQueueChan chan *TJsonFileInfo) {
for {
FileInfo := <-aWorkQueueChan
ChannelTracer.C += 1
fmt.Printf("C. Processed file: %s \n", FileInfo.FileName)
aDoneQueueChan <- FileInfo
}
}
func main() {
marshalChan := make(chan *TJsonFileInfo)
processChan := make(chan *TJsonFileInfo)
doneProcessingChan := make(chan *TJsonFileInfo)
go UnmarshalFiles(marshalChan, processChan)
go ProcessWork(processChan, doneProcessingChan)
for _, fileName := range jsonFileList {
go LoadJsonFiles(fileName, marshalChan)
}
for {
select {
case result := <-doneProcessingChan:
result.FileName = result.FileName // dummy use
fmt.Printf("Done%s Channels visited: %v\n", ".", ChannelTracer)
}
}
}
Note that this code still deadlocks but at the end, when all work is complete, in the last empty for
loop in main()
.
Note also that these lines:
ChannelTracer.A += 1
ChannelTracer.B += 1
ChannelTracer.C += 1
are not concurrency-safe. This means that in a multi-threaded environment one goroutine and the other might try to increment the same counter at the same time, resulting in a wrong count. To come around this issue, take a look at the following packages:
答案2
得分: 0
你应该按照以下方式组织你的程序:
1)主程序创建一个用于“待处理工作”的通道,可能还有一个用于“完成工作”的通道(这两个通道可能都需要一些缓冲区)
2)启动一个goroutine来生成文件列表,并将它们放入“待处理工作”的通道中。
3)使用一个for循环启动N个goroutine来处理文件。这些goroutine将从“待处理工作”的通道中读取文件,处理它们,并将响应发送到“完成工作”的通道中。
4)主程序等待“完成工作”的通道,并打印它们或执行其他操作。
上述最佳的“N”值取决于问题的性质:
- 如果你的工作是CPU密集型的,最佳的N值应该大约等于你系统中的处理器数量。
- 如果你的工作是磁盘密集型的,随着N的增加,性能可能会下降,因为多个工作线程会导致更多的随机I/O。
- 如果你的工作涉及从许多远程计算机中获取文件(比如网络爬虫),那么最佳的N值可能会非常高(100或者甚至1000)。
英文:
You should structure your program this way:
-
the main routine creates a channel for "work to do" and probably one for "done work" (both channels should probably have some buffering)
-
spin off one goroutine to generate the file list and put them in the "work to do" channel.
-
spin up N goroutines (in a for loop) to process files. The routine will read the file from the "work to do" channel, process it, and send the response to the "done work" channel.
-
the main routine waits on "done work" and prints them or whatever.
The optimal "N" above varies depending on the problem
- If your work is CPU bound, the optimal N should be about the number of processors in your system.
- If your work is disk bound, performance may actually go down as you increase N because multiple workers will cause more random I/O.
- If your work pulls files from many remote computers (think webcrawling), then the optimal N might be very high (100 or even 1000).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论