我需要使用单个任务队列和单个结果队列来启动许多工作程序。每个工人都应该以不同的goroutine开始。我需要等到所有工作人员都将完成并且任务队列将为空后再退出程序。我已经准备了goroutine同步的小例子。主要思想是我们将排队的任务计数,并等待所有工人完成工作。但是当前的实现有时会遗漏值。为什么会发生这种情况以及如何解决问题?示例代码:
import ( "fmt" "os" "os/signal" "strconv" ) const num_workers = 5 type workerChannel chan uint64 // Make channel for tasks var workCh workerChannel // Make channel for task counter var cntChannel chan int // Task counter var tskCnt int64 // Worker function func InitWorker(input workerChannel, result chan string, num int) { for { select { case inp := <-input: getTask() result <- ("Worker " + strconv.Itoa(num) + ":" + strconv.FormatUint(inp, 10)) } } } // Function to manage task counter // should be in uniq goroutine func taskCounter(inp chan int) { for { val := <-inp tskCnt += int64(val) } } // Put pask to the queue func putTask(val uint64) { func() { fmt.Println("Put ", val) cntChannel <- int(1) workCh <- val }() } // Get task from queue func getTask() { func() { cntChannel <- int(-1) }() } func main() { // Init service channels abort := make(chan os.Signal) done := make(chan bool) // init queue for results result := make(chan string) // init task queue workCh = make(workerChannel) // start some workers for i := uint(0); i < num_workers; i++ { go InitWorker(workCh, result, int(i)) } // init counter for synchro cntChannel = make(chan int) go taskCounter(cntChannel) // goroutine that put some tasks into queue go func() { for i := uint(0); i < 21; i++ { putTask(uint64(i)) } // wait for processing all tasks and close application for len(cntChannel) != 0 {} for tskCnt != 0 {} for len(workCh) != 0 {} for len(result) != 0 {} // send signal for close done <- true }() signal.Notify(abort, os.Interrupt) for { select { case <-abort: fmt.Println("Aborted.") os.Exit(0) // print results case res := <-result: fmt.Println(res) case <-done: fmt.Println("Done") os.Exit(0) } } }
使用sync.WaitGroup等待goroutine完成。关闭通道以使通道上的循环读取退出。
package main import ( "fmt" "sync" ) type workerChannel chan uint64 const num_workers = 5 func main() { results := make(chan string) workCh := make(workerChannel) // Start workers var wg sync.WaitGroup wg.Add(num_workers) for i := 0; i < num_workers; i++ { go func(num int) { defer wg.Done() // Loop processing work until workCh is closed for w := range workCh { results <- fmt.Sprintf("worker %d, task %d", num, w) } }(i) } // Close result channel when workers are done go func() { wg.Wait() close(results) }() // Send work to be done go func() { for i := 0; i < 21; i++ { workCh <- uint64(i) } // Closing the channel causes workers to break out of loop close(workCh) }() // Process results. Loop exits when result channel is closed. for r := range results { fmt.Println(r) } }
https://play.golang.org/p/ZifpzsP6fNv