这是我的整个源代码:
package main import ( "sync/atomic" "unsafe" "sync" "fmt" "time" "runtime" ) const ( MAX_DATA_SIZE = 100 ) // lock free queue type Queue struct { head unsafe.Pointer tail unsafe.Pointer } // one node in queue type Node struct { val interface{} next unsafe.Pointer } // constructor func New() (q *Queue) { queue := new(Queue) queue.head = unsafe.Pointer(new(Node)) queue.tail = queue.head return queue } // queue functions func (self *Queue) enQueue(val interface{}) { newValue := unsafe.Pointer(&Node{val: val, next: nil}) var tail,next unsafe.Pointer for { tail = self.tail next = ((*Node)(tail)).next if atomic.CompareAndSwapPointer(&next, nil, newValue) { atomic.CompareAndSwapPointer(&self.tail, tail, newValue) break }else{ for next != nil { tail = next } } } } func (self *Queue) deQueue() (val interface{}, success bool){ var head,next unsafe.Pointer for { head = self.head next = ((*Node)(head)).next if next == nil { return nil, false }else { if atomic.CompareAndSwapPointer(&(self.head), head, next) { val = ((*Node)(next)).val return val, true } } } return nil, false } func main() { //runtime.GOMAXPROCS(runtime.NumCPU()) fmt.Println(runtime.GOMAXPROCS(-1)) var wg sync.WaitGroup wg.Add(20) queue := New() for i := 0; i < 10; i++ { go func() { defer wg.Done() for j := 0; j < MAX_DATA_SIZE; j++ { t := time.Now() fmt.Println("enqueue111") fmt.Println("enq = ", t) fmt.Println("enqueue222") queue.enQueue(t) } }() } for i := 0; i < 10; i++ { go func() { ok := false var val interface{} defer wg.Done() for j := 0; j < MAX_DATA_SIZE; j++ { val,ok = queue.deQueue() for !ok { val,ok = queue.deQueue() } fmt.Println("deq = ",val) } }() } wg.Wait() }
代码卡在了 fmt.Println(“ enq =”,t)上, 但是我不知道为什么,这太奇怪了。
deQueue在失败情况下无限循环,这阻塞了CPU。Goroutine在执行CPU工作时不会屈服。GOMAXPROCS必须大于等于2才能获得CPU并行性。
只是为了踢,这是使用高阶通道的线程安全,无阻塞队列实现:https : //gist.github.com/3668150