我试图分裂循环即
N = 1000000 for i in xrange(N): #do something
使用multiprocessing.Process,它对于较小的N值效果很好。当我使用较大的N值时,就会出现问题。在p.join()之前或期间发生了一些奇怪的事情,并且程序没有响应。如果我将print i放在函数f的定义中,而不是q.put(i),则一切正常。
我将不胜感激任何帮助。这是代码。
from multiprocessing import Process, Queue def f(q,nMin, nMax): # function for multiprocessing for i in xrange(nMin,nMax): q.put(i) if __name__ == '__main__': nEntries = 1000000 nCpu = 10 nEventsPerCpu = nEntries/nCpu processes = [] q = Queue() for i in xrange(nCpu): processes.append( Process( target=f, args=(q,i*nEventsPerCpu,(i+1)*nEventsPerCpu) ) ) for p in processes: p.start() for p in processes: p.join() print q.qsize()
您正在尝试无限制地增加队列,并且您正在加入一个等待队列中有空间的子进程,因此您的主进程被拖延以等待该进程完成,而永远不会。
如果在连接之前将数据从队列中拉出,它将可以正常工作。
您可以使用的一种技术是这样的:
while 1: running = any(p.is_alive() for p in processes) while not queue.empty(): process_queue_data() if not running: break
根据文档,p.is_alive()应该执行隐式连接,但它似乎还暗示最佳实践可能是在此之后在所有线程上显式执行连接。
编辑:尽管这很清楚,但可能不是全部。如何使其性能更好取决于任务和计算机的具体情况(通常,一次不应该一次创建那么多进程,除非某些进程将在I / O上被阻塞)。
除了将进程数量减少到CPU数量之外,一些简单的修复方法也可以使其更快一些(再次取决于具体情况),如下所示:
liveprocs = list(processes) while liveprocs: try: while 1: process_queue_data(q.get(False)) except Queue.Empty: pass time.sleep(0.5) # Give tasks a chance to put more data in if not q.empty(): continue liveprocs = [p for p in liveprocs if p.is_alive()]