小编典典

Process.join()和队列不适用于大量

python

我试图分裂循环即

N = 1000000
for i in xrange(N):
    #do something

使用multiprocessing.Process,它对于较小的N值效果很好。当我使用较大的N值时,就会出现问题。在p.join()之前或期间发生了一些奇怪的事情,并且程序没有响应。如果我将print
i放在函数f的定义中,而不是q.put(i),则一切正常。

我将不胜感激任何帮助。这是代码。

from multiprocessing import Process, Queue

def f(q,nMin, nMax): # function for multiprocessing
    for i in xrange(nMin,nMax):
        q.put(i)

if __name__ == '__main__':

    nEntries = 1000000

    nCpu = 10
    nEventsPerCpu = nEntries/nCpu
    processes = []

    q = Queue()

    for i in xrange(nCpu):
        processes.append( Process( target=f, args=(q,i*nEventsPerCpu,(i+1)*nEventsPerCpu) ) )

    for p in processes:
        p.start()

    for p in processes:
        p.join()

    print q.qsize()

阅读 219

收藏
2020-12-20

共1个答案

小编典典

您正在尝试无限制地增加队列,并且您正在加入一个等待队列中有空间的子进程,因此您的主进程被拖延以等待该进程完成,而永远不会。

如果在连接之前将数据从队列中拉出,它将可以正常工作。

您可以使用的一种技术是这样的:

while 1:
    running = any(p.is_alive() for p in processes)
    while not queue.empty():
       process_queue_data()
    if not running:
        break

根据文档,p.is_alive()应该执行隐式连接,但它似乎还暗示最佳实践可能是在此之后在所有线程上显式执行连接。

编辑:尽管这很清楚,但可能不是全部。如何使其性能更好取决于任务和计算机的具体情况(通常,一次不应该一次创建那么多进程,除非某些进程将在I / O上被阻塞)。

除了将进程数量减少到CPU数量之外,一些简单的修复方法也可以使其更快一些(再次取决于具体情况),如下所示:

liveprocs = list(processes)
while liveprocs:
    try:
        while 1:
            process_queue_data(q.get(False))
    except Queue.Empty:
        pass

    time.sleep(0.5)    # Give tasks a chance to put more data in
    if not q.empty():
        continue
    liveprocs = [p for p in liveprocs if p.is_alive()]
2020-12-20