我刚刚开始使用Joblib模块,并且试图了解Parallel函数的工作方式。下面是并行化导致更长运行时间的示例,但我不明白为什么。我在1 cpu上的运行时间为51秒,而2 cpu上的运行时间为217秒。
我的假设是并行运行循环会将列表a和b复制到每个处理器。然后将item_n分配给一个cpu,将item_n + 1分配给另一个cpu,执行该函数,然后将结果写回到一个列表中(按顺序)。然后抓住接下来的2个项目,依此类推。我显然缺少了一些东西。
这是一个不好的例子还是使用joblib?我只是简单地将代码的结构错误了吗?
这是示例:
import numpy as np from matplotlib.path import Path from joblib import Parallel, delayed ## Create pairs of points for line segments a = zip(np.random.rand(5000,2),np.random.rand(5000,2)) b = zip(np.random.rand(300,2),np.random.rand(300,2)) ## Check if one line segment contains another. def check_paths(path, paths): for other_path in paths: res='no cross' chck = Path(other_path) if chck.contains_path(path)==1: res= 'cross' break return res res = Parallel(n_jobs=2) (delayed(check_paths) (Path(points), a) for points in b)
简而言之:我无法重现您的问题。如果您使用的是Windows,则应在主循环中使用保护器:的文档joblib.Parallel。我看到的唯一问题是大量的数据复制开销,但是您的数字似乎是不现实的。
joblib.Parallel
总的来说,这是我对您的代码的计时:
在我的i7 3770k(4核,8线程)上,对于不同的产品,我得到以下结果n_jobs:
n_jobs
For-loop: Finished in 33.8521318436 sec n_jobs=1: Finished in 33.5527760983 sec n_jobs=2: Finished in 18.9543449879 sec n_jobs=3: Finished in 13.4856410027 sec n_jobs=4: Finished in 15.0832719803 sec n_jobs=5: Finished in 14.7227740288 sec n_jobs=6: Finished in 15.6106669903 sec
因此,使用多个过程会有所收获。但是,尽管我有四个核心,但增益在三个过程中已经达到饱和。因此,我想执行时间实际上是受内存访问而不是处理器时间限制的。
您应该注意到,每个单个循环条目的参数都被复制到执行它的进程中。这意味着您需要a为中的每个元素进行复制b。那是无效的。因此,改为访问global a。(Parallel将派生该进程,将所有全局变量复制到新产生的进程,因此a可以访问)。这给了我以下代码(joblib建议的文档带有定时和主循环保护:
a
b
Parallel
joblib
import numpy as np from matplotlib.path import Path from joblib import Parallel, delayed import time import sys ## Check if one line segment contains another. def check_paths(path): for other_path in a: res='no cross' chck = Path(other_path) if chck.contains_path(path)==1: res= 'cross' break return res if __name__ == '__main__': ## Create pairs of points for line segments a = zip(np.random.rand(5000,2),np.random.rand(5000,2)) b = zip(np.random.rand(300,2),np.random.rand(300,2)) now = time.time() if len(sys.argv) >= 2: res = Parallel(n_jobs=int(sys.argv[1])) (delayed(check_paths) (Path(points)) for points in b) else: res = [check_paths(Path(points)) for points in b] print "Finished in", time.time()-now , "sec"
计时结果:
n_jobs=1: Finished in 34.2845709324 sec n_jobs=2: Finished in 16.6254048347 sec n_jobs=3: Finished in 11.219119072 sec n_jobs=4: Finished in 8.61683392525 sec n_jobs=5: Finished in 8.51907801628 sec n_jobs=6: Finished in 8.21842098236 sec n_jobs=7: Finished in 8.21816396713 sec n_jobs=8: Finished in 7.81841087341 sec
饱和度现在稍微移到了n_jobs=4预期值。
n_jobs=4
check_paths进行了几个可以轻松消除的冗余计算。首先,对于other_paths=a该行中的所有元素,Path(...)在每次调用中都会执行。预先计算。其次res='no cross',虽然每个循环只能更改一次(紧接着是断点并返回),但每次循环都会写入该字符串。将线移到循环的前面。然后,代码如下所示:
check_paths
other_paths=a
Path(...)
res='no cross'
import numpy as np from matplotlib.path import Path from joblib import Parallel, delayed import time import sys ## Check if one line segment contains another. def check_paths(path): #global a #print(path, a[:10]) res='no cross' for other_path in a: if other_path.contains_path(path)==1: res= 'cross' break return res if __name__ == '__main__': ## Create pairs of points for line segments a = zip(np.random.rand(5000,2),np.random.rand(5000,2)) a = [Path(x) for x in a] b = zip(np.random.rand(300,2),np.random.rand(300,2)) now = time.time() if len(sys.argv) >= 2: res = Parallel(n_jobs=int(sys.argv[1])) (delayed(check_paths) (Path(points)) for points in b) else: res = [check_paths(Path(points)) for points in b] print "Finished in", time.time()-now , "sec"
有时间安排:
n_jobs=1: Finished in 5.33742594719 sec n_jobs=2: Finished in 2.70858597755 sec n_jobs=3: Finished in 1.80810618401 sec n_jobs=4: Finished in 1.40814709663 sec n_jobs=5: Finished in 1.50854086876 sec n_jobs=6: Finished in 1.50901818275 sec n_jobs=7: Finished in 1.51030707359 sec n_jobs=8: Finished in 1.51062297821 sec
尽管我没有真正遵循它的目的,因为它与您的问题无关,但是您代码上的一个副节点contains_path只会返回True if this path completely contains the given path.(请参阅文档)。因此,no cross给定随机输入,您的函数基本上总是返回。
contains_path
True
if this path completely contains the given path.
no cross