不知道频道 长度时,我无法关闭频道
package main import ( "fmt" "time" ) func gen(ch chan int) { var i int for { time.Sleep(time.Millisecond * 10) ch <- i i++ // when no more data (e.g. from db, or event stream) if i > 100 { break } } // hot to close it properly? close(ch) } func receiver(ch chan int) { for i := range ch { fmt.Println("received:", i) } } func main() { ch := make(chan int) for i := 0; i < 10; i++ { go gen(ch) } receiver(ch) }
它给我错误
panic: send on closed channel goroutine 8 [running]: main.gen(0xc82001a0c0) /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:12 +0x57 created by main.main /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:35 +0xbd goroutine 1 [panicwait]: runtime.gopark(0x0, 0x0, 0x50b8e0, 0x9, 0x10, 0x1) /usr/lib/go/src/runtime/proc.go:185 +0x163 runtime.main() /usr/lib/go/src/runtime/proc.go:121 +0x2f4 runtime.goexit() /usr/lib/go/src/runtime/asm_amd64.s:1696 +0x1 goroutine 6 [sleep]: time.Sleep(0x989680) /usr/lib/go/src/runtime/time.go:59 +0xf9 main.gen(0xc82001a0c0) /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:11 +0x29 created by main.main /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:33 +0x79 goroutine 7 [sleep]: time.Sleep(0x989680) /usr/lib/go/src/runtime/time.go:59 +0xf9 main.gen(0xc82001a0c0) /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:11 +0x29 created by main.main /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:34 +0x9b exit status 2
这是合乎逻辑的-当第二个goroutine关闭通道试图发送给它时,它关闭了它。在这种情况下关闭渠道的最佳方法是什么?
通道关闭后,您将无法在该通道上发送更多值,否则会出现混乱。这就是您的经验。
这是因为您启动了使用同一个通道的多个goroutine,并且它们在该通道上发送值。然后关闭每个通道。而且由于它们未同步,因此一旦第一个goroutine到达将其关闭的点,其他人可能(并且他们将)仍然继续在其上发送值:恐慌!
您只能关闭该通道一次(尝试关闭一个已经关闭的通道也会引起混乱)。当所有发送值的goroutine完成后,您应该这样做。为此,您需要检测何时完成所有发送方goroutine。一种检测到此情况的惯用方法是使用sync.WaitGroup。
sync.WaitGroup
对于每个启动的发送方goroutine,我们在WaitGroupusing中添加1 WaitGroup.Add()。每个完成发送值的goroutine都可以通过调用发出信号WaitGroup.Done()。最好以延迟语句的方式执行此操作,因此,如果您的goroutine会突然终止(例如,紧急情况),WaitGroup.Done()仍会被调用,并且不会使其他goroutine挂起(等待赦免- WaitGroup.Done()永远不会出现的“缺失” 调用)。 )。
WaitGroup
WaitGroup.Add()
WaitGroup.Done()
并且WaitGroup.Wait()将等待,直到所有发送方goroutine都完成,并且只有在此之后,才关闭通道一次。我们希望检测到此“全局”完成事件并在处理正在进行的发送值时关闭通道,因此我们必须在自己的goroutine中执行此操作。
WaitGroup.Wait()
因为我们for ... range在通道上使用了构造,所以接收器goroutine将一直运行到通道关闭为止。并且由于它在主goroutine中运行,因此直到从通道正确接收并处理了所有值后,程序才会退出。所述for ... range构建体循环,直到接收到所有的值的信道被关闭之前已被发送。
for ... range
请注意,下面的解决方案也适用于缓冲通道和非缓冲通道,而无需进行修改(请尝试使用带有的缓冲通道ch := make(chan int, 100))。
ch := make(chan int, 100)
正确的解决方案(在Go Playground上尝试):
func gen(ch chan int, wg *sync.WaitGroup) { defer wg.Done() var i int for { time.Sleep(time.Millisecond * 10) ch <- i i++ // when no more data (e.g. from db, or event stream) if i > 100 { break } } } func receiver(ch chan int) { for i := range ch { fmt.Println("received:", i) } } func main() { ch := make(chan int) wg := &sync.WaitGroup{} for i := 0; i < 10; i++ { wg.Add(1) go gen(ch, wg) } go func() { wg.Wait() close(ch) }() receiver(ch) }
注意:
请注意,receiver(ch)在主goroutine 中运行非常重要,而代码WaitGroup在其自身的(非主)goroutine中等待和关闭通道的代码;而不是相反。如果您将它们切换为2,则可能会导致“提前退出”,即并非所有值都可以从通道中接收和处理。这样做的原因是因为Go程序在主goroutine完成时退出(请参阅:程序执行)。它不等待其他(非主)goroutine完成。因此,如果等待和关闭通道位于主goroutine中,则在关闭通道之后,程序可以随时退出,而无需等待其他goroutine,在这种情况下,该goroutine将循环从该通道接收值。
receiver(ch)