我写了一个程序,可以总结如下:
def loadHugeData(): #load it return data def processHugeData(data, res_queue): for item in data: #process it res_queue.put(result) res_queue.put("END") def writeOutput(outFile, res_queue): with open(outFile, 'w') as f res=res_queue.get() while res!='END': f.write(res) res=res_queue.get() res_queue = multiprocessing.Queue() if __name__ == '__main__': data=loadHugeData() p = multiprocessing.Process(target=writeOutput, args=(outFile, res_queue)) p.start() processHugeData(data, res_queue) p.join()
实际代码(尤其是writeOutput())要复杂得多。writeOutput()仅使用将其当作参数的这些值(意味着它不引用data)
writeOutput()
data
基本上,它将巨大的数据集加载到内存中并进行处理。输出的写操作委托给一个子进程(它实际上写到多个文件中,这需要很多时间)。因此,每次处理一个数据项时,它都会通过res_queue发送到子流程,然后该子流程根据需要将结果写入文件中。
子流程不需要访问,读取或修改loadHugeData()以任何方式加载的数据。子流程只需要使用主流程通过它发送的内容res_queue。这导致我遇到了问题。
loadHugeData()
res_queue
在我看来,子流程将其保存在庞大数据集的副本上(使用时检查内存使用情况top)。这是真的?如果是这样,那我该如何避免id(本质上使用双倍内存)?
top
我正在使用Python 2.6,程序正在Linux上运行。
该multiprocessing模块有效地基于fork系统调用,该系统调用创建当前进程的副本。由于您要在之前fork(或创建multiprocessing.Process)加载大量数据,因此子进程将继承数据的副本。
multiprocessing
fork
multiprocessing.Process
但是,如果您所运行的操作系统实现了COW(写时复制),那么除非您在父进程或子进程(父进程和子进程)中都修改了数据,否则物理内存中实际上只有一个副本。将 共享 相同的物理内存页面,尽管它们位于不同的虚拟地址空间中);即使这样,也只会为更改分配额外的内存(以pagesize增量为单位)。
pagesize
您可以通过multiprocessing.Process在加载大量数据之前进行调用来避免这种情况。这样,当您在父级中加载数据时,其他内存分配将不会反映在子级进程中。