Go协程使用for循环启动 – 一个还是多个通道?

huangapple go评论100阅读模式
英文:

Go routines started with for-loop - one or many channels?

问题

我想使用一个名为goroutine的for循环加载一些json文件(“<name_A-J>.json”)。我希望在加载其他文件时并行加载(同时处理第一个文件)。

问题1。由于文件的数量可能会变化(添加新文件),我将使用一个(文件)列表来存储文件名(仅在此示例中自动生成名称),因此我想使用for循环。这样最优吗?

问题2。如何最有效地使用通道?

问题3。如果需要为每个加载操作定义唯一的通道(如下面的示例代码),我该如何定义通道?

示例代码(要紧凑并能够使用文件名列表加载文件):


func load_json(aChan chan byte, s string) {
    // 加载“filename” + s + “.json”
    // 向通道确认
    aChan <- 0
}

func do_stuff() {
    // .. 使用新加载的json
}

func Main() {
    chan_A := make(chan byte)
    go load_json(chan_A, "_classA")
    
    chan_B := make(chan byte)
    go load_json(chan_B, "_classB")
    
    chan_C := make(chan byte)
    go load_json(chan_C, "_classC")
    
    chan_D := make(chan byte)
    go load_json(chan_D, "_classD")

    
    <-chan_A
    	// 现在,处理A类
    <-chan_B
    	// 等等...
    <-chan_C
    <-chan_D
    fmt.Println("完成。")
}

编辑:
我根据“Tom”的建议设计了一个简化的测试解决方案(请参见下文)。在我的情况下,我将任务分为三个阶段,使用每个阶段的一个通道来控制执行。然而,我倾向于在这段代码中出现死锁(请参见代码下方的执行结果和注释)。

PlayGround上运行此代码。

如何避免此代码中的死锁?

type TJsonFileInfo struct {
    FileName string
}
type TChannelTracer struct {  // 将计数并显示访问的A、B、C阶段
    A, B, C int
}
var ChannelTracer TChannelTracer

var jsonFileList = []string{
    "./files/classA.json",
    "./files/classB.json",
    "./files/classC.json",
}

func LoadJsonFiles(aFileName string, aResultQueueChan chan *TJsonFileInfo) {
    var newFileInfo TJsonFileInfo
    newFileInfo.FileName = aFileName
    // file, e := ioutil.ReadFile(newFileInfo.FileName)...
    ChannelTracer.A += 1
    fmt.Printf("A. 加载文件:%s\n", newFileInfo.FileName)
    aResultQueueChan <- &newFileInfo
}

func UnmarshalFile(aWorkQueueChan chan *TJsonFileInfo, aResultQueueChan chan *TJsonFileInfo) {
    FileInfo := <-aWorkQueueChan
    ChannelTracer.B += 1
    fmt.Printf("B. 解析文件:%s\n", FileInfo.FileName)
    aResultQueueChan <- FileInfo
}

func ProcessWork(aWorkQueueChan chan *TJsonFileInfo, aDoneQueueChan chan *TJsonFileInfo) {
    FileInfo := <-aWorkQueueChan
    ChannelTracer.C += 1
    fmt.Printf("C. 处理文件:%s \n", FileInfo.FileName)
    aDoneQueueChan <- FileInfo
}

func main() {
    marshalChan := make(chan *TJsonFileInfo)
    processChan := make(chan *TJsonFileInfo)
    doneProcessingChan := make(chan *TJsonFileInfo)

    for _, fileName := range jsonFileList {
        go LoadJsonFiles(fileName, marshalChan)
        go UnmarshalFile(marshalChan, processChan)
        go ProcessWork(processChan, doneProcessingChan)
    }

    for {
        select {
        case result := <-marshalChan:
            result.FileName = result.FileName // 虚拟使用
        case result := <-processChan:
            result.FileName = result.FileName // 虚拟使用
        case result := <-doneProcessingChan:
            result.FileName = result.FileName // 虚拟使用
            fmt.Printf("完成%s 访问的通道:%v\n", ".", ChannelTracer)
        }
    }
}

/**
阶段A、B和C的结果:

A. 加载文件:./files/classA.json
A. 加载文件:./files/classB.json
A. 加载文件:./files/classC.json
B. 解析文件:./files/classB.json
B. 解析文件:./files/classC.json
C. 处理文件:./files/classB.json 
C. 处理文件:./files/classC.json 
完成。 访问的通道:{3 2 2}     // 阶段A、B和C的通道追踪
完成。 访问的通道:{3 2 2}
fatal error: all goroutines are asleep - deadlock!
*/

注意,此代码不访问文件系统,因此应在PlayGround上运行。

编辑2:除了不安全的“ChannelTracer”之外,我只能通过与文件任务相同次数的消耗doneProcessingChannel来避免死锁。
Playground上运行代码。

func main() {
    marshalChan := make(chan *TJsonFileInfo)
    processChan := make(chan *TJsonFileInfo)
    doneProcessingChan := make(chan *TJsonFileInfo)
    
    go UnmarshalFiles(marshalChan, processChan)
    go ProcessWork(processChan, doneProcessingChan)
    
    for _, fileName := range jsonFileList {
        go LoadJsonFiles(fileName, marshalChan)
    }
    
    // 读取doneProcessingChan与上面生成的任务(文件)相同次数:
    for i := 0; i < len(jsonFileList); i++ {
        <-doneProcessingChan
        fmt.Printf("完成%s 访问的通道:%v\n", ".", ChannelTracer)
    }
}

// RIL

英文:

I would like to load some json files ("<name_A-J>.json") using a goroutine called from a for-loop. I'd like to have the loading parallellized (processing first files while the other files are being loaded).

<b>Q1</b>. Since the numer of files may vary (new ones to be added), I would use a (file) list with filenames (autogenerating the names only in this example), therefore I'd like to use a for-loop. Optimal?

<b>Q2</b>. What would be the most effective use of channel(s).

<b>Q3</b>. How would I define the channel(s) if a unique channel for each load operation (as in the example code below) is needed?

Example code (to be compacted & capable of loading the files using a list of file names):


func load_json(aChan chan byte, s string) {
    // load &quot;filename&quot; + s + &quot;.json&quot;
    // confirm to the channel
    aChan &lt;- 0
}

func do_stuff() {
    // .. with the newly loaded json
}

func Main() {
    chan_A := make(chan byte)
    go load_json(chan_A, &quot;_classA&quot;)
    
    chan_B := make(chan byte)
    go load_json(chan_B, &quot;_classB&quot;)
    
    chan_C := make(chan byte)
    go load_json(chan_C, &quot;_classC&quot;)
    
    chan_D := make(chan byte)
    go load_json(chan_D, &quot;_classD&quot;)

    
    &lt;-chan_A
    	// Now, do stuff with Class A
    &lt;-chan_B
    	// etc...
    &lt;-chan_C
    &lt;-chan_D
    fmt.Println(&quot;Done.&quot;)
}

<b>EDIT:</b>
I designed a simplified test solution based on the ideas suggested by "Tom" (see below). In my case I splitted the task in three phases, using one channel per phase to control the execution. However, I tend to get deadlocks with this code (See execution results and the note below below the code).

Run this code on the <b>PlayGround</b>.

How can I avoid the deadlocks in this code?:

type TJsonFileInfo struct {
    FileName string
}
type TChannelTracer struct {  // Will count &amp; display visited phases A, B, C
    A, B, C int
}
var ChannelTracer TChannelTracer

var jsonFileList = []string{
    &quot;./files/classA.json&quot;,
    &quot;./files/classB.json&quot;,
    &quot;./files/classC.json&quot;,
}

func LoadJsonFiles(aFileName string, aResultQueueChan chan *TJsonFileInfo) {
    var newFileInfo TJsonFileInfo
    newFileInfo.FileName = aFileName
    // file, e := ioutil.ReadFile(newFileInfo.FileName)...
    ChannelTracer.A += 1
    fmt.Printf(&quot;A. Loaded file: %s\n&quot;, newFileInfo.FileName)
    aResultQueueChan &lt;- &amp;newFileInfo
}

func UnmarshalFile(aWorkQueueChan chan *TJsonFileInfo, aResultQueueChan chan *TJsonFileInfo) {
    FileInfo := &lt;-aWorkQueueChan
    ChannelTracer.B += 1
    fmt.Printf(&quot;B. Marshalled file: %s\n&quot;, FileInfo.FileName)
    aResultQueueChan &lt;- FileInfo
}

func ProcessWork(aWorkQueueChan chan *TJsonFileInfo, aDoneQueueChan chan *TJsonFileInfo) {
    FileInfo := &lt;-aWorkQueueChan
    ChannelTracer.C += 1
    fmt.Printf(&quot;C. Processed file: %s \n&quot;, FileInfo.FileName)
    aDoneQueueChan &lt;- FileInfo
}

func main() {
    marshalChan := make(chan *TJsonFileInfo)
    processChan := make(chan *TJsonFileInfo)
    doneProcessingChan := make(chan *TJsonFileInfo)

    for _, fileName := range jsonFileList {
        go LoadJsonFiles(fileName, marshalChan)
        go UnmarshalFile(marshalChan, processChan)
        go ProcessWork(processChan, doneProcessingChan)
    }

    for {
        select {
        case result := &lt;-marshalChan:
            result.FileName = result.FileName // dummy use
        case result := &lt;-processChan:
            result.FileName = result.FileName // dummy use
        case result := &lt;-doneProcessingChan:
            result.FileName = result.FileName // dummy use
            fmt.Printf(&quot;Done%s Channels visited: %v\n&quot;, &quot;.&quot;, ChannelTracer)
        }
    }
}

/**
RESULTS (for phases A, B and C):

A. Loaded file: ./files/classA.json
A. Loaded file: ./files/classB.json
A. Loaded file: ./files/classC.json
B. Marshalled file: ./files/classB.json
B. Marshalled file: ./files/classC.json
C. Processed file: ./files/classB.json 
C. Processed file: ./files/classC.json 
Done. Channels visited: {3 2 2}     // ChannelTracer for phase A, B and C
Done. Channels visited: {3 2 2}
fatal error: all goroutines are asleep - deadlock!
*/

<i><b>Note</b> that this code doesn't access the file system so it should run on the PlayGround</i>.

<b>EDIT2</b>: - Apart from the unsafe "ChannelTracer" I can avoid deadlocks only by consuming doneProcessingChannel the same number of times as the file tasks.<br>
Run the code here: <b>Playground</b>

func main() {
    marshalChan := make(chan *TJsonFileInfo)
    processChan := make(chan *TJsonFileInfo)
    doneProcessingChan := make(chan *TJsonFileInfo)
    
    go UnmarshalFiles(marshalChan, processChan)
    go ProcessWork(processChan, doneProcessingChan)
    
    for _, fileName := range jsonFileList {
        go LoadJsonFiles(fileName, marshalChan)
    }
    
    //  Read doneProcessingChan equal number of times
    //  as the spawned tasks (files) above :
    for i := 0; i &lt; len(jsonFileList); i++ {
        &lt;-doneProcessingChan
        fmt.Printf(&quot;Done%s Channels visited: %v\n&quot;, &quot;.&quot;, ChannelTracer)
    }
}

// RIL

答案1

得分: 2

package main

import (
"encoding/json"
"fmt"
"os"
)

type Result struct {
Some string
Another string
AndAn int
}

func generateWork(work chan *os.File) {
files := []string{
"/home/foo/a.json",
"/home/foo/b.json",
"/home/foo/c.json",
}
for _, path := range files {
file, e := os.Open(path)
if e != nil {
panic(e)
}
work <- file
}
}

func processWork(work chan *os.File, done chan Result) {
file := <-work
decoder := json.NewDecoder(file)
result := Result{}
decoder.Decode(&result)
done <- result
}

func main() {
work := make(chan *os.File)
go generateWork(work)
done := make(chan Result)
for i := 0; i < 100; i++ {
go processWork(work, done)
}
for {
select {
case result := <-done:
// a result is available
fmt.Println(result)
}
}
}

英文:

building on the answer by @BraveNewCurrency I have composed a simplistic example program for you:

package main

import (
	&quot;encoding/json&quot;
	&quot;fmt&quot;
	&quot;os&quot;
)

type Result struct {
	Some    string
	Another string
	AndAn   int
}

func generateWork(work chan *os.File) {
	files := []string{
		&quot;/home/foo/a.json&quot;,
		&quot;/home/foo/b.json&quot;,
		&quot;/home/foo/c.json&quot;,
	}
	for _, path := range files {
		file, e := os.Open(path)
		if e != nil {
			panic(e)
		}
		work &lt;- file
	}
}

func processWork(work chan *os.File, done chan Result) {
	file := &lt;-work
	decoder := json.NewDecoder(file)
	result := Result{}
	decoder.Decode(&amp;result)
	done &lt;- result
}

func main() {
	work := make(chan *os.File)
	go generateWork(work)
	done := make(chan Result)
	for i := 0; i &lt; 100; i++ {
		go processWork(work, done)
	}
	for {
		select {
		case result := &lt;-done:
			// a result is available
			fmt.Println(result)
		}
	}
}

Note that this program won't work on the playground because file-system access is disallowed there.

Edit:

To answer the edition in your question, I've taken the code and changed some small things:

package main

import (
	_ &quot;encoding/json&quot;
	&quot;fmt&quot;
	_ &quot;io/ioutil&quot;
	_ &quot;os&quot;
)

type TJsonMetaInfo struct {
	MetaSystem string
}

type TJsonFileInfo struct {
	FileName string
}

type TChannelTracer struct { // Will count &amp; display visited phases A, B, C
	A, B, C int
}

var ChannelTracer TChannelTracer

var jsonFileList = []string{
	&quot;./files/classA.json&quot;,
	&quot;./files/classB.json&quot;,
	&quot;./files/classC.json&quot;,
}

func LoadJsonFiles(aFileName string, aResultQueueChan chan *TJsonFileInfo) {
	newFileInfo := TJsonFileInfo{aFileName}
	// file, e := ioutil.ReadFile(newFileInfo.FileName)
	// etc...
	ChannelTracer.A += 1
	fmt.Printf(&quot;A. Loaded file: %s\n&quot;, newFileInfo.FileName)
	aResultQueueChan &lt;- &amp;newFileInfo
}

func UnmarshalFiles(aWorkQueueChan chan *TJsonFileInfo, aResultQueueChan chan *TJsonFileInfo) {
	for {
		FileInfo := &lt;-aWorkQueueChan
		ChannelTracer.B += 1
		fmt.Printf(&quot;B. Unmarshalled file: %s\n&quot;, FileInfo.FileName)
		aResultQueueChan &lt;- FileInfo
	}
}

func ProcessWork(aWorkQueueChan chan *TJsonFileInfo, aDoneQueueChan chan *TJsonFileInfo) {
	for {
		FileInfo := &lt;-aWorkQueueChan
		ChannelTracer.C += 1
		fmt.Printf(&quot;C. Processed file: %s \n&quot;, FileInfo.FileName)
		aDoneQueueChan &lt;- FileInfo

	}
}

func main() {
	marshalChan := make(chan *TJsonFileInfo)
	processChan := make(chan *TJsonFileInfo)
	doneProcessingChan := make(chan *TJsonFileInfo)

	go UnmarshalFiles(marshalChan, processChan)
	go ProcessWork(processChan, doneProcessingChan)

	for _, fileName := range jsonFileList {
		go LoadJsonFiles(fileName, marshalChan)
	}

	for {
		select {
		case result := &lt;-doneProcessingChan:
			result.FileName = result.FileName // dummy use
			fmt.Printf(&quot;Done%s Channels visited: %v\n&quot;, &quot;.&quot;, ChannelTracer)
		}
	}
}

Note that this code still deadlocks but at the end, when all work is complete, in the last empty for loop in main().

Note also that these lines:

ChannelTracer.A += 1
ChannelTracer.B += 1
ChannelTracer.C += 1

are not concurrency-safe. This means that in a multi-threaded environment one goroutine and the other might try to increment the same counter at the same time, resulting in a wrong count. To come around this issue, take a look at the following packages:

答案2

得分: 0

你应该按照以下方式组织你的程序:

1)主程序创建一个用于“待处理工作”的通道,可能还有一个用于“完成工作”的通道(这两个通道可能都需要一些缓冲区)

2)启动一个goroutine来生成文件列表,并将它们放入“待处理工作”的通道中。

3)使用一个for循环启动N个goroutine来处理文件。这些goroutine将从“待处理工作”的通道中读取文件,处理它们,并将响应发送到“完成工作”的通道中。

4)主程序等待“完成工作”的通道,并打印它们或执行其他操作。

上述最佳的“N”值取决于问题的性质:

  • 如果你的工作是CPU密集型的,最佳的N值应该大约等于你系统中的处理器数量。
  • 如果你的工作是磁盘密集型的,随着N的增加,性能可能会下降,因为多个工作线程会导致更多的随机I/O。
  • 如果你的工作涉及从许多远程计算机中获取文件(比如网络爬虫),那么最佳的N值可能会非常高(100或者甚至1000)。
英文:

You should structure your program this way:

  1. the main routine creates a channel for "work to do" and probably one for "done work" (both channels should probably have some buffering)

  2. spin off one goroutine to generate the file list and put them in the "work to do" channel.

  3. spin up N goroutines (in a for loop) to process files. The routine will read the file from the "work to do" channel, process it, and send the response to the "done work" channel.

  4. the main routine waits on "done work" and prints them or whatever.

The optimal "N" above varies depending on the problem

  • If your work is CPU bound, the optimal N should be about the number of processors in your system.
  • If your work is disk bound, performance may actually go down as you increase N because multiple workers will cause more random I/O.
  • If your work pulls files from many remote computers (think webcrawling), then the optimal N might be very high (100 or even 1000).

huangapple
  • 本文由 发表于 2013年6月1日 21:03:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/16873000.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定