为高度并发的应用程序实现全局计数器的最佳方法是什么?就我而言,我可能有10K-20K go例程执行“工作”,并且我想计算这些例程共同处理的项目的数量和类型…
“经典”同步编码样式如下所示:
var work_counter int func GoWorkerRoutine() { for { // do work atomic.AddInt32(&work_counter,1) } }
现在,这变得更加复杂了,因为我想跟踪工作的“类型”,所以实际上我需要这样的东西:
var work_counter map[string]int var work_mux sync.Mutex func GoWorkerRoutine() { for { // do work work_mux.Lock() work_counter["type1"]++ work_mux.Unlock() } }
似乎应该使用渠道或类似方式来“优化”优化方式:
var work_counter int var work_chan chan int // make() called somewhere else (buffered) // started somewher else func GoCounterRoutine() { for { select { case c := <- work_chan: work_counter += c break } } } func GoWorkerRoutine() { for { // do work work_chan <- 1 } }
最后一个示例仍然缺少地图,但是添加起来很容易。这种样式会提供比简单的原子增量更好的性能吗?当我们谈论并发访问全局值而不是可能阻止I / O完成的事情时,我不能说这是多少复杂?
思想受到赞赏。
2013年5月28日更新:
我测试了几个实现,结果不是我期望的,这是我的反源码:
package helpers import ( ) type CounterIncrementStruct struct { bucket string value int } type CounterQueryStruct struct { bucket string channel chan int } var counter map[string]int var counterIncrementChan chan CounterIncrementStruct var counterQueryChan chan CounterQueryStruct var counterListChan chan chan map[string]int func CounterInitialize() { counter = make(map[string]int) counterIncrementChan = make(chan CounterIncrementStruct,0) counterQueryChan = make(chan CounterQueryStruct,100) counterListChan = make(chan chan map[string]int,100) go goCounterWriter() } func goCounterWriter() { for { select { case ci := <- counterIncrementChan: if len(ci.bucket)==0 { return } counter[ci.bucket]+=ci.value break case cq := <- counterQueryChan: val,found:=counter[cq.bucket] if found { cq.channel <- val } else { cq.channel <- -1 } break case cl := <- counterListChan: nm := make(map[string]int) for k, v := range counter { nm[k] = v } cl <- nm break } } } func CounterIncrement(bucket string, counter int) { if len(bucket)==0 || counter==0 { return } counterIncrementChan <- CounterIncrementStruct{bucket,counter} } func CounterQuery(bucket string) int { if len(bucket)==0 { return -1 } reply := make(chan int) counterQueryChan <- CounterQueryStruct{bucket,reply} return <- reply } func CounterList() map[string]int { reply := make(chan map[string]int) counterListChan <- reply return <- reply }
它使用通道进行写入和读取,这似乎是合乎逻辑的。
这是我的测试用例:
func bcRoutine(b *testing.B,e chan bool) { for i := 0; i < b.N; i++ { CounterIncrement("abc123",5) CounterIncrement("def456",5) CounterIncrement("ghi789",5) CounterIncrement("abc123",5) CounterIncrement("def456",5) CounterIncrement("ghi789",5) } e<-true } func BenchmarkChannels(b *testing.B) { b.StopTimer() CounterInitialize() e:=make(chan bool) b.StartTimer() go bcRoutine(b,e) go bcRoutine(b,e) go bcRoutine(b,e) go bcRoutine(b,e) go bcRoutine(b,e) <-e <-e <-e <-e <-e } var mux sync.Mutex var m map[string]int func bmIncrement(bucket string, value int) { mux.Lock() m[bucket]+=value mux.Unlock() } func bmRoutine(b *testing.B,e chan bool) { for i := 0; i < b.N; i++ { bmIncrement("abc123",5) bmIncrement("def456",5) bmIncrement("ghi789",5) bmIncrement("abc123",5) bmIncrement("def456",5) bmIncrement("ghi789",5) } e<-true } func BenchmarkMutex(b *testing.B) { b.StopTimer() m=make(map[string]int) e:=make(chan bool) b.StartTimer() for i := 0; i < b.N; i++ { bmIncrement("abc123",5) bmIncrement("def456",5) bmIncrement("ghi789",5) bmIncrement("abc123",5) bmIncrement("def456",5) bmIncrement("ghi789",5) } go bmRoutine(b,e) go bmRoutine(b,e) go bmRoutine(b,e) go bmRoutine(b,e) go bmRoutine(b,e) <-e <-e <-e <-e <-e }
我实现了一个简单的基准测试,在地图周围只有一个互斥锁(只是测试写操作),并使用5个并行运行的goroutine进行了基准测试。结果如下:
$ go test --bench=. helpers PASS BenchmarkChannels 100000 15560 ns/op BenchmarkMutex 1000000 2669 ns/op ok helpers 4.452s
我不会期望互斥体会这么快…
进一步的想法?
不要使用同步/原子 -从链接页面
原子包提供了用于实现同步算法的低级原子内存原语。这些功能需要格外小心才能正确使用。除特殊的低层应用程序外,最好通过通道或sync软件包的功能来完成同步
上次我必须执行此操作时,我使用互斥对象对类似于您的第二个示例的东西进行了基准测试,对类似于通道的第三个示例进行了基准测试。当事情真的很忙时,通道代码会赢,但是请确保将通道缓冲区设置得很大。