我有以下代码利用多处理程序来遍历大列表并找到匹配项。在任何一个进程中找到匹配项后,如何使所有进程停止?我已经看到了一些示例,但是我似乎都不适合我在这里所做的事情。
#!/usr/bin/env python3.5 import sys, itertools, multiprocessing, functools alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;" num_parts = 4 part_size = len(alphabet) // num_parts def do_job(first_bits): for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)): # CHECK FOR MATCH HERE print(''.join(x)) # EXIT ALL PROCESSES IF MATCH FOUND if __name__ == '__main__': pool = multiprocessing.Pool(processes=4) results = [] for i in range(num_parts): if i == num_parts - 1: first_bit = alphabet[part_size * i :] else: first_bit = alphabet[part_size * i : part_size * (i+1)] pool.apply_async(do_job, (first_bit,)) pool.close() pool.join()
谢谢你的时间。
更新1:
我已经实现了@ShadowRanger的出色方法中建议的更改,并且几乎可以按照我想要的方式工作。因此,我添加了一些日志记录以指示进度,并在其中放置一个“测试”键以进行匹配。我希望能够独立于num_parts来增加/减少iNumberOfProcessors。在这个阶段,当我让它们都达到4时,一切都按预期工作,则4个进程开始旋转(控制台额外增加了一个)。当我将iNumberOfProcessors更改为6时,有6个进程加速旋转,但只有其中一个具有任何CPU使用率。因此看来2是空闲的。在上面的解决方案中,我能够在不增加num_parts的情况下设置更高的内核数,并且所有进程都将被使用。
我不确定如何重构这种新方法来为我提供相同的功能。您能否看一下并为我提供一些重构的方向,以便能够彼此独立地设置iNumberOfProcessors和num_parts并仍然使用所有进程?
这是更新的代码:
#!/usr/bin/env python3.5 import sys, itertools, multiprocessing, functools alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;" num_parts = 4 part_size = len(alphabet) // num_parts iProgressInterval = 10000 iNumberOfProcessors = 6 def do_job(first_bits): iAttemptNumber = 0 iLastProgressUpdate = 0 for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)): sKey = ''.join(x) iAttemptNumber = iAttemptNumber + 1 if iLastProgressUpdate + iProgressInterval <= iAttemptNumber: iLastProgressUpdate = iLastProgressUpdate + iProgressInterval print("Attempt#:", iAttemptNumber, "Key:", sKey) if sKey == 'test': print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey) return True def get_part(i): if i == num_parts - 1: first_bit = alphabet[part_size * i :] else: first_bit = alphabet[part_size * i : part_size * (i+1)] return first_bit if __name__ == '__main__': # with statement with Py3 multiprocessing.Pool terminates when block exits with multiprocessing.Pool(processes = iNumberOfProcessors) as pool: # Don't need special case for final block; slices can for gotmatch in pool.imap_unordered(do_job, map(get_part, range(num_parts))): if gotmatch: break else: print("No matches found")
更新2:
好的,这是我尝试@noxdafox建议的尝试。根据他提供的建议,我整理了以下内容。不幸的是,当我运行它时,我得到了错误:
…第322行,在apply_async中引发ValueError(“ Pool not running”)。
谁能给我一些有关如何使它工作的指导。
基本上,问题是我的第一次尝试是进行多处理,但不支持在找到匹配项后取消所有进程。
我的第二次尝试(基于@ShadowRanger的建议)解决了该问题,但是破坏了能够独立扩展进程数和num_parts大小的功能,这是我的第一次尝试。
我的第三次尝试(基于@noxdafox的建议)抛出上面概述的错误。
如果有人可以给我一些指导,说明如何维护我的第一次尝试的功能(能够独立扩展进程的数量和num_parts的大小),并添加一旦发现匹配项便取消所有进程的功能,将不胜感激。
感谢您的时间。
这是基于@noxdafox建议的第三次尝试的代码:
#!/usr/bin/env python3.5 import sys, itertools, multiprocessing, functools alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;" num_parts = 4 part_size = len(alphabet) // num_parts iProgressInterval = 10000 iNumberOfProcessors = 4 def find_match(first_bits): iAttemptNumber = 0 iLastProgressUpdate = 0 for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)): sKey = ''.join(x) iAttemptNumber = iAttemptNumber + 1 if iLastProgressUpdate + iProgressInterval <= iAttemptNumber: iLastProgressUpdate = iLastProgressUpdate + iProgressInterval print("Attempt#:", iAttemptNumber, "Key:", sKey) if sKey == 'test': print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey) return True def get_part(i): if i == num_parts - 1: first_bit = alphabet[part_size * i :] else: first_bit = alphabet[part_size * i : part_size * (i+1)] return first_bit def grouper(iterable, n, fillvalue=None): args = [iter(iterable)] * n return itertools.zip_longest(*args, fillvalue=fillvalue) class Worker(): def __init__(self, workers): self.workers = workers def callback(self, result): if result: self.pool.terminate() def do_job(self): print(self.workers) pool = multiprocessing.Pool(processes=self.workers) for part in grouper(alphabet, part_size): pool.apply_async(do_job, (part,), callback=self.callback) pool.close() pool.join() print("All Jobs Queued") if __name__ == '__main__': w = Worker(4) w.do_job()
您可以检查该问题,以查看解决您的问题的实现示例。
这也适用于current.futures池。
只需将map方法替换为,apply_async并在调用者的列表上进行迭代即可。
map
apply_async
这样的事情。
for part in grouper(alphabet, part_size): pool.apply_async(do_job, part, callback=self.callback)