我有一个脚本,其中包括从列表中打开文件,然后对该文件中的文本进行处理。我正在使用python multiprocessing和Pool尝试并行化此操作。脚本的抽象如下:
import os from multiprocessing import Pool results = [] def testFunc(files): for file in files: print "Working in Process #%d" % (os.getpid()) #This is just an illustration of some logic. This is not what I'm actually doing. for line in file: if 'dog' in line: results.append(line) if __name__=="__main__": p = Pool(processes=2) files = ['/path/to/file1.txt', '/path/to/file2.txt'] results = p.apply_async(testFunc, args = (files,)) results2 = results.get()
运行此命令时,每次迭代的进程ID的打印均相同。基本上,我想做的是获取输入列表中的每个元素并将其分叉到一个单独的进程中,但是似乎一个进程正在完成所有工作。
apply_async
results
map_async
因此,也许尝试改用类似的方法:
import os import multiprocessing as mp results = [] def testFunc(file): result = [] print "Working in Process #%d" % (os.getpid()) # This is just an illustration of some logic. This is not what I'm # actually doing. with open(file, 'r') as f: for line in f: if 'dog' in line: result.append(line) return result def collect_results(result): results.extend(result) if __name__ == "__main__": p = mp.Pool(processes=2) files = ['/path/to/file1.txt', '/path/to/file2.txt'] for f in files: p.apply_async(testFunc, args=(f, ), callback=collect_results) p.close() p.join() print(results)