我在python中遇到以下问题。
我需要并行执行一些计算,这些计算的结果需要按顺序写入文件中。因此,我创建了一个函数,该函数接收multiprocessing.Queue和文件句柄,进行计算并将结果打印到文件中:
multiprocessing.Queue
import multiprocessing from multiprocessing import Process, Queue from mySimulation import doCalculation # doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file def work(queue, fh): while True: try: parameter = queue.get(block = False) result = doCalculation(parameter) print >>fh, string except: break if __name__ == "__main__": nthreads = multiprocessing.cpu_count() fh = open("foo", "w") workQueue = Queue() parList = # list of conditions for which I want to run doCalculation() for x in parList: workQueue.put(x) processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)] for p in processes: p.start() for p in processes: p.join() fh.close()
但是脚本运行后文件最终为空。我试图将worker()函数更改为:
def work(queue, filename): while True: try: fh = open(filename, "a") parameter = queue.get(block = False) result = doCalculation(parameter) print >>fh, string fh.close() except: break
并将文件名作为参数传递。然后它按我的预期工作。当我尝试按顺序执行相同的操作而不进行多处理时,它也可以正常工作。
为什么它在第一个版本中不起作用?我看不到问题。
另外:我可以保证两个进程不会尝试同时写入文件吗?
编辑:
谢谢。我知道了 这是工作版本:
import multiprocessing from multiprocessing import Process, Queue from time import sleep from random import uniform def doCalculation(par): t = uniform(0,2) sleep(t) return par * par # just to simulate some calculation def feed(queue, parlist): for par in parlist: queue.put(par) def calc(queueIn, queueOut): while True: try: par = queueIn.get(block = False) print "dealing with ", par, "" res = doCalculation(par) queueOut.put((par,res)) except: break def write(queue, fname): fhandle = open(fname, "w") while True: try: par, res = queue.get(block = False) print >>fhandle, par, res except: break fhandle.close() if __name__ == "__main__": nthreads = multiprocessing.cpu_count() fname = "foo" workerQueue = Queue() writerQueue = Queue() parlist = [1,2,3,4,5,6,7,8,9,10] feedProc = Process(target = feed , args = (workerQueue, parlist)) calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)] writProc = Process(target = write, args = (writerQueue, fname)) feedProc.start() for p in calcProc: p.start() writProc.start() feedProc.join () for p in calcProc: p.join() writProc.join ()
您确实应该使用两个队列和三种单独的处理。
将内容放入队列#1。
从队列1中取出内容并进行计算,然后将内容放入队列2中。您可以有许多这样的应用程序,因为它们从一个队列中安全地放入另一个队列中。
从队列2中获取内容并将其写入文件。您必须恰好具有这些中的1个,并且不能再有其他。它“拥有”文件,保证原子访问,并绝对确保文件被整洁且一致地写入。