英文:
Go: learning channels & queueing, fatal error
问题
我正在尝试学习如何使用Go中的通道来创建一个队列,用于我的其他项目之一。我的另一个项目基本上是将数据库行排队,然后使用行中的详细信息对数据库进行数值计算。
我不希望同一行在同一时间由多个工作器处理,因此需要检查是否有工作器正在处理该特定行ID,并且如果是这样,需要等待其完成。如果不是相同的行ID,它可以异步运行,但我还想限制同时运行的异步工作器的数量。在我的下面的代码中,我试图将其限制为三个工作器。
以下是我的代码:
package main
import (
"log"
"strconv"
"time"
)
// RowInfo holds the job info
type RowInfo struct {
id int
}
// WorkerCount holds how many workers are currently running
var WorkerCount int
// WorkerLocked specifies whether a row ID is currently being processed by a worker
var WorkerLocked map[string]bool
// Process the RowInfo
func worker(row RowInfo) {
rowID := strconv.Itoa(row.id)
WorkerCount++
WorkerLocked[rowID] = true
time.Sleep(1 * time.Second)
log.Printf("ID rcvd: %d", row.id)
WorkerLocked[rowID] = false
WorkerCount--
}
// waiter will check if the row is already being processed by a worker
// Block until it finishes completion, then dispatch
func waiter(row RowInfo) {
rowID := strconv.Itoa(row.id)
for WorkerLocked[rowID] == true {
time.Sleep(1 * time.Second)
}
go worker(row)
}
func main() {
jobsQueue := make(chan RowInfo, 10)
WorkerLocked = make(map[string]bool)
// Dispatcher waits for jobs on the channel and dispatches to waiter
go func() {
// Wait for a job
for {
// Only have a maximum of 3 workers running asynchronously at a time
for WorkerCount > 3 {
time.Sleep(1 * time.Second)
}
job := <-jobsQueue
go waiter(job)
}
}()
// Test the queue, send some data
for i := 0; i < 12; i++ {
r := RowInfo{
id: i,
}
jobsQueue <- r
}
// Prevent exit!
for {
time.Sleep(1 * time.Second)
}
}
我遇到了这个错误,但这是一个间歇性问题,因为有时候当我运行它时,它似乎是正常工作的。这是一个竞态条件吗?
go run main.go
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x8 pc=0x4565e7]
goroutine 37 [running]:
main.worker(0x5)
/home/piiz/go/src/github.com/zzz/asynch/main.go:25 +0x94
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 1 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.main()
/home/piiz/go/src/github.com/zzz/asynch/main.go:73 +0xf8
goroutine 5 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.main.func1(0xc82008c000)
/home/piiz/go/src/github.com/zzz/asynch/main.go:55 +0x2d
created by main.main
/home/piiz/go/src/github.com/zzz/asynch/main.go:61 +0xa0
goroutine 35 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x2)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 36 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x4)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 34 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x1)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 12 [runnable]:
runtime.goexit1()
/usr/local/go/src/runtime/proc1.go:1732
runtime.goexit()
/usr/local/go/src/runtime/asm_amd64.s:1697 +0x6
created by main.main.func1
/home/piiz/go/src/github.com/zzz/asynch/main.go:59 +0x8c
goroutine 19 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x8)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 20 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x0)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 16 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x9)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 33 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x3)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 18 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x7)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 22 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0xa)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 49 [runnable]:
main.worker(0x6)
/home/piiz/go/src/github.com/zzz/asynch/main.go:21
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
exit status 2
不过,我还在学习中,所以如果你看到我的代码感到困惑,那我也不会感到惊讶 :)也许我完全错误地解决了这个问题。谢谢。
英文:
I'm trying to learn how to use channels to make a queue in Go for one of my other projects. My other project basically queues up database rows, and then does number crunching on the database using the details in the rows.
I don't want the same row to be processing in a worker at the same time, so it needs to check whether a worker is currently working on that specific row ID, and if so, wait for it to finish. If it's not the same row ID, it can run asynchronously, but I also want to limit the number of asynchronous workers that can run at the same time. In my code below, I'm trying to limit it to three workers at the moment.
Here's what I have:
package main
import (
"log"
"strconv"
"time"
)
// RowInfo holds the job info
type RowInfo struct {
id int
}
// WorkerCount holds how many workers are currently running
var WorkerCount int
// WorkerLocked specifies whether a row ID is currently processing by a worker
var WorkerLocked map[string]bool
// Process the RowInfo
func worker(row RowInfo) {
rowID := strconv.Itoa(row.id)
WorkerCount++
WorkerLocked[rowID] = true
time.Sleep(1 * time.Second)
log.Printf("ID rcvd: %d", row.id)
WorkerLocked[rowID] = false
WorkerCount--
}
// waiter will check if the row is already processing in a worker
// Block until it finishes completion, then dispatch
func waiter(row RowInfo) {
rowID := strconv.Itoa(row.id)
for WorkerLocked[rowID] == true {
time.Sleep(1 * time.Second)
}
go worker(row)
}
func main() {
jobsQueue := make(chan RowInfo, 10)
WorkerLocked = make(map[string]bool)
// Dispatcher waits for jobs on the channel and dispatches to waiter
go func() {
// Wait for a job
for {
// Only have a max of 3 workers running asynch at a time
for WorkerCount > 3 {
time.Sleep(1 * time.Second)
}
job := <-jobsQueue
go waiter(job)
}
}()
// Test the queue, send some data
for i := 0; i < 12; i++ {
r := RowInfo{
id: i,
}
jobsQueue <- r
}
// Prevent exit!
for {
time.Sleep(1 * time.Second)
}
}
And I'm getting this error, but it's an intermittent issue because sometimes when I run it it appears to work. Is there a race condition?:
go run main.go
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x8 pc=0x4565e7]
goroutine 37 [running]:
main.worker(0x5)
/home/piiz/go/src/github.com/zzz/asynch/main.go:25 +0x94
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 1 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.main()
/home/piiz/go/src/github.com/zzz/asynch/main.go:73 +0xf8
goroutine 5 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.main.func1(0xc82008c000)
/home/piiz/go/src/github.com/zzz/asynch/main.go:55 +0x2d
created by main.main
/home/piiz/go/src/github.com/zzz/asynch/main.go:61 +0xa0
goroutine 35 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x2)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 36 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x4)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 34 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x1)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 12 [runnable]:
runtime.goexit1()
/usr/local/go/src/runtime/proc1.go:1732
runtime.goexit()
/usr/local/go/src/runtime/asm_amd64.s:1697 +0x6
created by main.main.func1
/home/piiz/go/src/github.com/zzz/asynch/main.go:59 +0x8c
goroutine 19 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x8)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 20 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x0)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 16 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x9)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 33 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x3)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 18 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0x7)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 22 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:59 +0xf9
main.worker(0xa)
/home/piiz/go/src/github.com/zzz/asynch/main.go:27 +0xa1
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
goroutine 49 [runnable]:
main.worker(0x6)
/home/piiz/go/src/github.com/zzz/asynch/main.go:21
created by main.waiter
/home/piiz/go/src/github.com/zzz/asynch/main.go:42 +0xbb
exit status 2
Anyway, I am still learning, so if you look at my code and go "what the hell", well, I won't be surprised Maybe I'm approaching this problem entirely wrong. Thanks.
答案1
得分: 2
如果你要使用WorkerLocked
映射,你需要使用sync
包来保护对它的访问。你还需要以同样的方式保护WorkerCount
(或使用原子操作)。这样做也会使得睡眠变得不必要(使用条件变量)。
更好的做法是,使用通道让3个(或更多)工作线程等待要处理的行。然后,将行分发给不同的工作线程,以便特定的行始终由特定的工作线程处理(例如,使用row.id % 3
来确定将行发送给哪个工作线程/通道)。
英文:
If you're going to use the WorkerLocked
map, you need to protect access to it using the sync
package. You also need to protect WorkerCount
in the same way (or using atomic operations). Doing something like that would also make sleeping unnecessary (using condition variables).
Better yet, have 3 (or however many) workers waiting for rows to work on using channels. You would then distribute the rows to the various workers such that a particular row is always worked on by a particular worker (e.g., using row.id % 3 to determine which worker/channel to send the row to).
答案2
得分: 1
我强烈建议在处理从数据库读取的工作线程中不要使用任何锁定。锁定和信号量通常会引起许多问题,并最终导致数据损坏。相信我,我曾经有过这样的经历。在这种情况下,你需要小心并避免使用它们。如果你希望保留数据并维护映射,锁定是有用的,但不适用于实际处理。
通过锁定Go协程,你会不必要地减慢Go程序的速度。Go被设计成尽可能快地处理事务。不要束缚它。
这是我自己的一些理论,可能有助于你更好地理解我的意思:
-
为了处理工作线程的限制,只需生成3个不同的Go协程来从队列中选择。工作线程永远不会从通道中获取相同的作业,所以在这里是安全的。
-
make()已经有了内部通道限制,可以在这种情况下很好地使用。该通道限制是第二个参数。所以如果你写
queue := make(chan RowInfo, 10)
,这意味着这个队列可以容纳最多10个RowInfo。如果聚合到这个队列的循环达到10个,它将被锁定并等待工作线程释放一个项目。所以一旦队列变为9,数据库聚合器将写入第10个,工作线程将取出第10个。
通过这种方式,你可以拥有Golang所关注的自然工作流程 这也被称为生成预工作线程。
关于管理数据库状态,你可以在数据库中设置一个状态,表示“进行中”,这样每次选择时都会对该行进行更新,表示它正在进行中。当然,这只是一种方法。通过在Golang中保留某种映射,我认为你会给你的服务带来更多的负担。
希望这可以帮助你!
英文:
I highly recommend not to use any locking in this situation where you have workers that handles reads from database. Locks and semaphores in general can cause a lot of issue and in the end leave you with bunch of corrupted data. Trust me. Been there, done that. You need to be careful and avoid using them in cases like this. Locking is good if you wish to preserve data and maintain maps for example but not for actual processing.
By locking go routine you're slowing down your go program unnecessary. Go is designed to process things fast as possible. Don't hold him down.
Here's some theory of my own that might help you understand what I'm trying to say a little bit better:
-
In order to handle workers limit to 3. Just spawn 3 different go routines that selects from queue. Worker will never take same job from channel so you're safe here.
-
make() already have internal channel limit done that can be used in this case nicely. That channel limit is actual 2nd parameter. So if you write <pre>queue := make(chan RowInfo, 10)</pre> it means that this queue can take up to 10 of RowInfo. In case that loop that aggregates into this queue reaches 10 of them, it will lock down and wait for worker to release one item from channel. So once queue goes to 9, database aggregator will than write 10th and worker will take that 10th out.
On this way you can have natural workflow that golang is all about This as well is called spawning pre-workers
<!-- language: go -->
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
// RowInfo holds the job info
type RowInfo struct {
ID int
}
func worker(queue chan RowInfo, done chan bool) {
fmt.Println("Starting worker...")
for {
select {
case row := <-queue:
fmt.Printf("Got row info: %v \n", row)
// Keep it for second so we can see actual queue lock working
time.Sleep(1 * time.Second)
case <-time.After(10 * time.Second):
fmt.Printf("This job is taking way too long. Let's clean it up now by lets say write write in database that job has failed so it can be restarted again when time is right.")
case <-done:
fmt.Printf("Got quit signal... Killing'em all")
break
}
}
}
func handleSigterm(kill chan os.Signal, done chan bool) {
select {
case _ = <-kill:
close(done)
}
}
func main() {
// Do not allow more than 10 records to be in the channel.
queue := make(chan RowInfo, 10)
done := make(chan bool)
kill := make(chan os.Signal, 1)
signal.Notify(kill, os.Interrupt)
signal.Notify(kill, syscall.SIGTERM)
go handleSigterm(kill, done)
for i := 0; i < 3; i++ {
go worker(queue, done)
}
// Should be infinite loop in the end...
go func() {
for i := 0; i < 100; i++ {
fmt.Printf("Queueing: %v \n", i)
queue <- RowInfo{ID: i}
}
}()
<-done
// Give it some time to process things before shutting down. This is bad way of doing things
// but is efficient for this example
time.Sleep(5 * time.Second)
}
As for managing database states, You can have status in database saying "in-progress" so every time you select you as well do an update on that row that it's in progress. This is of course one way of doing it. By preserving some sort of mapping in golang I would say that you'd torture your service more than it needs to be.
Hope this helps!
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论