在过去的几天里,我一直在尝试通过重构我的一个命令行实用程序来在Golang中进行并发,但是我被困住了。
这是原始代码(master分支)。
这是并发分支(x_concurrent分支)。
当我使用执行并发代码时go run jira_open_comment_emailer.go,defer wg.Done()如果将JIRA问题添加到此处的通道,则永不执行,这将导致我wg.Wait()永远挂起。
go run jira_open_comment_emailer.go
defer wg.Done()
wg.Wait()
我的想法是,我有大量的JIRA问题,我想为每个goroutine编写一个goroutine,以查看是否需要评论。如果是这样,我想将其添加到某种结构(我经过研究后选择了一个渠道),以后可以像队列一样从中读取信息,以建立电子邮件提醒。
这是代码的相关部分:
// Given an issue, determine if it has an open comment // Returns true if there is an open comment on the issue, otherwise false func getAndProcessComments(issue Issue, channel chan<- Issue, wg *sync.WaitGroup) { // Decrement the wait counter when the function returns defer wg.Done() needsReply := false // Loop over the comments in the issue for _, comment := range issue.Fields.Comment.Comments { commentMatched, err := regexp.MatchString("~"+config.JIRAUsername, comment.Body) checkError("Failed to regex match against comment body", err) if commentMatched { needsReply = true } if comment.Author.Name == config.JIRAUsername { needsReply = false } } // Only add the issue to the channel if it needs a reply if needsReply == true { // This never allows the defered wg.Done() to execute? channel <- issue } } func main() { start := time.Now() // This retrieves all issues in a search from JIRA allIssues := getFullIssueList() // Initialize a wait group var wg sync.WaitGroup // Set the number of waits to the number of issues to process wg.Add(len(allIssues)) // Create a channel to store issues that need a reply channel := make(chan Issue) for _, issue := range allIssues { go getAndProcessComments(issue, channel, &wg) } // Block until all of my goroutines have processed their issues. wg.Wait() // Only send an email if the channel has one or more issues if len(channel) > 0 { sendEmail(channel) } fmt.Printf("Script ran in %s", time.Since(start)) }
goroutines在发送到非缓冲通道时阻塞。最小的更改将取消goroutine的阻塞,即创建一个可处理所有问题的缓冲通道:
channel := make(chan Issue, len(allIssues))
并在调用wg.Wait()之后关闭通道。