我正在使用multiprocessingpython库生成4个Process()对象以并行化cpu密集任务。任务(灵感和代码从这个伟大的文章)是计算的主要因素列表中的每个整数。
multiprocessing
Process()
main.py:
import random import multiprocessing import sys num_inputs = 4000 num_procs = 4 proc_inputs = num_inputs/num_procs input_list = [int(1000*random.random()) for i in xrange(num_inputs)] output_queue = multiprocessing.Queue() procs = [] for p_i in xrange(num_procs): print "Process [%d]"%p_i proc_list = input_list[proc_inputs * p_i:proc_inputs * (p_i + 1)] print " - num inputs: [%d]"%len(proc_list) # Using target=worker1 HANGS on join p = multiprocessing.Process(target=worker1, args=(p_i, proc_list, output_queue)) # Using target=worker2 RETURNS with success #p = multiprocessing.Process(target=worker2, args=(p_i, proc_list, output_queue)) procs.append(p) p.start() for p in jobs: print "joining ", p, output_queue.qsize(), output_queue.full() p.join() print "joined ", p, output_queue.qsize(), output_queue.full() print "Processing complete." ret_vals = [] while output_queue.empty() == False: ret_vals.append(output_queue.get()) print len(ret_vals) print sys.getsizeof(ret_vals)
观察:
worker1
.join()
worker2
这对我来说非常混乱,因为worker1和之间的唯一区别worker2(见下文)是前者在中插入单个列表,Queue而后者为每个过程插入一个列表列表。
Queue
为什么使用worker1和不使用worker2目标都有死锁?两种(或两者都不)都不能超过“多处理队列”的最大大小限制为32767吗?
worker1 vs worker2:
def worker1(proc_num, proc_list, output_queue): '''worker function which deadlocks''' for num in proc_list: output_queue.put(factorize_naive(num)) def worker2(proc_num, proc_list, output_queue): '''worker function that works''' workers_stuff = [] for num in proc_list: workers_stuff.append(factorize_naive(num)) output_queue.put(workers_stuff)
关于SO有 很多 类似的问题,但是我相信这些问题的核心显然与所有这些问题截然不同。
文档警告此:
警告:如上所述,如果子进程已将项目放入队列中(并且未使用JoinableQueue.cancel_join_thread),则该进程将不会终止,直到所有缓冲的项目都已刷新到管道中为止。 这意味着,如果您尝试加入该进程,则可能会陷入僵局,除非您确定已放入队列中的所有项目都已消耗完。同样,如果子进程是非守护进程,则当父进程尝试加入其所有非守护进程子进程时,其父进程可能会在退出时挂起。
警告:如上所述,如果子进程已将项目放入队列中(并且未使用JoinableQueue.cancel_join_thread),则该进程将不会终止,直到所有缓冲的项目都已刷新到管道中为止。
这意味着,如果您尝试加入该进程,则可能会陷入僵局,除非您确定已放入队列中的所有项目都已消耗完。同样,如果子进程是非守护进程,则当父进程尝试加入其所有非守护进程子进程时,其父进程可能会在退出时挂起。
尽管aQueue似乎是不受限制的,但在后台隐藏的情况下,已排队的项目在内存中进行了缓冲,以避免进程间管道过载。在刷新这些内存缓冲区之前,进程无法正常结束。你worker1()放多了很多项目 在 排队比你的worker2(),而这一切就是这么简单。注意,在实现诉诸于内存缓冲之前可以排队的项目数量未定义:它在OS和Python版本之间可能会有所不同。
worker1()
worker2()
正如文档所建议的那样,避免这种情况的正常方法是 在 尝试处理 之前 将.get()所有项目 从 队列中 移出 。正如您所发现的,是否有 必 要这样做取决于未定义的方式,取决于每个工作进程将多少个项目放入队列中。 .join()
.get()