英文:
goroutine channels over a for loop
问题
我的main
函数从文件中读取JSON,将其解组为结构体,将其转换为另一种结构体类型,并通过标准输出打印格式化的JSON。
我正在尝试使用goroutine和通道来为我的for
循环添加并发性。
func main() {
muvMap := map[string]string{"male": "M", "female": "F"}
fileA, err := os.Open("serviceAfileultimate.json")
if err != nil {
panic(err)
}
defer fileA.Close()
data := make([]byte, 10000)
count, err := fileA.Read(data)
if err != nil {
panic(err)
}
dataBytes := data[:count]
var servicesA ServiceA
json.Unmarshal(dataBytes, &servicesA)
var servicesB = make([]ServiceB, servicesA.Count)
goChannels := make(chan ServiceB, servicesA.Count)
for i := 0; i < servicesA.Count; i++ {
go func() {
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Address").SetString(Merge(&servicesA.Users[i].Location))
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Date_Of_Birth").SetString(dateCopyTransform(servicesA.Users[i].Dob))
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Email").SetString(servicesA.Users[i].Email)
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Fullname").SetString(Merge(&servicesA.Users[i].Name))
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Gender").SetString(muvMap[servicesA.Users[i].Gender])
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Phone").SetString(servicesA.Users[i].Cell)
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Username").SetString(servicesA.Users[i].Username)
goChannels <- servicesB[i]
}()
}
for index := range goChannels {
json.NewEncoder(os.Stdout).Encode(index)
}
}
它可以编译,但返回的消息类似于:
goroutine 1 [chan receive]: main.main() C://.....go.94 +0x55b.
英文:
My main
function reads json from a file, unmarshals it into a struct, converts it into another struct type and spits out formatted JSON through stdout.
I'm trying to implement goroutines and channels to add concurrency to my for
loop.
func main() {
muvMap := map[string]string{"male": "M", "female": "F"}
fileA, err := os.Open("serviceAfileultimate.json")
if err != nil {
panic(err)
}
defer fileA.Close()
data := make([]byte, 10000)
count, err := fileA.Read(data)
if err != nil {
panic(err)
}
dataBytes := data[:count]
var servicesA ServiceA
json.Unmarshal(dataBytes, &servicesA)
var servicesB = make([]ServiceB, servicesA.Count)
goChannels := make(chan ServiceB, servicesA.Count)
for i := 0; i < servicesA.Count; i++ {
go func() {
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Address").SetString(Merge(&servicesA.Users[i].Location))
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Date_Of_Birth").SetString(dateCopyTransform(servicesA.Users[i].Dob))
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Email").SetString(servicesA.Users[i].Email)
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Fullname").SetString(Merge(&servicesA.Users[i].Name))
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Gender").SetString(muvMap[servicesA.Users[i].Gender])
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Phone").SetString(servicesA.Users[i].Cell)
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Username").SetString(servicesA.Users[i].Username)
goChannels <- servicesB[i]
}()
}
for index := range goChannels {
json.NewEncoder(os.Stdout).Encode(index)
}
}
It compiles but is returning messages like:
goroutine 1 [chan receive]: main.main() C://.....go.94 +0x55b.
答案1
得分: 1
你正在打印通道的信息,而不是其中包含的数据。你不需要循环,只需要接收然后打印。
json := <-index
json.NewEncoder(os.Stdout).Encode(json)
现在我需要指出的是,这段代码不会阻塞。如果你想一直读取,直到所有工作完成,你需要一些形式的锁定/协调机制。
通常会看到类似以下的代码:
for {
select {
case json := <-jsonChannel:
// 处理数据
case <-abort:
// 退出循环
}
}
处理这种情况。另外,只是提醒一下,你正在使用默认容量初始化通道(意味着它是一个缓冲通道),这相当奇怪。我建议你查阅一些关于这个主题的教程,因为整体上你的设计需要一些改进才能成为非并发实现的改进。最后,你可以找到一些库来为你抽象出一部分工作,大多数人可能会建议你这样做。这里有一个例子:https://github.com/lytics/squaredance
英文:
You're printing the channels info, not the data it contains. You don't want a loop, you just want to receive then print.
json := <-index
json.NewEncoder(os.Stdout).Encode(json)
Now I do I need to point out, that code is not going to block. If you want to keep reading until all work is done you need some kind of locking/coordination mechanism.
You'll often see things like
for {
select {
case json := <-jsonChannel:
// do stuff
case <-abort:
// get out of here
}
}
To deal with that. Also, just fyi you're initializing your channel with a default capacity (meaning it's a buffered channel) which is pretty odd. I'd recommend reviewing some tutorials on the topic cause overall your design needs some work actually be an improvement of non-concurrent implementations. Lastly you can find libraries to abstract some of this work for you and most people would probably recommend you do. Here's an example; https://github.com/lytics/squaredance
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论