我希望我的go常规工作者(ProcessToDo()在下面的代码中)在关闭所有“排队”工作之前等待。
ProcessToDo()
工作例程具有一个“待办事项”通道(已缓冲),通过该通道将工作发送给它。它有一个“完成”通道来告诉它开始关闭。该文档说,如果满足多个选择之一,则通道上的选择将选择一个“伪随机值”……这意味着在所有缓冲工作完成之前将触发关闭(返回)。
在下面的代码示例中,我希望所有20条消息都可以打印…
package main import ( "time" "fmt" ) func ProcessToDo(done chan struct{}, todo chan string) { for { select { case work, ok := <-todo: if !ok { fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n") return } fmt.Printf("todo: %q\n", work) time.Sleep(100 * time.Millisecond) case _, ok := <-done: if ok { fmt.Printf("Shutting down ProcessToDo - done message received!\n") } else { fmt.Printf("Shutting down ProcessToDo - done channel closed!\n") } close(todo) return } } } func main() { done := make(chan struct{}) todo := make(chan string, 100) go ProcessToDo(done, todo) for i := 0; i < 20; i++ { todo <- fmt.Sprintf("Message %02d", i) } fmt.Println("*** all messages queued ***") time.Sleep(1 * time.Second) close(done) time.Sleep(4 * time.Second) }
done您完全不需要使用通道,因为您可以通过关闭todo通道本身来发出关闭信号。
done
todo
并for range在通道上使用,它将迭代直到通道关闭且其缓冲区为空。
for range
您应该有一个done通道,但只有这样,goroutine本身才能发出信号,表明它已完成工作,因此主goroutine可以继续或退出。
此变体与您的变体等效,简单得多,并且不需要time.Sleep()调用来等待其他goroutine(反正太错误和不确定)。在GoPlayground上尝试一下:
time.Sleep()
func ProcessToDo(done chan struct{}, todo chan string) { for work := range todo { fmt.Printf("todo: %q\n", work) time.Sleep(100 * time.Millisecond) } fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n") done <- struct{}{} // Signal that we processed all jobs } func main() { done := make(chan struct{}) todo := make(chan string, 100) go ProcessToDo(done, todo) for i := 0; i < 20; i++ { todo <- fmt.Sprintf("Message %02d", i) } fmt.Println("*** all messages queued ***") close(todo) <-done // Wait until the other goroutine finishes all jobs }
还要注意,工作程序goroutine应该使用来表示信号已完成,defer因此,如果主工作程序以某种意外的方式或发生紧急情况返回,则不会阻塞主工作程序。因此,它应该这样开始:
defer
defer func() { done <- struct{}{} // Signal that we processed all jobs }()
您还可以sync.WaitGroup用于将主goroutine同步到worker(以等待它)。实际上,如果您打算使用多个工作程序例程,那比从done通道读取多个值更干净。同样,WaitGroup由于它带有一个Done()方法(这是一个函数调用),因此用来表示完成也更简单,因此您不需要匿名函数:
sync.WaitGroup
WaitGroup
Done()
defer wg.Done()
有关完整的示例,请参见JimB的答案WaitGroup。
使用forrange信道同步,因此你不需要任何额外的代码,将同步访问:如果你想使用多工作够程也是地道的todo通道或在收到该职位。并且,如果您关闭中的todo频道main(),则会正确地向所有工作程序发出信号。但是,当然,所有排队的作业将只被接收和处理一次。
forrange
main()
现在,使用WaitGroup用于使主goroutine等待工作程序使用的变体(JimB的答案):如果您希望有1个以上的工作程序goroutine,该怎么办?同时(最有可能并行)处理您的工作?
您需要在代码中添加/更改的唯一一件事是:真正启动多个代码:
for i := 0; i < 10; i++ { wg.Add(1) go ProcessToDo(todo) }
现在,您无需更改任何其他内容,便有了一个正确的并发应用程序,该应用程序使用10个并发goroutine接收并处理您的作业。而且我们没有使用任何“丑陋的”time.Sleep()(我们只使用了“丑陋的”,而只是模拟了缓慢的处理,而不是等待其他goroutine),并且您不需要任何额外的同步。