我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用asyncio.StreamReaderProtocol()。
def stream_from_fd(fd, loop): """Recieve a streamer for a given file descriptor.""" reader = asyncio.StreamReader(loop=loop) protocol = asyncio.StreamReaderProtocol(reader, loop=loop) waiter = asyncio.futures.Future(loop=loop) transport = UnixFileDescriptorTransport( loop=loop, fileno=fd, protocol=protocol, waiter=waiter, ) try: yield from waiter except: transport.close() raise if loop.get_debug(): logger.debug("Read fd %r connected: (%r, %r)", fd, transport, protocol) return reader, transport
def open_pipe_connection( path=None, *, loop=None, limit=DEFAULT_LIMIT, **kwargs ): """ Connect to a server using a Windows named pipe. """ path = path.replace('/', '\\') loop = loop or asyncio.get_event_loop() reader = asyncio.StreamReader(limit=limit, loop=loop) protocol = asyncio.StreamReaderProtocol(reader, loop=loop) transport, _ = await loop.create_pipe_connection( lambda: protocol, path, **kwargs ) writer = asyncio.StreamWriter(transport, protocol, reader, loop) return reader, writer
def connection_made(self, transport): """ A peer is now connected and we receive an instance of the underlying :class:`asyncio.Transport`. We :class:`asyncio.StreamReader` is created and the transport is associated before the initial HTTP handshake is undertaken. """ #self.transport = transport #self.stream = asyncio.StreamReader() #self.stream.set_transport(transport) asyncio.StreamReaderProtocol.connection_made(self, transport) # Let make it concurrent for others to tag along f = asyncio.async(self.handle_initial_handshake()) f.add_done_callback(self.terminated)
def get_reader(self): if self._reader is None: self._reader = asyncio.StreamReader() protocol = asyncio.StreamReaderProtocol(self._reader) loop = asyncio.get_event_loop() await loop.connect_read_pipe(lambda: protocol, sys.stdin) return self._reader
def connect_read_pipe(file): loop = asyncio.get_event_loop() stream_reader = asyncio.StreamReader(loop=loop) def factory(): return asyncio.StreamReaderProtocol(stream_reader) transport, _ = yield from loop.connect_read_pipe(factory, file) return stream_reader, transport # # Example #
def task(): rfd, wfd = os.pipe() args = [sys.executable, '-c', code, str(wfd)] pipe = open(rfd, 'rb', 0) reader = asyncio.StreamReader(loop=loop) protocol = asyncio.StreamReaderProtocol(reader, loop=loop) transport, _ = yield from loop.connect_read_pipe(lambda: protocol, pipe) proc = yield from asyncio.create_subprocess_exec(*args, pass_fds={wfd}) yield from proc.wait() os.close(wfd) data = yield from reader.read() print("read = %r" % data.decode())
def test_read_all_from_pipe_reader(self): # See Tulip issue 168. This test is derived from the example # subprocess_attach_read_pipe.py, but we configure the # StreamReader's limit so that twice it is less than the size # of the data writter. Also we must explicitly attach a child # watcher to the event loop. code = """\ import os, sys fd = int(sys.argv[1]) os.write(fd, b'data') os.close(fd) """ rfd, wfd = os.pipe() args = [sys.executable, '-c', code, str(wfd)] pipe = open(rfd, 'rb', 0) reader = asyncio.StreamReader(loop=self.loop, limit=1) protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop) transport, _ = self.loop.run_until_complete( self.loop.connect_read_pipe(lambda: protocol, pipe)) watcher = asyncio.SafeChildWatcher() watcher.attach_loop(self.loop) try: asyncio.set_child_watcher(watcher) create = asyncio.create_subprocess_exec(*args, pass_fds={wfd}, loop=self.loop) proc = self.loop.run_until_complete(create) self.loop.run_until_complete(proc.wait()) finally: asyncio.set_child_watcher(None) os.close(wfd) data = self.loop.run_until_complete(reader.read(-1)) self.assertEqual(data, b'data')
def test_streamreader_constructor(self): self.addCleanup(asyncio.set_event_loop, None) asyncio.set_event_loop(self.loop) # Tulip issue #184: Ensure that StreamReaderProtocol constructor # retrieves the current loop if the loop parameter is not set reader = asyncio.StreamReader() self.assertIs(reader._loop, self.loop)
def test_streamreaderprotocol_constructor(self): self.addCleanup(asyncio.set_event_loop, None) asyncio.set_event_loop(self.loop) # Tulip issue #184: Ensure that StreamReaderProtocol constructor # retrieves the current loop if the loop parameter is not set reader = mock.Mock() protocol = asyncio.StreamReaderProtocol(reader) self.assertIs(protocol._loop, self.loop)
def stdio(loop=None): if loop is None: loop = asyncio.get_event_loop() reader = asyncio.StreamReader() reader_protocol = asyncio.StreamReaderProtocol(reader) writer_transport, writer_protocol = await loop.connect_write_pipe( FlowControlMixin, os.fdopen(0, 'wb')) writer = StreamWriter(writer_transport, writer_protocol, None, loop) await loop.connect_read_pipe(lambda: reader_protocol, sys.stdin) return reader, writer
def read_pipe(file, size): loop = get_event_loop() reader = StreamReader() protocol = StreamReaderProtocol(reader) transport, _ = await loop.connect_read_pipe( lambda: protocol, fdopen(os_open(file, O_RDONLY | O_NONBLOCK))) chunks = list() while size > 0: chunk = await reader.read(size) if not chunk: break chunks.append(chunk) size -= len(chunk) transport.close() return b''.join(chunks)
def stdio(loop=None): if loop is None: loop = asyncio.get_event_loop() reader = asyncio.StreamReader() reader_protocol = asyncio.StreamReaderProtocol(reader) writer_transport, writer_protocol = await loop.connect_write_pipe(FlowControlMixin, os.fdopen(1, 'wb')) writer = StreamWriter(writer_transport, writer_protocol, None, loop) await loop.connect_read_pipe(lambda: reader_protocol, sys.stdin) return reader, writer
def start_pipe_server( client_connected_cb, *, path, loop=None, limit=DEFAULT_LIMIT ): """ Start listening for connection using Windows named pipes. """ path = path.replace('/', '\\') loop = loop or asyncio.get_event_loop() def factory(): reader = asyncio.StreamReader(limit=limit, loop=loop) protocol = asyncio.StreamReaderProtocol( reader, client_connected_cb, loop=loop, ) return protocol server, *_ = await loop.start_serving_pipe(factory, address=path) # The returned instance sadly doesn't have a `wait_closed` method so we add # one. closed = asyncio.Event(loop=loop) original_close = server.close def close(): original_close() closed.set() server.close = close server.wait_closed = closed.wait return server
def __init__(self, handler_cls): asyncio.StreamReaderProtocol.__init__(self, asyncio.StreamReader(), self._pseudo_connected) self.ws = handler_cls(self)
def mempipe(loop=None, limit=None): """In-memory pipe, returns a ``(reader, writer)`` pair. .. versionadded:: 0.1 """ loop = loop or asyncio.get_event_loop() limit = limit or _DEFAULT_LIMIT reader = asyncio.StreamReader(loop=loop, limit=limit) writer = asyncio.StreamWriter( transport=_MemoryTransport(reader), protocol=asyncio.StreamReaderProtocol(reader, loop=loop), reader=reader, loop=loop, ) return reader, writer
def stdio(loop=None): """Set up stdin/stdout stream handlers""" if loop is None: loop = asyncio.get_event_loop() reader = asyncio.StreamReader() reader_protocol = asyncio.StreamReaderProtocol(reader) writer_transport, writer_protocol = yield from loop.connect_write_pipe(FlowControlMixin, os.fdopen(0, 'wb')) writer = StreamWriter(writer_transport, writer_protocol, None, loop) yield from loop.connect_read_pipe(lambda: reader_protocol, sys.stdin) return reader, writer
def get_stdin(loop, exit_callbacks): """ !!!! Super super important !!!! Must create stdout ***before*** stdin. Otherwise deadlock. :param loop: :return: :rtype: asyncio.StreamReader """ stdin = asyncio.StreamReader() stdin_fio = os.fdopen(os.dup(sys.stdin.fileno()), 'rb') reader_protocol = asyncio.StreamReaderProtocol(stdin) read_transport, read_protocol = await loop.connect_read_pipe( lambda: reader_protocol, stdin_fio ) # async def _close_transport(): # read_transport.close() # # exit_callbacks.append( # _close_transport # ) return stdin
def stream_as_generator(loop, stream): reader = asyncio.StreamReader(loop=loop) reader_protocol = asyncio.StreamReaderProtocol(reader) await loop.connect_read_pipe(lambda: reader_protocol, stream) while 'stream receives input': line = await reader.readline() if not line: # EOF. break yield line
def open_serial_connection(**kwargs): """A wrapper for create_serial_connection() returning a (reader, writer) pair. The reader returned is a StreamReader instance; the writer is a StreamWriter instance. The arguments are all the usual arguments to Serial(). Additional optional keyword arguments are loop (to set the event loop instance to use) and limit (to set the buffer limit passed to the StreamReader. This function is a coroutine. """ # in order to avoid errors when pySerial is installed under Python 2, # avoid Pyhthon 3 syntax here. So do not use this function as a good # example! loop = kwargs.get('loop', asyncio.get_event_loop()) limit = kwargs.get('limit', asyncio.streams._DEFAULT_LIMIT) reader = asyncio.StreamReader(limit=limit, loop=loop) protocol = asyncio.StreamReaderProtocol(reader, loop=loop) # in Python 3 we would write "yield transport, _ from c()" for transport, _ in create_serial_connection( loop=loop, protocol_factory=lambda: protocol, **kwargs): yield transport, _ writer = asyncio.StreamWriter(transport, protocol, reader, loop) # in Python 3 we would write "return reader, writer" raise StopIteration(reader, writer) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # test
def test_read_all_from_pipe_reader(self): # See asyncio issue 168. This test is derived from the example # subprocess_attach_read_pipe.py, but we configure the # StreamReader's limit so that twice it is less than the size # of the data writter. Also we must explicitly attach a child # watcher to the event loop. code = """\ import os, sys fd = int(sys.argv[1]) os.write(fd, b'data') os.close(fd) """ rfd, wfd = os.pipe() args = [sys.executable, '-c', code, str(wfd)] pipe = open(rfd, 'rb', 0) reader = asyncio.StreamReader(loop=self.loop, limit=1) protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop) transport, _ = self.loop.run_until_complete( self.loop.connect_read_pipe(lambda: protocol, pipe)) watcher = asyncio.SafeChildWatcher() watcher.attach_loop(self.loop) try: asyncio.set_child_watcher(watcher) create = asyncio.create_subprocess_exec(*args, pass_fds={wfd}, loop=self.loop) proc = self.loop.run_until_complete(create) self.loop.run_until_complete(proc.wait()) finally: asyncio.set_child_watcher(None) os.close(wfd) data = self.loop.run_until_complete(reader.read(-1)) self.assertEqual(data, b'data')
def test_streamreader_constructor(self): self.addCleanup(asyncio.set_event_loop, None) asyncio.set_event_loop(self.loop) # asyncio issue #184: Ensure that StreamReaderProtocol constructor # retrieves the current loop if the loop parameter is not set reader = asyncio.StreamReader() self.assertIs(reader._loop, self.loop)
def test_streamreaderprotocol_constructor(self): self.addCleanup(asyncio.set_event_loop, None) asyncio.set_event_loop(self.loop) # asyncio issue #184: Ensure that StreamReaderProtocol constructor # retrieves the current loop if the loop parameter is not set reader = mock.Mock() protocol = asyncio.StreamReaderProtocol(reader) self.assertIs(protocol._loop, self.loop)
def _test_pipe(self): ADDRESS = r'\\.\pipe\_test_pipe-%s' % os.getpid() with self.assertRaises(FileNotFoundError): yield from self.loop.create_pipe_connection( asyncio.Protocol, ADDRESS) [server] = yield from self.loop.start_serving_pipe( UpperProto, ADDRESS) self.assertIsInstance(server, windows_events.PipeServer) clients = [] for i in range(5): stream_reader = asyncio.StreamReader(loop=self.loop) protocol = asyncio.StreamReaderProtocol(stream_reader, loop=self.loop) trans, proto = yield from self.loop.create_pipe_connection( lambda: protocol, ADDRESS) self.assertIsInstance(trans, asyncio.Transport) self.assertEqual(protocol, proto) clients.append((stream_reader, trans)) for i, (r, w) in enumerate(clients): w.write('lower-{}\n'.format(i).encode()) for i, (r, w) in enumerate(clients): response = yield from r.readline() self.assertEqual(response, 'LOWER-{}\n'.format(i).encode()) w.close() server.close() with self.assertRaises(FileNotFoundError): yield from self.loop.create_pipe_connection( asyncio.Protocol, ADDRESS) return 'done'
def _client_connected_cb(self, reader, writer): # This is redundant since we subclass StreamReaderProtocol, but I like # the shorter names. self._reader = reader self._writer = writer
def test_read_all_from_pipe_reader(self): # See Tulip issue 168. This test is derived from the example # subprocess_attach_read_pipe.py, but we configure the # StreamReader's limit so that twice it is less than the size # of the data writter. Also we must explicitly attach a child # watcher to the event loop. code = """\ import os, sys fd = int(sys.argv[1]) os.write(fd, b'data') os.close(fd) """ rfd, wfd = os.pipe() args = [sys.executable, '-c', code, str(wfd)] pipe = open(rfd, 'rb', 0) reader = asyncio.StreamReader(loop=self.loop, limit=1) protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop) transport, _ = self.loop.run_until_complete( self.loop.connect_read_pipe(lambda: protocol, pipe)) watcher = asyncio.SafeChildWatcher() watcher.attach_loop(self.loop) try: asyncio.set_child_watcher(watcher) proc = self.loop.run_until_complete( asyncio.create_subprocess_exec(*args, pass_fds={wfd}, loop=self.loop)) self.loop.run_until_complete(proc.wait()) finally: asyncio.set_child_watcher(None) os.close(wfd) data = self.loop.run_until_complete(reader.read(-1)) self.assertEqual(data, b'data')
def _test_pipe(self): ADDRESS = r'\\.\pipe\_test_pipe-%s' % os.getpid() with self.assertRaises(FileNotFoundError): yield from self.loop.create_pipe_connection( asyncio.Protocol, ADDRESS) [server] = yield from self.loop.start_serving_pipe( UpperProto, ADDRESS) self.assertIsInstance(server, windows_events.PipeServer) clients = [] for i in range(5): stream_reader = asyncio.StreamReader(loop=self.loop) protocol = asyncio.StreamReaderProtocol(stream_reader) trans, proto = yield from self.loop.create_pipe_connection( lambda: protocol, ADDRESS) self.assertIsInstance(trans, asyncio.Transport) self.assertEqual(protocol, proto) clients.append((stream_reader, trans)) for i, (r, w) in enumerate(clients): w.write('lower-{}\n'.format(i).encode()) for i, (r, w) in enumerate(clients): response = yield from r.readline() self.assertEqual(response, 'LOWER-{}\n'.format(i).encode()) w.close() server.close() with self.assertRaises(FileNotFoundError): yield from self.loop.create_pipe_connection( asyncio.Protocol, ADDRESS) return 'done'
def collect_stats(infile): reader = asyncio.StreamReader() reader_protocol = asyncio.StreamReaderProtocol(reader) transport, _ = await asyncio.get_event_loop() \ .connect_read_pipe(lambda: reader_protocol, infile) count = 0 dp = None stats = None while True: try: line = await reader.readline() except ValueError: logging.exception('Too large stats object!') break if line == b'': break stats = line if dp is None: try: dp = json.loads(line.decode('ascii')) except JSONDecodeError: break count += 1 transport.close() if dp is None or count == 0: return {} try: stats = json.loads(stats.decode('ascii')) except JSONDecodeError: stats = {} return stats
def stdin_reader(loop: asyncio.AbstractEventLoop, input_handler: Callable[[str], Awaitable] ) -> None: if sys.platform == 'win32': # Windows can't use SelectorEventLoop.connect_read_pipe # and ProactorEventLoop.connect_read_pipe apparently # doesn't work with sys.* streams or files. # http://stackoverflow.com/questions/31510190/aysncio-cannot-read-stdin-on-windows # # Running polling in an executor (thread) doesn't work properly either # since there is absolutely no way to stop the executor (sys.stdin.readline) # and make the program terminate. # So instead, we spawn a custom daemon thread. # Fuck yeah asyncio! import threading thread_close_evt = asyncio.Event() def reader_thread(): while True: try: line = sys.stdin.readline() except KeyboardInterrupt: break if not line: break loop.call_soon_threadsafe(lambda: loop.create_task(input_handler(line))) loop.call_soon_threadsafe(lambda: thread_close_evt.set()) threading.Thread(target=reader_thread, daemon=True).start() await thread_close_evt.wait() else: reader = asyncio.StreamReader() make_protocol = partial(asyncio.StreamReaderProtocol, reader) await loop.connect_read_pipe(make_protocol, sys.stdin) while True: line_bytes = await reader.readline() line = line_bytes.decode(sys.stdin.encoding) if not line: break loop.create_task(input_handler(line)) print("stdin stream closed")