我有一个可执行文件,需要使用不同的参数经常运行。为此,我按照此处给出的模式,使用多处理模块编写了一个小的Python(2.7)包装器。
我的代码如下所示:
try: logging.info("starting pool runs") pool.map(run_nlin, params) pool.close() except KeyboardInterrupt: logging.info("^C pressed") pool.terminate() except Exception, e: logging.info("exception caught: ", e) pool.terminate() finally: time.sleep(5) pool.join() logging.info("done")
我的工作者函数在这里:
class KeyboardInterruptError(Exception): pass def run_nlin((path_config, path_log, path_nlin, update_method)): try: with open(path_log, "w") as log_: cmdline = [path_nlin, path_config] if update_method: cmdline += [update_method, ] sp.call(cmdline, stdout=log_, stderr=log_) except KeyboardInterrupt: time.sleep(5) raise KeyboardInterruptError() except: raise
path_config是二进制程序的配置文件的路径;例如,有运行该程序的日期。
path_config
当我启动包装器时,一切看起来都很好。但是,当我按时^C,包装脚本似乎numproc在终止之前从池中启动了其他进程。例如,当我在1-10天启动脚本时,在psaux输出中可以看到二进制程序的两个实例正在运行(通常在1天和3天)。现在,当我按时^C,包装脚本退出,第1天和第3天的二进制程序消失了,但是第5天和第7天又有新的二进制程序运行了。
^C
numproc
psaux
因此对我来说,似乎在最终死亡之前Pool启动了另一个numproc进程。
Pool
有什么想法在这里发生什么,我能做些什么?
在此页面上,多处理模块的作者Jesse Noller显示正确的处理方式KeyboardInterrupt是让子流程返回- 不会引发异常。这允许主进程终止池。
KeyboardInterrupt
但是,如下面的代码所示,except KeyboardInterrupt直到运行了由生成的所有任务 之后 ,主流程才到达该块pool.map。这就是为什么(我相信)在按下run_nlin后会看到对辅助函数的额外调用Ctrl-C。
except KeyboardInterrupt
pool.map
run_nlin
Ctrl-C
一种可能的解决方法是,如果multiprocessing.Event设置了a,则对所有工作程序功能进行测试。如果事件已发生,则请工人提早纾困,否则,请继续进行长时间的计算。
multiprocessing.Event
import logging import multiprocessing as mp import time logger = mp.log_to_stderr(logging.WARNING) def worker(x): try: if not terminating.is_set(): logger.warn("Running worker({x!r})".format(x = x)) time.sleep(3) else: logger.warn("got the message... we're terminating!") except KeyboardInterrupt: logger.warn("terminating is set") terminating.set() return x def initializer(terminating_): # This places terminating in the global namespace of the worker subprocesses. # This allows the worker function to access `terminating` even though it is # not passed as an argument to the function. global terminating terminating = terminating_ def main(): terminating = mp.Event() result = [] pool = mp.Pool(initializer=initializer, initargs=(terminating, )) params = range(12) try: logger.warn("starting pool runs") result = pool.map(worker, params) pool.close() except KeyboardInterrupt: logger.warn("^C pressed") pool.terminate() finally: pool.join() logger.warn('done: {r}'.format(r = result)) if __name__ == '__main__': main()
运行脚本将产生:
% test.py [WARNING/MainProcess] starting pool runs [WARNING/PoolWorker-1] Running worker(0) [WARNING/PoolWorker-2] Running worker(1) [WARNING/PoolWorker-3] Running worker(2) [WARNING/PoolWorker-4] Running worker(3)
在这里按Ctrl-C;每个工人都设置terminating事件。我们确实只需要设置它,但是尽管效率很低,但它仍然有效。
terminating
C-c C-c[WARNING/PoolWorker-4] terminating is set [WARNING/PoolWorker-2] terminating is set [WARNING/PoolWorker-3] terminating is set [WARNING/PoolWorker-1] terminating is set
现在,所有其他排队的任务pool.map都将运行:
[WARNING/PoolWorker-4] got the message... we're terminating! [WARNING/PoolWorker-2] got the message... we're terminating! [WARNING/PoolWorker-1] got the message... we're terminating! [WARNING/PoolWorker-2] got the message... we're terminating! [WARNING/PoolWorker-4] got the message... we're terminating! [WARNING/PoolWorker-2] got the message... we're terminating! [WARNING/PoolWorker-1] got the message... we're terminating! [WARNING/PoolWorker-3] got the message... we're terminating!
最终,主要过程到达except KeyboardInterrupt块。
[WARNING/MainProcess] ^C pressed [WARNING/MainProcess] done: []