小编典典

渠道中的合并项目

go

我有一个接收任务并将其放入频道的功能。每个任务都有ID,一些属性以及放置结果的通道。看起来像这样

task.Result = make(chan *TaskResult)
queue <- task
result := <-task.Result
sendReponse(result)

另一个goroutine从通道中获取任务,对其进行处理,然后将结果放入任务的通道中

task := <-queue
task.Result <- doExpensiveComputation(task)

此代码可以正常工作。但是现在我要合并任务queue。任务处理是一项非常昂贵的操作,因此我希望一次处理具有相同ID的队列中的所有任务。我看到了两种方法。

首先,不要将具有相同ID的任务放入队列,因此,当现有任务到达时,它将等待其复制完成。这是伪代码

if newTask in queue {
  existing := queue.getById(newTask.ID)
  existing.waitForComplete()
  sendResponse(existing.ProcessingResult)
} else {
  queue.enqueue(newTask)
}

因此,我可以使用go通道和map来实现它以实现随机访问+一些同步手段,例如互斥体。我不喜欢这种方式的是,我必须在代码周围携带地图和通道,并使它们的内容保持同步。

第二种方法是将所有任务放入队列,但是在结果到达时从队列中提取任务和所有具有相同ID的任务,然后将结果发送到所有任务。这是伪代码

someTask := queue.dequeue()
result := doExpensiveComputation(someTask)
someTask.Result <- result
moreTasks := queue.getAllWithID(someTask.ID)
for _,theSameTask := range moreTasks {
  theSameTask.Result <- result
}

我有一个想法,如何以与上述相同的方式使用chan + map +互斥体来实现此目的。

这是一个问题:是否存在一些内置/现有的数据结构可用于解决此类问题?还有其他(更好)的方法吗?


阅读 215

收藏
2020-07-02

共1个答案

小编典典

如果我正确地理解了问题,我想到的最简单的解决方案是在任务发送者(放入queue)和工作人员(从取出queue)之间添加中间层。这(可能是例行程序)将负责存储当前任务(按ID)并将结果广播到每个匹配的任务。

伪代码:

go func() {
    active := make(map[TaskID][]Task)

    for {
        select {
        case task := <-queue:
            tasks := active[task.ID]
            // No tasks with such ID, start heavy work
            if len(tasks) == 0 {
                worker <- task
            }
            // Save task for the result
            active[task.ID] = append(active[task.ID], task)
        case r := <-response:
            // Broadcast to all tasks
            for _, task := range active[r.ID] {
                task.Result <- r.Result
            }
        }
    }
}()

不需要互斥体,也可能不需要携带任何东西,工作人员将只需要将所有结果放入中间层,然后可以正确地路由响应。如果冲突ID可能在一段时间内到达,您甚至可以在此处轻松添加缓存。

编辑:
我有一个梦想,上面的代码导致了死锁。如果您一次发送大量请求并阻塞了worker通道,那么这将是一个严重的问题–这个中间层例程正worker <- task等待工作人员完成,但是所有工作人员都可能无法发送到响应通道(因为我们的例程可以收集)。可播放的证明。

可以考虑在通道中添加一些缓冲区,但这不是一个合适的解决方案(除非您可以通过这种方式设计系统,使缓冲区永远不会填满)。有几种方法可以解决此问题;例如,您可以运行一个单独的例程来收集响应,但是随后您需要active使用互斥量保护地图。可行
您还可以worker <- task选择一个选项,该选项将尝试将任务发送给工作人员,接收新任务(如果没有要发送的内容)或收集响应。可以利用以下事实:零通道从未准备好进行通信(被选择忽略),因此您可以在一个选择中选择接收和发送任务。例:

go func() {
    var next Task // received task which needs to be passed to a worker
    in := queue // incoming channel (new tasks) -- active
    var out chan Task // outgoing channel (to workers) -- inactive
    for {
        select {
        case t := <-in:
            next = t // store task, so we can pass to worker
            in, out = nil, worker // deactivate incoming channel, activate outgoing
        case out <- next:
            in, out = queue, nil // deactivate outgoing channel, activate incoming
        case r := <-response:
            collect <- r
        }
    }
}()

2020-07-02