我想使用multiprocessing.Pool,但是multiprocessing.Pool不能在超时后中止任务。我找到了解决方案,并对其进行了一些修改。
from multiprocessing import util, Pool, TimeoutError from multiprocessing.dummy import Pool as ThreadPool import threading import sys from functools import partial import time def worker(y): print("worker sleep {} sec, thread: {}".format(y, threading.current_thread())) start = time.time() while True: if time.time() - start >= y: break time.sleep(0.5) # show work progress print(y) return y def collect_my_result(result): print("Got result {}".format(result)) def abortable_worker(func, *args, **kwargs): timeout = kwargs.get('timeout', None) p = ThreadPool(1) res = p.apply_async(func, args=args) try: # Wait timeout seconds for func to complete. out = res.get(timeout) except TimeoutError: print("Aborting due to timeout {}".format(args[1])) # kill worker itself when get TimeoutError sys.exit(1) else: return out def empty_func(): pass if __name__ == "__main__": TIMEOUT = 4 util.log_to_stderr(util.DEBUG) pool = Pool(processes=4) # k - time to job sleep featureClass = [(k,) for k in range(20, 0, -1)] # list of arguments for f in featureClass: # check available worker pool.apply(empty_func) # run job with timeout abortable_func = partial(abortable_worker, worker, timeout=TIMEOUT) pool.apply_async(abortable_func, args=f, callback=collect_my_result) time.sleep(TIMEOUT) pool.terminate() print("exit")
主要修改-使用 sys.exit(1) 退出工作进程。它杀死了工作进程并杀死了工作线程,但是我不确定这个解决方案是否很好。当进程因正在运行的作业而终止时,我会遇到哪些潜在的问题?
停止正在运行的作业没有隐含的风险,操作系统将负责正确终止进程。
如果您的工作是在文件上进行写操作,则磁盘上可能会有很多被截断的文件。
如果您在数据库上编写或与某个远程进程连接,则也可能会出现一些小问题。
但是,Python标准池不支持超时,并且突然终止进程可能会导致应用程序内部出现异常行为。
Pebble处理池确实支持超时任务。
from pebble import process, TimeoutError with process.Pool() as pool: task = pool.schedule(function, args=[1,2], timeout=5) try: result = task.get() except TimeoutError: print "Task: %s took more than 5 seconds to complete" % task