我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用aioredis.Redis()。
def get_redis(address=None, loop=None, recreate=False) -> aioredis.Redis: global _redis address = address or settings.CONFIG['redis'] kwargs = utils.parse_redis_url(address) kwargs['address'] = kwargs.pop('host'), kwargs.pop('port') if not _redis or recreate: _redis = await aioredis.create_reconnecting_redis(loop=loop, **kwargs) return _redis
def set_redis_pool(self, redis_pool: Optional[Redis]): if redis_pool: if isinstance(redis_pool, (ConnectionsPool,)): # If they've passed a raw pool then wrap it up in a Redis object. # aioredis.create_redis_pool() normally does this for us. redis_pool = Redis(redis_pool) if not isinstance(redis_pool, (Redis,)): raise InvalidRedisPool( 'Invalid Redis connection provided: {}. If unsure, use aioredis.create_redis_pool() to ' 'create your redis connection.'.format(redis_pool) ) if not isinstance(redis_pool._pool_or_conn, (ConnectionsPool,)): raise InvalidRedisPool( 'The provided redis connection is backed by a single connection, rather than a ' 'pool of connections. This will lead to lightbus deadlocks and is unsupported. ' 'If unsure, use aioredis.create_redis_pool() to create your redis connection.' ) self._redis_pool = redis_pool
def call_rpc(self, rpc_message: RpcMessage): stream = '{}:stream'.format(rpc_message.api_name) logger.debug( LBullets( L("Enqueuing message {} in Redis stream {}", Bold(rpc_message), Bold(stream)), items=rpc_message.to_dict() ) ) pool = await self.get_redis_pool() with await pool as redis: start_time = time.time() # TODO: MAXLEN await redis.xadd(stream=stream, fields=rpc_message.to_dict()) logger.info(L( "Enqueued message {} in Redis in {} stream {}", Bold(rpc_message), human_time(time.time() - start_time), Bold(stream) ))
def send_result(self, rpc_message: RpcMessage, result_message: ResultMessage, return_path: str): logger.debug(L( "Sending result {} into Redis using return path {}", Bold(result_message), Bold(return_path) )) redis_key = self._parse_return_path(return_path) pool = await self.get_redis_pool() with await pool as redis: start_time = time.time() p = redis.pipeline() p.lpush(redis_key, redis_encode(result_message.result)) # TODO: Make result expiry configurable p.expire(redis_key, timeout=60) await p.execute() logger.debug(L( "? Sent result {} into Redis in {} using return path {}", Bold(result_message), human_time(time.time() - start_time), Bold(return_path) ))
def receive_result(self, rpc_message: RpcMessage, return_path: str) -> ResultMessage: logger.info(L("? Awaiting Redis result for RPC message: {}", Bold(rpc_message))) redis_key = self._parse_return_path(return_path) pool = await self.get_redis_pool() with await pool as redis: start_time = time.time() # TODO: Make timeout configurable _, result = await redis.blpop(redis_key, timeout=5) result = redis_decode(result) logger.info(L( "? Received Redis result in {} for RPC message {}: {}", human_time(time.time() - start_time), rpc_message, Bold(result) )) return result
def get_redis(self) -> Redis: """ Get the redis pool, if a pool is already initialised it's returned, else one is crated. """ async with self._create_pool_lock: if self.redis is None: self.redis = await self.create_redis_pool() return self.redis
def __init__(self, connection: aioredis.Redis): self.connection = connection
def conn(func): @functools.wraps(func) async def wrapper(self, *args, _conn=None, **kwargs): if _conn is None: pool = await self._get_pool() conn_context = await pool with conn_context as _conn: if not AIOREDIS_BEFORE_ONE: _conn = aioredis.Redis(_conn) return await func(self, *args, _conn=_conn, **kwargs) return await func(self, *args, _conn=_conn, **kwargs) return wrapper
def acquire_conn(self): await self._get_pool() conn = await self._pool.acquire() if not AIOREDIS_BEFORE_ONE: conn = aioredis.Redis(conn) return conn
def __init__(self, queue=None): self.queue = queue or list() redis = mock.MagicMock(aioredis.Redis) redis.rpush.side_effect = utils.make_coro(result=lambda key, item: self.queue.append(item)) redis.rpop.side_effect = utils.make_coro(result=lambda key: self.queue.pop()) redis.lpush.side_effect = utils.make_coro(result=lambda key, item: self.queue.insert(0, item)) redis.llen.side_effect = utils.make_coro(result=lambda key: len(self.queue)) super(MockQueue, self).__init__(redis, None)
def __init__(self, redis, key): """ :param aioredis.Redis redis: """ super(RedisQueue, self).__init__() self.redis = redis self.key = key
def __init__(self, *, redis: Redis, max_concurrent_tasks: int=50, shutdown_delay: float=6, timeout_seconds: int=60, burst_mode: bool=True, raise_task_exception: bool=False, semaphore_timeout: float=60) -> None: """ :param redis: redis pool to get connection from to pop items from list, also used to optionally re-enqueue pending jobs on termination :param max_concurrent_tasks: maximum number of jobs which can be execute at the same time by the event loop :param shutdown_delay: number of seconds to wait for tasks to finish :param timeout_seconds: maximum duration of a job, after that the job will be cancelled by the event loop :param burst_mode: break the iter loop as soon as no more jobs are available by adding an sentinel quit queue :param raise_task_exception: whether or not to raise an exception which occurs in a processed task """ self.redis = redis self.loop = redis._pool_or_conn._loop self.max_concurrent_tasks = max_concurrent_tasks self.task_semaphore = asyncio.Semaphore(value=max_concurrent_tasks, loop=self.loop) self.shutdown_delay = max(shutdown_delay, 0.1) self.timeout_seconds = timeout_seconds self.burst_mode = burst_mode self.raise_task_exception = raise_task_exception self.pending_tasks: Set[asyncio.futures.Future] = set() self.task_exception: Exception = None self.semaphore_timeout = semaphore_timeout self.jobs_complete, self.jobs_failed, self.jobs_timed_out = 0, 0, 0 self.running = False self._finish_lock = asyncio.Lock(loop=self.loop)
def create_pool_lenient(settings: RedisSettings, loop: asyncio.AbstractEventLoop, *, _retry: int=0) -> Redis: """ Create a new redis pool, retrying up to conn_retries times if the connection fails. :param settings: RedisSettings instance :param loop: event loop :param _retry: retry attempt, this is set when the method calls itself recursively """ addr = settings.host, settings.port try: pool = await aioredis.create_redis_pool( addr, loop=loop, db=settings.database, password=settings.password, timeout=settings.conn_timeout ) except (ConnectionError, OSError, aioredis.RedisError, asyncio.TimeoutError) as e: if _retry < settings.conn_retries: logger.warning('redis connection error %s %s, %d retries remaining...', e.__class__.__name__, e, settings.conn_retries - _retry) await asyncio.sleep(settings.conn_retry_delay) else: raise else: if _retry > 0: logger.info('redis connection successful') return pool # recursively attempt to create the pool outside the except block to avoid # "During handling of the above exception..." madness return await create_pool_lenient(settings, loop, _retry=_retry + 1)
def __init__(self, *, loop: asyncio.AbstractEventLoop=None, redis_settings: RedisSettings=None, existing_redis: Redis=None) -> None: """ :param loop: asyncio loop to use for the redis pool :param redis_settings: connection settings to use for the pool :param existing_redis: existing pool, if set no new pool is created, instead this one is used """ # the "or getattr(...) or" seems odd but it allows the mixin to work with subclasses which initialise # loop or redis_settings before calling super().__init__ and don't pass those parameters through in kwargs. self.loop = loop or getattr(self, 'loop', None) or asyncio.get_event_loop() self.redis_settings = redis_settings or getattr(self, 'redis_settings', None) or RedisSettings() self.redis = existing_redis self._create_pool_lock = asyncio.Lock(loop=self.loop)
def get_redis_pool(self) -> Redis: if self._redis_pool is None: self._redis_pool = await aioredis.create_redis_pool(**self.connection_kwargs) return self._redis_pool