我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tornado.testing.ExpectLog()。
def test_body_size_override_reset(self): # The max_body_size override is reset between requests. stream = IOStream(socket.socket()) try: yield stream.connect(('127.0.0.1', self.get_http_port())) # Use a raw stream so we can make sure it's all on one connection. stream.write(b'PUT /streaming?expected_size=10240 HTTP/1.1\r\n' b'Content-Length: 10240\r\n\r\n') stream.write(b'a' * 10240) headers, response = yield gen.Task(read_stream_body, stream) self.assertEqual(response, b'10240') # Without the ?expected_size parameter, we get the old default value stream.write(b'PUT /streaming HTTP/1.1\r\n' b'Content-Length: 10240\r\n\r\n') with ExpectLog(gen_log, '.*Content-Length too long'): data = yield stream.read_until_close() self.assertEqual(data, b'') finally: stream.close()
def test_multi_exceptions(self): with ExpectLog(app_log, "Multiple exceptions in yield list"): with self.assertRaises(RuntimeError) as cm: yield gen.Multi([self.async_exception(RuntimeError("error 1")), self.async_exception(RuntimeError("error 2"))]) self.assertEqual(str(cm.exception), "error 1") # With only one exception, no error is logged. with self.assertRaises(RuntimeError): yield gen.Multi([self.async_exception(RuntimeError("error 1")), self.async_future(2)]) # Exception logging may be explicitly quieted. with self.assertRaises(RuntimeError): yield gen.Multi([self.async_exception(RuntimeError("error 1")), self.async_exception(RuntimeError("error 2"))], quiet_exceptions=RuntimeError)
def test_multi_future_exceptions(self): with ExpectLog(app_log, "Multiple exceptions in yield list"): with self.assertRaises(RuntimeError) as cm: yield [self.async_exception(RuntimeError("error 1")), self.async_exception(RuntimeError("error 2"))] self.assertEqual(str(cm.exception), "error 1") # With only one exception, no error is logged. with self.assertRaises(RuntimeError): yield [self.async_exception(RuntimeError("error 1")), self.async_future(2)] # Exception logging may be explicitly quieted. with self.assertRaises(RuntimeError): yield gen.multi_future( [self.async_exception(RuntimeError("error 1")), self.async_exception(RuntimeError("error 2"))], quiet_exceptions=RuntimeError)
def test_cross_user(self): token2 = self.get_token() # Each token can be used to authenticate its own request. for token in (self.xsrf_token, token2): response = self.fetch( "/", method="POST", body=urllib_parse.urlencode(dict(_xsrf=token)), headers=self.cookie_headers(token)) self.assertEqual(response.code, 200) # Sending one in the cookie and the other in the body is not allowed. for cookie_token, body_token in ((self.xsrf_token, token2), (token2, self.xsrf_token)): with ExpectLog(gen_log, '.*XSRF cookie does not match POST'): response = self.fetch( "/", method="POST", body=urllib_parse.urlencode(dict(_xsrf=body_token)), headers=self.cookie_headers(cookie_token)) self.assertEqual(response.code, 403)
def test_connection_refused(self): # When a connection is refused, the connect callback should not # be run. (The kqueue IOLoop used to behave differently from the # epoll IOLoop in this respect) cleanup_func, port = refusing_port() self.addCleanup(cleanup_func) stream = IOStream(socket.socket(), self.io_loop) self.connect_called = False def connect_callback(): self.connect_called = True self.stop() stream.set_close_callback(self.stop) # log messages vary by platform and ioloop implementation with ExpectLog(gen_log, ".*", required=False): stream.connect(("127.0.0.1", port), connect_callback) self.wait() self.assertFalse(self.connect_called) self.assertTrue(isinstance(stream.error, socket.error), stream.error) if sys.platform != 'cygwin': _ERRNO_CONNREFUSED = (errno.ECONNREFUSED,) if hasattr(errno, "WSAECONNREFUSED"): _ERRNO_CONNREFUSED += (errno.WSAECONNREFUSED,) # cygwin's errnos don't match those used on native windows python self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED)
def test_read_callback_error(self): # Test that IOStream sets its exc_info when a read callback throws server, client = self.make_iostream_pair() try: server.set_close_callback(self.stop) with ExpectLog( app_log, "(Uncaught exception|Exception in callback)" ): # Clear ExceptionStackContext so IOStream catches error with NullContext(): server.read_bytes(1, callback=lambda data: 1 / 0) client.write(b"1") self.wait() self.assertTrue(isinstance(server.error, ZeroDivisionError)) finally: server.close() client.close()
def test_async_read_error_logging(self): # Socket errors on asynchronous reads should be logged (but only # once). server, client = self.make_iostream_pair() server.set_close_callback(self.stop) try: # Start a read that will be fulfilled asynchronously. server.read_bytes(1, lambda data: None) client.write(b'a') # Stub out read_from_fd to make it fail. def fake_read_from_fd(): os.close(server.socket.fileno()) server.__class__.read_from_fd(server) server.read_from_fd = fake_read_from_fd # This log message is from _handle_read (not read_from_fd). with ExpectLog(gen_log, "error on read"): self.wait() finally: server.close() client.close()
def test_read_until_regex_max_bytes_inline(self): server, client = self.make_iostream_pair() client.set_close_callback(lambda: self.stop("closed")) try: # Similar to the error case in the previous test, but the # server writes first so client reads are satisfied # inline. For consistency with the out-of-line case, we # do not raise the error synchronously. server.write(b"123456") with ExpectLog(gen_log, "Unsatisfiable read"): client.read_until_regex(b"def", self.stop, max_bytes=5) data = self.wait() self.assertEqual(data, "closed") finally: server.close() client.close()
def test_handle_stream_coroutine_logging(self): # handle_stream may be a coroutine and any exception in its # Future will be logged. class TestServer(TCPServer): @gen.coroutine def handle_stream(self, stream, address): yield gen.moment stream.close() 1 / 0 server = client = None try: sock, port = bind_unused_port() with NullContext(): server = TestServer() server.add_socket(sock) client = IOStream(socket.socket()) with ExpectLog(app_log, "Exception in callback"): yield client.connect(('localhost', port)) yield client.read_until_close() yield gen.moment finally: if server is not None: server.stop() if client is not None: client.close()
def test_multiple_errors(self): def fail(message): raise Exception(message) self.io_loop.add_callback(lambda: fail("error one")) self.io_loop.add_callback(lambda: fail("error two")) # The first error gets raised; the second gets logged. with ExpectLog(app_log, "multiple unhandled exceptions"): with self.assertRaises(Exception) as cm: self.wait() self.assertEqual(str(cm.exception), "error one")
def test_error_in_on_message(self): ws = yield self.ws_connect('/error_in_on_message') ws.write_message('hello') with ExpectLog(app_log, "Uncaught exception"): response = yield ws.read_message() self.assertIs(response, None) yield self.close(ws)
def test_websocket_network_fail(self): sock, port = bind_unused_port() sock.close() with self.assertRaises(IOError): with ExpectLog(gen_log, ".*"): yield websocket_connect( 'ws://127.0.0.1:%d/' % port, io_loop=self.io_loop, connect_timeout=3600)
def test_missing_headers(self): data = b'''\ --1234 Foo --1234--'''.replace(b"\n", b"\r\n") args = {} files = {} with ExpectLog(gen_log, "multipart/form-data missing headers"): parse_multipart_form_data(b"1234", data, args, files) self.assertEqual(files, {})
def test_line_does_not_end_with_correct_line_break(self): data = b'''\ --1234 Content-Disposition: form-data; name="files"; filename="ab.txt" Foo--1234--'''.replace(b"\n", b"\r\n") args = {} files = {} with ExpectLog(gen_log, "Invalid multipart/form-data"): parse_multipart_form_data(b"1234", data, args, files) self.assertEqual(files, {})
def test_content_disposition_header_without_name_parameter(self): data = b"""\ --1234 Content-Disposition: form-data; filename="ab.txt" Foo --1234--""".replace(b"\n", b"\r\n") args = {} files = {} with ExpectLog(gen_log, "multipart/form-data value missing name"): parse_multipart_form_data(b"1234", data, args, files) self.assertEqual(files, {})
def test_stack_context(self): with ExpectLog(app_log, "Uncaught exception GET /"): self.http_client.fetch(self.get_url('/'), self.handle_response) self.wait() self.assertEqual(self.response.code, 500) self.assertTrue(b'got expected exception' in self.response.body)
def test_exception_logging(self): """Uncaught exceptions get logged by the IOLoop.""" # Use a NullContext to keep the exception from being caught by # AsyncTestCase. with NullContext(): self.io_loop.add_callback(lambda: 1 / 0) self.io_loop.add_callback(self.stop) with ExpectLog(app_log, "Exception in callback"): self.wait()
def test_exception_logging_future(self): """The IOLoop examines exceptions from Futures and logs them.""" with NullContext(): @gen.coroutine def callback(): self.io_loop.add_callback(self.stop) 1 / 0 self.io_loop.add_callback(callback) with ExpectLog(app_log, "Exception in callback"): self.wait()
def test_spawn_callback(self): # An added callback runs in the test's stack_context, so will be # re-arised in wait(). self.io_loop.add_callback(lambda: 1 / 0) with self.assertRaises(ZeroDivisionError): self.wait() # A spawned callback is run directly on the IOLoop, so it will be # logged without stopping the test. self.io_loop.spawn_callback(lambda: 1 / 0) self.io_loop.add_callback(self.stop) with ExpectLog(app_log, "Exception in callback"): self.wait()
def test_non_ssl_request(self): # Make sure the server closes the connection when it gets a non-ssl # connection, rather than waiting for a timeout or otherwise # misbehaving. with ExpectLog(gen_log, '(SSL Error|uncaught exception)'): with ExpectLog(gen_log, 'Uncaught exception', required=False): self.http_client.fetch( self.get_url("/").replace('https:', 'http:'), self.stop, request_timeout=3600, connect_timeout=3600) response = self.wait() self.assertEqual(response.code, 599)
def test_error_logging(self): # No stack traces are logged for SSL errors. with ExpectLog(gen_log, 'SSL Error') as expect_log: self.http_client.fetch( self.get_url("/").replace("https:", "http:"), self.stop) response = self.wait() self.assertEqual(response.code, 599) self.assertFalse(expect_log.logged_stack) # Python's SSL implementation differs significantly between versions. # For example, SSLv3 and TLSv1 throw an exception if you try to read # from the socket before the handshake is complete, but the default # of SSLv23 allows it.
def test_malformed_body(self): # parse_qs is pretty forgiving, but it will fail on python 3 # if the data is not utf8. On python 2 parse_qs will work, # but then the recursive_unicode call in EchoHandler will # fail. if str is bytes: return with ExpectLog(gen_log, 'Invalid x-www-form-urlencoded body'): response = self.fetch( '/echo', method="POST", headers={'Content-Type': 'application/x-www-form-urlencoded'}, body=b'\xe9') self.assertEqual(200, response.code) self.assertEqual(b'{}', response.body)
def test_malformed_first_line(self): with ExpectLog(gen_log, '.*Malformed HTTP request line'): self.stream.write(b'asdf\r\n\r\n') # TODO: need an async version of ExpectLog so we don't need # hard-coded timeouts here. self.io_loop.add_timeout(datetime.timedelta(seconds=0.01), self.stop) self.wait()
def test_unix_socket_bad_request(self): # Unix sockets don't have remote addresses so they just return an # empty string. with ExpectLog(gen_log, "Malformed HTTP message from"): self.stream.write(b"garbage\r\n\r\n") self.stream.read_until_close(self.stop) response = self.wait() self.assertEqual(response, b"")
def test_gzip_unsupported(self): # Gzip support is opt-in; without it the server fails to parse # the body (but parsing form bodies is currently just a log message, # not a fatal error). with ExpectLog(gen_log, "Unsupported Content-Encoding"): response = self.post_gzip('foo=bar') self.assertEquals(json_decode(response.body), {})
def test_large_headers(self): with ExpectLog(gen_log, "Unsatisfiable read", required=False): response = self.fetch("/", headers={'X-Filler': 'a' * 1000}) # 431 is "Request Header Fields Too Large", defined in RFC # 6585. However, many implementations just close the # connection in this case, resulting in a 599. self.assertIn(response.code, (431, 599))
def test_large_body_buffered(self): with ExpectLog(gen_log, '.*Content-Length too long'): response = self.fetch('/buffered', method='PUT', body=b'a' * 10240) self.assertEqual(response.code, 599)
def test_large_body_buffered_chunked(self): with ExpectLog(gen_log, '.*chunked body too large'): response = self.fetch('/buffered', method='PUT', body_producer=lambda write: write(b'a' * 10240)) self.assertEqual(response.code, 599)
def test_large_body_streaming_chunked(self): with ExpectLog(gen_log, '.*chunked body too large'): response = self.fetch('/streaming', method='PUT', body_producer=lambda write: write(b'a' * 10240)) self.assertEqual(response.code, 599)
def test_timeout(self): stream = IOStream(socket.socket()) try: yield stream.connect(('127.0.0.1', self.get_http_port())) # Use a raw stream because AsyncHTTPClient won't let us read a # response without finishing a body. stream.write(b'PUT /streaming?body_timeout=0.1 HTTP/1.0\r\n' b'Content-Length: 42\r\n\r\n') with ExpectLog(gen_log, 'Timeout reading body'): response = yield stream.read_until_close() self.assertEqual(response, b'') finally: stream.close()
def test_coroutine_exception_handler(self): # Make sure we get an error and not a timeout with ExpectLog(app_log, "Uncaught exception GET /coroutine_exception"): response = self.fetch('/coroutine_exception') self.assertEqual(500, response.code)
def test_cookie_tampering_future_timestamp(self): handler = CookieTestRequestHandler() # this string base64-encodes to '12345678' handler.set_secure_cookie('foo', binascii.a2b_hex(b'd76df8e7aefc'), version=1) cookie = handler._cookies['foo'] match = re.match(br'12345678\|([0-9]+)\|([0-9a-f]+)', cookie) self.assertTrue(match) timestamp = match.group(1) sig = match.group(2) self.assertEqual( _create_signature_v1(handler.application.settings["cookie_secret"], 'foo', '12345678', timestamp), sig) # shifting digits from payload to timestamp doesn't alter signature # (this is not desirable behavior, just confirming that that's how it # works) self.assertEqual( _create_signature_v1(handler.application.settings["cookie_secret"], 'foo', '1234', b'5678' + timestamp), sig) # tamper with the cookie handler._cookies['foo'] = utf8('1234|5678%s|%s' % ( to_basestring(timestamp), to_basestring(sig))) # it gets rejected with ExpectLog(gen_log, "Cookie timestamp in future"): self.assertTrue( handler.get_secure_cookie('foo', min_version=1) is None)
def test_error(self): # Percent signs (encoded as %25) should not mess up printf-style # messages in logs with ExpectLog(gen_log, ".*Invalid unicode"): self.fetch("/group/?arg=%25%e9")
def test_decode_argument_invalid_unicode(self): # test that invalid unicode in URLs causes 400, not 500 with ExpectLog(gen_log, ".*Invalid unicode.*"): response = self.fetch("/typecheck/invalid%FF") self.assertEqual(response.code, 400) response = self.fetch("/typecheck/invalid?foo=%FF") self.assertEqual(response.code, 400)
def test_default(self): with ExpectLog(app_log, "Uncaught exception"): response = self.fetch("/default") self.assertEqual(response.code, 500) self.assertTrue(b"500: Internal Server Error" in response.body) response = self.fetch("/default?status=503") self.assertEqual(response.code, 503) self.assertTrue(b"503: Service Unavailable" in response.body)