我有以下代码实现了工作队列:
package main import ( "fmt" "net/http" "io" "time" ) var ( linkQueue chan Link scraperQueue chan chan Link ) func CycleDirectory(page int) { linkQueue <- Link{Name: "asd"} } type Link struct { Name string } func (s Scraper) Start() { fmt.Println("Started") go func() { for { s.ScraperQueue <- s.Link select { case link := <-s.Link: fmt.Println(fmt.Sprintf("%v", s.Id) + ": Received " + link.Name) case <-s.QuitChan: fmt.Println("Closed") return } } }() } func (s Scraper) Stop() { go func() { s.QuitChan <- true }() } type Scraper struct { Id int Link chan Link ScraperQueue chan chan Link QuitChan chan bool } func InitScraper(id int, scraperQueue chan chan Link) Scraper { return Scraper { Id: id, Link: make(chan Link), ScraperQueue: scraperQueue, QuitChan: make(chan bool), } } func HelloServer(w http.ResponseWriter, req *http.Request) { io.WriteString(w, "hello, world!\n") } func main() { linkQueue = make(chan Link, 2000) numScrapers := 2 scraperQueue = make(chan chan Link, numScrapers) for i := 0; i < numScrapers; i++ { s := InitScraper(i+1, scraperQueue) s.Start() } go func() { for { select { case link := <-linkQueue: go func() { scraper := <-scraperQueue scraper <- link }() } } }() CycleDirectory(1) // time.Sleep(1 * time.Millisecond) for { // select { // } } // http.HandleFunc("/hello", HelloServer) // http.ListenAndServe(":12345", nil) }
使用包含if语句(或内部没有任何内容)的for循环运行此代码,爬虫不会打印收到的消息。使用net / http中的ListenAndServe函数进行阻止,它将打印收到的消息。使用睡眠阻止1毫秒,我收到了消息。并将select语句放入for循环中,我也收到消息。
为什么没有select语句的for循环不允许执行在工作队列中发送的消息,以及我将如何处理这一问题。我需要在for循环中使用if语句来检查是否所有工作都已完成,因此我可以退出循环并结束程序。
更新资料
Amd的建议是解决此问题的方法。这是我使用sync.WaitGroup包主要的更新代码
import ( "fmt" "sync" ) var ( linkQueue chan Link scraperQueue chan chan Link wg sync.WaitGroup ) func CycleDirectory(page int) { wg.Add(1) linkQueue <- Link{Name: "asd"} } type Link struct { Name string } func (s Scraper) Start() { fmt.Println("Started") go func() { for { s.ScraperQueue <- s.Link select { case link := <-s.Link: Scrape(s.Id, link.Name) s.Stop() case <-s.QuitChan: fmt.Println("Closed") wg.Done() return } } }() } func (s Scraper) Stop() { go func() { s.QuitChan <- true }() } type Scraper struct { Id int Link chan Link ScraperQueue chan chan Link QuitChan chan bool } func Scrape(id int, name string) { fmt.Println(fmt.Sprintf("%v", id) + ": Received " + name) } func InitScraper(id int, scraperQueue chan chan Link) Scraper { return Scraper { Id: id, Link: make(chan Link), ScraperQueue: scraperQueue, QuitChan: make(chan bool), } } func main() { linkQueue = make(chan Link, 2000) numScrapers := 2 scraperQueue = make(chan chan Link, numScrapers) for i := 0; i < numScrapers; i++ { s := InitScraper(i+1, scraperQueue) s.Start() } go func() { for { select { case link := <-linkQueue: go func() { scraper := <-scraperQueue scraper <- link }() } } }() CycleDirectory(1) wg.Wait() fmt.Println("Done") }
您可以sync.WaitGroup用来停止程序退出,直到完成所有工作。 在The Go Playground上尝试一下:
sync.WaitGroup
package main import ( "fmt" "sync" "time" ) var ( wg sync.WaitGroup ) func main() { wg.Add(1) go func() { defer wg.Done() time.Sleep(2 * time.Second) }() fmt.Println("Wait...") wg.Wait() fmt.Println("Done.") }