这是我的代码:
package main import ( "sync/atomic" "unsafe" "sync" "fmt" "time" ) const ( MAX_DATA_SIZE = 100 ) // lock free queue type Queue struct { head unsafe.Pointer tail unsafe.Pointer } // one node in queue type Node struct { val interface{} next unsafe.Pointer } // queue functions func (self *Queue) enQueue(val interface{}) { newValue := unsafe.Pointer(&Node{val: val, next: nil}) var tail,next unsafe.Pointer for { tail = self.tail next = ((*Node)(tail)).next if next != nil { atomic.CompareAndSwapPointer(&(self.tail), tail, next) }else if atomic.CompareAndSwapPointer(&((*Node)(tail).next), nil, newValue){ break } } } func (self *Queue) deQueue() (val interface{}, success bool){ var head,tail,next unsafe.Pointer for { head = self.head tail = self.tail next = ((*Node)(head)).next if head == tail { if next == nil { return nil, false }else { atomic.CompareAndSwapPointer(&(self.tail), tail, next) } }else { val = ((*Node)(next)).val if atomic.CompareAndSwapPointer(&(self.head), head, next) { return val, true } } } return } func main() { var wg sync.WaitGroup wg.Add(20) queue := new(Queue) queue.head = unsafe.Pointer(new(Node)) queue.tail = queue.head for i := 0; i < 10; i++ { go func() { defer wg.Done() for j := 0; j < MAX_DATA_SIZE; j++ { t := time.Now() queue.enQueue(t) fmt.Println("enq = ", t) } }() } for i := 0; i < 10; i++ { go func() { ok := false var val interface{} defer wg.Done() for j := 0; j < MAX_DATA_SIZE; j++ { val,ok = queue.deQueue() for !ok { val,ok = queue.deQueue() } fmt.Println("deq = ",val) } }() } wg.Wait() }
问题是,有时代码可以正常运行,但有时它会失败并且只会卡住而没有任何响应。
我的代码有问题吗?
这是上面改写的通道,建议使用@mkb(排除无限队列大小)。
它不会锁定。
我建议您使用渠道,除非您有充分的理由不这样做,因为Go团队已花费大量精力使其变得可靠,高性能且易于使用。
package main import ( "fmt" "runtime" "sync" "time" ) const ( MAX_DATA_SIZE = 100 ) func main() { runtime.GOMAXPROCS(4) var wg sync.WaitGroup wg.Add(20) queue := make(chan time.Time, 10) for i := 0; i < 10; i++ { go func() { defer wg.Done() for j := 0; j < MAX_DATA_SIZE; j++ { t := time.Now() queue <- t fmt.Println("enq = ", t) } }() } for i := 0; i < 10; i++ { go func() { defer wg.Done() for j := 0; j < MAX_DATA_SIZE; j++ { val := <-queue fmt.Println("deq = ", val) } }() } wg.Wait() }