运行Go异步操作并写入映射。

huangapple go评论99阅读模式
英文:

Run Go asynchronous operations and write to map

问题

我有一个项目,试图在Go中同时运行无限个BigQuery。父项目是完全由Python编写的。我需要能够跟踪查询结果,就像在一个映射中一样。

输入:

{
'reports_portal': 'select * from reports_portal',
'billing_portal': 'select * from billing_portal',
}

输出:

{
'reports_portal': [23, 123, 5234, 632],
'billing_portal': [23, 123, 5234, 632],
}

等等。

由于这些BigQuery非常慢(从用户界面的角度来看,SRE需要等待15-30秒才能获得结果),它们需要异步运行。

我首先尝试将项目异步写入映射:

package main

import (
    "fmt"
)


func add_to_map(m map[string] string, word string) map[string]string {
    added_word := word + " plus more letters"
    m[word] = added_word
    return m
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        go add_to_map(words_map, this_word)
    }
    fmt.Println(words_map)
}

但是会报错:

$ go run try_asynchronous.go 
fatal error: concurrent map writes

goroutine 7 [running]:
runtime.throw(0x10b3b96, 0x15)
    /usr/local/Cellar/go/1.8.1/libexec/src/runtime/panic.go:596 +0x95 fp=0xc420032eb8 sp=0xc420032e98
runtime.mapassign(0x109ad20, 0xc420016270, 0xc420032fa0, 0x10b3268)
    /usr/local/Cellar/go/1.8.1/libexec/src/runtime/hashmap.go:499 +0x667 fp=0xc420032f58 sp=0xc420032eb8
main.add_to_map(0xc420016270, 0x10b1ba0, 0x3, 0x0)
    /tmp/golang-w-python/try_asynchronous.go:10 +0xa3 fp=0xc420032fc0 sp=0xc420032f58
runtime.goexit()
    /usr/local/Cellar/go/1.8.1/libexec/src/runtime/asm_amd64.s:2197 +0x1 fp=0xc420032fc8 sp=0xc420032fc0
created by main.main
    /tmp/golang-w-python/try_asynchronous.go:19 +0xc8

goroutine 1 [runnable]:
fmt.(*pp).fmtString(0xc42001e0c0, 0x10b1f52, 0x7, 0x76)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:424 +0x1a2
fmt.(*pp).printValue(0xc42001e0c0, 0x10953c0, 0xc42000e260, 0x98, 0x76, 0x1)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:729 +0x27aa
fmt.(*pp).printValue(0xc42001e0c0, 0x109ad20, 0xc420016270, 0x15, 0x76, 0x0)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:750 +0x103d
fmt.(*pp).printArg(0xc42001e0c0, 0x109ad20, 0xc420016270, 0x76)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:682 +0x217
fmt.(*pp).doPrintln(0xc42001e0c0, 0xc420045f28, 0x1, 0x1)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:1138 +0xa1
fmt.Fprintln(0x1108140, 0xc42000c018, 0xc420045f28, 0x1, 0x1, 0xc420045ef0, 0xc420045ee0, 0x1087218)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:247 +0x5c
fmt.Println(0xc420045f28, 0x1, 0x1, 0x10b1e6f, 0x6, 0x0)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:257 +0x57
main.main()
    /tmp/golang-w-python/try_asynchronous.go:21 +0x132
exit status 2

基于需要同时运行多个查询并尝试通过名称跟踪结果,我期望在异步过程中写入映射。但是fatal error: concurrent map writes表示不能这样做。

我不明白:

  1. 为什么不能这样做?
  2. 我应该怎么做才能同时运行这些bigquery?

编辑:

我最接近的一个返回结果的方法不是异步的:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func random_sleep() {
    r := rand.Intn(3000)
    time.Sleep(time.Duration(r) * time.Millisecond)
}

func add_to_map(m map[string] string, word string) {
    defer wg.Done()
    added_word := word + " plus more letters"
    mutex.Lock()
    defer mutex.Unlock()
    fmt.Println("Before sleep")
    random_sleep()
    m[word] = added_word
    fmt.Println("Added word %v", word)
}

func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}

结果是错误的:

Before sleep
Added word %v turtle
Before sleep
Added word %v cat
Before sleep
Added word %v giraffe
Before sleep
Added word %v dog
map[dog:dog plus more letters turtle:turtle plus more letters cat:cat plus more letters giraffe:giraffe plus more letters]

Before sleep
Added word %v turtle
Before sleep
Added word %v cat
Before sleep
Added word %v giraffe
Before sleep
Added word %v dog
map[dog:dog plus more letters turtle:turtle plus more letters cat:cat plus more letters giraffe:giraffe plus more letters]

结果应该非常快,不超过3秒(我认为是随机数的最大值):

期望结果:

Before sleep
Before sleep
Before sleep
Before sleep
Added word %v cat
Added word %v giraffe
Added word %v turtle
Added word %v dog
英文:

I have this project that tries to run unlimited bigqueries at the same time in Go. The parent project is all Python. I need to be able to keep track of the query results, like in a map.

Input:

{
 'reports_portal': 'select * from reports_portal',
 'billing_portal': 'select * from billing_portal',
}

output:

{
 'reports_portal': [23, 123, 5234, 632],
 'billing_portal': [23, 123, 5234, 632],
}

and so on

these bigqueries need to be run asynchronously as they're very slow (from a UI perspective, an SRE waiting 15-30 seconds for results.

I first try to asynchronously write items to a map:

package main

import (
    "fmt"
)


func add_to_map(m map[string] string, word string) map[string]string {
    added_word := word + " plus more letters"
    m[word] = added_word
    return m
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        go add_to_map(words_map, this_word)
    }
    fmt.Println(words_map)
}

blows up like:

$ go run try_asynchronous.go 
fatal error: concurrent map writes

goroutine 7 [running]:
runtime.throw(0x10b3b96, 0x15)
    /usr/local/Cellar/go/1.8.1/libexec/src/runtime/panic.go:596 +0x95 fp=0xc420032eb8 sp=0xc420032e98
runtime.mapassign(0x109ad20, 0xc420016270, 0xc420032fa0, 0x10b3268)
    /usr/local/Cellar/go/1.8.1/libexec/src/runtime/hashmap.go:499 +0x667 fp=0xc420032f58 sp=0xc420032eb8
main.add_to_map(0xc420016270, 0x10b1ba0, 0x3, 0x0)
    /tmp/golang-w-python/try_asynchronous.go:10 +0xa3 fp=0xc420032fc0 sp=0xc420032f58
runtime.goexit()
    /usr/local/Cellar/go/1.8.1/libexec/src/runtime/asm_amd64.s:2197 +0x1 fp=0xc420032fc8 sp=0xc420032fc0
created by main.main
    /tmp/golang-w-python/try_asynchronous.go:19 +0xc8

goroutine 1 [runnable]:
fmt.(*pp).fmtString(0xc42001e0c0, 0x10b1f52, 0x7, 0x76)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:424 +0x1a2
fmt.(*pp).printValue(0xc42001e0c0, 0x10953c0, 0xc42000e260, 0x98, 0x76, 0x1)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:729 +0x27aa
fmt.(*pp).printValue(0xc42001e0c0, 0x109ad20, 0xc420016270, 0x15, 0x76, 0x0)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:750 +0x103d
fmt.(*pp).printArg(0xc42001e0c0, 0x109ad20, 0xc420016270, 0x76)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:682 +0x217
fmt.(*pp).doPrintln(0xc42001e0c0, 0xc420045f28, 0x1, 0x1)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:1138 +0xa1
fmt.Fprintln(0x1108140, 0xc42000c018, 0xc420045f28, 0x1, 0x1, 0xc420045ef0, 0xc420045ee0, 0x1087218)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:247 +0x5c
fmt.Println(0xc420045f28, 0x1, 0x1, 0x10b1e6f, 0x6, 0x0)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:257 +0x57
main.main()
    /tmp/golang-w-python/try_asynchronous.go:21 +0x132
exit status 2

based on needing to run many queries at once and trying to keep track of the results by their name, I expected to write to a map during asynchronous. But fatal error: concurrent map writes says you can't.

I don't understand

  1. why not
  2. what I should do the run these bigqueries simultaneously.

EDIT:

The closest thing I have, that returns results, is not asynchronous:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func random_sleep() {
    r := rand.Intn(3000)
    time.Sleep(time.Duration(r) * time.Millisecond)
}

func add_to_map(m map[string] string, word string) {
    defer wg.Done()
    added_word := word + " plus more letters"
    mutex.Lock()
    defer mutex.Unlock()
    fmt.Println("Before sleep")
    random_sleep()
    m[word] = added_word
    fmt.Println("Added word %v", word)
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}

Results are wrong:

cchilders:~/work_projects/metricsportal/golang_integration (feature/golang-query) 
$ go run try_async.go 
Before sleep
Added word %v turtle
Before sleep
Added word %v cat
Before sleep
Added word %v giraffe
Before sleep
Added word %v dog
map[dog:dog plus more letters turtle:turtle plus more letters cat:cat plus more letters giraffe:giraffe plus more letters]

cchilders:~/work_projects/metricsportal/golang_integration (feature/golang-query) 
$ go run try_async.go 
Before sleep
Added word %v turtle
Before sleep
Added word %v cat
Before sleep
Added word %v giraffe
Before sleep
Added word %v dog
map[dog:dog plus more letters turtle:turtle plus more letters cat:cat plus more letters giraffe:giraffe plus more letters]

Results should be very fast, no longer than 3 seconds (the max of random I think):

Expectation - 

Before sleep
Before sleep
Before sleep
Before sleep
Added word %v cat
Added word %v giraffe
Added word %v turtle
Added word %v dog

答案1

得分: 1

你的代码中存在两个不同的问题:

1)即使你总是写入不同的键,但在没有锁定映射的情况下无法同时进行写入操作:https://golang.org/doc/faq#atomic_maps

因此,在访问映射时,你需要确保获得独占访问权。

2)在打印映射之前,你需要等待所有的goroutine完成(这就是为什么在你修改后的代码中结果不一致的原因)。

根据你的示例,解决这两个问题的简单方法如下:

package main

import (
    "fmt"
    "sync"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func add_to_map(m map[string]string, word string) {
    defer wg.Done()
    added_word := word + " plus more letters"
    mutex.Lock()
    defer mutex.Unlock()
    m[word] = added_word
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}

希望对你有所帮助!

英文:

You have two different issues in your code:

  1. Even if you are always writing to different keys, you can't do that simultaneously without locking the map: https://golang.org/doc/faq#atomic_maps

So, you need to just make sure you get exclusive access to the map when accessing it.

  1. You need to finish for all goroutines to finish before printing the map (that's why you get inconsistent results in your edited code)

A simple way to solve both issues based on your example:

package main

import (
    "fmt"
    "sync"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func add_to_map(m map[string] string, word string) {
	defer wg.Done()
    added_word := word + " plus more letters"
    mutex.Lock()
    defer mutex.Unlock()
    m[word] = added_word
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
    	wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}

答案2

得分: 1

好的,以下是翻译好的内容:

好的,让我澄清一些事情并帮助你。

你不需要从这里返回修改后的地图,因为你的函数得到的是地图的引用而不是副本。(让我们忽略你完全忽略了返回值的事实)

func add_to_map(m map[string]string, word string) map[string]string {
    added_word := word + "加上更多字母"
    m[word] = added_word
    return m
}

接下来,你需要同步访问地图。你可以使用互斥锁来实现。

import "sync"

var mutex sync.Mutex // 全局变量,也可以作为局部变量创建

func add_to_map(m map[string]string, word string) {
    added_word := word + "加上更多字母"
    // 在这里可以进行长时间的计算任务并计算结果
    // 在这里进行计算
    mutex.Lock()         // 结果准备好后锁定互斥锁
    defer mutex.Unlock() // 从函数返回时解锁互斥锁
    m[word] = added_word // 将结果写入共享地图
}

请注意,在Go 1.9中将会有一种并发地图类型。

编辑
你需要等待所有的Go协程完成,因为你的main()现在在它们之前完成。你可以使用WaitGroup来实现这一点。

package main

import (
    "fmt"
    "sync"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func add_to_map(m map[string]string, word string) {
    defer wg.Done()
    added_word := word + "加上更多字母"
    // 在这里进行繁重的工作
    //
    mutex.Lock()
    defer mutex.Unlock()
    m[word] = added_word
}

func main() {
    words_map := make(map[string]string)
    words := []string{"长颈鹿", "猫", "狗", "乌龟"}
    for _, this_word := range words {
        wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}
英文:

OK let me clarify some things and help you.

You don't need to return a modified map from here because your
function gets a reference to map not copy of it. (Let's ignore the fact
that you are completely ignoring return value)

func add_to_map(m map[string] string, word string) map[string]string {
    added_word := word + " plus more letters"
    m[word] = added_word
    return m
}

Next thing is that you need to synchronize access to map. You can use
mutex for this.

import "sync"

var mutex sync.Mutex //glabal variable but can be created as local also

func add_to_map(m map[string] string, word string) {
    added_word := word + " plus more letters"
    // here you can do long to compute task and calculate result
    // calc here
    mutex.Lock() //result ready lock mutex
    defer mutex.Unlock() // unlock mutex when we return from function
    m[word] = added_word // result write to shared map
}

Note that in Go 1.9 there will be a Concurrent Map type.

Edit:
You need to wait for all go-routines to finish because your main() now finishes before them. You can do this by using WaitGroup

package main

import (
    "fmt"
    "sync"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func add_to_map(m map[string] string, word string) {
    defer wg.Done()
    added_word := word + " plus more letters"
    // do heavy work here
    //
    mutex.Lock()
    defer mutex.Unlock()
    m[word] = added_word
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}

答案3

得分: 0

(代表原帖作者发布的解决方案)

我对虚假延迟的使用是错误的,这两个解决方案都有效。谢谢:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func random_sleep() {
    r := rand.Intn(3000)
    time.Sleep(time.Duration(r) * time.Millisecond)
}


func add_to_map(m map[string]string, word string) {
    defer wg.Done()
    added_word := word + " plus more letters"
    fmt.Println("Before sleep")
    random_sleep()
    mutex.Lock()
    defer mutex.Unlock()
    m[word] = added_word
    fmt.Println("Added word %v", word)
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}

结果:

$ go run try_async.go 
Before sleep
Before sleep
Before sleep
Before sleep
Added word %v dog
Added word %v giraffe
Added word %v cat
Added word %v turtle
map[turtle:turtle plus more letters dog:dog plus more letters giraffe:giraffe plus more letters cat:cat plus more letters]
英文:

(Posted solution on behalf of the OP).

My usage of fake delay was wrong, the solutions both work. Thank you:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func random_sleep() {
    r := rand.Intn(3000)
    time.Sleep(time.Duration(r) * time.Millisecond)
}


func add_to_map(m map[string] string, word string) {
    defer wg.Done()
    added_word := word + " plus more letters"
    fmt.Println("Before sleep")
    random_sleep()
    mutex.Lock()
    defer mutex.Unlock()
    m[word] = added_word
    fmt.Println("Added word %v", word)
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}

Result:

$ go run try_async.go 
Before sleep
Before sleep
Before sleep
Before sleep
Added word %v dog
Added word %v giraffe
Added word %v cat
Added word %v turtle
map[turtle:turtle plus more letters dog:dog plus more letters giraffe:giraffe plus more letters cat:cat plus more letters]

huangapple
  • 本文由 发表于 2017年7月29日 00:40:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/45378537.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定