我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用asyncio.Queue()。
def main(loop, num_consumers): # Create the queue with a fixed size so the producer # will block until the consumers pull some items out. q = asyncio.Queue(maxsize=num_consumers) # Scheduled the consumer tasks. consumers = [ loop.create_task(consumer(i, q)) for i in range(num_consumers) ] # Schedule the producer task. prod = loop.create_task(producer(q, num_consumers)) # Wait for all of the coroutines to finish. await asyncio.wait(consumers + [prod])
def _wait(self, entity_type, entity_id, action, predicate=None): """ Block the calling routine until a given action has happened to the given entity :param entity_type: The entity's type. :param entity_id: The entity's id. :param action: the type of action (e.g., 'add', 'change', or 'remove') :param predicate: optional callable that must take as an argument a delta, and must return a boolean, indicating whether the delta contains the specific action we're looking for. For example, you might check to see whether a 'change' has a 'completed' status. See the _Observer class for details. """ q = asyncio.Queue(loop=self._connector.loop) async def callback(delta, old, new, model): await q.put(delta.get_id()) self.add_observer(callback, entity_type, action, entity_id, predicate) entity_id = await q.get() # object might not be in the entity_map if we were waiting for a # 'remove' action return self.state._live_entity_map(entity_type).get(entity_id)
def __init__(self, driver, *args, **kwargs): self._avail_q = asyncio.Queue(maxsize=NUM_BUFFERS) self._active_q = asyncio.Queue(maxsize=NUM_BUFFERS) self.running = False self.width = driver.width self.height = driver.height self._tick = Ticker(1 / DEFAULT_FPS) self._input_queue = None if hasattr(driver, 'input_manager') and driver.input_manager is not None: self._input_queue = InputQueue(driver) self._logger = Log.get('uchroma.%s.%d' % (self.__class__.__name__, self.zindex)) super(Renderer, self).__init__(*args, **kwargs)
def __init__(self, vk_client, logger=None): if logger: self.logger = logger else: self.logger = logging.Logger("vk_reqque") self.vk_client = vk_client self.hold = False self.release = False self.processing = False self._requests_done = 0 self.requests_done_clear_time = 0 self.queue = asyncio.Queue()
def test_put_cancelled(self): q = asyncio.Queue(loop=self.loop) @asyncio.coroutine def queue_put(): yield from q.put(1) return True @asyncio.coroutine def test(): return (yield from q.get()) t = asyncio.Task(queue_put(), loop=self.loop) self.assertEqual(1, self.loop.run_until_complete(test())) self.assertTrue(t.done()) self.assertTrue(t.result())
def test_put_cancelled_race(self): q = asyncio.Queue(loop=self.loop, maxsize=1) put_a = asyncio.Task(q.put('a'), loop=self.loop) put_b = asyncio.Task(q.put('b'), loop=self.loop) put_c = asyncio.Task(q.put('X'), loop=self.loop) test_utils.run_briefly(self.loop) self.assertTrue(put_a.done()) self.assertFalse(put_b.done()) put_c.cancel() test_utils.run_briefly(self.loop) self.assertTrue(put_c.done()) self.assertEqual(q.get_nowait(), 'a') self.assertEqual(q.get_nowait(), 'b') self.loop.run_until_complete(put_b)
def __init__(self, coro, *, loop=None): super().__init__(coro, loop=loop) self.holder = inspect.getcoroutinelocals(coro).get('self') try: self.uid = self.holder.uid except AttributeError: self.uid = str(uuid4()) self._broker = get_broker(self._loop) self._in_progress = False self._template = None self._workflow = None self._source = None self._start = None self._end = None self._inputs = None self._outputs = None self._queue = asyncio.Queue(loop=self._loop) if self.holder: self.holder.queue = self._queue # A 'committed' task is a pending task not suspended self._committed = asyncio.Event() self._committed.set() self._timed_out = False
def delete(self, *, if_unused=True, if_empty=True, timeout=None) -> asyncio.Future: """ Delete the queue. :param if_unused: Perform delete only when unused :param if_empty: Perform delete only when empty :param timeout: execution timeout :return: :class:`None` """ log.info("Deleting %r", self) self._futures.reject_all(RuntimeError("Queue was deleted")) future = self._create_future(timeout) self._channel.queue_delete( future.set_result, self.name, if_unused=if_unused, if_empty=if_empty ) return future
def test_get_ws_connection_invalid_error(client): """Test error opening a websocket connection with an engineIO session.""" mock_ws = AsyncMock() receive_queue = asyncio.Queue(loop=client.loop) mock_ws.receive_str = receive_queue.get @asyncio.coroutine def mock_send_str(data): if data == "2probe": yield from receive_queue.put("3probe") return if data == "5": yield from receive_queue.put('44[[[') mock_ws.send_str = mock_send_str client._client_session.ws_connect.return_value = mock_ws session_data = { "sid": "mock_session_id", "pingTimeout": 12.345, "pingInterval": 23.456, } with pytest.raises(exceptions.ProtocolError): client.loop.run_until_complete( client._get_ws_connection(session_data))
def test_ws_loop_error(client): """Test websocket loop error message.""" mock_ws = AsyncMock() receive_queue = asyncio.Queue(loop=client.loop) mock_ws.receive = receive_queue.get client._ws_connection = mock_ws client.ws_close = AsyncMock() client._handle_event = MagicMock() msg = MagicMock() msg.type = aiohttp.WSMsgType.ERROR client.loop.run_until_complete(receive_queue.put(msg)) with pytest.raises(exceptions.TransportError) as exc: client.loop.run_until_complete(client._ws_loop()) assert client.ws_close.called assert len(client.ws_close.mock_calls) == 1 assert client._handle_event.called assert len(client._handle_event.mock_calls) == 1 assert client._handle_event.mock_calls[0][1][0] == 'closed' assert client._handle_event.mock_calls[0][1][1] is None assert str(exc.value) == "Websocket error detected. Connection closed."
def __init__(self, loop: asyncio.BaseEventLoop = None, **config): if loop is None: try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # TOOD: say in the docs that we take ownership of the loop, we close it # ourselves in run() self.loop = loop self.config = dict(self.DEFAULTS, **config) self.encoding = self.config['encoding'] self.registry = registry.Registry(self.config) self.queue = asyncio.Queue(loop=self.loop) asyncio.ensure_future(self._process_queue(), loop=self.loop)
def __init__(self): self.logger = logging.getLogger('rauc_hawkbit') self.dbus_events = asyncio.Queue() loop = asyncio.get_event_loop() # handle dbus events in async way self.dbus_event_task = loop.create_task(self.handle_dbus_event()) # holds active subscriptions self.signal_subscriptions = [] # ({interface}, {signal}): {callback} self.signal_callbacks = {} # ({interface}, {property}): {callback} self.property_callbacks = {} self.system_bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None) # always subscribe to property changes by default self.new_signal_subscription('org.freedesktop.DBus.Properties', 'PropertiesChanged', self.property_changed_callback)
def _send_request(self, method, params=[], is_subscribe = False): ''' Send a new request to the server. Serialized the JSON and tracks id numbers and optional callbacks. ''' # pick a new ID self.next_id += 1 req_id = self.next_id # serialize as JSON msg = {'id': req_id, 'method': method, 'params': params} # subscriptions are a Q, normal requests are a future if is_subscribe: waitQ = asyncio.Queue() self.subscriptions[method].append(waitQ) fut = asyncio.Future(loop=self.loop) self.inflight[req_id] = (msg, fut) # send it via the transport, which serializes it self.protocol.send_data(msg) return fut if not is_subscribe else (fut, waitQ)
def subscribe(self, method, *params): ''' Perform a remote command which will stream events/data to us. Expects a method name, which look like: server.peers.subscribe .. and sometimes take arguments, all of which are positional. Returns a tuple: (Future, asyncio.Queue). The future will have the result of the initial call, and the queue will receive additional responses as they happen. ''' assert '.' in method assert method.endswith('subscribe') return self._send_request(method, params, is_subscribe=True)
def _ws_recv_handler(self): # Given command responses and notifications are all send through the # same websocket, separate them here, passing command response thanks # to a Queue. while True: raw = await self._websocket.recv() try: if isinstance(raw, bytes): raw = raw.decode() recv = ejson_loads(raw) if 'status' in recv: # Message response self._resp_queue.put_nowait(recv) else: # Event self._signal_ns.signal(recv['event']).send(recv['sender']) except (KeyError, TypeError, json.JSONDecodeError): # Dummy ??? logger.warning('Backend server sent invalid message: %s' % raw)
def __init__(self, error_callback=None, loop=None): self.state = 0 """The current state of the decoder. If 0, it's reading the data length. If 1, it's reading the data.""" self.error_callback = error_callback #: Function that is called whenever an error occurs decoding the data. self.buffer = BytesIO() #: Buffer used to hold the data being read. self.block_read = 0 #: Number of bytes read in the current block. self.data_length = 0 #: The length of the current package being read. self.queue = asyncio.Queue(10, loop=loop or event_loop.get()) #: Queue holding the packages received. self.get = self.queue.get self.get_nowait = self.queue.get_nowait self.restricted = True """ If ``True`` this means that the decoder is running in restricted mode. In the restricted mode the decoder only allows packages big enough for the handshake packages. """
def __init__(self, bot): super().__init__() self.bot = bot self.filename = None self.converter = None self.opus_data = [] self.encoder = OpusEncoder(48000, 2) self.delay = self.encoder.frame_length / 1000.0 self.encoder.frame_length = 0 self._connected = Event() self._connected.set() self.to_encode = asyncio.Queue() self.encode_task = bot.loop.create_task(self.queue_encoder())
def start(sync_event_source, loop=None): """Create and start the WebSocket server.""" if not loop: loop = asyncio.get_event_loop() event_source = asyncio.Queue(loop=loop) bridge = threading.Thread(target=_multiprocessing_to_asyncio, args=(sync_event_source, event_source, loop), daemon=True) bridge.start() app = init_app(event_source, loop=loop) aiohttp.web.run_app(app, host=config['wsserver']['host'], port=config['wsserver']['port'])
def main(): event_loop = asyncio.get_event_loop() table = asyncio.Queue(maxsize=1, loop=event_loop) try: # producer event_loop.create_task(put('apple', table)) event_loop.create_task(put('banana', table)) event_loop.create_task(put('candy', table)) # consumer event_loop.create_task(get('bob', table)) event_loop.create_task(get('john', table)) event_loop.create_task(get('mary', table)) # start event_loop.run_forever() finally: event_loop.close()
def on_headers_complete(self): self.request = self.request_class( url_bytes=self.url, headers=CIDict(self.headers), version=self.parser.get_http_version(), method=self.parser.get_method().decode(), transport=self.transport ) # Remove any existing KeepAlive handler here, # It will be recreated if required on the new request. if self._keep_alive_timeout_handler: self._keep_alive_timeout_handler.cancel() self._keep_alive_timeout_handler = None if self.is_request_stream: self._is_stream_handler = self.router.is_stream_handler( self.request) if self._is_stream_handler: self.request.stream = asyncio.Queue() self.execute_request_handler()
def __init__( self, REMOTE_IP, REMOTE_PORT, loop: asyncio.AbstractEventLoop() = None, executor: futures.Executor() = None ): self._input_list = [] self._input_queue = asyncio.Queue() if loop: self.loop = loop else: self.loop = asyncio.get_event_loop() self.executor = executor self.REMOTE_IP = REMOTE_IP self.REMOTE_PORT = REMOTE_PORT
def __init__(self, a, b, c): global node self.loop = node.loop self.protocol_version = "HTTP/1.1" self.node = node self.maalstroom_plugin_used = False self.maalstroom_url_prefix = None self.maalstroom_url_prefix_str = None self.proxy_used = False # self._inq = queue.Queue() self._inq = asyncio.Queue(loop=self.loop) self._outq = queue.Queue() self._abort_event = threading.Event() self._dispatcher = self._create_dispatcher() self._maalstroom_http_url_prefix = "http://{}/" self._maalstroom_morphis_url_prefix = "morphis://" super().__init__(a, b, c)
def main(loop, stdin_generator, client, skip_existing=True): existing = {} if skip_existing: # Fetch the list of records to skip records that exist and haven't changed. existing = fetch_existing(client) # Start a producer and a consumer with threaded kinto requests. queue = asyncio.Queue() executor = concurrent.futures.ThreadPoolExecutor(max_workers=NB_THREADS) # Schedule the consumer consumer_coro = consume(loop, queue, executor, client, existing) consumer = asyncio.ensure_future(consumer_coro) # Run the producer and wait for completion await produce(loop, stdin_generator, queue) # Wait until the consumer is done consuming everything. await queue.join() # The consumer is still awaiting for the producer, cancel it. consumer.cancel()
def add_client(self, id): """ Register new client which wants to receive messages to identifier 'id'. If there are any such messages, they are sent immediately. There can be more subscribers per stream. :param id: Identifier of required stream of messages :return: Returns new asyncio.Queue on which can be wait for by 'yield from' command. """ new_queue = asyncio.Queue() if id not in self._clients.keys(): self._clients[id] = [] self._clients[id].append(new_queue) # if there are already any messages, send them if id in self._saved_messages.keys(): for msg in self._saved_messages[id]: new_queue.put_nowait(msg) self._logger.debug("client connection: new client '{}' registered".format(id)) return new_queue
def remove_client(self, id, queue): """ Remove client listening on 'id' message stream with queue 'queue'. This means removing associated queue and deleting the entry from internal dictionary. If no such client exists, nothing is done. :param id: Identifier of required stream of messages :param queue: Queue associated with client to be removed :return: Nothing """ if id in self._clients.keys(): clients = self._clients[id] clients.remove(queue) self._logger.debug("client connection: client '{}' removed".format(id)) else: self._logger.debug("client connection: client '{}' removing failed - " " not present".format(id))
def __init__(self, basePath, max_tasks=25): # max concurrent tasks self.max_tasks = max_tasks # we have seen this url self.processed = set() # BasePath of url to start crawl, should be root of a domain self.basePath = basePath # event loop, we are not fallbacking to iocp (win32) or select or any sort of other event loop, we will only use asyncio provided event loop self.loop = asyncio.get_event_loop() # create our session, which encapsulates a connection pool self.session = aiohttp.ClientSession(loop=self.loop) # get Queue self.queue = Queue(loop=self.loop) # first url self.queue.put_nowait(self.basePath) # JSON for visualization self.data = []
def test_why_are_getters_waiting(self): # From issue #268. @asyncio.coroutine def consumer(queue, num_expected): for _ in range(num_expected): yield from queue.get() @asyncio.coroutine def producer(queue, num_items): for i in range(num_items): yield from queue.put(i) queue_size = 1 producer_num_items = 5 q = asyncio.Queue(queue_size, loop=self.loop) self.loop.run_until_complete( asyncio.gather(producer(q, producer_num_items), consumer(q, producer_num_items), loop=self.loop), )
def test_put_cancelled_race(self): q = asyncio.Queue(loop=self.loop, maxsize=1) put_a = asyncio.Task(q.put('a'), loop=self.loop) put_b = asyncio.Task(q.put('b'), loop=self.loop) put_c = asyncio.Task(q.put('X'), loop=self.loop) test_utils.run_briefly(self.loop) self.assertTrue(put_a.done()) self.assertFalse(put_b.done()) put_c.cancel() test_utils.run_briefly(self.loop) self.assertTrue(put_c.done()) self.assertEqual(q.get_nowait(), 'a') test_utils.run_briefly(self.loop) self.assertEqual(q.get_nowait(), 'b') self.loop.run_until_complete(put_b)
def test_why_are_putters_waiting(self): # From issue #265. queue = asyncio.Queue(2, loop=self.loop) @asyncio.coroutine def putter(item): yield from queue.put(item) @asyncio.coroutine def getter(): yield num = queue.qsize() for _ in range(num): item = queue.get_nowait() t0 = putter(0) t1 = putter(1) t2 = putter(2) t3 = putter(3) self.loop.run_until_complete( asyncio.gather(getter(), t0, t1, t2, t3, loop=self.loop))
def assert_queue_exists(self, queue_name: QueueName) -> QueueDeclareOkParameters: """ Asserts *queue_name* exists*. Channel is closed by the server if it does not! :param queue_name: Queue name :rtype: ~ammoo.wire.frames.method.queue.QueueDeclareOkParameters """ """If queue exists, return its parameters. If not, the channel is closed and ServerClosedChannel raised""" validate_queue_name(queue_name) self._check_open() self._send_method_frame(self._channel_id, QUEUE_DECLARE_CAM, pack_queue_declare_parameters( queue_name=queue_name, passive=True, durable=False, exclusive=False, auto_delete=False, no_wait=False, arguments={}, encoding=self.amqp_encoding, rabbitmq=self.rabbitmq )) frame = await self._wait_for_cam_frame(QUEUE_DECLARE_OK_CAM) return frame.parameters
def delete_queue(self, queue_name: QueueName, *, if_unused: bool=False, if_empty: bool=False) -> MessageCount: """ Delete a queue named *queue_name*. If the queue does not exist, the method merely asserts it is not there. :param queue_name: Queue name :param bool if_unused: Optional: Only delete queue if it has no consumers. :param bool if_empty: Optional: Only delete queue if it has no messages. :return: Number of messages in queue before it was deleted """ validate_queue_name(queue_name) validate_bool(if_unused, 'if_unused') validate_bool(if_empty, 'if_empty') self._check_open() self._send_method_frame(self._channel_id, QUEUE_DELETE_CAM, pack_queue_delete_parameters( queue_name=queue_name, if_unused=if_unused, if_empty=if_empty, no_wait=False, encoding=self.amqp_encoding )) frame = await self._wait_for_cam_frame(QUEUE_DELETE_OK_CAM) return frame.parameters
def purge_queue(self, queue_name: QueueName) -> MessageCount: """ Purges a queue of messages, emptying it. :param str queue_name: Queue name :return: Number of messages in queue before it was purged """ validate_queue_name(queue_name) self._check_open() self._send_method_frame(self._channel_id, QUEUE_PURGE_CAM, pack_queue_purge_parameters( queue_name=queue_name, no_wait=False, encoding=self.amqp_encoding )) frame = await self._wait_for_cam_frame(QUEUE_PURGE_OK_CAM) return frame.parameters
def get(self, queue_name: QueueName, *, no_ack: bool=False) -> GetMessage: """ Get a message from queue. :param str queue_name: Queue name :param bool no_ack: Optional: If True, server does not expect message to be acknowledged or rejected. :raises EmptyQueue: If there are no messages in queue, EmptyQueue is raised :rtype: GetMessage """ validate_queue_name(queue_name) validate_bool(no_ack, 'no_ack') self._check_open() self._send_method_frame(self._channel_id, BASIC_GET_CAM, pack_basic_get_parameters( queue_name=queue_name, no_ack=no_ack, encoding=self.amqp_encoding )) value = await self._wait_for_cam_frame(BASIC_GET_OK_CAM) # also resolved by basic.empty if isinstance(value, GetMessage): # it's basic.get-ok, a message return value else: raise channel_exceptions.EmptyQueue() # got a basic.empty
def __init__(self, maxsize=0, *, loop=None): self._queues = defaultdict(partial(asyncio.Queue, maxsize, loop=loop))
def using_queues(): q = asyncio.Queue() q.put_nowait('Hello') await q.get() await q.put('world') q.get_nowait() pq = asyncio.PriorityQueue() stack = asyncio.LifoQueue()
def __init__(self, domain, options, queue = None, loop = None, dict_file = None): self.loop = loop if loop else asyncio.get_event_loop() assert self.loop is not None self.sem = asyncio.Semaphore(options.rate) self.domain = domain self.tasks = [] self.queue = queue or asyncio.Queue() self.result = [] self.dict_file = dict_file or 'subnames.txt' self.resolver = DNSResolver(loop = self.loop) self._load_sub_names()
def __init__(self, client, text_channel): self.bot = client self.text_channel = text_channel self.server = text_channel.server self.queue = asyncio.Queue() self.current = None self.play_next_song = asyncio.Event() self.ytdl_options = {"default_search": "auto", "noplaylist": True, "quiet": True, "format": "webm[abr>0]/bestaudio/best", "prefer_ffmpeg": True} self.ytdl_download_options = {"default_search": "auto", "noplaylist": True, "quiet": True, "format": "bestaudio/best", "extractaudio": True, "outtmpl": "data/audio_cache/%(id)s-%(title)s.%(ext)s", "restrictfilenames": True} # "audioformat": "mp3" ? self.ytdl_playlist_options = {"default_search": "auto", "extract_flat": True, "forcejson": True, "quiet": True, "logger": playlist_logger} self.default_volume = 100.0 self.skip_votes_required = 0 self.skip_votes = set() self.player = self.bot.loop.create_task(self.player_task()) self.resume_flag = asyncio.Event() self.not_interrupted = asyncio.Event() self.not_interrupted.set() self.audio_files = os.listdir("data/audio_files/") self.library_files = [f for f in os.listdir(clients.library_files) if f.endswith((".mp3", ".m4a"))] self.library_flag = False self.radio_flag = False self.recognizer = speech_recognition.Recognizer() self.listener = None self.listen_paused = False self.previous_played_time = 0
def queue_embed(self): if self.radio_flag: return discord.Embed(title = ":radio: Radio is currently on", color = clients.bot_color) elif self.library_flag: return discord.Embed(title = ":notes: Playing songs from my library", color = clients.bot_color) elif self.queue.qsize() == 0: return discord.Embed(title = ":hole: The queue is currently empty", color = clients.bot_color) else: queue_string = "" for number, stream in enumerate(list(self.queue._queue)[:10], start = 1): queue_string += ":{}: **[{}]({})** (Added by: {})\n".format("keycap_ten" if number == 10 else clients.inflect_engine.number_to_words(number), stream["info"].get("title", "N/A"), stream["info"].get("webpage_url", "N/A"), stream["requester"].display_name) if self.queue.qsize() > 10: more_songs = self.queue.qsize() - 10 queue_string += ":arrow_right: There {} {} more {} in the queue".format(clients.inflect_engine.plural("is", more_songs), more_songs, clients.inflect_engine.plural("song", more_songs)) return discord.Embed(title = ":musical_score: Queue:", description = queue_string, color = clients.bot_color)
def __init__(self, take_ownership=True, # Tor dies when the Sorter does torrc_config={"ControlPort": "9051", "CookieAuth": "1"}, socks_port=9050, page_load_timeout=20, max_tasks=10, db_handler=None): self.logger = setup_logging(_log_dir, "sorter") self.db_handler = db_handler self.logger.info("Opening event loop for Sorter...") self.loop = asyncio.get_event_loop() self.max_tasks = max_tasks self.logger.info("Creating Sorter queue...") self.q = asyncio.Queue() # Start tor and create an aiohttp tor connector self.torrc_config = torrc_config self.socks_port = str(find_free_port(socks_port)) self.torrc_config.update({"SocksPort": self.socks_port}) self.logger.info("Starting tor process with config " "{self.torrc_config}.".format(**locals())) self.tor_process = launch_tor_with_config(config=self.torrc_config, take_ownership=take_ownership) onion_proxy = aiosocks.Socks5Addr('127.0.0.1', socks_port) conn = SocksConnector(proxy=onion_proxy, remote_resolve=True) # aiohttp's ClientSession does connection pooling and HTTP keep-alives # for us self.logger.info("Creating aiohttp ClientSession with our event loop " "and tor proxy connector...") self.session = aiohttp.ClientSession(loop=self.loop, connector=conn) # Pretend we're Tor Browser in order to get rejected by less sites/WAFs u = "Mozilla/5.0 (Windows NT 6.1; rv:45.0) Gecko/20100101 Firefox/45.0" self.headers = {'user-agent': u} self.page_load_timeout = page_load_timeout
def __init__(self, bot): self.current = None self.voice = None self.bot = bot self.play_next_song = asyncio.Event() self.songs = asyncio.Queue() self.skip_votes = set() # a set of user_ids that voted self.audio_player = self.bot.loop.create_task(self.audio_player_task())
def __init__(self, driver, expire_time=None): self._logger = driver.logger self._input_manager = driver.input_manager self._key_mapping = driver.hardware.key_mapping self._expire_time = 0 if expire_time is not None: self._expire_time = expire_time self._attached = False self._q = asyncio.Queue() self._events = [] self._keystates = InputQueue.KEY_DOWN
def __init__(self, prefix: str = "aiotasks", loop=None): super().__init__(loop=loop, prefix=prefix) self._loop_subscribers = loop or asyncio.get_event_loop() self.topics_messages = asyncio.Queue(loop=self._loop_subscribers)
def __init__(self, dsn=None, prefix: str = "aiotasks", loop=None, concurrency: int = 5): super().__init__(loop=loop, prefix=prefix, concurrency=concurrency) self._task_queue = asyncio.Queue(loop=self._loop_delay)
def __init__(self, host: str = CHROME_HOST, port: int = CHROME_PORT, loop=None) -> None: self.host = host self.port = port self.loop = loop self._rdp = ChromeRemoteDebugger(host, port, loop=loop) self._pages = set() self._idle_pages: asyncio.Queue = asyncio.Queue(loop=self.loop)
def test_ctor_loop(self): loop = mock.Mock() q = asyncio.Queue(loop=loop) self.assertIs(q._loop, loop) q = asyncio.Queue(loop=self.loop) self.assertIs(q._loop, self.loop)
def test_ctor_noloop(self): asyncio.set_event_loop(self.loop) q = asyncio.Queue() self.assertIs(q._loop, self.loop)
def test_empty(self): q = asyncio.Queue(loop=self.loop) self.assertTrue(q.empty()) q.put_nowait(1) self.assertFalse(q.empty()) self.assertEqual(1, q.get_nowait()) self.assertTrue(q.empty())
def test_full(self): q = asyncio.Queue(loop=self.loop) self.assertFalse(q.full()) q = asyncio.Queue(maxsize=1, loop=self.loop) q.put_nowait(1) self.assertTrue(q.full())