小编典典

将大文件中的数据分块进行多处理?

python

我正在尝试使用多重处理来并行化应用程序,该处理程序会处理一个非常大的csv文件(64MB至500MB),逐行执行一些工作,然后输出一个固定大小的小文件。

目前,我正在执行list(file_obj),不幸的是,它已完全加载到内存中(我认为),然后我将该列表分成了n个部分,n是我要运行的进程数。然后,我pool.map()在分类列表上执行。

与单线程,仅打开文件并迭代的方法相比,这似乎具有非常非常糟糕的运行时。有人可以提出更好的解决方案吗?

此外,我需要按组处理文件中的行,以保留特定列的值。这些行组本身可以拆分,但是该列中的任何组都不能包含多个值。


阅读 311

收藏
2020-12-20

共1个答案

小编典典

list(file_obj)大的时候可能需要很多内存fileobj。我们可以通过使用itertools根据需要拉出几行代码来减少内存需求。

特别是,我们可以使用

reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)

将文件拆分为可处理的块,以及

groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)]
result = pool.map(worker, groups)

使多处理池一次处理多个num_chunks块。

这样,我们大约只需要足够的内存即可在内存中保存几个(num_chunks)块,而不是整个文件。


import multiprocessing as mp
import itertools
import time
import csv

def worker(chunk):
    # `chunk` will be a list of CSV rows all with the same name column
    # replace this with your real computation
    # print(chunk)
    return len(chunk)

def keyfunc(row):
    # `row` is one row of the CSV file.
    # replace this with the name column.
    return row[0]

def main():
    pool = mp.Pool()
    largefile = 'test.dat'
    num_chunks = 10
    results = []
    with open(largefile) as f:
        reader = csv.reader(f)
        chunks = itertools.groupby(reader, keyfunc)
        while True:
            # make a list of num_chunks chunks
            groups = [list(chunk) for key, chunk in
                      itertools.islice(chunks, num_chunks)]
            if groups:
                result = pool.map(worker, groups)
                results.extend(result)
            else:
                break
    pool.close()
    pool.join()
    print(results)

if __name__ == '__main__':
    main()
2020-12-20