我正在尝试在Go中构建一个Web搜寻器,我想在其中指定并发工作器的最大数量。只要队列中有要探索的链接,他们都将工作。当队列中的元素少于工作者时,工作者应大喊大叫,但如果发现更多链接,请恢复工作。
我试过的代码是
const max_workers = 6 // simulating links with int func crawl(wg *sync.WaitGroup, queue chan int) { for element := range queue { wg.Done() // why is defer here causing a deadlock? fmt.Println("adding 2 new elements ") if element%2 == 0 { wg.Add(2) queue <- (element*100 + 11) queue <- (element*100 + 33) } } } func main() { var wg sync.WaitGroup queue := make(chan int, 10) queue <- 0 queue <- 1 queue <- 2 queue <- 3 var min int if (len(queue) < max_workers) { min = len(queue) } else { min = max_workers } for i := 0; i < min; i++ { wg.Add(1) go crawl(&wg, queue) } wg.Wait() close(queue) }
链接到游乐场
这似乎可行,但是有一个陷阱:开始时,我必须用多个元素填充队列。我希望它从(单个)种子页面(在我的示例中queue <- 0)开始,然后动态地增加/缩小工作池。
queue <- 0
我的问题是:
我如何获得行为?
为什么延迟会wg.Done()导致死锁?wg.Done()实际完成该功能是否正常?我认为没有defergoroutine不会等待其他部分完成(在解析HTML的实际工作示例中可能会花费更长的时间)。
wg.Done()
defer
如果您对“ Go web爬网程序”(或“ golang Web爬网程序”)使用自己喜欢的网络搜索,则会发现许多示例,包括: Go Tour Exercise:Web Crawler。Go中也有一些关于并发的讨论,涵盖了这种情况。
Go中执行此操作的“标准”方法根本不需要涉及等待组。为了回答您的问题之一,defer只有在函数返回时才运行与之排队的事物。您具有运行时间长的功能,因此请勿defer在这样的循环中使用。
“标准”方式是在自己的goroutine中启动想要的许多工人。他们都从同一个频道读取“工作”,如果/无事可做则阻止。完全完成该通道后,它们都会退出。
在诸如履带式的情况下,工人们会发现更多的“工作”要做,并想将他们排队。您不希望他们写回同一通道,因为它只有有限的缓冲量(或没有缓冲!),最终您将阻止所有尝试排队更多工作的工人!
一个简单的解决方案是使用一个单独的通道(例如每个工作人员都有in <-chan Job, out chan<- Job)和一个队列/过滤器goroutine来读取这些请求,将它们附加到一个切片上,该切片要么任意增大要么对其进行全局限制,并且从切片的开头馈入另一个通道(即,从一个通道读取一个简单的for- select循环并写入另一个通道)。该代码通常还负责跟踪已完成的操作(例如访问的URL映射),并丢弃传入的重复请求。
in <-chan Job, out chan<- Job
队列goroutine可能看起来像这样(此处的参数名称过于冗长):
type Job string func queue(toWorkers chan<- Job, fromWorkers <-chan Job) { var list []Job done := make(map[Job]bool) for { var send chan<- Job var item Job if len(list) > 0 { send = toWorkers item = list[0] } select { case send <- item: // We sent an item, remove it list = list[1:] case thing := <-fromWorkers: // Got a new thing if !done[thing] { list = append(list, thing) done[thing] = true } } } }
这个简单的示例掩盖了一些内容。如终止。而且,如果“作业”是您要使用的较大结构,chan *Job而[]*Job不是。在这种情况下,您还需要将映射类型更改为从作业中提取的某些键(例如, Job.URL也许),并且您想要list[0] = nil在list = list[1:]摆脱*Job指针的引用之前先做一下,并让垃圾收集器更早地进行操作。 。
chan *Job
[]*Job
Job.URL
list[0] = nil
list = list[1:]
*Job
编辑:有关干净终止的一些注意事项。
有几种方法可以像上面那样干净地终止代码。可以使用一个等待组,但是Add / Done调用的放置需要仔细进行,您可能需要另一个goroutine来进行Wait(然后关闭其中一个通道以开始关闭)。工人不应关闭其输出通道,因为有多个工人,并且您不能多次关闭通道。队列goroutine在不知道何时完成工作的情况下无法告诉何时关闭通往工作人员的通道。
过去,当我使用与上面非常相似的代码时,我在“队列” goroutine中使用了本地“杰出”计数器(这避免了互斥量或等待组所需的任何同步开销)。将工作发送给工人时,未完成工作的数量会增加。当工人说完成后,它又减少了。我的代码恰好为此提供了另一个渠道(除了要排队的其他节点之外,我的“队列”也在收集结果)。它在自己的通道上可能更干净,但是可以在现有通道上使用特殊值(例如nil Job指针)。无论如何,有了这样的计数器,本地列表上的现有长度检查只需要查看列表为空并且是时候终止时没有什么未完成的事情。
例如:
if len(list) > 0 { send = toWorkers item = list[0] } else if outstandingJobs == 0 { close(toWorkers) return }