我在多处理模块上遇到了麻烦。我正在使用具有其map方法的工作人员池从大量文件中加载数据,对于每个文件,我都使用自定义函数来分析数据。每次处理文件时,我都希望更新一个计数器,以便可以跟踪还有多少文件需要处理。这是示例代码:
def analyze_data( args ): # do something counter += 1 print counter if __name__ == '__main__': list_of_files = os.listdir(some_directory) global counter counter = 0 p = Pool() p.map(analyze_data, list_of_files)
我找不到解决方案。
问题在于该counter变量未在您的进程之间共享:每个单独的进程都在创建它自己的本地实例并对其进行递增。
counter
有关可用于在进程之间共享状态的某些技术,请参阅文档的本部分。在您的情况下,您可能希望Value在工作人员之间共享一个实例
Value
这是示例的工作版本(带有一些虚拟输入数据)。请注意,它使用的是全局值,实际上我会尽量避免使用这些值:
from multiprocessing import Pool, Value from time import sleep counter = None def init(args): ''' store the counter for later use ''' global counter counter = args def analyze_data(args): ''' increment the global counter, do something with the input ''' global counter # += operation is not atomic, so we need to get a lock: with counter.get_lock(): counter.value += 1 print counter.value return args * 10 if __name__ == '__main__': #inputs = os.listdir(some_directory) # # initialize a cross-process counter and the input lists # counter = Value('i', 0) inputs = [1, 2, 3, 4] # # create the pool of workers, ensuring each one receives the counter # as it starts. # p = Pool(initializer = init, initargs = (counter, )) i = p.map_async(analyze_data, inputs, chunksize = 1) i.wait() print i.get()