注意-Go中的新手。
我编写了一个多路复用器,该多路复用器 应将 一组通道的输出合并为一个。对建设性的批评感到满意。
func Mux(channels []chan big.Int) chan big.Int { // Count down as each channel closes. When hits zero - close ch. n := len(channels) // The channel to output to. ch := make(chan big.Int, n) // Make one go per channel. for _, c := range channels { go func() { // Pump it. for x := range c { ch <- x } // It closed. n -= 1 // Close output if all closed now. if n == 0 { close(ch) } }() } return ch }
我正在测试:
func fromTo(f, t int) chan big.Int { ch := make(chan big.Int) go func() { for i := f; i < t; i++ { fmt.Println("Feed:", i) ch <- *big.NewInt(int64(i)) } close(ch) }() return ch } func testMux() { r := make([]chan big.Int, 10) for i := 0; i < 10; i++ { r[i] = fromTo(i*10, i*10+10) } all := Mux(r) // Roll them out. for l := range all { fmt.Println(l) } }
但是我的输出很奇怪:
Feed: 0 Feed: 10 Feed: 20 Feed: 30 Feed: 40 Feed: 50 Feed: 60 Feed: 70 Feed: 80 Feed: 90 Feed: 91 Feed: 92 Feed: 93 Feed: 94 Feed: 95 Feed: 96 Feed: 97 Feed: 98 Feed: 99 {false [90]} {false [91]} {false [92]} {false [93]} {false [94]} {false [95]} {false [96]} {false [97]} {false [98]} {false [99]}
所以对我的问题:
我需要所有输入通道具有与输出通道相同的权限-即,我不能从一个通道获得所有输出,而从下一个通道获得所有输出,等等。
对于任何有兴趣的人-这是修复之后的最终代码,并且正确地(大概)使用了 sync.WaitGroup
sync.WaitGroup
import ( "math/big" "sync" ) /* Multiplex a number of channels into one. */ func Mux(channels []chan big.Int) chan big.Int { // Count down as each channel closes. When hits zero - close ch. var wg sync.WaitGroup wg.Add(len(channels)) // The channel to output to. ch := make(chan big.Int, len(channels)) // Make one go per channel. for _, c := range channels { go func(c <-chan big.Int) { // Pump it. for x := range c { ch <- x } // It closed. wg.Done() }(c) } // Close the channel when the pumping is finished. go func() { // Wait for everyone to be done. wg.Wait() // Close. close(ch) }() return ch }
从中产生的每个goroutine Mux最终都从同一通道中拉出,因为c在循环的每次迭代中都会进行更新- 它们不仅仅捕获的值c。如果将通道传递给goroutine,您将获得预期的结果,如下所示:
Mux
c
for _, c := range channels { go func(c <-chan big.Int) { ... }(c) }
您可以在此处测试此修改。
另一个可能的问题是您对n变量的处理:如果使用进行运行GOMAXPROCS != 1,则可能有两个goroutines试图一次对其进行更新。该sync.WaitGroup类型将是等待goroutine完成的更安全的方法。
n
GOMAXPROCS != 1