因此,我已经看到了许多在Go中实现一个消费者和许多生产者的方法-Go 并发中的经典fanIn函数。
我想要的是fanOut功能。它以一个通道作为参数,它从中读取一个值,并返回一个通道片,该通道将这个值的副本写入其中。
有没有正确/推荐的方法来实现这一目标?
您几乎描述了执行此操作的最佳方法,但这是执行此操作的一小段代码示例。
去游乐场:https : //play.golang.org/p/jwdtDXVHJk
package main import ( "fmt" "time" ) func producer(iters int) <-chan int { c := make(chan int) go func() { for i := 0; i < iters; i++ { c <- i time.Sleep(1 * time.Second) } close(c) }() return c } func consumer(cin <-chan int) { for i := range cin { fmt.Println(i) } } func fanOut(ch <-chan int, size, lag int) []chan int { cs := make([]chan int, size) for i, _ := range cs { // The size of the channels buffer controls how far behind the recievers // of the fanOut channels can lag the other channels. cs[i] = make(chan int, lag) } go func() { for i := range ch { for _, c := range cs { c <- i } } for _, c := range cs { // close all our fanOut channels when the input channel is exhausted. close(c) } }() return cs } func fanOutUnbuffered(ch <-chan int, size int) []chan int { cs := make([]chan int, size) for i, _ := range cs { // The size of the channels buffer controls how far behind the recievers // of the fanOut channels can lag the other channels. cs[i] = make(chan int) } go func() { for i := range ch { for _, c := range cs { c <- i } } for _, c := range cs { // close all our fanOut channels when the input channel is exhausted. close(c) } }() return cs } func main() { c := producer(10) chans := fanOutUnbuffered(c, 3) go consumer(chans[0]) go consumer(chans[1]) consumer(chans[2]) }
需要注意的重要部分是一旦输入通道用完后如何关闭输出通道。同样,如果输出通道之一阻塞了发送,它将阻止其他输出通道的发送。我们通过设置通道的缓冲区大小来控制延迟量。