我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用multiprocessing.pool.apply_async()。
def get_container_id_mapping(pool, compose_cmd): service_names = subprocess.check_output( compose_cmd + ["config", "--services"] ) service_names = service_names.strip().decode("utf-8").split("\n") id_mapping = { name: pool.apply_async(pool_container_id, (name, compose_cmd)) for name in service_names } while not all(future.ready() for future in id_mapping.values()): time.sleep(0.1) for name, future in list(id_mapping.items()): if not future.successful(): raise RuntimeError("Cannot get ID of service {0}".format(name)) id_mapping[name] = future.get() return id_mapping
def timeout(max_timeout): """Timeout decorator, parameter in seconds.""" def timeout_decorator(item): """Wrap the original function.""" @functools.wraps(item) def func_wrapper(*args, **kwargs): """Closure for function.""" pool = multiprocessing.pool.ThreadPool(processes=1) async_result = pool.apply_async(item, args, kwargs) # raises a TimeoutError if execution exceeds max_timeout return async_result.get(max_timeout) return func_wrapper return timeout_decorator
def timeout(max_timeout): """Timeout decorator, parameter in seconds.""" def timeout_decorator(item): """Wrap the original function.""" @functools.wraps(item) def func_wrapper(*args, **kwargs): """Closure for function.""" pool = multiprocessing.pool.ThreadPool(processes=1) async_result = pool.apply_async(item, args, kwargs) # raises a TimeoutError if execution exceeds max_timeout return async_result.get(max_timeout) return func_wrapper return timeout_decorator #??????????????????
def process_main_files(pool, snapshot_dir, compose_cmd, container_ids): pool.apply_async(collect_backup, [snapshot_dir, compose_cmd]) pool.apply_async(collect_docker_info, [snapshot_dir]) pool.apply_async(collect_docker_version, [snapshot_dir]) pool.apply_async( collect_docker_compose_config, [snapshot_dir, compose_cmd]) pool.apply_async(collect_all_logs, [snapshot_dir, compose_cmd]) pool.apply_async(collect_monitoring_results, [snapshot_dir, container_ids["admin"]])
def process_service_files(pool, name, container_id, snapshot_dir, compose_cmd): service_snapshot_dir = os.path.join(snapshot_dir, name) pool.apply_async(collect_service_log, [service_snapshot_dir, name, compose_cmd]) pool.apply_async(collect_service_date, [service_snapshot_dir, name, compose_cmd]) pool.apply_async(collect_service_unix_timestamp, [service_snapshot_dir, name, compose_cmd]) pool.apply_async(collect_service_packages_os, [service_snapshot_dir, name, compose_cmd]) pool.apply_async(collect_service_ps, [service_snapshot_dir, name, compose_cmd]) pool.apply_async(collect_service_docker_inspect, [service_snapshot_dir, name, container_id]) pool.apply_async(collect_service_docker_stats, [service_snapshot_dir, name, container_id]) pool.apply_async(collect_service_config, [service_snapshot_dir, name, container_id]) pool.apply_async(collect_service_git_release, [service_snapshot_dir, name, container_id]) pool.apply_async(collect_service_decapod_release, [service_snapshot_dir, name, container_id]) pool.apply_async(collect_service_packages_npm, [service_snapshot_dir, name, container_id]) pool.apply_async(collect_service_packages_python2, [service_snapshot_dir, name, container_id]) pool.apply_async(collect_service_packages_python3, [service_snapshot_dir, name, container_id]) pool.apply_async(collect_service_ansible_config, [service_snapshot_dir, name, container_id]) pool.apply_async(collect_service_private_key_sha1sum, [service_snapshot_dir, name, compose_cmd])
def timeout(max_timeout): """Timeout decorator, parameter in seconds.""" def timeout_decorator(f): """Wrap the original function.""" @functools.wraps(f) def func_wrapper(self, *args, **kwargs): """Closure for function.""" pool = multiprocessing.pool.ThreadPool(processes=1) async_result = pool.apply_async(f, (self,) + args, kwargs) timeout = kwargs.pop('timeout_max_timeout', max_timeout) or max_timeout # raises a TimeoutError if execution exceeds max_timeout return async_result.get(timeout) return func_wrapper return timeout_decorator
def main(): process_pool_context = multiprocessing.get_context('spawn') pool = multiprocessing.pool.Pool( processes=4, context=process_pool_context, ) pool.apply_async( func=zmq_streamer, ) multiprocessing_manager = multiprocessing.Manager() multiprocessing_queue = multiprocessing_manager.Queue( maxsize=test_queue_size, ) for i in range(test_queue_size): multiprocessing_queue.put(b'1') res = pool.apply_async( func=consume_queue, args=(multiprocessing_queue,), ) res.get() context = zmq.Context() socket = context.socket(zmq.PAIR) res = pool.apply_async( func=consume_zmq_pair, ) time.sleep(1) socket.connect("tcp://localhost:%s" % zmq_port) for i in range(test_queue_size): socket.send(b'1') res.get() socket.close() context = zmq.Context() socket = context.socket(zmq.PUSH) res = pool.apply_async( func=consume_zmq_streamer, ) time.sleep(1) socket.connect("tcp://localhost:%s" % zmq_queue_port_pull) for i in range(test_queue_size): socket.send(b'1') res.wait() socket.close()