高并发应用中实现全局计数器的最佳方法是什么?

huangapple go评论71阅读模式
英文:

Best way to implement global counters for highly concurrent applications?

问题

实现高并发应用程序的全局计数器的最佳方法是什么?在我的情况下,我可能有10K-20K个Go协程执行“工作”,我想要计算协程共同处理的项目的数量和类型...

“经典”的同步编码风格如下:

var work_counter int

func GoWorkerRoutine() {
    for {
        // do work
        atomic.AddInt32(&work_counter, 1)
    }    
}

现在这变得更加复杂,因为我想要跟踪正在进行的工作的“类型”,所以我实际上需要这样的东西:

var work_counter map[string]int
var work_mux sync.Mutex

func GoWorkerRoutine() {
    for {
        // do work
        work_mux.Lock()
        work_counter["type1"]++
        work_mux.Unlock()
    }    
}

似乎应该有一种使用通道或类似的“go”优化方式,类似于这样:

var work_counter int
var work_chan chan int // 在其他地方调用make()(带缓冲区)

// 在其他地方启动
func GoCounterRoutine() {
    for {
        select {
            case c := <-work_chan:
                work_counter += c
                break
        }
    }
}

func GoWorkerRoutine() {
    for {
        // do work
        work_chan <- 1
    }    
}

最后一个示例仍然缺少映射,但很容易添加。这种风格是否比简单的原子递增提供更好的性能?当涉及到对全局值的并发访问与可能在I/O上阻塞的情况时,我无法确定这是否更加复杂...

感谢您的思考。

更新5/28/2013:

我测试了几种实现方式,结果并不是我预期的,这是我的计数器源代码:

package helpers

import (
)

type CounterIncrementStruct struct {
    bucket string
    value int
}

type CounterQueryStruct struct {
    bucket string
    channel chan int
}

var counter map[string]int
var counterIncrementChan chan CounterIncrementStruct
var counterQueryChan chan CounterQueryStruct
var counterListChan chan chan map[string]int

func CounterInitialize() {
    counter = make(map[string]int)
    counterIncrementChan = make(chan CounterIncrementStruct, 0)
    counterQueryChan = make(chan CounterQueryStruct, 100)
    counterListChan = make(chan chan map[string]int, 100)
    go goCounterWriter()
}

func goCounterWriter() {
    for {
        select {
            case ci := <-counterIncrementChan:
                if len(ci.bucket) == 0 { return }
                counter[ci.bucket] += ci.value
                break
            case cq := <-counterQueryChan:
                val, found := counter[cq.bucket]
                if found {
                    cq.channel <- val
                } else {
                    cq.channel <- -1    
                }
                break
            case cl := <-counterListChan:
                nm := make(map[string]int)
                for k, v := range counter {
                    nm[k] = v
                }
                cl <- nm
                break
        }
    }
}

func CounterIncrement(bucket string, counter int) {
    if len(bucket) == 0 || counter == 0 { return }
    counterIncrementChan <- CounterIncrementStruct{bucket, counter}
}

func CounterQuery(bucket string) int {
    if len(bucket) == 0 { return -1 }
    reply := make(chan int)
    counterQueryChan <- CounterQueryStruct{bucket, reply}
    return <-reply
}

func CounterList() map[string]int {
    reply := make(chan map[string]int)
    counterListChan <- reply
    return <-reply
}

它使用通道进行写入和读取,这似乎是合理的。

这是我的测试用例:

func bcRoutine(b *testing.B, e chan bool) {
    for i := 0; i < b.N; i++ {
        CounterIncrement("abc123", 5)
        CounterIncrement("def456", 5)
        CounterIncrement("ghi789", 5)
        CounterIncrement("abc123", 5)
        CounterIncrement("def456", 5)
        CounterIncrement("ghi789", 5)
    }
    e <- true
}

func BenchmarkChannels(b *testing.B) {
    b.StopTimer()
    CounterInitialize()
    e := make(chan bool)
    b.StartTimer()
    
    go bcRoutine(b, e)
    go bcRoutine(b, e)
    go bcRoutine(b, e)
    go bcRoutine(b, e)
    go bcRoutine(b, e)
    
    <-e
    <-e
    <-e
    <-e
    <-e
    
}

var mux sync.Mutex
var m map[string]int
func bmIncrement(bucket string, value int) {
    mux.Lock()
    m[bucket] += value
    mux.Unlock()
}

func bmRoutine(b *testing.B, e chan bool) {
    for i := 0; i < b.N; i++ {
        bmIncrement("abc123", 5)
        bmIncrement("def456", 5)
        bmIncrement("ghi789", 5)
        bmIncrement("abc123", 5)
        bmIncrement("def456", 5)
        bmIncrement("ghi789", 5)
    }
    e <- true
}

func BenchmarkMutex(b *testing.B) {
    b.StopTimer()
    m = make(map[string]int)
    e := make(chan bool)
    b.StartTimer()
    
    for i := 0; i < b.N; i++ {
        bmIncrement("abc123", 5)
        bmIncrement("def456", 5)
        bmIncrement("ghi789", 5)
        bmIncrement("abc123", 5)
        bmIncrement("def456", 5)
        bmIncrement("ghi789", 5)
    }
    
    go bmRoutine(b, e)
    go bmRoutine(b, e)
    go bmRoutine(b, e)
    go bmRoutine(b, e)
    go bmRoutine(b, e)
    
    <-e
    <-e
    <-e
    <-e
    <-e
    
}

我实现了一个简单的基准测试,只是在映射周围使用了互斥锁(只测试写入),并且在并行运行5个goroutine的情况下进行了基准测试。以下是结果:

$ go test --bench=. helpers
PASS
BenchmarkChannels         100000             15560 ns/op
BenchmarkMutex   1000000              2669 ns/op
ok      helpers 4.452s

我没有预料到互斥锁会快那么多...

进一步的思考?

英文:

What is the best way to implement global counters for a highly concurrent application? In my case I may have 10K-20K go routines performing "work", and I want to count the number and types of items that the routines are working on collectively...

The "classic" synchronous coding style would look like:

var work_counter int

func GoWorkerRoutine() {
    for {
        // do work
        atomic.AddInt32(&amp;work_counter,1)
    }    
}

Now this gets more complicated because I want to track the "type" of work being done, so really I'd need something like this:

var work_counter map[string]int
var work_mux sync.Mutex

func GoWorkerRoutine() {
    for {
        // do work
        work_mux.Lock()
        work_counter[&quot;type1&quot;]++
        work_mux.Unlock()
    }    
}

It seems like there should be a "go" optimized way using channels or something similar to this:

var work_counter int
var work_chan chan int // make() called somewhere else (buffered)

// started somewher else
func GoCounterRoutine() {
    for {
        select {
            case c := &lt;- work_chan:
                work_counter += c
                break
        }
    }
}

func GoWorkerRoutine() {
    for {
        // do work
        work_chan &lt;- 1
    }    
}

This last example is still missing the map, but that's easy enough to add. Will this style provide better performance than just a simple atomic increment? I can't tell if this is more or less complicated when we're talking about concurrent access to a global value versus something that may block on I/O to complete...

Thoughts are appreciated.

Update 5/28/2013:

I tested a couple implementations, and the results were not what I expected, here's my counter source code:

package helpers

import (
)

type CounterIncrementStruct struct {
	bucket string
	value int
}

type CounterQueryStruct struct {
	bucket string
	channel chan int
}

var counter map[string]int
var counterIncrementChan chan CounterIncrementStruct
var counterQueryChan chan CounterQueryStruct
var counterListChan chan chan map[string]int

func CounterInitialize() {
	counter = make(map[string]int)
	counterIncrementChan = make(chan CounterIncrementStruct,0)
	counterQueryChan = make(chan CounterQueryStruct,100)
	counterListChan = make(chan chan map[string]int,100)
	go goCounterWriter()
}

func goCounterWriter() {
	for {
		select {
			case ci := &lt;- counterIncrementChan:
				if len(ci.bucket)==0 { return }
				counter[ci.bucket]+=ci.value
				break
			case cq := &lt;- counterQueryChan:
				val,found:=counter[cq.bucket]
				if found {
					cq.channel &lt;- val
				} else {
					cq.channel &lt;- -1	
				}
				break
			case cl := &lt;- counterListChan:
				nm := make(map[string]int)
				for k, v := range counter {
				    nm[k] = v
				}
				cl &lt;- nm
				break
		}
	}
}

func CounterIncrement(bucket string, counter int) {
	if len(bucket)==0 || counter==0 { return }
	counterIncrementChan &lt;- CounterIncrementStruct{bucket,counter}
}

func CounterQuery(bucket string) int {
	if len(bucket)==0 { return -1 }
	reply := make(chan int)
	counterQueryChan &lt;- CounterQueryStruct{bucket,reply}
	return &lt;- reply
}

func CounterList() map[string]int {
	reply := make(chan map[string]int)
	counterListChan &lt;- reply
	return &lt;- reply
}

It uses channels for both writes and reads which seems logical.

Here are my test cases:

func bcRoutine(b *testing.B,e chan bool) {
	for i := 0; i &lt; b.N; i++ {
		CounterIncrement(&quot;abc123&quot;,5)
		CounterIncrement(&quot;def456&quot;,5)
		CounterIncrement(&quot;ghi789&quot;,5)
		CounterIncrement(&quot;abc123&quot;,5)
		CounterIncrement(&quot;def456&quot;,5)
		CounterIncrement(&quot;ghi789&quot;,5)
	}
	e&lt;-true
}

func BenchmarkChannels(b *testing.B) {
	b.StopTimer()
	CounterInitialize()
	e:=make(chan bool)
	b.StartTimer()
	
	go bcRoutine(b,e)
	go bcRoutine(b,e)
	go bcRoutine(b,e)
	go bcRoutine(b,e)
	go bcRoutine(b,e)
	
	&lt;-e
	&lt;-e
	&lt;-e
	&lt;-e
	&lt;-e
	
}

var mux sync.Mutex
var m map[string]int
func bmIncrement(bucket string, value int) {
	mux.Lock()
	m[bucket]+=value
	mux.Unlock()
}

func bmRoutine(b *testing.B,e chan bool) {
	for i := 0; i &lt; b.N; i++ {
		bmIncrement(&quot;abc123&quot;,5)
		bmIncrement(&quot;def456&quot;,5)
		bmIncrement(&quot;ghi789&quot;,5)
		bmIncrement(&quot;abc123&quot;,5)
		bmIncrement(&quot;def456&quot;,5)
		bmIncrement(&quot;ghi789&quot;,5)
	}
	e&lt;-true
}

func BenchmarkMutex(b *testing.B) {
	b.StopTimer()
	m=make(map[string]int)
	e:=make(chan bool)
	b.StartTimer()
	
	for i := 0; i &lt; b.N; i++ {
		bmIncrement(&quot;abc123&quot;,5)
		bmIncrement(&quot;def456&quot;,5)
		bmIncrement(&quot;ghi789&quot;,5)
		bmIncrement(&quot;abc123&quot;,5)
		bmIncrement(&quot;def456&quot;,5)
		bmIncrement(&quot;ghi789&quot;,5)
	}
	
	go bmRoutine(b,e)
	go bmRoutine(b,e)
	go bmRoutine(b,e)
	go bmRoutine(b,e)
	go bmRoutine(b,e)
	
	&lt;-e
	&lt;-e
	&lt;-e
	&lt;-e
	&lt;-e
	
}

I implemented a simple benchmark with just a mutex around the map (just testing writes), and benchmarked both with 5 goroutines running in parallel. Here are the results:

$ go test --bench=. helpers
PASS
BenchmarkChannels         100000             15560 ns/op
BenchmarkMutex   1000000              2669 ns/op
ok      helpers 4.452s

I would not have expected the mutex to be that much faster...

Further thoughts?

答案1

得分: 37

如果您正在尝试同步一组工作人员(例如,允许n个goroutine处理一些工作量),那么通道是一个非常好的方法,但如果您实际上只需要一个计数器(例如页面浏览量),那么它们就过于复杂了。syncsync/atomic包可以帮助您。

import "sync/atomic"

type count32 int32

func (c *count32) inc() int32 {
    return atomic.AddInt32((*int32)(c), 1)
}

func (c *count32) get() int32 {
    return atomic.LoadInt32((*int32)(c))
}

Go Playground示例

英文:

If you're trying to synchronize a pool of workers (e.g. allow n goroutines to crunch away at some amount of work) then channels are a very good way to go about it, but if all you actually need is a counter (e.g page views) then they are overkill. The sync and sync/atomic packages are there to help.

import &quot;sync/atomic&quot;

type count32 int32

func (c *count32) inc() int32 {
	return atomic.AddInt32((*int32)(c), 1)
}

func (c *count32) get() int32 {
	return atomic.LoadInt32((*int32)(c))
}

Go Playground Example

答案2

得分: 21

不要使用sync/atomic - 来自链接页面

包atomic提供了低级原子内存原语,用于实现同步算法。
使用这些函数需要非常小心。除了特殊的低级应用程序外,最好使用通道或sync包的功能来进行同步。

上次我不得不这样做时,我对类似于你的第二个示例的互斥锁和类似于你的第三个示例的通道进行了基准测试。当事情变得非常繁忙时,通道代码获胜,但请确保将通道缓冲区设置得很大。

英文:

Don't use sync/atomic - from the linked page

> Package atomic provides low-level atomic memory primitives useful for
> implementing synchronization algorithms.
> These functions require great care to be used correctly. Except for
> special, low-level applications, synchronization is better done with
> channels or the facilities of the sync package

Last time I had to do this I benchmarked something which looked like your second example with a mutex and something which looked like your third example with a channel. The channels code won when things got really busy, but make sure you make the channel buffer big.

答案3

得分: 11

不要因为认为互斥锁和锁不是“适当的Go”而害怕使用它们。在你的第二个例子中,清楚地展示了正在发生的事情,这非常重要。你需要自己尝试一下,看看互斥锁有多么受欢迎,以及增加复杂性是否会提高性能。

如果你确实需要提高性能,也许分片是最好的方法:
http://play.golang.org/p/uLirjskGeN

缺点是你的计数只会与你的分片决定的最新状态一样。频繁调用time.Since()可能也会带来性能损失,但是,像往常一样,先进行测量 高并发应用中实现全局计数器的最佳方法是什么?

英文:

Don't be afraid of using mutexes and locks just because you think they're "not proper Go". In your second example it's absolutely clear what's going on, and that counts for a lot. You will have to try it yourself to see how contented that mutex is, and whether adding complication will increase performance.

If you do need increased performance, perhaps sharding is the best way to go:
http://play.golang.org/p/uLirjskGeN

The downside is that your counts will only be as up-to-date as your sharding decides. There may also be performance hits from calling time.Since() so much, but, as always, measure it first 高并发应用中实现全局计数器的最佳方法是什么?

答案4

得分: 8

另一个使用sync/atomic的答案适用于像页面计数器这样的东西,但不适用于向外部API提交唯一标识符。为了做到这一点,您需要一个“增加并返回”操作,这只能作为CAS循环实现。

这是一个围绕int32的CAS循环,用于生成唯一的消息ID:

import "sync/atomic"

type UniqueID struct {
    counter int32
}

func (c *UniqueID) Get() int32 {
    for {
        val := atomic.LoadInt32(&c.counter)
        if atomic.CompareAndSwapInt32(&c.counter, val, val+1) {
            return val
        }
    }
}

要使用它,只需执行以下操作:

requestID := client.msgID.Get()
form.Set("id", requestID)

与通道相比,这种方法的优点在于它不需要太多额外的空闲资源-现有的goroutine在需要ID时使用,而不是为程序所需的每个计数器使用一个goroutine。

待办事项:与通道进行基准测试。我猜在没有争用的情况下,通道会更差,在高争用的情况下会更好,因为它们具有排队,而这段代码只是简单地旋转以赢得比赛。

英文:

The other answer using sync/atomic is suited for things like page counters, but not for submitting unique identifiers to an external API. To do that, you need an "increment-and-return" operation, which can only be implemented as a CAS loop.

Here's a CAS loop around an int32 to generate unique message IDs:

<!-- language: go -->

import &quot;sync/atomic&quot;

type UniqueID struct {
	counter int32
}

func (c *UniqueID) Get() int32 {
	for {
		val := atomic.LoadInt32(&amp;c.counter)
		if atomic.CompareAndSwapInt32(&amp;c.counter, val, val+1) {
			return val
		}
	}
}

To use it, simply do:

requestID := client.msgID.Get()
form.Set(&quot;id&quot;, requestID)

This has an advantage over channels in that it doesn't require as many extra idle resources - existing goroutines are used as they ask for IDs rather than using one goroutine for every counter your program needs.

TODO: Benchmark against channels. I'm going to guess that channels are worse in the no-contention case and better in the high-contention case, as they have queuing while this code simply spins attempting to win the race.

答案5

得分: 7

旧问题,但我刚刚偶然发现这个,可能会有所帮助:https://github.com/uber-go/atomic

基本上,Uber的工程师们在sync/atomic包的基础上构建了一些不错的实用函数。

我还没有在生产环境中测试过这个,但代码库非常小,大多数函数的实现相当标准

绝对比使用通道或基本互斥锁更可取。

英文:

Old question but I just stumbled upon this and it may help: https://github.com/uber-go/atomic

Basically the engineers at Uber has built a few nice util functions on top of the sync/atomic package

I haven't tested this in production yet but the codebase is very small and the implementation of most functions is quite stock standard

Definitely preferred over using channels or basic mutexes

答案6

得分: 3

最后一个很接近:

package main

import "fmt"

func main() {
    ch := make(chan int, 3)
    go GoCounterRoutine(ch)
    go GoWorkerRoutine(1, ch)
    // not run as goroutine because mein() would just end
    GoWorkerRoutine(2, ch)

}

// started somewhere else
func GoCounterRoutine(ch chan int) {
    counter := 0
    for {
        ch <- counter
        counter += 1
    }
}

func GoWorkerRoutine(n int, ch chan int) {
    var seq int
    for seq := range ch {
        // do work:
        fmt.Println(n, seq)
    }
}

这引入了单点故障:如果计数器 goroutine 死掉,一切都会丢失。如果所有的 goroutine 在一台计算机上执行,这可能不是一个问题,但如果它们分散在网络上,这可能成为一个问题。为了使计数器免受集群中单个节点故障的影响,必须使用特殊的算法。

英文:

The last one was close:

package main

import &quot;fmt&quot;

func main() {
	ch := make(chan int, 3)
	go GoCounterRoutine(ch)
	go GoWorkerRoutine(1, ch)
	// not run as goroutine because mein() would just end
	GoWorkerRoutine(2, ch)

}

// started somewhere else
func GoCounterRoutine(ch chan int) {
	counter := 0
	for {
		ch &lt;- counter
		counter += 1
	}
}

func GoWorkerRoutine(n int, ch chan int) {
	var seq int
	for seq := range ch {
		// do work:
		fmt.Println(n, seq)
	}
}

This introduces a single point of failure: if the counter goroutine dies, everything is lost. This may not be a problem if all goroutine are executed on one computer, but may become a problem if they are scattered over the network. To make the counter immune to failures of single nodes in the cluster, special algorithms have to be used.

答案7

得分: 3

我使用了一个简单的map + 互斥锁来实现这个,这似乎是处理这个问题的最佳方式,因为它是“最简单的方式”(这是Go语言在选择锁还是通道时建议使用的方式)。

package main

import (
	"fmt"
	"sync"
)

type single struct {
	mu     sync.Mutex
	values map[string]int64
}

var counters = single{
	values: make(map[string]int64),
}

func (s *single) Get(key string) int64 {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.values[key]
}

func (s *single) Incr(key string) int64 {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.values[key]++
	return s.values[key]
}

func main() {
	fmt.Println(counters.Incr("bar"))
	fmt.Println(counters.Incr("bar"))
	fmt.Println(counters.Incr("bar"))
	fmt.Println(counters.Get("foo"))
	fmt.Println(counters.Get("bar"))

}

你可以在https://play.golang.org/p/9bDMDLFBAY上运行这段代码。我还制作了一个简单的打包版本,可以在gist.github.com上找到。

英文:

I implemented this with a simple map + mutex which seems to be the best way to handle this since it is the "simplest way" (which is what Go says to use to choose locks vs channels).

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
)

type single struct {
	mu     sync.Mutex
	values map[string]int64
}

var counters = single{
	values: make(map[string]int64),
}

func (s *single) Get(key string) int64 {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.values[key]
}

func (s *single) Incr(key string) int64 {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.values[key]++
	return s.values[key]
}

func main() {
	fmt.Println(counters.Incr(&quot;bar&quot;))
	fmt.Println(counters.Incr(&quot;bar&quot;))
	fmt.Println(counters.Incr(&quot;bar&quot;))
	fmt.Println(counters.Get(&quot;foo&quot;))
	fmt.Println(counters.Get(&quot;bar&quot;))

}

You can run the code on https://play.golang.org/p/9bDMDLFBAY.
I made a simple packaged version on gist.github.com

答案8

得分: 2

see by yourself and let me know what you think.

src/test/helpers/helpers.go

package helpers

type CounterIncrementStruct struct {
	bucket string
	value  int
}

type CounterQueryStruct struct {
	bucket  string
	channel chan int
}

var counter map[string]int
var counterIncrementChan chan CounterIncrementStruct
var counterQueryChan chan CounterQueryStruct
var counterListChan chan chan map[string]int

func CounterInitialize() {
	counter = make(map[string]int)
	counterIncrementChan = make(chan CounterIncrementStruct, 0)
	counterQueryChan = make(chan CounterQueryStruct, 100)
	counterListChan = make(chan chan map[string]int, 100)
	go goCounterWriter()
}

func goCounterWriter() {
	for {
		select {
		case ci := <-counterIncrementChan:
			if len(ci.bucket) == 0 {
				return
			}
			counter[ci.bucket] += ci.value
			break
		case cq := <-counterQueryChan:
			val, found := counter[cq.bucket]
			if found {
				cq.channel <- val
			} else {
				cq.channel <- -1
			}
			break
		case cl := <-counterListChan:
			nm := make(map[string]int)
			for k, v := range counter {
				nm[k] = v
			}
			cl <- nm
			break
		}
	}
}

func CounterIncrement(bucket string, counter int) {
	if len(bucket) == 0 || counter == 0 {
		return
	}
	counterIncrementChan <- CounterIncrementStruct{bucket, counter}
}

func CounterQuery(bucket string) int {
	if len(bucket) == 0 {
		return -1
	}
	reply := make(chan int)
	counterQueryChan <- CounterQueryStruct{bucket, reply}
	return <-reply
}

func CounterList() map[string]int {
	reply := make(chan map[string]int)
	counterListChan <- reply
	return <-reply
}

src/test/distributed/distributed.go

package distributed

type Counter struct {
	buckets map[string]int
	incrQ   chan incrQ
	readQ   chan readQ
	sumQ    chan chan int
}

func New() Counter {
	c := Counter{
		buckets: make(map[string]int, 100),
		incrQ:   make(chan incrQ, 1000),
		readQ:   make(chan readQ, 0),
		sumQ:    make(chan chan int, 0),
	}
	go c.run()
	return c
}

func (c Counter) run() {
	for {
		select {
		case a := <-c.readQ:
			a.res <- c.buckets[a.bucket]
		case a := <-c.sumQ:
			var sum int
			for _, cnt := range c.buckets {
				sum += cnt
			}
			a <- sum
		case a := <-c.incrQ:
			c.buckets[a.bucket] += a.count
		}
	}
}

func (c Counter) Get(bucket string) int {
	res := make(chan int)
	c.readQ <- readQ{bucket: bucket, res: res}
	return <-res
}

func (c Counter) Sum() int {
	res := make(chan int)
	c.sumQ <- res
	return <-res
}

type readQ struct {
	bucket string
	res    chan int
}

type incrQ struct {
	bucket string
	count  int
}

func (c Counter) Agent(bucket string, limit int) *Agent {
	a := &Agent{
		bucket:   bucket,
		limit:    limit,
		sendIncr: c.incrQ,
	}
	return a
}

type Agent struct {
	bucket   string
	limit    int
	count    int
	sendIncr chan incrQ
}

func (a *Agent) Incr(n int) {
	a.count += n
	if a.count > a.limit {
		select {
		case a.sendIncr <- incrQ{bucket: a.bucket, count: a.count}:
			a.count = 0
		default:
		}
	}
}

func (a *Agent) Done() {
	a.sendIncr <- incrQ{bucket: a.bucket, count: a.count}
	a.count = 0
}

src/test/helpers_test.go

package counters

import (
	"sync"
	"testing"
)

var mux sync.Mutex
var m map[string]int

func bmIncrement(bucket string, value int) {
	mux.Lock()
	m[bucket] += value
	mux.Unlock()
}

func BenchmarkMutex(b *testing.B) {
	b.StopTimer()
	m = make(map[string]int)
	buckets := []string{
		"abc123",
		"def456",
		"ghi789",
	}
	b.StartTimer()

	var wg sync.WaitGroup
	wg.Add(b.N)
	for i := 0; i < b.N; i++ {
		go func() {
			for _, b := range buckets {
				bmIncrement(b, 5)
			}
			for _, b := range buckets {
				bmIncrement(b, 5)
			}
			wg.Done()
		}()
	}
	wg.Wait()
}

src/test/distributed_test.go

package counters

import (
	"sync"
	"test/counters/distributed"
	"testing"
)

func BenchmarkDistributed(b *testing.B) {
	b.StopTimer()
	counter := distributed.New()
	agents := []*distributed.Agent{
		counter.Agent("abc123", 100),
		counter.Agent("def456", 100),
		counter.Agent("ghi789", 100),
	}
	b.StartTimer()

	var wg sync.WaitGroup
	wg.Add(b.N)
	for i := 0; i < b.N; i++ {
		go func() {
			for _, a := range agents {
				a.Incr(5)
			}
			for _, a := range agents {
				a.Incr(5)
			}
			wg.Done()
		}()
	}
	for _, a := range agents {
		a.Done()
	}
	wg.Wait()
}

results

$ go test --bench=. --count 10 -benchmem
goos: linux
goarch: amd64
pkg: test/counters
BenchmarkDistributed-4   	 3356620	       351 ns/op	      24 B/op	       0 allocs/op
BenchmarkDistributed-4   	 3414073	       368 ns/op	      11 B/op	       0 allocs/op
BenchmarkDistributed-4   	 3371878	       374 ns/op	       7 B/op	       0 allocs/op
BenchmarkDistributed-4   	 3240631	       387 ns/op	       3 B/op	       0 allocs/op
BenchmarkDistributed-4   	 3169230	       389 ns/op	       2 B/op	       0 allocs/op
BenchmarkDistributed-4   	 3177606	       386 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 3064552	       390 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 3065877	       409 ns/op	       2 B/op	       0 allocs/op
BenchmarkDistributed-4   	 2924686	       400 ns/op	       1 B/op	       0 allocs/op
BenchmarkDistributed-4   	 3049873	       389 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1106 ns/op	      17 B/op	       0 allocs/op
BenchmarkMutex-4         	  948331	      1246 ns/op	       9 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1244 ns/op	      12 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1246 ns/op	      11 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1228 ns/op	       1 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1235 ns/op	       2 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1244 ns/op	       1 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1214 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutex-4         	  956024	      1233 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1213 ns/op	       0 B/op	       0 allocs/op
PASS
ok  	test/counters	37.461s

If you change the limit value to 1000, the code gets much faster, instantly without worries

$ go test --bench=. --count 10 -benchmem
goos: linux
goarch: amd64
pkg: test/counters
BenchmarkDistributed-4   	 5463523	       221 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5455981	       220 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5591240	       213 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5277915	       212 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5430421	       213 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5374153	       226 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5656743	       219 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5337343	       211 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5353845	       217 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5416137	       217 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1002 ns/op	     135 B/op	       0 allocs/op
BenchmarkMutex-4         	 1253211	      1141 ns/op	      58 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1261 ns/op	       3 B/op	       0 allocs/op
BenchmarkMutex-4         	  987345	      1678 ns/op	      59 B/op	       0 allocs/op
BenchmarkMutex-4         	  925371	      1247 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1259 ns/op	       2 B/op	       0 allocs/op
BenchmarkMutex-4         	  978800	      1248 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutex-4         	  982144	      1213 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutex-4         	  975681	      1254 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutex-4         	  994789	      1205 ns/op	       0 B/op	       0 allocs/op
PASS
ok  	test/counters	34.314s

Changing Counter.incrQ length will also greatly affect performance, though it is more memory.

英文:

see by yourself and let me know what you think.

src/test/helpers/helpers.go

package helpers

type CounterIncrementStruct struct {
	bucket string
	value  int
}

type CounterQueryStruct struct {
	bucket  string
	channel chan int
}

var counter map[string]int
var counterIncrementChan chan CounterIncrementStruct
var counterQueryChan chan CounterQueryStruct
var counterListChan chan chan map[string]int

func CounterInitialize() {
	counter = make(map[string]int)
	counterIncrementChan = make(chan CounterIncrementStruct, 0)
	counterQueryChan = make(chan CounterQueryStruct, 100)
	counterListChan = make(chan chan map[string]int, 100)
	go goCounterWriter()
}

func goCounterWriter() {
	for {
		select {
		case ci := &lt;-counterIncrementChan:
			if len(ci.bucket) == 0 {
				return
			}
			counter[ci.bucket] += ci.value
			break
		case cq := &lt;-counterQueryChan:
			val, found := counter[cq.bucket]
			if found {
				cq.channel &lt;- val
			} else {
				cq.channel &lt;- -1
			}
			break
		case cl := &lt;-counterListChan:
			nm := make(map[string]int)
			for k, v := range counter {
				nm[k] = v
			}
			cl &lt;- nm
			break
		}
	}
}

func CounterIncrement(bucket string, counter int) {
	if len(bucket) == 0 || counter == 0 {
		return
	}
	counterIncrementChan &lt;- CounterIncrementStruct{bucket, counter}
}

func CounterQuery(bucket string) int {
	if len(bucket) == 0 {
		return -1
	}
	reply := make(chan int)
	counterQueryChan &lt;- CounterQueryStruct{bucket, reply}
	return &lt;-reply
}

func CounterList() map[string]int {
	reply := make(chan map[string]int)
	counterListChan &lt;- reply
	return &lt;-reply
}

src/test/distributed/distributed.go

package distributed

type Counter struct {
	buckets map[string]int
	incrQ   chan incrQ
	readQ   chan readQ
	sumQ    chan chan int
}

func New() Counter {
	c := Counter{
		buckets: make(map[string]int, 100),
		incrQ:   make(chan incrQ, 1000),
		readQ:   make(chan readQ, 0),
		sumQ:    make(chan chan int, 0),
	}
	go c.run()
	return c
}

func (c Counter) run() {
	for {
		select {
		case a := &lt;-c.readQ:
			a.res &lt;- c.buckets[a.bucket]
		case a := &lt;-c.sumQ:
			var sum int
			for _, cnt := range c.buckets {
				sum += cnt
			}
			a &lt;- sum
		case a := &lt;-c.incrQ:
			c.buckets[a.bucket] += a.count
		}
	}
}

func (c Counter) Get(bucket string) int {
	res := make(chan int)
	c.readQ &lt;- readQ{bucket: bucket, res: res}
	return &lt;-res
}

func (c Counter) Sum() int {
	res := make(chan int)
	c.sumQ &lt;- res
	return &lt;-res
}

type readQ struct {
	bucket string
	res    chan int
}

type incrQ struct {
	bucket string
	count  int
}

func (c Counter) Agent(bucket string, limit int) *Agent {
	a := &amp;Agent{
		bucket:   bucket,
		limit:    limit,
		sendIncr: c.incrQ,
	}
	return a
}

type Agent struct {
	bucket   string
	limit    int
	count    int
	sendIncr chan incrQ
}

func (a *Agent) Incr(n int) {
	a.count += n
	if a.count &gt; a.limit {
		select {
		case a.sendIncr &lt;- incrQ{bucket: a.bucket, count: a.count}:
			a.count = 0
		default:
		}
	}
}

func (a *Agent) Done() {
	a.sendIncr &lt;- incrQ{bucket: a.bucket, count: a.count}
	a.count = 0
}

src/test/helpers_test.go

package counters

import (
	&quot;sync&quot;
	&quot;testing&quot;
)

var mux sync.Mutex
var m map[string]int

func bmIncrement(bucket string, value int) {
	mux.Lock()
	m[bucket] += value
	mux.Unlock()
}

func BenchmarkMutex(b *testing.B) {
	b.StopTimer()
	m = make(map[string]int)
	buckets := []string{
		&quot;abc123&quot;,
		&quot;def456&quot;,
		&quot;ghi789&quot;,
	}
	b.StartTimer()

	var wg sync.WaitGroup
	wg.Add(b.N)
	for i := 0; i &lt; b.N; i++ {
		go func() {
			for _, b := range buckets {
				bmIncrement(b, 5)
			}
			for _, b := range buckets {
				bmIncrement(b, 5)
			}
			wg.Done()
		}()
	}
	wg.Wait()
}

src/test/distributed_test.go

package counters

import (
	&quot;sync&quot;
	&quot;test/counters/distributed&quot;
	&quot;testing&quot;
)

func BenchmarkDistributed(b *testing.B) {
	b.StopTimer()
	counter := distributed.New()
	agents := []*distributed.Agent{
		counter.Agent(&quot;abc123&quot;, 100),
		counter.Agent(&quot;def456&quot;, 100),
		counter.Agent(&quot;ghi789&quot;, 100),
	}
	b.StartTimer()

	var wg sync.WaitGroup
	wg.Add(b.N)
	for i := 0; i &lt; b.N; i++ {
		go func() {
			for _, a := range agents {
				a.Incr(5)
			}
			for _, a := range agents {
				a.Incr(5)
			}
			wg.Done()
		}()
	}
	for _, a := range agents {
		a.Done()
	}
	wg.Wait()
}

results

$ go test --bench=. --count 10 -benchmem
goos: linux
goarch: amd64
pkg: test/counters
BenchmarkDistributed-4   	 3356620	       351 ns/op	      24 B/op	       0 allocs/op
BenchmarkDistributed-4   	 3414073	       368 ns/op	      11 B/op	       0 allocs/op
BenchmarkDistributed-4   	 3371878	       374 ns/op	       7 B/op	       0 allocs/op
BenchmarkDistributed-4   	 3240631	       387 ns/op	       3 B/op	       0 allocs/op
BenchmarkDistributed-4   	 3169230	       389 ns/op	       2 B/op	       0 allocs/op
BenchmarkDistributed-4   	 3177606	       386 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 3064552	       390 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 3065877	       409 ns/op	       2 B/op	       0 allocs/op
BenchmarkDistributed-4   	 2924686	       400 ns/op	       1 B/op	       0 allocs/op
BenchmarkDistributed-4   	 3049873	       389 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1106 ns/op	      17 B/op	       0 allocs/op
BenchmarkMutex-4         	  948331	      1246 ns/op	       9 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1244 ns/op	      12 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1246 ns/op	      11 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1228 ns/op	       1 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1235 ns/op	       2 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1244 ns/op	       1 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1214 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutex-4         	  956024	      1233 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1213 ns/op	       0 B/op	       0 allocs/op
PASS
ok  	test/counters	37.461s

If you change the limit value to 1000, the code gets much faster, instantly without worries

$ go test --bench=. --count 10 -benchmem
goos: linux
goarch: amd64
pkg: test/counters
BenchmarkDistributed-4   	 5463523	       221 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5455981	       220 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5591240	       213 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5277915	       212 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5430421	       213 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5374153	       226 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5656743	       219 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5337343	       211 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5353845	       217 ns/op	       0 B/op	       0 allocs/op
BenchmarkDistributed-4   	 5416137	       217 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1002 ns/op	     135 B/op	       0 allocs/op
BenchmarkMutex-4         	 1253211	      1141 ns/op	      58 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1261 ns/op	       3 B/op	       0 allocs/op
BenchmarkMutex-4         	  987345	      1678 ns/op	      59 B/op	       0 allocs/op
BenchmarkMutex-4         	  925371	      1247 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutex-4         	 1000000	      1259 ns/op	       2 B/op	       0 allocs/op
BenchmarkMutex-4         	  978800	      1248 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutex-4         	  982144	      1213 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutex-4         	  975681	      1254 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutex-4         	  994789	      1205 ns/op	       0 B/op	       0 allocs/op
PASS
ok  	test/counters	34.314s

Changing Counter.incrQ length will also greatly affect performance, though it is more memory.

答案9

得分: 0

如果您的工作计数器类型不是动态的,即您可以提前将它们全部写出来,我认为您不会找到比这更简单或更快的方法。

没有互斥锁,没有通道,没有映射。只有一个静态大小的数组和一个枚举。

type WorkType int

const (
	WorkType1 WorkType = iota
	WorkType2
	WorkType3
	WorkType4
	NumWorkTypes
)

var workCounter [NumWorkTypes]int64

func updateWorkCount(workType WorkType, delta int) {
	atomic.AddInt64(&workCounter[workType], int64(delta))
}

使用方法如下:

updateWorkCount(WorkType1, 1)

如果您需要有时将工作类型作为字符串进行显示,您可以使用类似 stringer 的工具生成代码。

英文:

If your work counter types are not dynamic, i.e. you can write them all out upfront, I don't think you'll get much simpler or faster than this.

No mutex, no channel, no map. Just a statically sized array and an enum.

type WorkType int

const (
	WorkType1 WorkType = iota
	WorkType2
	WorkType3
	WorkType4
	NumWorkTypes
)

var workCounter [NumWorkTypes]int64

func updateWorkCount(workType WorkType, delta int) {
	atomic.AddInt64(&amp;workCounter[workType], int64(delta))
}

Usage like so:

updateWorkCount(WorkType1, 1)

If you need to sometimes work with work types as strings for display purposes, you can always generate code with a tool like stringer

huangapple
  • 本文由 发表于 2013年5月28日 11:07:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/16783273.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定