小编典典

随时可以运行x个goroutine

go

我看到了很多有关如何使Go等待x数量的goroutine完成的教程和示例,但是我想做的是确保始终有x数量的运行,因此一旦结束,就会启动一个新的goroutine。

具体来说,我有几十万个“要做的事情”正在处理MySQL产生的一些事情。所以它是这样的:

db, err := sql.Open("mysql", connection_string)
checkErr(err)
defer db.Close()

rows,err := db.Query(`SELECT id FROM table`)
checkErr(err)
defer rows.Close()

var id uint
for rows.Next() {
    err := rows.Scan(&id)
    checkErr(err)
    go processTheThing(id)
    }
checkErr(err)
rows.Close()

目前,它将启动数十万个线程processTheThing()。我需要的是启动了最多x个数字(我们称其为20)goroutines。因此,它从在前20行中启动20开始,然后从当前goroutine中的一个结束后的下一个id启动新的goroutine。因此,在任何时间点总是有20个正在运行。

我敢肯定这是非常简单/标准的,但是我似乎无法在任何教程或示例中找到很好的解释,或者如何做到这一点。


阅读 297

收藏
2020-07-02

共1个答案

小编典典

感谢大家帮助我解决这个问题。但是,尽管您都帮助我理解了该技术,但我认为没有人真正提供既可行又简单/易于理解的功能。

最后,我认为作为对我的特定问题的解答,我认为它更易于理解和实用,因此,如果其他人有相同的问题,我将在此处发布。

不知何故,最终看起来很像OneOfOne发布的内容,这很棒,因为现在我明白了。但是我一开始发现OneOfOne的代码很难理解,因为将函数传递给函数使人们很难理解到底是什么。我认为这种方式更有意义:

package main

import (
"fmt"
"sync"
)

const xthreads = 5 // Total number of threads to use, excluding the main() thread

func doSomething(a int) {
    fmt.Println("My job is",a)
    return
}

func main() {
    var ch = make(chan int, 50) // This number 50 can be anything as long as it's larger than xthreads
    var wg sync.WaitGroup

    // This starts xthreads number of goroutines that wait for something to do
    wg.Add(xthreads)
    for i:=0; i<xthreads; i++ {
        go func() {
            for {
                a, ok := <-ch
                if !ok { // if there is nothing to do and the channel has been closed then end the goroutine
                    wg.Done()
                    return
                }
                doSomething(a) // do the thing
            }
        }()
    }

    // Now the jobs can be added to the channel, which is used as a queue
    for i:=0; i<50; i++ {
        ch <- i // add i to the queue
    }

    close(ch) // This tells the goroutines there's nothing else to do
    wg.Wait() // Wait for the threads to finish
}
2020-07-02