英文:
How to implement a counter when using golang's goroutine?
问题
我正在尝试创建一个具有push和pop函数的队列结构。
我需要使用10个线程push数据,并使用另外10个线程pop数据,就像我在下面的代码中所做的那样。
问题:
- 我需要打印出我已经pushed/popped了多少次,但我不知道如何做到这一点。
- 有没有办法加快我的代码速度?我的代码对我来说太慢了。
package main
import (
"runtime"
"time"
)
const (
DATA_SIZE_PER_THREAD = 10000000
)
type Queue struct {
records string
}
func (self Queue) push(record chan interface{}) {
// 需要push计数器
record <- time.Now()
}
func (self Queue) pop(record chan interface{}) {
// 需要pop计数器
<- record
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
// 记录通道
record := make(chan interface{},1000000)
// 完成标志通道
finish := make(chan bool)
queue := new(Queue)
for i:=0; i<10; i++ {
go func() {
for j:=0; j<DATA_SIZE_PER_THREAD; j++ {
queue.push(record)
}
finish<-true
}()
}
for i:=0; i<10; i++ {
go func() {
for j:=0; j<DATA_SIZE_PER_THREAD; j++ {
queue.pop(record)
}
finish<-true
}()
}
for i:=0; i<20; i++ {
<-finish
}
}
英文:
I'm trying to make a queue struct that have push and pop functions.
I need to use 10 threads push and another 10 threads pop data, just like I did in the code below.
Questions:
- I need to print out how much I have pushed/popped, but I don't know how to do that.
- Is there anyway to speed up my code? The code is too slow for me.
package main
import (
"runtime"
"time"
)
const (
DATA_SIZE_PER_THREAD = 10000000
)
type Queue struct {
records string
}
func (self Queue) push(record chan interface{}) {
// need push counter
record <- time.Now()
}
func (self Queue) pop(record chan interface{}) {
// need pop counter
<- record
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
//record chan
record := make(chan interface{},1000000)
//finish flag chan
finish := make(chan bool)
queue := new(Queue)
for i:=0; i<10; i++ {
go func() {
for j:=0; j<DATA_SIZE_PER_THREAD; j++ {
queue.push(record)
}
finish<-true
}()
}
for i:=0; i<10; i++ {
go func() {
for j:=0; j<DATA_SIZE_PER_THREAD; j++ {
queue.pop(record)
}
finish<-true
}()
}
for i:=0; i<20; i++ {
<-finish
}
}
答案1
得分: 16
有几件事情你应该修复。
-
队列类型的方法应该有指针接收器。否则,每次方法调用都会创建当前队列类型的副本,对队列字段的任何更改都不会在方法调用之后持续存在。
-
等待所有协程完成可以使用
sync.WaitGroup
来完成。这是它专门设计的目的。 -
在队列类型内部维护一个线程安全的推入/弹出计数器可以使用
sync/atomic
包来完成。
就速度而言,从你的示例中,我不太确定你想要实现什么。如果你详细说明一下,可能会有一些优化方法。
这是我从你的代码中修改的一个示例:
package main
import (
"log"
"runtime"
"sync"
"sync/atomic"
"time"
)
const SizePerThread = 10000000
type Queue struct {
records string
count int64
}
func (q *Queue) push(record chan interface{}) {
record <- time.Now()
newcount := atomic.AddInt64(&q.count, 1)
log.Printf("Push: %d", newcount)
}
func (q *Queue) pop(record chan interface{}) {
<-record
newcount := atomic.AddInt64(&q.count, -1)
log.Printf("Pop: %d", newcount)
}
func main() {
var wg sync.WaitGroup
runtime.GOMAXPROCS(runtime.NumCPU())
record := make(chan interface{}, 1000000)
queue := new(Queue)
// 我们启动了20个协程。
// 让等待组知道它应该等待这些协程中的多少个完成。
wg.Add(20)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < SizePerThread; j++ {
queue.push(record)
}
}()
go func() {
defer wg.Done()
for j := 0; j < SizePerThread; j++ {
queue.pop(record)
}
}()
}
// 等待所有协程完成。
wg.Wait()
}
英文:
There are a few things you should fix.
-
The methods on the Queue type should have pointer receivers. Otherwise, every method
call will create a copy of the current queue type and any changes to queue fields will
not persist beyond the method call itself. -
Waiting for all routines to finish, can be done using a
sync.WaitGroup
. This
is specifically what it was designed for. -
Maintaining a thread-safe push/pop counter inside the queue type can be done by
using thesync/atomic
package.
As far as speed goes, from your example, I am not quite sure what you are trying to achieve. Any optimizations might come up if you elaborate on that a little.
Here is an example I modified from your code:
package main
import (
"log"
"runtime"
"sync"
"sync/atomic"
"time"
)
const SizePerThread = 10000000
type Queue struct {
records string
count int64
}
func (q *Queue) push(record chan interface{}) {
record <- time.Now()
newcount := atomic.AddInt64(&q.count, 1)
log.Printf("Push: %d", newcount)
}
func (q *Queue) pop(record chan interface{}) {
<-record
newcount := atomic.AddInt64(&q.count, -1)
log.Printf("Pop: %d", newcount)
}
func main() {
var wg sync.WaitGroup
runtime.GOMAXPROCS(runtime.NumCPU())
record := make(chan interface{}, 1000000)
queue := new(Queue)
// We are launching 20 goroutines.
// Let the waitgroup know it should wait for as many
// of them to finish.
wg.Add(20)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < SizePerThread; j++ {
queue.push(record)
}
}()
go func() {
defer wg.Done()
for j := 0; j < SizePerThread; j++ {
queue.pop(record)
}
}()
}
// Wait for all goroutines to finish.
wg.Wait()
}
答案2
得分: -5
问题1的答案:正如jimt建议的那样,sync/atomic有一些用于原子更新计数器的函数,这对你可能很有用。
问题2的答案:减小DATA_SIZE_PER_THREAD的值,或者更好的方法是使用以下程序:
package main
func main() {}
它以更高效的方式产生与你的程序相同的输出。
但是,说真的,我理解你写了一个小程序来探索一些概念。然而,你的程序存在一些问题。现在不是担心速度的时候,而是学习一些基本概念的时候。
英文:
Answer to question 1: As jimt suggested, sync/atomic has functions for atomically updating a counter, which may be useful to you.
Answer to question 2: Reduce the value of DATA_SIZE_PER_THREAD, or better yet, use the program
package main
func main() {}
which produces the same output as your program in a more efficent way.
Seriously though, I understand you have written a small program to explore some concepts. Your program contains a number of issues however. This is not the time to worry about speed, it is the time to learn some fundamental concepts.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论