小编典典

工作线程池

go

http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-
golang/提供的示例中,很多地方都引用了该示例。

func (d *Dispatcher) dispatch() {
for {
    select {
    case job := <-JobQueue:
        // a job request has been received
        go func(job Job) {
            // try to obtain a worker job channel that is available.
            // this will block until a worker is idle
            jobChannel := <-d.WorkerPool

            // dispatch the job to the worker job channel
            jobChannel <- job
         }(job)
    }
}
}

MaxWorker分派服务完许多工作后,工人池(chan
chan工作)会不会耗尽?因为<-d.WorkerPool从信道和信道工作拉出第一类型后没有被补充dispatcher.Run()被调用的第一次?还是我想念/误读了什么?如何为WorkerPool补充可用的工作渠道?

go func(job Job) {
            // try to obtain a worker job channel that is available.
            // this will block until a worker is idle
            jobChannel := <-d.WorkerPool

            // dispatch the job to the worker job channel
            jobChannel <- job
        }(job)

阅读 251

收藏
2020-07-02

共1个答案

小编典典

如果您仔细阅读了工人的守则,您会发现

w.WorkerPool <- w.JobChannel

每次循环开始时,工作人员本身的渠道都会被退回

我在下面复制整个功能:

func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // we have received a work request.
                if err := job.Payload.UploadToS3(); err != nil {
                    log.Errorf("Error uploading to S3: %s", err.Error())
                }

            case <-w.quit:
                // we have received a signal to stop
                return
            }
        }
    }()
}
2020-07-02