我正在寻找一种解决方案,可以多路复用一些通道输出。
我有一个数据源,它是从io.Reader我发送到单个通道的读取的。另一方面,我有一个从通道读取的websocket请求处理程序。现在,发生了两个客户端创建一个websocket连接的情况,它们均从同一通道读取,但每个客户端仅获得部分消息。
io.Reader
代码示例(简体):
func (b *Bootloader) ReadLog() (<-chan []byte, error) { if b.logCh != nil { logrus.Warn("ReadLog called while channel already exists!") return b.logCh, nil // This is where we get problems } b.logCh = make(chan []byte, 0) go func() { buf := make([]byte, 1024) for { n, err := b.p.Read(buf) if err == nil { msg := make([]byte, n) copy(msg, buf[:n]) b.logCh <- msg } else { break } } close(b.logCh) b.logCh = nil }() return b.logCh, nil }
现在,当ReadLog()两次被调用时,第二个调用仅返回在第一个调用中创建的通道,这将导致上述问题。
ReadLog()
问题是: 如何进行适当的复用?
在发送或接收站点上关心多路复用是否更好/更容易/更意识形态?
我应该对接收者隐藏频道并使用回调吗?
我现在有点卡住了。任何提示都欢迎。
多路复用非常简单:创建要多路复用到的通道切片,启动一个goroutine,该例程从原始通道读取并将每个消息复制到该切片中的每个通道:
// Really this should be in Bootloader but this is just an example var consumers []chan []byte func (b *Bootloader) multiplex() { // We'll use a sync.once to make sure we don't start a bunch of these. sync.Once(func(){ go func() { // Every time a message comes over the channel... for v := range b.logCh { // Loop over the consumers... for _,cons := range consumers { // Send each one the message cons <- v } } }() }) }