我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用asyncio.wait()。
def repos_fetch(config, repos): """ Fetches the list of repositories to the kas_work_dir. """ tasks = [] for repo in repos: if not hasattr(asyncio, 'ensure_future'): # pylint: disable=no-member,deprecated-method task = asyncio.async(_repo_fetch_async(config, repo)) else: task = asyncio.ensure_future(_repo_fetch_async(config, repo)) tasks.append(task) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) for task in tasks: if task.result(): sys.exit(task.result())
def stop(self): if self.renderer.running: tasks = [] if self.task is not None and not self.task.done(): self.task.cancel() tasks.append(self.task) if self.waiter is not None and not self.waiter.done(): self.waiter.cancel() tasks.append(self.waiter) await self.renderer._stop() if len(tasks) > 0: await asyncio.wait(tasks, return_when=futures.ALL_COMPLETED) self.renderer.finish(self._frame)
def _dequeue(self, r_idx: int): """ Gather completed layers from the renderers. If nothing is available, keep the last layer (in case the renderers are producing output at different rates). Yields until at least one layer is ready. """ if not self.running or r_idx >= len(self.layers): return layer = self.layers[r_idx] renderer = layer.renderer # wait for a buffer buf = await renderer._active_q.get() # return the old buffer to the renderer if layer.active_buf is not None: renderer._free_layer(layer.active_buf) # put it on the active list layer.active_buf = buf
def _stop(self): """ Stop this AnimationLoop Shuts down the loop and triggers cleanup tasks. """ if not self.running: return False self.running = False for layer in self.layers[::-1]: await self.remove_layer(layer) if self._anim_task is not None and not self._anim_task.done(): self._anim_task.cancel() await asyncio.wait([self._anim_task], return_when=futures.ALL_COMPLETED) self._logger.info("AnimationLoop stopped")
def _close_input_devices(self): if not hasattr(self, '_opened') or not self._opened: return self._opened = False for event_device in self._event_devices: asyncio.get_event_loop().remove_reader(event_device.fileno()) event_device.close() tasks = [] for task in self._tasks: if not task.done(): task.cancel() tasks.append(task) await asyncio.wait(tasks, return_when=futures.ALL_COMPLETED) self._event_devices.clear()
def step(client: object, agents: list, timeout: int, loop: BaseEventLoop): tasks = [] for agent, agent_cfg in agents: tags = agent_cfg.tags def event_fn(**kwargs): if "tags" in kwargs: for tag in tags: kwargs["tags"].append(tag) else: kwargs["tags"] = tags if "time" not in kwargs: kwargs["time"] = int(time()) client.event(**kwargs) tasks.append(agent.process(event_fn)) return await asyncio.wait(tasks, timeout=timeout)
def reload(self): """Wait for reload events and reload config when event set.""" while 1: try: yield from self.reload_event.wait() self.log.info("Reloading configuration...") except asyncio.CancelledError: return finally: self.reload_event.clear() try: config = Config(self, self.filename, self.verbose) self.log.debug("New config instance %s" % config) yield from config.validate() self.config = config self.log.info("Done") except asyncio.CancelledError: return except Exception: self.log.exception("Error loading new config")
def run_blocking_tasks(executor): log = logging.getLogger('run_blocking_tasks') log.info('starting') log.info('creating executor tasks') loop = asyncio.get_event_loop() blocking_tasks = [ loop.run_in_executor(executor, blocks, i) for i in range(6) ] log.info('waiting for executor tasks') completed, pending = await asyncio.wait(blocking_tasks) results = [t.result() for t in completed] log.info('results: {!r}'.format(results)) log.info('exiting')
def run_blocking_tasks(executor): log = logging.getLogger('run_blocking_tasks') log.info('starting') log.info('creating executor tasks') loop = asyncio.get_event_loop() blocking_tasks = [ loop.run_in_executor(executor, blocks, i) for i in range(6) ] log.info('waiting for executor tasks') completed, pending = await asyncio.wait(blocking_tasks) results = [t.result() for t in completed] log.info('results: {!r}'.format(results)) log.info('exiting') # changes from asyncio_executor_thread.py
def main(num_phases): print('starting main') phases = [ phase(i) for i in range(num_phases) ] print('waiting 0.1 for phases to complete') completed, pending = await asyncio.wait(phases, timeout=0.1) print('{} completed and {} pending'.format( len(completed), len(pending), )) # Cancel remaining tasks so they do not generate errors # as we exit without finishing them. if pending: print('canceling tasks') for t in pending: t.cancel() print('exiting main')
def test_readline(self): # Read one line. 'readline' will need to wait for the data # to come from 'cb' stream = asyncio.StreamReader(loop=self.loop) stream.feed_data(b'chunk1 ') read_task = asyncio.Task(stream.readline(), loop=self.loop) def cb(): stream.feed_data(b'chunk2 ') stream.feed_data(b'chunk3 ') stream.feed_data(b'\n chunk4') self.loop.call_soon(cb) line = self.loop.run_until_complete(read_task) self.assertEqual(b'chunk1 chunk2 chunk3 \n', line) self.assertEqual(b' chunk4', stream._buffer)
def test_as_completed_concurrent(self): def gen(): when = yield self.assertAlmostEqual(0.05, when) when = yield 0 self.assertAlmostEqual(0.05, when) yield 0.05 loop = self.new_test_loop(gen) a = asyncio.sleep(0.05, 'a', loop=loop) b = asyncio.sleep(0.05, 'b', loop=loop) fs = {a, b} futs = list(asyncio.as_completed(fs, loop=loop)) self.assertEqual(len(futs), 2) waiter = asyncio.wait(futs, loop=loop) done, pending = loop.run_until_complete(waiter) self.assertEqual(set(f.result() for f in done), {'a', 'b'})
def test_stream_cancel(event_loop): async def cancel(task): await asyncio.sleep(0.001) task.cancel() async def test_stream_iterations(stream): while True: await test_stream_iteration(stream) with aiohttp.ClientSession(loop=event_loop) as session: client = peony.client.BasePeonyClient("", "", session=session) context = peony.stream.StreamResponse(method='GET', url="http://whatever.com", client=client) with context as stream: with patch.object(stream, '_connect', side_effect=stream_content): coro = test_stream_iterations(stream) task = event_loop.create_task(coro) cancel_task = event_loop.create_task(cancel(task)) with aiohttp.Timeout(1): await asyncio.wait([task, cancel_task])
def test_large_limit(self, event_loop, test_db): await test_db.test_large_limit.create_index([('x', 1)]) my_str = 'mongomongo' * 1000 jobs = [] for i in range(2000): doc = {'x': i, 'y': my_str} jobs.append(test_db.test_large_limit.insert_one(doc)) done, _ = await asyncio.wait(jobs, loop=event_loop) assert all(ft.exception() is None for ft in done) i = 0 y = 0 async with test_db.test_large_limit.find(limit=1900).sort([('x', 1)]) as cursor: async for doc in cursor: i += 1 y += doc['x'] assert 1900 == i assert (1900 * 1899) / 2 == y
def daemon(): try_init_cgroup() async with VJ4Session(config['server_url']) as session: while True: try: await session.login_if_needed(config['uname'], config['password']) done, pending = await wait([do_judge(session), do_noop(session)], return_when=FIRST_COMPLETED) for task in pending: task.cancel() await gather(*done) except Exception as e: logger.exception(e) logger.info('Retrying after %d seconds', RETRY_DELAY_SEC) await sleep(RETRY_DELAY_SEC)
def test_pipe_to_log(context, event_loop): cmd = r""">&2 echo "foo" && echo "bar" && exit 0""" proc = event_loop.run_until_complete( asyncio.create_subprocess_exec( "bash", "-c", cmd, stdout=PIPE, stderr=PIPE, stdin=None ) ) tasks = [] with swlog.get_log_filehandle(context) as log_fh: tasks.append(swlog.pipe_to_log(proc.stderr, filehandles=[log_fh])) tasks.append(swlog.pipe_to_log(proc.stdout, filehandles=[log_fh])) event_loop.run_until_complete(asyncio.wait(tasks)) event_loop.run_until_complete(proc.wait()) log_file = swlog.get_log_filename(context) assert read(log_file) in ("foo\nbar\n", "bar\nfoo\n")
def launch_second_instances(): temp_dir = sys.argv[1] if not os.path.exists(temp_dir): os.makedirs(temp_dir) job1 = subprocess.Popen( [sys.executable, __file__, os.path.join(temp_dir, "one"), os.path.join(temp_dir, "two"), os.path.join(temp_dir, "three")], ) loop = asyncio.get_event_loop() job2 = asyncio.create_subprocess_exec( sys.executable, __file__, os.path.join(temp_dir, "four"), os.path.join(temp_dir, "five"), os.path.join(temp_dir, "six"), ) loop.run_until_complete(job2) job1.wait()
def broadcast(self, event_type, data=None, enable_serial=True): for ws in self.connected: if enable_serial: self.connected[ws]["s"] += 1 event = self.generate_event(event_type, data, self.connected[ws]["s"]) # Some event like heartbeat does not need serial. else: event = self.generate_event(event_type, data) await ws.send(event) del event # await asyncio.wait([ws.send(event) for ws in self.connected]) # Generate the heartbeat to keep the websocket # connection alive. # TODO: close connection if client doesn't reply to heartbeat
def token_sender(self): while not self.stopped: self.logger.info("???????55??????????") start = datetime.now() tasks = list() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) for symbol in self.websockets.keys(): ws = self.websockets[symbol]["ws"] if ws.open: tasks.append( ws.send("*" + self.websockets[symbol]["token"])) if len(tasks) > 0: loop.run_until_complete(asyncio.wait(tasks)) loop.close() self.logger.info( "????????. ???%s" % (datetime.now() - start).total_seconds() ) time.sleep(55) # ????????token
def token_renewer(self): while not self.stopped: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) tasks = list() for symbol in self.websockets.keys(): ws = self.websockets[symbol]["ws"] if ws.open: if ( datetime.now() - self.websockets[symbol]["renewed"] ).total_seconds() > 180: tasks.append(self.renew_token(symbol)) if len(tasks) > 0: loop.run_until_complete(asyncio.wait(tasks)) loop.close() time.sleep(1)
def read(self, callback: Callable[[bytearray], Any]): try: for downloader in self._downloaders: # Wait until downloader is not in a downloaded/cancelled state. async with self._state_condition: while downloader.state not in (DOWNLOADED, CANCELLED): await self._state_condition.wait() if downloader.state != DOWNLOADED: self._debug('Downloader not in `DOWNLOADED` state, but in `{!s}`.'.format(downloader.state)) raise CancelledError() # Open file and send all its bytes it to back. await read_from_file_by_chunks(downloader.buffer_file_path, callback, self._chunk_size, lambda: self._state != CANCELLED, loop=self._loop) except Exception as exc: raise ReadError(exc)
def token_sender(self): while not self.stopped: self.logger.debug("???????55??????????") start = datetime.now() tasks = list() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) for symbol in self.websockets.keys(): ws = self.websockets[symbol]["ws"] if ws.open: tasks.append( ws.send("*" + self.websockets[symbol]["token"])) if len(tasks) > 0: loop.run_until_complete(asyncio.wait(tasks)) loop.close() self.logger.debug( "????????. ???%s" % (datetime.now() - start).total_seconds() ) time.sleep(55) # ????????token
def emit(self, event, data, namespace, room=None, skip_sid=None, callback=None, **kwargs): """Emit a message to a single client, a room, or all the clients connected to the namespace. Note: this method is a coroutine. """ if namespace not in self.rooms or room not in self.rooms[namespace]: return tasks = [] for sid in self.get_participants(namespace, room): if sid != skip_sid: if callback is not None: id = self._generate_ack_id(sid, namespace, callback) else: id = None tasks.append(self.server._emit_internal(sid, event, data, namespace, id)) await asyncio.wait(tasks)
def trigger(self, data, *args, **kwargs): """ Triggers all activities connected to his port. :param data: The data being transmitted. :param args: Random stuff :param kwargs: More random stuff """ # ctlog.debug("InputPort.trigger({})".format(data)) # Only transmit the data if there are activities connected to this port. if len(self._activities): futures = [None] * len(self._activities) for i, activity in enumerate(self._activities): futures[i] = activity.trigger([self], {self.attribute_name: data}, self.parent_object, *args, **kwargs) try: # TODO: This will stop calling modules as soon as one raises an exception. Figure out a way to handle # exceptions individually for each future. await wait_for(shield(wait(futures)), self.time_out) except Exception as e: print(self.channel_name, self.time_out) self.parent_object.root.handle_exception(sys.exc_info())
def package_handler(self): """ Co-routine that handles the packages coming out of the decoder. """ dlog.debug(self.log_prefix + "Package handler started") while self.running: decoder_future = asyncio.ensure_future(self.decoder.get()) await asyncio.wait((decoder_future, self.closing_semaphore.acquire()), return_when=asyncio.FIRST_COMPLETED) if decoder_future.done(): package = decoder_future.result() dlog.debug(self.log_prefix + "Received '{}({})'".format( self.__command_dictionary[package[0]].func.__name__, package[1] )) await self.__command_dictionary[package[0]](*package[1]) else: break dlog.debug(self.log_prefix + "Package handler stopped")
def run(self): """ ????? :return: """ print('???????') try: count = self.redis.count() print('????', count, '???') for i in range(0, count, BATCH_TEST_SIZE): start = i stop = min(i + BATCH_TEST_SIZE, count) print('?????', start + 1, '-', stop, '???') test_proxies = self.redis.batch(start, stop) loop = asyncio.get_event_loop() tasks = [self.test_single_proxy(proxy) for proxy in test_proxies] loop.run_until_complete(asyncio.wait(tasks)) sys.stdout.flush() time.sleep(5) except Exception as e: print('???????', e.args)
def websocket_creator(self): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # ??????????? symbol_list = self.symbols # Cut symbol_list weight = (len(self.query) + 1) if ('transaction' in self.query) else len(self.query) step = int(64 / weight) symbol_list_slice = [symbol_list[i: i + step] for i in range(0, len(symbol_list), step)] tasks = list() for symbol_list in symbol_list_slice: qlist = '' for symbol in symbol_list: qlist = self.generate_qlist(qlist=qlist, symbol=symbol) qlist = qlist.lower() tasks.append(self.create_ws(qlist, symbol_list=symbol_list)) loop.run_until_complete(asyncio.wait(tasks)) loop.close() # ??????????
def token_sender(self): while True: self.logger.info("???????55??????????") start = datetime.now() tasks = list() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) for symbol in self.websockets.keys(): ws = self.websockets[symbol]["ws"] if ws.open: tasks.append( ws.send("*" + self.websockets[symbol]["token"])) if len(tasks) > 0: loop.run_until_complete(asyncio.wait(tasks)) loop.close() self.logger.info( "????????. ???%s" % (datetime.now() - start).total_seconds() ) time.sleep(55) # ????????token
def token_renewer(self): while True: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) tasks = list() for symbol in self.websockets.keys(): ws = self.websockets[symbol]["ws"] if ws.open: if ( datetime.now() - self.websockets[symbol]["renewed"] ).total_seconds() > 180: tasks.append(self.renew_token(symbol)) if len(tasks) > 0: loop.run_until_complete(asyncio.wait(tasks)) loop.close() time.sleep(1)
def token_sender(self): while True: self.logger.info("???????55??????????") start = datetime.now() tasks = list() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) for symbol in self.websockets.keys(): ws = self.websockets[ symbol ]["ws"] if ws.open: tasks.append( ws.send("*"+self.websockets[symbol]["token"]) ) if len(tasks)>0: loop.run_until_complete( asyncio.wait(tasks) ) loop.close() self.logger.info("????????. ???%s" % (datetime.now()-start).total_seconds() ) time.sleep(55) # ????????token
def token_renewer(self): while True: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) tasks = list() for symbol in self.websockets.keys(): ws = self.websockets[ symbol ]["ws"] if ws.open: if (datetime.now()-self.websockets[ symbol ]["renewed"]).total_seconds()>180: tasks.append( self.renew_token( symbol ) ) if len(tasks)>0: loop.run_until_complete( asyncio.wait(tasks) ) loop.close() time.sleep(1) # gc.collect()
def convert(self, in_resource, out_resource): with in_resource.cache_open('r') as fd: parsed = json.load(fd) files = parsed['files'].items() # Download all files with the same client session self._new_aiohttp_client() self.file_descriptors = { relpath: out_resource.cache_open_as_dir(relpath, 'wb') for relpath, file_url in files } await asyncio.wait([ self._download_async(file_url, self.file_descriptors[relpath]) for relpath, file_url in files ]) self._close()
def _read(self, size=-1): remaining_size = size end_time = time() + self.read_timeout payload = [] while remaining_size and (time() < end_time): remaining_time = end_time - time() done, pending = await asyncio.wait([self.reader.read(remaining_size)], timeout=remaining_time, loop=self.loop) if done: chunk = done.pop().result() payload.append(chunk) remaining_size -= len(chunk) if pending: pending.pop().cancel() if remaining_size: raise exc.UnfinishedRead return b''.join(payload)
def close(self, timeout): if self.closing: return if self.pending or (self.pending_specials and self.pending_specials != {None: []}): log.warning('Pendings: {}; specials: {}'.format(self.pending, self.pending_specials)) self.closing = True try: # await list(pending_with_timeouts) self.abort(exception=exc.TimeoutError) # wlist = list(self.drain_all_pending()) # log.warn('Wait for list: {} {}'.format(wlist, self.pending)) # if len(wlist) > 0: # await asyncio.wait(wlist, timeout=timeout) except asyncio.TimeoutError: log.warn('ABORT Timeout') await self.abort(exception=exc.TimeoutError) except Exception as e: log.exception('in close: {}'.format(e)) raise e finally: log.debug('Closing writer') self.writer.close() log.debug('Writer closed')
def test_watch(self): data = [] ready = asyncio.Event() test_data = b'test' * 1000 async def data_callback(d): data.append(d) ready.set() watcher = self.c.recipes.DataWatcher() watcher.set_client(self.c) watcher.add_callback(self.path, data_callback) assert data == [] await self.c.set_data(self.path, test_data) await asyncio.wait([ready.wait()], timeout=0.1) assert ready.is_set() assert data == [test_data]
def await_until_closing(self, coro): """ Wait for some task to complete but aborts as soon asthe instance is being closed. :param coro: The coroutine or future-like object to wait for. """ wait_task = asyncio.ensure_future(self.wait_closing(), loop=self.loop) coro_task = asyncio.ensure_future(coro, loop=self.loop) try: done, pending = await asyncio.wait( [wait_task, coro_task], return_when=asyncio.FIRST_COMPLETED, loop=self.loop, ) finally: wait_task.cancel() coro_task.cancel() # It could be that the previous instructions cancelled coro_task if it # wasn't done yet. return await coro_task
def on_run(self): try: while True: await asyncio.wait_for( self.revive_event.wait(), timeout=self.timeout, loop=self.loop, ) self.revive_event.clear() except asyncio.TimeoutError: try: if asyncio.iscoroutinefunction(self.callback): await self.callback() else: self.callback() except Exception: logger.exception("Error in timeout callback execution.")
def on_run(self): while True: try: await asyncio.wait_for( self.reset_event.wait(), timeout=self.period, loop=self.loop, ) except asyncio.TimeoutError: try: if asyncio.iscoroutinefunction(self.callback): await self.callback() else: self.callback() except Exception: logger.exception("Error in timer callback execution.") else: self.reset_event.clear()
def run_forever(self, print=print, **kwargs): loop = self.loop print("======== Running aioworkers ========\n" "(Press CTRL+C to quit)") try: if self.on_startup: on_startup = [coro(self) for coro in self.on_startup] loop.run_until_complete(asyncio.wait(on_startup)) loop.run_forever() except KeyboardInterrupt: pass finally: if self.on_shutdown: on_shutdown = [coro(self) for coro in self.on_shutdown] loop.run_until_complete(asyncio.wait(on_shutdown)) loop.close()
def play_content(self, content, content_type='audio/mpeg', port=0): " play " address = _get_ipaddress() # create a listening port sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('', port)) sock.listen(1) port = sock.getsockname()[1] uri = 'http://{}:{}/dummy.mp3'.format(address, port) # http server http_server = self._loop.create_server( lambda: PlayContentServer(content, content_type, verbose=self._verbose), sock=sock) # play request play_uri = self._loop.create_task(self._play_uri(uri)) yield from asyncio.wait([http_server, play_uri])
def ssh_cleanup_agent(config): """ Removes the identities and stop the ssh-agent instance """ # remove the identities process = Popen(['ssh-add', '-D'], env=config.environ) process.wait() if process.returncode != 0: logging.error('failed to delete SSH identities') # stop the ssh-agent process = Popen(['ssh-agent', '-k'], env=config.environ) process.wait() if process.returncode != 0: logging.error('failed to stop SSH agent')
def close(self): if not self.ws: return self.monitor.close_called.set() await self._pinger_task.stopped.wait() await self._receiver_task.stopped.wait() await self.ws.close() self.ws = None
def run_with_interrupt(task, event, loop=None): """ Awaits a task while allowing it to be interrupted by an `asyncio.Event`. If the task finishes without the event becoming set, the results of the task will be returned. If the event becomes set, the task will be cancelled ``None`` will be returned. :param task: Task to run :param event: An `asyncio.Event` which, if set, will interrupt `task` and cause it to be cancelled. :param loop: Optional event loop to use other than the default. """ loop = loop or asyncio.get_event_loop() event_task = loop.create_task(event.wait()) done, pending = await asyncio.wait([task, event_task], loop=loop, return_when=asyncio.FIRST_COMPLETED) for f in pending: f.cancel() exception = [f.exception() for f in done if f is not event_task and f.exception()] if exception: raise exception[0] result = [f.result() for f in done if f is not event_task] if result: return result[0] else: return None
def run(*steps): """ Helper to run one or more async functions synchronously, with graceful handling of SIGINT / Ctrl-C. Returns the return value of the last function. """ if not steps: return task = None run._sigint = False # function attr to allow setting from closure loop = asyncio.get_event_loop() def abort(): task.cancel() run._sigint = True added = False try: loop.add_signal_handler(signal.SIGINT, abort) added = True except (ValueError, OSError, RuntimeError) as e: # add_signal_handler doesn't work in a thread if 'main thread' not in str(e): raise try: for step in steps: task = loop.create_task(step) loop.run_until_complete(asyncio.wait([task], loop=loop)) if run._sigint: raise KeyboardInterrupt() if task.exception(): raise task.exception() return task.result() finally: if added: loop.remove_signal_handler(signal.SIGINT)
def each(iterable, url_map=None, download=None): """For each iterable object, map it to a URL and request asynchronously :param iterable: an iterable object (ex. list of objects) :type iterable: iterable object :param url_map: (optional) callable object mapping an object to a url or bundle :type url_map: callable object :param download: (optional) your own customized download object :type download: :class:`AioDownload` :return: generator """ download = download or AioDownload() url_map = url_map or (lambda x: str(x)) tasks = [] for i in iterable: bundle = url_map(i) if not isinstance(bundle, AioDownloadBundle): bundle = AioDownloadBundle(bundle) if i != bundle.url: bundle.info = i tasks.append(download.loop.create_task(download.main(bundle))) for task_set in download.loop.run_until_complete(asyncio.wait(tasks)): for task in task_set: yield task.result() download.client.close()
def runner(argv, timeout=0): """ Run the input command-line executable (specified in a Popen-style list) and return its exit code. Optionally specify a timeout. If timeout is 0 or None, simply wait until the process is done. """ def stringify(xs): return map(str, xs) argv = list(stringify(argv)) proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False) t0 = time.time() while True: exit_code = proc.poll() if exit_code is None: # Process is still running if timeout > 0 and time.time() - t0 >= timeout: proc.kill() stdout = proc.stdout.read() stderr = proc.stderr.read() raise subprocess.TimeoutExpired(cmd=' '.join(argv), timeout=timeout, output=stdout, stderr=stderr) else: return proc.returncode # time.sleep(1) <-- BAD idea await asyncio.sleep(.1)
def runner(argv, timeout=0): """ Run the input command-line executable (specified in a Popen-style list) and return its exit code. Optionally specify a timeout. If timeout is 0 or None, simply wait until the process is done. """ def stringify(xs): return map(str, xs) argv = list(stringify(argv)) proc = await asyncio.create_subprocess_exec(*argv, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) if timeout <= 0: timeout = None await asyncio.wait_for(proc.wait(), timeout=timeout) return proc.returncode
def handler(websocket, path): global connected print('{} new connection from {}'.format(time.time(),websocket.remote_address[0])) connected.add(websocket) consumer_task = asyncio.ensure_future(consumer_handler(websocket)) producer_task = asyncio.ensure_future(producer_handler(websocket)) done, pending = await asyncio.wait( [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED, ) for task in pending: task.cancel()