我有一个CPU密集的Celery任务。我想使用许多EC2实例上的所有处理能力(核心)来更快地完成此工作 ( 我认为是 具有多处理功能的芹菜并行分布式任务)。
我正在尝试更好地理解所有术语, 线程 , 多处理 , 分布式计算 , 分布式并行处理 。
示例任务:
@app.task for item in list_of_millions_of_ids: id = item # do some long complicated equation here very CPU heavy!!!!!!! database.objects(newid=id).save()
使用上面的代码 (如果可能,还提供一个示例), 如何允许通过利用云中所有可用计算机上的所有计算CPU能力来拆分一项任务,从而使用Celery分发此任务?
您的目标是:
芹菜可以很轻松地为您做这两个。首先要了解的是,每个celery worker默认配置为运行与系统上可用CPU内核数量一样多的任务:
并发是用于同时处理您的任务的前叉工作进程的数量,当所有这些工作都忙于工作时,新任务将必须等待其中一项任务完成才能被处理。 缺省的并发数是该机器(包括核心)上的CPU数 ,您可以使用- c选项指定自定义数。没有建议值,因为最佳数量取决于许多因素,但是如果您的任务主要是受I / O约束的,那么您可以尝试增加它,实验表明,很少会增加两倍以上的CPU数量有效,并且有可能降低性能。
并发是用于同时处理您的任务的前叉工作进程的数量,当所有这些工作都忙于工作时,新任务将必须等待其中一项任务完成才能被处理。
缺省的并发数是该机器(包括核心)上的CPU数 ,您可以使用- c选项指定自定义数。没有建议值,因为最佳数量取决于许多因素,但是如果您的任务主要是受I / O约束的,那么您可以尝试增加它,实验表明,很少会增加两倍以上的CPU数量有效,并且有可能降低性能。
这意味着每个任务都不必担心使用多处理/线程来利用多个CPU /内核。相反,芹菜将同时运行足够的任务以使用每个可用的CPU。
有了这一点,下一步就是创建一个任务来处理您的的某些子集list_of_millions_of_ids。您这里有两个选择- 一个是让每个任务处理一个ID,因此您要运行N个任务,其中N == len(list_of_millions_of_ids)。这将确保工作在所有任务中平均分配,因为永远不会出现一个工人提前完成而只是在等待的情况。如果需要工作,可以将ID移出队列。您可以使用芹菜来做到这一点(如John Doe所述)group。
list_of_millions_of_ids
N == len(list_of_millions_of_ids)
group
task.py:
@app.task def process_id(item): id = item #long complicated equation here database.objects(newid=id).save()
并执行任务:
from celery import group from tasks import process_id jobs = group(process_id.s(item) for item in list_of_millions_of_ids) result = jobs.apply_async()
另一种选择是将列表分成较小的部分,然后将这些部分分发给您的工作人员。这种方法冒着浪费一些周期的风险,因为您可能最终会有些工人在等待而其他人仍在工作。但是,芹菜文档指出这种担心通常是没有根据的:
有些人可能会担心将任务分块会导致并行度降低,但是对于繁忙的集群却很少如此,实际上,由于避免了消息传递的开销,这可能会大大提高性能。
因此,由于减少了消息传递开销,您可能会发现对列表进行分块并将分块分配给每个任务的效果更好。您可能还可以通过以下方式来减轻数据库的负担:计算每个ID,将其存储在列表中,然后在完成后将整个列表添加到DB中,而不是一次执行一个ID 。分块方法看起来像这样
@app.task def process_ids(items): for item in items: id = item #long complicated equation here database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.
并开始任务:
from tasks import process_ids jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here. jobs.apply_async()
您可以尝试一下分块大小可以为您带来最佳结果的方法。您希望找到一个最佳的位置,在其中减少消息传递的开销,同时又要保持足够小的大小,以免最终导致工作人员完成工作的速度比另一个工作人员快得多,然后无所事事地等待着。