我正在使用以下代码将CSV文件拆分为多个块(从此处获取)
def worker(chunk): print len(chunk) def keyfunc(row): return row[0] def main(): pool = mp.Pool() largefile = 'Counseling.csv' num_chunks = 10 start_time = time.time() results = [] with open(largefile) as f: reader = csv.reader(f) reader.next() chunks = itertools.groupby(reader, keyfunc) while True: # make a list of num_chunks chunks groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)] if groups: result = pool.map(worker, groups) results.extend(result) else: break pool.close() pool.join()
但是,无论我选择使用多少块,似乎块的数量始终保持恒定。例如,无论选择1个块还是10个块,在处理示例文件时总会得到此输出。理想情况下,我想对文件进行分块,以使其平均分配。
请注意,我正在分块的实际文件超过了1300万行,这就是为什么我要逐块处理它的原因。那是必须的!
6 7 1 ... 1 1 94 --- 0.101687192917 seconds ---
根据评论,我们希望每个进程可以处理10000行的块。这并不是很难做到的。请参阅iter/islice下面的食谱。但是,使用问题
iter/islice
pool.map(worker, ten_thousand_row_chunks)
是 pool.map 将尝试把 所有块 的任务队列一次。如果这需要的内存超过可用内存,那么您将获得一个 MemoryError。(注意:pool.imap遇到相同的问题)
pool.map
MemoryError
pool.imap
因此,我们需要对pool.map每个块的各个部分进行迭代调用。
import itertools as IT import multiprocessing as mp import csv def worker(chunk): return len(chunk) def main(): # num_procs is the number of workers in the pool num_procs = mp.cpu_count() # chunksize is the number of lines in a chunk chunksize = 10**5 pool = mp.Pool(num_procs) largefile = 'Counseling.csv' results = [] with open(largefile, 'rb') as f: reader = csv.reader(f) for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []): chunk = iter(chunk) pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), [])) result = pool.map(worker, pieces) results.extend(result) print(results) pool.close() pool.join() main()
每个文件chunk最多包含chunksize*num_procs文件中的几行。这些数据足以使池中的所有工作人员都能工作,但又不会太大,以至于引起MemoryError-如果设置的chunksize值没有太大。
chunk
chunksize*num_procs
chunksize
每个chunk然后被分解成块,其中每个块由多达的 chunksize行从该文件。然后将这些片段发送到pool.map。
如何iter(lambda: list(IT.islice(iterator, chunksize)), [])工作:
iter(lambda: list(IT.islice(iterator, chunksize)), [])
这是将迭代器分为长度为chunksize的块的惯用法。让我们看一下它如何工作的例子:
In [111]: iterator = iter(range(10))
请注意,每次IT.islice(iterator, 3)调用时,都会从迭代器中切出一个新的3个项目块:
IT.islice(iterator, 3)
In [112]: list(IT.islice(iterator, 3)) Out[112]: [0, 1, 2] In [113]: list(IT.islice(iterator, 3)) Out[113]: [3, 4, 5] In [114]: list(IT.islice(iterator, 3)) Out[114]: [6, 7, 8]
当迭代器中剩余的项目少于3个时,仅返回剩余的内容:
In [115]: list(IT.islice(iterator, 3)) Out[115]: [9]
如果再次调用它,则会得到一个空列表:
In [116]: list(IT.islice(iterable, 3)) Out[116]: []
lambda: list(IT.islice(iterator, chunksize))是一个list(IT.islice(iterator, chunksize))在调用时返回的函数。这是“单线”,相当于
lambda: list(IT.islice(iterator, chunksize))
list(IT.islice(iterator, chunksize))
def func(): return list(IT.islice(iterator, chunksize))
最后,iter(callable, sentinel)返回另一个迭代器。该迭代器产生的值是可调用函数返回的值。它会不断产生值,直到可调用对象返回等于前哨值的值为止。所以
iter(callable, sentinel)
将继续返回值,list(IT.islice(iterator, chunksize))直到该值是空列表为止:
In [121]: iterator = iter(range(10)) In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), [])) Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]