我们从Python开源项目中,提取了以下5个代码示例,用于说明如何使用asyncio.LifoQueue()。
def using_queues(): q = asyncio.Queue() q.put_nowait('Hello') await q.get() await q.put('world') q.get_nowait() pq = asyncio.PriorityQueue() stack = asyncio.LifoQueue()
def test_order(self): q = asyncio.LifoQueue(loop=self.loop) for i in [1, 3, 2]: q.put_nowait(i) items = [q.get_nowait() for _ in range(3)] self.assertEqual([2, 3, 1], items)
def __init__(self, *connect_args, min_size, max_size, max_queries, max_inactive_connection_lifetime, setup, init, loop, connection_class, **connect_kwargs): if loop is None: loop = asyncio.get_event_loop() self._loop = loop if max_size <= 0: raise ValueError('max_size is expected to be greater than zero') if min_size < 0: raise ValueError( 'min_size is expected to be greater or equal to zero') if min_size > max_size: raise ValueError('min_size is greater than max_size') if max_queries <= 0: raise ValueError('max_queries is expected to be greater than zero') if max_inactive_connection_lifetime < 0: raise ValueError( 'max_inactive_connection_lifetime is expected to be greater ' 'or equal to zero') self._minsize = min_size self._maxsize = max_size self._holders = [] self._initialized = False self._queue = asyncio.LifoQueue(maxsize=self._maxsize, loop=self._loop) self._working_addr = None self._working_config = None self._working_params = None self._connection_class = connection_class self._closed = False for _ in range(max_size): ch = PoolConnectionHolder( self, connect_args=connect_args, connect_kwargs=connect_kwargs, max_queries=max_queries, max_inactive_time=max_inactive_connection_lifetime, setup=setup, init=init) self._holders.append(ch) self._queue.put_nowait(ch)