我正在尝试使用多重处理来并行化应用程序,该处理程序会处理一个非常大的csv文件(64MB至500MB),逐行执行一些工作,然后输出一个固定大小的小文件。
目前,我正在执行list(file_obj),不幸的是,它已完全加载到内存中(我认为),然后我将该列表分成了n个部分,n是我要运行的进程数。然后,我pool.map()在分类列表上执行。
list(file_obj)
pool.map()
与单线程,仅打开文件并迭代的方法相比,这似乎具有非常非常糟糕的运行时。有人可以提出更好的解决方案吗?
此外,我需要按组处理文件中的行,以保留特定列的值。这些行组本身可以拆分,但是该列中的任何组都不能包含多个值。
list(file_obj)大的时候可能需要很多内存fileobj。我们可以通过使用itertools根据需要拉出几行代码来减少内存需求。
fileobj
特别是,我们可以使用
reader = csv.reader(f) chunks = itertools.groupby(reader, keyfunc)
将文件拆分为可处理的块,以及
groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)] result = pool.map(worker, groups)
使多处理池一次处理多个num_chunks块。
num_chunks
这样,我们大约只需要足够的内存即可在内存中保存几个(num_chunks)块,而不是整个文件。
import multiprocessing as mp import itertools import time import csv def worker(chunk): # `chunk` will be a list of CSV rows all with the same name column # replace this with your real computation # print(chunk) return len(chunk) def keyfunc(row): # `row` is one row of the CSV file. # replace this with the name column. return row[0] def main(): pool = mp.Pool() largefile = 'test.dat' num_chunks = 10 results = [] with open(largefile) as f: reader = csv.reader(f) chunks = itertools.groupby(reader, keyfunc) while True: # make a list of num_chunks chunks groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)] if groups: result = pool.map(worker, groups) results.extend(result) else: break pool.close() pool.join() print(results) if __name__ == '__main__': main()