我有一个要 并行 处理的文件夹名称字典。在每个文件夹,里面是文件名的数组,我想在加工 系列 :
folder_file_dict = { folder_name : { file_names_key : [file_names_array] } }
最终,我将创建一个名为folder_name的文件夹,其中包含名称为的文件len(folder_file_dict[folder_name][file_names_key])。我有这样的方法:
folder_name
len(folder_file_dict[folder_name][file_names_key])
def process_files_in_series(file_names_array, udp_port): for file_name in file_names_array: time_consuming_method(file_name, udp_port) # create "file_name" udp_ports = [123, 456, 789]
请注意time_consuming_method()上面的内容,由于通过UDP端口进行的调用会花费很长时间。我也仅限于在上面的阵列中使用UDP端口。因此,我必须等待time_consuming_methodUDP端口完成操作,然后才能再次使用该UDP端口。这意味着我一次只能len(udp_ports)运行线程。
time_consuming_method()
time_consuming_method
len(udp_ports)
因此,我最终将len(folder_file_dict.keys())通过len(folder_file_dict.keys())调用来创建线程process_files_in_series。我也有MAX_THREAD个计数。我正在尝试使用Queue和Threading模块,但是我不确定我需要哪种设计。如何使用队列和线程以及可能的条件来做到这一点?使用线程池的解决方案也可能会有所帮助。
len(folder_file_dict.keys())
process_files_in_series
Queue
Threading
注意
我没有试图提高读取/写入速度。我正在尝试并行调用time_consuming_methodunder process_files_in_series。创建这些文件只是过程的一部分,而不是速率限制步骤。
另外,我要寻找一个解决方案,使用Queue,Threading以及可能的Condition模块或相关于这些模块什么。线程池解决方案也可能会有所帮助。我不能使用进程,只能使用线程。
Condition
我也在寻找Python 2.7中的解决方案。
使用线程池:
#!/usr/bin/env python2 from multiprocessing.dummy import Pool, Queue # thread pool folder_file_dict = { folder_name: { file_names_key: file_names_array } } def process_files_in_series(file_names_array, udp_port): for file_name in file_names_array: time_consuming_method(file_name, udp_port) # create "file_name" ... def mp_process(filenames): udp_port = free_udp_ports.get() # block until a free udp port is available args = filenames, udp_port try: return args, process_files_in_series(*args), None except Exception as e: return args, None, str(e) finally: free_udp_ports.put_nowait(udp_port) free_udp_ports = Queue() # in general, use initializer to pass it to children for port in udp_ports: free_udp_ports.put_nowait(port) pool = Pool(number_of_concurrent_jobs) # for args, result, error in pool.imap_unordered(mp_process, get_files_arrays()): if error is not None: print args, error
我认为如果不同文件名数组的处理时间可能不同,则不需要将线程数绑定到udp端口数。
如果我folder_file_dict正确理解了结构,则生成文件名数组:
folder_file_dict
def get_files_arrays(folder_file_dict=folder_file_dict): for folder_name_dict in folder_file_dict.itervalues(): for filenames_array in folder_name_dict.itervalues(): yield filenames_array