我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用asyncio.as_completed()。
def watcher(tasks,delay=False): res = [] for t in asyncio.as_completed(tasks): r = yield from t res.append(r) if delay: # simulate processing delay process_time = random.random() / 10 yield from asyncio.sleep(process_time) #print(res) #assert(sorted(res) == res) if sorted(res) != res: print('FAIL', res) print('------------') else: print('.', end='') sys.stdout.flush()
def test_as_completed_with_unused_timeout(self): def gen(): yield yield 0 yield 0.01 loop = self.new_test_loop(gen) a = asyncio.sleep(0.01, 'a', loop=loop) @asyncio.coroutine def foo(): for f in asyncio.as_completed([a], timeout=1, loop=loop): v = yield from f self.assertEqual(v, 'a') loop.run_until_complete(asyncio.Task(foo(), loop=loop))
def test_as_completed_reverse_wait(self): def gen(): yield 0 yield 0.05 yield 0 loop = self.new_test_loop(gen) a = asyncio.sleep(0.05, 'a', loop=loop) b = asyncio.sleep(0.10, 'b', loop=loop) fs = {a, b} futs = list(asyncio.as_completed(fs, loop=loop)) self.assertEqual(len(futs), 2) x = loop.run_until_complete(futs[1]) self.assertEqual(x, 'a') self.assertAlmostEqual(0.05, loop.time()) loop.advance_time(0.05) y = loop.run_until_complete(futs[0]) self.assertEqual(y, 'b') self.assertAlmostEqual(0.10, loop.time())
def test_as_completed_duplicate_coroutines(self): @asyncio.coroutine def coro(s): return s @asyncio.coroutine def runner(): result = [] c = coro('ham') for f in asyncio.as_completed([c, c, coro('spam')], loop=self.loop): result.append((yield from f)) return result fut = asyncio.Task(runner(), loop=self.loop) self.loop.run_until_complete(fut) result = fut.result() self.assertEqual(set(result), {'ham', 'spam'}) self.assertEqual(len(result), 2)
def test_as_completed_concurrent(self): def gen(): when = yield self.assertAlmostEqual(0.05, when) when = yield 0 self.assertAlmostEqual(0.05, when) yield 0.05 loop = self.new_test_loop(gen) a = asyncio.sleep(0.05, 'a', loop=loop) b = asyncio.sleep(0.05, 'b', loop=loop) fs = {a, b} futs = list(asyncio.as_completed(fs, loop=loop)) self.assertEqual(len(futs), 2) waiter = asyncio.wait(futs, loop=loop) done, pending = loop.run_until_complete(waiter) self.assertEqual(set(f.result() for f in done), {'a', 'b'})
def oc_classify(records, one_codex_api_key, progress=False, stdout=False): oc_auth = aiohttp.BasicAuth(one_codex_api_key) conn = aiohttp.TCPConnector(limit=10) with aiohttp.ClientSession(auth=oc_auth, connector=conn) as oc_session: with aiohttp.ClientSession(connector=conn) as ebi_session: tasks = [classify_taxify(oc_session, ebi_session, r.id, str(r.seq)) for r in records] # No async generators in 3.5... :'( # return [await f for f in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks))] records = [] for f in tqdm.tqdm(asyncio.as_completed(tasks), disable=not progress, total=len(tasks)): response = await f record = build_record(response[0], response[1]) if stdout: print(record.format('fasta'), end='') records.append(record) return records # --------------------------------------------------------------------------------------------------
def fetch_bar(): day = datetime.datetime.strptime('20100416', '%Y%m%d').replace(tzinfo=pytz.FixedOffset(480)) end = datetime.datetime.strptime('20160118', '%Y%m%d').replace(tzinfo=pytz.FixedOffset(480)) tasks = [] while day <= end: tasks.append(is_trading_day(day)) day += datetime.timedelta(days=1) trading_days = [] for f in tqdm(asyncio.as_completed(tasks), total=len(tasks)): rst = await f trading_days.append(rst) tasks.clear() for day, trading in trading_days: if trading: tasks += [ asyncio.ensure_future(update_from_shfe(day)), asyncio.ensure_future(update_from_dce(day)), asyncio.ensure_future(update_from_czce(day)), asyncio.ensure_future(update_from_cffex(day)), ] print('task len=', len(tasks)) for f in tqdm(asyncio.as_completed(tasks), total=len(tasks)): await f
def clean_daily_bar(): day = datetime.datetime.strptime('20100416', '%Y%m%d').replace(tzinfo=pytz.FixedOffset(480)) end = datetime.datetime.strptime('20160118', '%Y%m%d').replace(tzinfo=pytz.FixedOffset(480)) tasks = [] while day <= end: tasks.append(is_trading_day(day)) day += datetime.timedelta(days=1) trading_days = [] for f in tqdm(asyncio.as_completed(tasks), total=len(tasks)): rst = await f trading_days.append(rst) tasks.clear() for day, trading in trading_days: if not trading: DailyBar.objects.filter(time=day.date()).delete() print('done!')
def wait_with_progress(urlList, concurency = 30, timeout = 120, rawResults = False, cloudflare = False, headers = None): sem = asyncio.Semaphore(concurency) # Client session worker headers = headers or {} headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.104 Safari/537.36 vulners.com/bot'}) if cloudflare: sessionClient = CloudflareScraper else: sessionClient = aiohttp.ClientSession urlToResultDict = {} with sessionClient(connector=aiohttp.TCPConnector(verify_ssl=False), headers=headers) as session: coros = [parseUrl(url = d, semaphore = sem, session = session, timeout = timeout, rawResults=rawResults) for d in urlList] for f in tqdm.tqdm(asyncio.as_completed(coros), total=len(coros)): result = yield from f urlToResultDict.update(result) return urlToResultDict
def gather(self, it, *, return_exceptions: bool=False, show_progress: bool=False, **kwargs) -> list: """Execute coroutines concurrently and gather their results. This displays a progress bar in the command line when `show_progress=True`. """ if not show_progress: return await asyncio.gather(*it, loop=self._loop, return_exceptions=return_exceptions) futures = [_intercept_fut(i, f, return_exceptions) for i, f in enumerate(it)] results = [None] * len(futures) with ProgressBar(length=len(futures), **kwargs) as bar: for result in asyncio.as_completed(futures, loop=self._loop): results.__setitem__(*(await result)) bar.update(1) return results
def tiles_urllib(tile_metas: List[HipsTileMeta], hips_survey: HipsSurveyProperties, progress_bar: bool, n_parallel, timeout: float) -> List[HipsTile]: """Generator function to fetch HiPS tiles from a remote URL.""" with concurrent.futures.ThreadPoolExecutor(max_workers=n_parallel) as executor: futures = [] for meta in tile_metas: url = hips_survey.tile_url(meta) future = executor.submit(fetch_tile_urllib, url, meta, timeout) futures.append(future) futures = concurrent.futures.as_completed(futures) if progress_bar: from tqdm import tqdm futures = tqdm(futures, total=len(tile_metas), desc='Fetching tiles') tiles = [] for future in futures: tiles.append(future.result()) return tiles
def fetch_all_tiles_aiohttp(tile_metas: List[HipsTileMeta], hips_survey: HipsSurveyProperties, progress_bar: bool, n_parallel: int, timeout: float) -> List[HipsTile]: """Generator function to fetch HiPS tiles from a remote URL using aiohttp.""" import aiohttp connector = aiohttp.TCPConnector(limit=n_parallel) async with aiohttp.ClientSession(connector=connector) as session: futures = [] for meta in tile_metas: url = hips_survey.tile_url(meta) future = asyncio.ensure_future(fetch_tile_aiohttp(url, meta, session, timeout)) futures.append(future) futures = asyncio.as_completed(futures) if progress_bar: from tqdm import tqdm futures = tqdm(futures, total=len(tile_metas), desc='Fetching tiles') tiles = [] for future in futures: tiles.append(await future) return tiles
def main(): futures = [ asyncio.ensure_future(delayed_value(1)), asyncio.ensure_future(delayed_value(2)), asyncio.ensure_future(delayed_value(3)), asyncio.ensure_future(delayed_value(4)), asyncio.ensure_future(delayed_value(5)), ] for future in asyncio.as_completed(futures): value = await future print(value)
def check_urls(urls, loop): tasks = list() conn = aiohttp.TCPConnector(limit=100, limit_per_host=2, loop=loop) async with aiohttp.ClientSession(connector=conn, read_timeout=300, conn_timeout=10, loop=loop) as session: for metadata in urls: task = fetch(metadata, session) tasks.append(task) responses = dict() for f in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks)): resource_id, url, err, http_last_modified, hash, force_hash = await f responses[resource_id] = (url, err, http_last_modified, hash, force_hash) return responses
def main(num_phases): print('starting main') phases = [ phase(i) for i in range(num_phases) ] print('waiting for phases to complete') results = [] for next_to_complete in asyncio.as_completed(phases): answer = await next_to_complete print('received answer {!r}'.format(answer)) results.append(answer) print('results: {!r}'.format(results)) return results
def test_as_completed_with_timeout(self): def gen(): yield yield 0 yield 0 yield 0.1 loop = self.new_test_loop(gen) a = asyncio.sleep(0.1, 'a', loop=loop) b = asyncio.sleep(0.15, 'b', loop=loop) @asyncio.coroutine def foo(): values = [] for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop): if values: loop.advance_time(0.02) try: v = yield from f values.append((1, v)) except asyncio.TimeoutError as exc: values.append((2, exc)) return values res = loop.run_until_complete(asyncio.Task(foo(), loop=loop)) self.assertEqual(len(res), 2, res) self.assertEqual(res[0], (1, 'a')) self.assertEqual(res[1][0], 2) self.assertIsInstance(res[1][1], asyncio.TimeoutError) self.assertAlmostEqual(0.12, loop.time()) # move forward to close generator loop.advance_time(10) loop.run_until_complete(asyncio.wait([a, b], loop=loop))
def test_as_completed_invalid_args(self): fut = asyncio.Future(loop=self.loop) # as_completed() expects a list of futures, not a future instance self.assertRaises(TypeError, self.loop.run_until_complete, asyncio.as_completed(fut, loop=self.loop)) coro = coroutine_function() self.assertRaises(TypeError, self.loop.run_until_complete, asyncio.as_completed(coro, loop=self.loop)) coro.close()
def downloader_coro(cc_list, base_url, verbose, concur_req): counter = collections.Counter() semaphore = asyncio.Semaphore(concur_req) to_do = [download_one(cc, base_url, semaphore, verbose) for cc in sorted(cc_list)] to_do_iter = asyncio.as_completed(to_do) if not verbose: to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) for future in to_do_iter: try: res = yield from future except FetchError as exc: country_code = exc.country_code try: error_msg = exc.__cause__.args[0] except IndexError: error_msg = exc.__cause__.__class__.__name__ if verbose and error_msg: msg = '*** Error for {}: {}' print(msg.format(country_code, error_msg)) status = HTTPStatus.error else: status = res.status counter[status] += 1 return counter
def downloader_coro(cc_list, base_url, verbose, concur_req): # <1> counter = collections.Counter() semaphore = asyncio.Semaphore(concur_req) # <2> to_do = [download_one(cc, base_url, semaphore, verbose) for cc in sorted(cc_list)] # <3> to_do_iter = asyncio.as_completed(to_do) # <4> if not verbose: to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5> for future in to_do_iter: # <6> try: res = yield from future # <7> except FetchError as exc: # <8> country_code = exc.country_code # <9> try: error_msg = exc.__cause__.args[0] # <10> except IndexError: error_msg = exc.__cause__.__class__.__name__ # <11> if verbose and error_msg: msg = '*** Error for {}: {}' print(msg.format(country_code, error_msg)) status = HTTPStatus.error else: status = res.status counter[status] += 1 # <12> return counter # <13>
def downloader_coro(cc_list, base_url, verbose, concur_req): # <1> counter = collections.Counter() semaphore = asyncio.Semaphore(concur_req) # <2> to_do = [download_one(cc, base_url, semaphore, verbose) for cc in sorted(cc_list)] # <3> to_do_iter = asyncio.as_completed(to_do) # <4> if not verbose: to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5> for future in to_do_iter: # <6> try: res = await future # <7> except FetchError as exc: # <8> country_code = exc.country_code # <9> try: error_msg = exc.__cause__.args[0] # <10> except IndexError: error_msg = exc.__cause__.__class__.__name__ # <11> if verbose and error_msg: msg = '*** Error for {}: {}' print(msg.format(country_code, error_msg)) status = HTTPStatus.error else: status = res.status counter[status] += 1 # <12> return counter # <13>
def downloader_coro(cc_list): to_do = [download_one(cc) for cc in cc_list] results = [] for future in asyncio.as_completed(to_do): print(future) result = yield from future results.append(result) return results
def process_sources_for_status(client, sources): g_status = [] coroutines = [retrieve_status(client, source) for source in sources] for coroutine in asyncio.as_completed(coroutines): status = yield from coroutine g_status.append(status) return sorted(g_status, key=lambda x: x[0].nick)
def process_sources_for_file(client, sources, limit, cache=None): g_tweets = [] coroutines = [retrieve_file(client, source, limit, cache) for source in sources] for coroutine in asyncio.as_completed(coroutines): tweets = yield from coroutine g_tweets.extend(tweets) return sorted(g_tweets, reverse=True)[:limit]
def run(self): self._schedule_next_call() trackings = self._config['trackings'] async with aiohttp.ClientSession() as session: mail = Mailgun(session=session, domain=self._config['mailgun']['domain'], api_key=self._config['mailgun']['api_key']) futures = asyncio.as_completed([self._get_tracking(tracking, session=session) for tracking in trackings]) mails = [] with orm.db_session(): for future in futures: try: tracking_id, status = await future except PostOfficeError as e: print(e) continue print(f'{tracking_id}: {status}') tracking = orm.get(t for t in Tracking if t.id == tracking_id) if not tracking: tracking = Tracking(id=tracking_id) self._db.commit() if tracking.status != status: mails.append(mail.send( from_addr=self._config['mailgun']['from'], to_addrs=self._config['mailgun']['to'], subject=f'Your {self._config["trackings"][tracking_id]} is getting closer', body=f'{tracking_id}: {status}')) tracking.status = status if mails: await asyncio.wait(mails)
def __wait_with_progress(self, coros): with tqdm.tqdm( total=sum([len(feature.scenarios) for feature in self.features]), desc=f'Running {len(self.features)} features and ' f'{sum([len(feature.scenarios) for feature in self.features])} scenarios', ncols=100, bar_format='{desc}{percentage:3.0f}%|{bar}| {elapsed}' ) as progress_bar: for future in asyncio.as_completed(coros): completed_feature = await future progress_bar.update(len(completed_feature.scenarios))
def wait_with_progress(tasks): ''' ??tqdm????? parameters ========== task : int ???? ''' for f in tqdm(asyncio.as_completed(tasks), total=len(tasks)): await f
def fetch_and_map(function, data, MAX_CONNECTIONS = 4, *args, **kwargs): """Asychronously fetch and apply a function to the content of a dictionary of URLs. Results are returned as a dictionary. Args and kwargs are passed to the fetch_page() function to set the HTML request parameters. Input: F = function(), data = {key: (url, params)} Returns: {key: F(fetch_url(url))} """ @asyncio.coroutine def process(semaphore): tasks = [ fetch_page(key, semaphore, url, params = params, *args, **kwargs) for key, (url, params) in data.items() ] res = {} for coroutine in tqdm(asyncio.as_completed(tasks), total = len(tasks)): key, val = yield from coroutine if val is not None: res[key] = function(key, val) else: logger = logging.getLogger(__name__) logging.warn("Could not process {}:{}".format(key, data[key])) return res # using asyncio.get_event_loop() means it grabs the main event loop, which, # when closed, stops all other event loops from working loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) semaphore = asyncio.Semaphore(MAX_CONNECTIONS, loop = loop) res = loop.run_until_complete(process(semaphore)) loop.close() return res
def ape_as_completed(): for p, hs, rs in ape.fire_by_priority('arg1'): for f in asyncio.as_completed(rs): r = loop.run_until_complete(f) yield p, f, r
def as_completed(self): # as_completed requires a list of futures, not a generator fs = list(self) # returns an iterator of futures that will only resolve once iterator = asyncio.as_completed(fs) # Return a new instance of this class so you can piggy back .gather or .wait and such return self.__class__(iterator)
def progress(self, coros): for obj in tqdm.tqdm(asyncio.as_completed(coros), total=len(coros)): yield from obj
def test_as_completed(self): def gen(): yield 0 yield 0 yield 0.01 yield 0 loop = self.new_test_loop(gen) # disable "slow callback" warning loop.slow_callback_duration = 1.0 completed = set() time_shifted = False @asyncio.coroutine def sleeper(dt, x): nonlocal time_shifted yield from asyncio.sleep(dt, loop=loop) completed.add(x) if not time_shifted and 'a' in completed and 'b' in completed: time_shifted = True loop.advance_time(0.14) return x a = sleeper(0.01, 'a') b = sleeper(0.01, 'b') c = sleeper(0.15, 'c') @asyncio.coroutine def foo(): values = [] for f in asyncio.as_completed([b, c, a], loop=loop): values.append((yield from f)) return values res = loop.run_until_complete(asyncio.Task(foo(), loop=loop)) self.assertAlmostEqual(0.15, loop.time()) self.assertTrue('a' in res[:2]) self.assertTrue('b' in res[:2]) self.assertEqual(res[2], 'c') # Doing it again should take no time and exercise a different path. res = loop.run_until_complete(asyncio.Task(foo(), loop=loop)) self.assertAlmostEqual(0.15, loop.time())
def bulk_run(self, device_to_commands, timeout, open_timeout, client_ip, client_port): if ((len(device_to_commands) < self.LB_THRESHOLD) and (self._bulk_session_count < self.BULK_SESSION_LIMIT)): # Run these command locally. self.incrementCounter('bulk_run.local') return await self.bulk_run_local(device_to_commands, timeout, open_timeout, client_ip, client_port) async def _remote_task(chunk): # Run the chunk of commands on remote instance self.incrementCounter('bulk_run.remote') retry_count = 0 while True: try: return await self._bulk_run_remote( chunk, timeout, open_timeout, client_ip, client_port) except ttypes.InstanceOverloaded as ioe: # Instance we ran the call on was overloaded. We can retry # the command again, hopefully on a different instance self.incrementCounter('bulk_run.remote.overload_error') self.logger.error("Instance Overloaded: %d: %s", retry_count, ioe) if retry_count > self.BULK_RETRY_LIMIT: # Fail the calls return self._bulk_failure(chunk, str(ioe)) # Stagger the retries delay = random.uniform(self.BULK_RETRY_DELAY_MIN, self.BULK_RETRY_DELAY_MAX) await asyncio.sleep(delay) retry_count += 1 # Split the request into chunks and run them on remote hosts tasks = [_remote_task(chunk) for chunk in self._chunked_dict(device_to_commands, self.LB_THRESHOLD)] all_results = {} for task in asyncio.as_completed(tasks, loop=self.loop): result = await task all_results.update(result) return all_results