我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用asyncio.BaseEventLoop()。
def step(client: object, agents: list, timeout: int, loop: BaseEventLoop): tasks = [] for agent, agent_cfg in agents: tags = agent_cfg.tags def event_fn(**kwargs): if "tags" in kwargs: for tag in tags: kwargs["tags"].append(tag) else: kwargs["tags"] = tags if "time" not in kwargs: kwargs["time"] = int(time()) client.event(**kwargs) tasks.append(agent.process(event_fn)) return await asyncio.wait(tasks, timeout=timeout)
def __init__(self, loop: asyncio.BaseEventLoop = None, **config): if loop is None: try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # TOOD: say in the docs that we take ownership of the loop, we close it # ourselves in run() self.loop = loop self.config = dict(self.DEFAULTS, **config) self.encoding = self.config['encoding'] self.registry = registry.Registry(self.config) self.queue = asyncio.Queue(loop=self.loop) asyncio.ensure_future(self._process_queue(), loop=self.loop)
def _record_perf_async( loop: asyncio.BaseEventLoop, event: str, message: str) -> None: """Record timing metric async :param asyncio.BaseEventLoop loop: event loop :param str event: event :param str message: message """ if not _RECORD_PERF: return proc = await asyncio.subprocess.create_subprocess_shell( './perf.py cascade {ev} --prefix {pr} --message "{msg}"'.format( ev=event, pr=_PREFIX, msg=message), loop=loop) await proc.wait() if proc.returncode != 0: logger.error( 'could not record perf to storage for event: {}'.format(event))
def run(coro, loop=None): """ Convenient shortcut alias to ``loop.run_until_complete``. Arguments: coro (coroutine): coroutine object to schedule. loop (asyncio.BaseEventLoop): optional event loop to use. Defaults to: ``asyncio.get_event_loop()``. Returns: mixed: returned value by coroutine. Usage:: async def mul_2(num): return num * 2 paco.run(mul_2(4)) # => 8 """ loop = loop or asyncio.get_event_loop() return loop.run_until_complete(coro)
def __init__(self, *, config: Config, loop: asyncio.BaseEventLoop=_init_loop(), show_progress: bool=True): self.config = config self.show_progress = show_progress db_engine = create_engine(f'sqlite:///{config.config_dir/config.db_name}') ModelBase.metadata.create_all(db_engine) self.db = sessionmaker(bind=db_engine)() self.client = ClientSession(connector=TCPConnector(limit_per_host=10, loop=loop), headers={'User-Agent': _UA_STRING}, loop=loop) self.resolvers = {n: r(manager=self) for n, r in BaseResolver.__members__.items()} self.runner = _Runner(self) self._loop = loop self._resolver_lock = asyncio.Lock(loop=loop) self._tpes = [ThreadPoolExecutor(max_workers=1), ThreadPoolExecutor(max_workers=1)]
def main_loop(cfg: Config, logger: Logger, transport_cls: Generic[T], continue_fn: callable, loop: BaseEventLoop): riemann = cfg.riemann transport = transport_cls(riemann.host, riemann.port) client = processor.QClient(transport) agents = create_agents(cfg.agents) register_augments(client, cfg.augments, logger) executor = cfg.executor_class(max_workers=cfg.executors_count) loop.set_default_executor(executor) init(agents) while True: ts = time() (done, pending) = await step(client, agents, timeout=cfg.interval * 1.5, loop=loop) te = time() td = te - ts instrumentation(client, logger, cfg.interval, td, len(client.queue.events), len(pending)) await processor.flush(client, transport, logger) if continue_fn(): await asyncio.sleep(cfg.interval - int(td), loop=loop) else: logger.info("Stopping Oshino") break client.on_stop()
def test_event_loop_creation(self): assert isinstance(create_loop(), asyncio.BaseEventLoop)
def __init__(self, dsn: str, loop: asyncio.BaseEventLoop): self.dsn = dsn self.loop = loop self._launcher_tasks = None self._launcher_topics = None
def create_timer(cb: Callable[[float], None], interval: float, delay_policy: TimerDelayPolicy=TimerDelayPolicy.DEFAULT, loop: Optional[asyncio.BaseEventLoop]=None) -> asyncio.Task: ''' Schedule a timer with the given callable and the interval in seconds. The interval value is also passed to the callable. If the callable takes longer than the timer interval, all accumulated callable's tasks will be cancelled when the timer is cancelled. Args: cb: TODO - fill argument descriptions Returns: You can stop the timer by cancelling the returned task. ''' if not loop: loop = asyncio.get_event_loop() async def _timer(): fired_tasks = [] try: while True: if delay_policy == TimerDelayPolicy.CANCEL: for t in fired_tasks: if not t.done(): t.cancel() await t fired_tasks.clear() else: fired_tasks[:] = [t for t in fired_tasks if not t.done()] t = loop.create_task(cb(interval=interval)) fired_tasks.append(t) await asyncio.sleep(interval) except asyncio.CancelledError: for t in fired_tasks: t.cancel() await asyncio.gather(*fired_tasks) return loop.create_task(_timer())
def __init__(self, source: LIRCClient, loop: asyncio.BaseEventLoop=None): # Instance attributes self._source = source self._loop = loop or asyncio.get_event_loop() self._current_command_string = None self._current_command_repetition = 0
def __init__(self, *, loop): self.loop = loop try: _BaseEventLoop = asyncio.BaseEventLoop except AttributeError: _BaseEventLoop = asyncio.AbstractEventLoop self.is_asyncio = isinstance(loop, _BaseEventLoop)
def get(debug_enabled=False) -> asyncio.BaseEventLoop: """Returns the event secondary event loop.""" global loop if loop is None: enable_event_loop() if debug_enabled: loop.set_debug(True) return loop
def connect(self, loop, protocol_factory, conn_check): '''Connect attempts to open a connection transport to the Cozmo app on a device. On opening a transport it will create a protocol from the supplied factory and connect it to the transport, returning a (transport, protocol) tuple. See :meth:`asyncio.BaseEventLoop.create_connection` ''' raise NotImplementedError
def connect_on_loop(loop, conn_factory=conn.CozmoConnection, connector=None): '''Uses the supplied event loop to connect to a device. Will run the event loop in the current thread until the connection succeeds or fails. If you do not want/need to manage your own loop, then use the :func:`connect` function to handle setup/teardown and execute a user-supplied function. Args: loop (:class:`asyncio.BaseEventLoop`): The event loop to use to connect to Cozmo. conn_factory (callable): Override the factory function to generate a :class:`cozmo.conn.CozmoConnection` (or subclass) instance. connector (:class:`DeviceConnector`): Optional instance of a DeviceConnector subclass that handles opening the USB connection to a device. By default, it will connect to the first Android or iOS device that has the Cozmo app running in SDK mode. Returns: A :class:`cozmo.conn.CozmoConnection` instance. ''' if connector is None: connector = _DEFAULT_CONNECTOR factory = functools.partial(conn_factory, loop=loop) async def conn_check(coz_conn): await coz_conn.wait_for(conn.EvtConnected, timeout=5) async def connect(): return await connector.connect(loop, factory, conn_check) transport, coz_conn = loop.run_until_complete(connect()) return coz_conn
def loop(self): ''':class:`asyncio.BaseEventLoop`: loop instance that this object is registered with.''' return getattr(self, '_loop', None)
def _renew_blob_lease( loop: asyncio.BaseEventLoop, blob_client: azureblob.BlockBlobService, container_key: str, resource: str, blob_name: str): """Renew a storage blob lease :param asyncio.BaseEventLoop loop: event loop :param azureblob.BlockBlobService blob_client: blob client :param str container_key: blob container index into _STORAGE_CONTAINERS :param str resource: resource :param str blob_name: blob name """ try: lease_id = blob_client.renew_blob_lease( container_name=_STORAGE_CONTAINERS[container_key], blob_name=blob_name, lease_id=_BLOB_LEASES[resource], ) except azure.common.AzureException as e: logger.exception(e) _BLOB_LEASES.pop(resource) _CBHANDLES.pop(resource) else: _BLOB_LEASES[resource] = lease_id _CBHANDLES[resource] = loop.call_later( 15, _renew_blob_lease, loop, blob_client, container_key, resource, blob_name)
def _load_and_register_async( loop: asyncio.BaseEventLoop, table_client: azuretable.TableService, nglobalresources: int) -> None: """Load and register image :param asyncio.BaseEventLoop loop: event loop :param azuretable.TableService table_client: table client :param int nglobalresource: number of global resources """ global _LR_LOCK_ASYNC async with _LR_LOCK_ASYNC: for resource in _TORRENTS: # if torrent is seeding, load container/file and register if (_TORRENTS[resource]['started'] and _TORRENTS[resource]['handle'].is_seed()): if (not _TORRENTS[resource]['loaded'] and not _TORRENTS[resource]['loading']): # container load image if is_container_resource(resource): thr = ContainerImageLoadThread(resource) thr.start() else: # TODO "load blob" - move to appropriate path raise NotImplementedError() # register to services table if (not _TORRENTS[resource]['registered'] and _TORRENTS[resource]['loaded'] and not _TORRENTS[resource]['loading']): _merge_service( table_client, resource, nglobalresources) _TORRENTS[resource]['registered'] = True
def _get_ipaddress_async(loop: asyncio.BaseEventLoop) -> str: """Get IP address :param asyncio.BaseEventLoop loop: event loop :rtype: str :return: ip address """ if _ON_WINDOWS: raise NotImplementedError() else: proc = await asyncio.subprocess.create_subprocess_shell( 'ip addr list eth0 | grep "inet " | cut -d\' \' -f6 | cut -d/ -f1', stdout=asyncio.subprocess.PIPE, loop=loop) output = await proc.communicate() return output[0].decode('ascii').strip()
def __init__(self, bot_token, app_token=None, loop: Optional[asyncio.BaseEventLoop] = None, session: aiohttp.ClientSession = None): self._bot_token = bot_token self._app_token = app_token self._token = app_token or bot_token self._loop = loop or asyncio.get_event_loop() self._session = session or aiohttp.ClientSession(loop=self._loop)
def __init__(self, bot_token, callback, *, loop: Optional[asyncio.BaseEventLoop] = None, session: aiohttp.ClientSession = None): super().__init__(bot_token, loop=loop, session=session) self._ws = None self._closed = asyncio.Event(loop=self._loop) self._callback = callback
def __init__(self, loop: asyncio.BaseEventLoop, dispatcher: AbstractDispatcher, *, request_timeout: int=15): """ This is a specialized HTTP protocol meant to be a low-latency API Gateway. For the HTTP dispatcher this means that data in/out will be streamed each direction, other dispatchers may require full request/response bodies be read into memory. :param loop: event loop :param dispatcher: dispatcher strategy, should implement the methods defined in the AbstractDispatcher :param request_timeout: Max length of a request cycle in secs (def: 15s) """ self.dispatcher = dispatcher self.loop = loop self.request_timeout = request_timeout self.parser = None # httptools.HttpRequestParser self.transport = None # type: Optional[asyncio.Transport] self.reader = None # type: Optional[asyncio.StreamReader] self.timeout = None # type: Optional[Task] # request info self.url = None # type: Optional[str] self.headers = None # type: Optional[List[Tuple[bytes, bytes]]] # =========================== # asyncio.Protocol callbacks # ===========================
def loop(self) -> asyncio.BaseEventLoop: return self._loop
def setup_test_loop(): """create and return an asyncio.BaseEventLoop instance. The caller should also call teardown_test_loop, once they are done with the loop. """ loop = asyncio.get_event_loop() # asyncio.set_event_loop(None) return loop # def teardown_test_loop(loop): # """teardown and cleanup an event_loop created # by setup_test_loop. # :param loop: the loop to teardown # :type loop: asyncio.BaseEventLoop # """ # is_closed = getattr(loop, 'is_closed') # if is_closed is not None: # closed = is_closed() # else: # closed = loop._closed # if not closed: # loop.call_soon(loop.stop) # loop.run_forever() # loop.close() # gc.collect() # asyncio.set_event_loop(None)
def repeat(coro, times=1, step=1, limit=1, loop=None): """ Executes the coroutine function ``x`` number of times, and accumulates results in order as you would use with ``map``. Execution concurrency is configurable using ``limit`` param. This function is a coroutine. Arguments: coro (coroutinefunction): coroutine function to schedule. times (int): number of times to execute the coroutine. step (int): increment iteration step, as with ``range()``. limit (int): concurrency execution limit. Defaults to 10. loop (asyncio.BaseEventLoop): optional event loop to use. Raises: TypeError: if coro is not a coroutine function. Returns: list: accumulated yielded values returned by coroutine. Usage:: async def mul_2(num): return num * 2 await paco.repeat(mul_2, times=5) # => [2, 4, 6, 8, 10] """ assert_corofunction(coro=coro) # Iterate and attach coroutine for defer scheduling times = max(int(times), 1) iterable = range(1, times + 1, step) # Run iterable times return (yield from map(coro, iterable, limit=limit, loop=loop))
def timeout(coro, timeout=None, loop=None): """ Wraps a given coroutine function, that when executed, if it takes more than the given timeout in seconds to execute, it will be canceled and raise an `asyncio.TimeoutError`. This function is equivalent to Python standard `asyncio.wait_for()` function. This function can be used as decorator. Arguments: coro (coroutinefunction|coroutine): coroutine to wrap. timeout (int|float): max wait timeout in seconds. loop (asyncio.BaseEventLoop): optional event loop to use. Raises: TypeError: if coro argument is not a coroutine function. Returns: coroutinefunction: wrapper coroutine function. Usage:: await paco.timeout(coro, timeout=10) """ @asyncio.coroutine def _timeout(coro): return (yield from asyncio.wait_for(coro, timeout, loop=loop)) @asyncio.coroutine def wrapper(*args, **kw): return (yield from _timeout(coro(*args, **kw))) return _timeout(coro) if asyncio.iscoroutine(coro) else wrapper
def exception(self, loop: asyncio.BaseEventLoop, context: dict) -> None: '''Log unhandled exceptions from anywhere in the event loop.''' Log.error('unhandled exception: %s', context['message']) Log.error('%s', context) if 'exception' in context: Log.error(' %s', context['exception'])
def get_app(self, loop): """ This method should be overridden to return the aiohttp.web.Application object to test. :param loop: the event_loop to use :type loop: asyncio.BaseEventLoop """ pass # pragma: no cover
def setup_test_loop(loop_factory=asyncio.new_event_loop): """Create and return an asyncio.BaseEventLoop instance. The caller should also call teardown_test_loop, once they are done with the loop. """ loop = loop_factory() asyncio.set_event_loop(None) return loop
def teardown_test_loop(loop): """Teardown and cleanup an event_loop created by setup_test_loop. :param loop: the loop to teardown :type loop: asyncio.BaseEventLoop """ closed = loop.is_closed() if not closed: loop.call_soon(loop.stop) loop.run_forever() loop.close() gc.collect() asyncio.set_event_loop(None)
def __init__(self, loop: asyncio.BaseEventLoop = None): self._headers = { "User-Agent": "Listen (https://github.com/GetRektByMe/Listen)", "Content-Type": "application/json" } self._loop = loop or asyncio.get_event_loop() self._ws = None self.ws_handler = None
def get_hist_data( req: HistDataReq, broker: object, mysql: dict=None) -> MarketDataBlock: """ Return a MarketDataBlock object containing historical market data for a user request. All the involved operations are asynchronously concurrent, including downloading data, merging data in memory, and query and saving data with a MySQL database. The function will first determine which parts of the requested data exist in the MySQL database. The parts of requested data not in the database will be automatically downloaded asynchronously, provided a broker API service is available. If a database is unavailable, all requested data will be downloaded. The downloaded data for any single request will be immediately combined to a MarketDataBlock object, while other requested data are still being downloaded. The downloaded data will also be asynchronously inserted to the database. :param mysql: {'host': str, 'user': str, 'password': str, 'db': str, 'loop': asyncio.BaseEventLoop} """ xchg_tz = await broker.hist_data_req_timezone(req) # All data will be downloaded from broker if database is unavailable # or requested BarSize not in database. if mysql is None or timedur_standardize(req.BarSize)[-1] is 's': blk_list = await broker.req_hist_data_async(req) blk = blk_list[0] blk.tz_convert(xchg_tz) return blk # init database engine = await aio_create_engine( host=mysql['host'], user=mysql['user'], password=mysql['password'], db=mysql['db'], loop=mysql['loop']) # Query database first, and split req for downloading (dl_reqs, insert_limit, blk_ret, start_dt, end_dt) = await query_hist_data_split_req(req, xchg_tz, engine) _logger.debug('blk_ret head:\n%s', blk_ret.df.iloc[:3]) _logger.debug('start_dt: %s', start_dt) _logger.debug('end_dt: %s', end_dt) # Download data and insert to db concurrently if dl_reqs is not None: blk_dl_list = await asyncio.gather(*( download_insert_hist_data(req_i, broker, engine, inslim) for req_i, inslim in zip(dl_reqs, insert_limit))) for blk_dl in blk_dl_list: _logger.debug('blk_dl head:\n%s', blk_dl.df.iloc[:3]) blk_ret.combine(blk_dl) _logger.debug('Combined blk_ret head:\n%s', blk_ret.df.iloc[:3]) # Limit time range according to req blk_ret.df = blk_ret.df.loc(axis=0)[:, :, :, start_dt:end_dt] # wrap up engine.close() await engine.wait_closed() return blk_ret
def __init__(self, host: str = 'localhost', port: int = 8086, username: Optional[str] = None, password: Optional[str] = None, db: str = 'testdb', database: Optional[str] = None, loop: asyncio.BaseEventLoop = None, log_level: int = 30, mode: str = 'async'): """ The AsyncInfluxDBClient object holds information necessary to interact with InfluxDB. It is async by default, but can also be used as a sync/blocking client and even generate Pandas DataFrames from queries. The three main public methods are the three endpoints of the InfluxDB API, namely: 1) AsyncInfluxDBClient.ping 2) AsyncInfluxDBClient.write 3) AsyncInfluxDBClient.query See each of the above methods documentation for further usage details. See also: https://docs.influxdata.com/influxdb/v1.2/tools/api/ :param host: Hostname to connect to InfluxDB. :param port: Port to connect to InfluxDB. :param username: Username to use to connect to InfluxDB. :param password: User password. :param db: Default database to be used by the client. :param database: Default database to be used by the client. This field is for argument consistency with the official InfluxDB Python client. :param loop: Event loop used for processing HTTP requests. :param log_level: Logging level. The lower the more verbose. Defaults to INFO (30). :param mode: Mode in which client should run. Available options are: 'async', 'blocking' and 'dataframe'. - 'async': Default mode. Each query/request to the backend will - 'blocking': Behaves in sync/blocking fashion, similar to the official InfluxDB-Python client. - 'dataframe': Behaves in a sync/blocking fashion, but parsing results into Pandas DataFrames. Similar to InfluxDB-Python's `DataFrameClient`. """ self._logger = self._make_logger(log_level) self._loop = asyncio.get_event_loop() if loop is None else loop self._auth = aiohttp.BasicAuth(username, password) if username and password else None self._session = aiohttp.ClientSession(loop=self._loop, auth=self._auth) self._url = f'http://{host}:{port}/{{endpoint}}' self.host = host self.port = port self.db = database or db self.mode = mode if mode not in {'async', 'blocking', 'dataframe'}: raise ValueError('Invalid mode')
def wait_for_first(*futures, discard_remaining=True, loop=None): '''Wait the first of a set of futures to complete. Eg:: event = cozmo.event.wait_for_first( coz.world.wait_for_new_cube(), playing_anim.wait_for(cozmo.anim.EvtAnimationCompleted) ) If more than one completes during a single event loop run, then if any of those results are not exception, one of them will be selected (at random, as determined by ``set.pop``) to be returned, else one of the result exceptions will be raised instead. Args: futures (list of :class:`asyncio.Future`): The futures or coroutines to wait on. discard_remaining (bool): Cancel or discard the results of the futures that did not return first. loop (:class:`asyncio.BaseEventLoop`): The event loop to wait on. Returns: The first result, or raised exception ''' done, pending = await asyncio.wait(futures, loop=loop, return_when=asyncio.FIRST_COMPLETED) # collect the results from all "done" futures; only one will be returned result = None for fut in done: try: fut_result = fut.result() if result is None or isinstance(result, BaseException): result = fut_result except Exception as exc: if result is None: result = exc if discard_remaining: # cancel the pending futures for fut in pending: fut.cancel() if isinstance(result, BaseException): raise result return result
def bootstrap_dht_nodes( loop: asyncio.BaseEventLoop, table_client: azuretable.TableService, ipaddress: str, num_attempts: int) -> None: """Bootstrap DHT router nodes :param asyncio.BaseEventLoop loop: event loop :param azuretable.TableService table_client: table client :param str ipaddress: ip address :param int num_attempts: number of attempts """ found_self = False dht_nodes = [] try: entities = table_client.query_entities( _STORAGE_CONTAINERS['table_dht'], filter='PartitionKey eq \'{}\''.format(_PARTITION_KEY)) except azure.common.AzureMissingResourceHttpError: pass else: for entity in entities: dht_nodes.append((entity['RowKey'], entity['Port'])) if entity['RowKey'] == ipaddress: found_self = True if not found_self: entity = { 'PartitionKey': _PARTITION_KEY, 'RowKey': ipaddress, 'Port': _DEFAULT_PORT_BEGIN, } table_client.insert_entity(_STORAGE_CONTAINERS['table_dht'], entity) dht_nodes.insert(0, (ipaddress, _DEFAULT_PORT_BEGIN)) # TODO handle vm/ips no longer in pool for node in dht_nodes: if len(_DHT_ROUTERS) >= 3: break add_dht_node(node[0], node[1]) # ensure at least 3 DHT router nodes if possible if len(dht_nodes) < 3: num_attempts += 1 if num_attempts < 600: delay = 1 elif num_attempts < 1200: delay = 10 else: delay = 30 loop.call_later( delay, bootstrap_dht_nodes, loop, table_client, ipaddress, num_attempts)
def manage_torrents_async( loop: asyncio.BaseEventLoop, table_client: azuretable.TableService, ipaddress: str, nglobalresources: int) -> None: """Manage torrents :param asyncio.BaseEventLoop loop: event loop :param azuretable.TableService table_client: table client :param str ipaddress: ip address :param int nglobalresource: number of global resources """ global _LR_LOCK_ASYNC, _GR_DONE while True: # async schedule load and register if not _GR_DONE and not _LR_LOCK_ASYNC.locked(): asyncio.ensure_future(_load_and_register_async( loop, table_client, nglobalresources)) # move pending torrents into torrents with _PT_LOCK: for pt in _PENDING_TORRENTS: _TORRENTS[pt] = _PENDING_TORRENTS[pt] _PENDING_TORRENTS.clear() # start applicable torrent sessions for resource in _TORRENTS: if _TORRENTS[resource]['started']: # log torrent info _log_torrent_info(resource, _TORRENTS[resource]['handle']) continue seed = _TORRENTS[resource]['seed'] logger.info( ('creating torrent session for {} ipaddress={} ' 'seed={}').format(resource, ipaddress, seed)) grtype, image = get_container_image_name_from_resource(resource) _TORRENTS[resource]['handle'] = create_torrent_session( resource, _TORRENT_DIR, seed) await _record_perf_async( loop, 'torrent-start', 'grtype={},img={}'.format( grtype, image)) del image # insert torrent into torrentinfo table try: table_client.insert_entity( _STORAGE_CONTAINERS['table_torrentinfo'], entity=_TORRENTS[resource]['entity']) except azure.common.AzureConflictHttpError: pass # mark torrent as started if not _TORRENTS[resource]['started']: _TORRENTS[resource]['started'] = True # sleep to avoid pinning cpu await asyncio.sleep(1)
def distribute_global_resources( loop: asyncio.BaseEventLoop, blob_client: azureblob.BlockBlobService, table_client: azuretable.TableService, ipaddress: str) -> None: """Distribute global services/resources :param asyncio.BaseEventLoop loop: event loop :param azureblob.BlockBlobService blob_client: blob client :param azuretable.TableService table_client: table client :param str ipaddress: ip address """ # set torrent session port listen if _ENABLE_P2P: global _TORRENT_SESSION # create torrent session logger.info('creating torrent session on {}:{}'.format( ipaddress, _DEFAULT_PORT_BEGIN)) _TORRENT_SESSION = libtorrent.session() _TORRENT_SESSION.listen_on(_DEFAULT_PORT_BEGIN, _DEFAULT_PORT_END) _TORRENT_SESSION.stop_lsd() _TORRENT_SESSION.stop_upnp() _TORRENT_SESSION.stop_natpmp() # bootstrap dht nodes bootstrap_dht_nodes(loop, table_client, ipaddress, 0) _TORRENT_SESSION.start_dht() # get globalresources from table try: entities = table_client.query_entities( _STORAGE_CONTAINERS['table_globalresources'], filter='PartitionKey eq \'{}\''.format(_PARTITION_KEY)) except azure.common.AzureMissingResourceHttpError: entities = None nentities = 0 # check torrent info table for resource if entities is not None: for ent in entities: nentities += 1 if _ENABLE_P2P: _check_resource_has_torrent( blob_client, table_client, ent['Resource']) else: _DIRECTDL_QUEUE.put(ent['Resource']) if nentities == 0: logger.info('no global resources specified') return # run async func in loop loop.run_until_complete(download_monitor_async( loop, blob_client, table_client, ipaddress, nentities))
def dropwhile(coro, iterable, loop=None): """ Make an iterator that drops elements from the iterable as long as the predicate is true; afterwards, returns every element. Note, the iterator does not produce any output until the predicate first becomes false, so it may have a lengthy start-up time. This function is pretty much equivalent to Python standard `itertools.dropwhile()`, but designed to be used with async coroutines. This function is a coroutine. This function can be composed in a pipeline chain with ``|`` operator. Arguments: coro (coroutine function): coroutine function to call with values to reduce. iterable (iterable|asynchronousiterable): an iterable collection yielding coroutines functions. loop (asyncio.BaseEventLoop): optional event loop to use. Raises: TypeError: if coro argument is not a coroutine function. Returns: filtered values (list): ordered list of resultant values. Usage:: async def filter(num): return num < 4 await paco.dropwhile(filter, [1, 2, 3, 4, 5, 1]) # => [4, 5, 1] """ drop = False @asyncio.coroutine def assert_fn(element): nonlocal drop if element and not drop: return False if not element and not drop: drop = True return True if drop else element @asyncio.coroutine def filter_fn(element): return (yield from coro(element)) return (yield from filter(filter_fn, iterable, assert_fn=assert_fn, limit=1, loop=loop))
def map(coro, iterable, limit=0, loop=None, timeout=None, return_exceptions=False, *args, **kw): """ Concurrently maps values yielded from an iterable, passing then into an asynchronous coroutine function. Mapped values will be returned as list. Items order will be preserved based on origin iterable order. Concurrency level can be configurable via ``limit`` param. This function is the asynchronous equivalent port Python built-in `map()` function. This function is a coroutine. This function can be composed in a pipeline chain with ``|`` operator. Arguments: coro (coroutinefunction): map coroutine function to use. iterable (iterable|asynchronousiterable): an iterable collection yielding coroutines functions. limit (int): max concurrency limit. Use ``0`` for no limit. loop (asyncio.BaseEventLoop): optional event loop to use. timeout (int|float): timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time. return_exceptions (bool): returns exceptions as valid results. *args (mixed): optional variadic arguments to be passed to the coroutine map function. Returns: list: ordered list of values yielded by coroutines Usage:: async def mul_2(num): return num * 2 await paco.map(mul_2, [1, 2, 3, 4, 5]) # => [2, 4, 6, 8, 10] """ # Call each iterable but collecting yielded values return (yield from each(coro, iterable, limit=limit, loop=loop, timeout=timeout, collect=True, return_exceptions=return_exceptions))
def __init__(self, **kwargs): ''' config default ''' self.config = Spider.default_config.copy() self.config.update(kwargs.get("config", {})) self.loop = kwargs.get("loop", None) # if no loop, get one if self.loop is None or not isinstance(self.loop, asyncio.BaseEventLoop): self.loop = asyncio.get_event_loop() ''' if no session , new one. Providing a session is convenient when you spider some that you need to login, you can just pass a logged-in session. Of course, you can provide a function which will be call before all spider requests to log in. ''' self.session = kwargs.get("session", None) if self.session is None or not isinstance(self.session, aiohttp.ClientSession): self.session = aiohttp.ClientSession(loop=self.loop) ''' The methods contained here will be called before any requests. For example,if spider need to login, you may need provide login method. The variable `will_continue` stands for whether this spider continue or not after all `before_start_funcs` called. ''' self.before_start_funcs = [] self.will_continue = True ''' The methods contained here will be called after all requests. For example,if spider need to logout, you may need provide logout method. ''' self.after_crawl_funcs = [] ''' spider's logger ''' self.logger = logging.getLogger(self.__class__.__name__) ''' The reasons that only sipder's download_pending uses TaskQueue are: 1. TaskQueue is still not stable. 2. When there are too many request waited to send, it has to keep many contexts for each waiting request including the method request_with_callback. So the request queue still use asyncio.Queue. ''' self.pending = asyncio.Queue() # downloading concurrent should not be too large. self.download_pending = TaskQueue( maxsize=self.config["download_concurrent"]) self.visited = set() # you cannot call method `start` twice. self.running = False # active tasks self.active = []
def late_init(self, db: Gino, *, loop=None, options=_options): """ Initialize this application with a database object. This method does a few things to setup application for working with the database: - it enables task local storage; - creates a connection pool and binds it to the passed database object; - populates :py:attr:`~.db`. :param db: the :py:class:`gino.ext.tornado.Gino()` class instance that will be used in this application. :param loop: io loop that will be used to run heep server, either tornado's or asyncio's. :param options: a tornado's ``OptionParser()`` instance or any dictionary-like object with the database settings. Default is to use ``tornado.options.options`` global. """ if loop is None: loop = tornado.ioloop.IOLoop.current() if isinstance(loop, tornado.platform.asyncio.BaseAsyncIOLoop): asyncio_loop = loop.asyncio_loop elif isinstance(loop, asyncio.BaseEventLoop): asyncio_loop = loop else: raise RuntimeError('AsyncIOLoop is required to run GINO') _enable_task_local(asyncio_loop) self.db: Gino = db await db.create_pool( host=options['db_host'], port=options['db_port'], user=options['db_user'], password=options['db_password'], database=options['db_database'], min_size=options['db_pool_min_size'], max_size=options['db_pool_max_size'], max_inactive_connection_lifetime=( options['db_pool_max_inactive_conn_lifetime'] ), max_queries=options['db_pool_max_queries'], loop=asyncio_loop ) # noinspection PyAbstractClass