我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用falcon.testing()。
def test_netloc_from_env(self, protocol): port = 9000 host = 'example.org' env = testing.create_environ( protocol=protocol, host=host, port=port, app=self.app, path='/hello', query_string=self.qs, headers=self.headers) req = Request(env) assert req.port == port assert req.netloc == '{0}:{1}'.format(host, port)
def test_raise_status_in_process_resource(self): """ Make sure we can raise status from middleware process resource """ class TestMiddleware: def process_resource(self, req, resp, resource, params): raise HTTPStatus(falcon.HTTP_200, headers={'X-Failed': 'False'}, body='Pass') app = falcon.API(middleware=TestMiddleware()) app.add_route('/status', TestHookResource()) client = testing.TestClient(app) response = client.simulate_request(path='/status', method='GET') assert response.status == falcon.HTTP_200 assert response.headers['x-failed'] == 'False' assert response.text == 'Pass'
def test_raise_status_runs_process_response(self): """ Make sure process_response still runs """ class TestMiddleware: def process_response(self, req, resp, response): resp.status = falcon.HTTP_200 resp.set_header('X-Failed', 'False') resp.body = 'Pass' app = falcon.API(middleware=TestMiddleware()) app.add_route('/status', TestHookResource()) client = testing.TestClient(app) response = client.simulate_request(path='/status', method='GET') assert response.status == falcon.HTTP_200 assert response.headers['x-failed'] == 'False' assert response.text == 'Pass'
def test_log_get_request(self): """Test that Log middleware is executed""" global context app = falcon.API(middleware=[RequestTimeMiddleware()]) app.add_route(TEST_ROUTE, MiddlewareClassResource()) client = testing.TestClient(app) response = client.simulate_request(path=TEST_ROUTE) assert _EXPECTED_BODY == response.json assert response.status == falcon.HTTP_200 assert 'start_time' in context assert 'mid_time' in context assert 'end_time' in context assert context['mid_time'] >= context['start_time'], \ 'process_resource not executed after request' assert context['end_time'] >= context['start_time'], \ 'process_response not executed after request' assert context['req_succeeded']
def test_generate_trans_id_and_time_with_request(self): global context app = falcon.API(middleware=[TransactionIdMiddleware(), RequestTimeMiddleware()]) app.add_route(TEST_ROUTE, MiddlewareClassResource()) client = testing.TestClient(app) response = client.simulate_request(path=TEST_ROUTE) assert _EXPECTED_BODY == response.json assert response.status == falcon.HTTP_200 assert 'transaction_id' in context assert 'unique-req-id' == context['transaction_id'] assert 'start_time' in context assert 'mid_time' in context assert 'end_time' in context assert context['mid_time'] >= context['start_time'], \ 'process_resource not executed after request' assert context['end_time'] >= context['start_time'], \ 'process_response not executed after request'
def test_middleware_execution_order(self): global context app = falcon.API(middleware=[ExecutedFirstMiddleware(), ExecutedLastMiddleware()]) app.add_route(TEST_ROUTE, MiddlewareClassResource()) client = testing.TestClient(app) response = client.simulate_request(path=TEST_ROUTE) assert _EXPECTED_BODY == response.json assert response.status == falcon.HTTP_200 # as the method registration is in a list, the order also is # tested expectedExecutedMethods = [ 'ExecutedFirstMiddleware.process_request', 'ExecutedLastMiddleware.process_request', 'ExecutedFirstMiddleware.process_resource', 'ExecutedLastMiddleware.process_resource', 'ExecutedLastMiddleware.process_response', 'ExecutedFirstMiddleware.process_response' ] assert expectedExecutedMethods == context['executed_methods']
def test_independent_middleware_execution_order(self): global context app = falcon.API(independent_middleware=True, middleware=[ExecutedFirstMiddleware(), ExecutedLastMiddleware()]) app.add_route(TEST_ROUTE, MiddlewareClassResource()) client = testing.TestClient(app) response = client.simulate_request(path=TEST_ROUTE) assert _EXPECTED_BODY == response.json assert response.status == falcon.HTTP_200 # as the method registration is in a list, the order also is # tested expectedExecutedMethods = [ 'ExecutedFirstMiddleware.process_request', 'ExecutedLastMiddleware.process_request', 'ExecutedFirstMiddleware.process_resource', 'ExecutedLastMiddleware.process_resource', 'ExecutedLastMiddleware.process_response', 'ExecutedFirstMiddleware.process_response' ] assert expectedExecutedMethods == context['executed_methods']
def test_inner_mw_throw_exception(self): """Test that error in inner middleware leaves""" global context class RaiseErrorMiddleware(object): def process_request(self, req, resp): raise Exception('Always fail') app = falcon.API(middleware=[TransactionIdMiddleware(), RequestTimeMiddleware(), RaiseErrorMiddleware()]) app.add_route(TEST_ROUTE, MiddlewareClassResource()) client = testing.TestClient(app) with pytest.raises(Exception): client.simulate_request(path=TEST_ROUTE) # RequestTimeMiddleware process_response should be executed assert 'transaction_id' in context assert 'start_time' in context assert 'mid_time' not in context assert 'end_time' in context
def test_can_access_resource_params(self): """Test that params can be accessed from within process_resource""" global context class Resource: def on_get(self, req, resp, **params): resp.body = json.dumps(params) app = falcon.API(middleware=AccessParamsMiddleware()) app.add_route('/path/{id}', Resource()) client = testing.TestClient(app) response = client.simulate_request(path='/path/22') assert 'params' in context assert context['params'] assert context['params']['id'] == '22' assert response.json == {'added': True, 'id': '22'}
def test_http_status_raised_from_error_handler(self): mw = CaptureResponseMiddleware() app = falcon.API(middleware=mw) app.add_route('/', MiddlewareClassResource()) client = testing.TestClient(app) def _http_error_handler(error, req, resp, params): raise falcon.HTTPStatus(falcon.HTTP_201) # NOTE(kgriffs): This will take precedence over the default # handler for facon.HTTPError. app.add_error_handler(falcon.HTTPError, _http_error_handler) response = client.simulate_request(path='/', method='POST') assert response.status == falcon.HTTP_201 assert mw.resp.status == response.status
def test_response_base_case(client): result = client.simulate_get('/') cookie = result.cookies['foo'] assert cookie.name == 'foo' assert cookie.value == 'bar' assert cookie.domain == 'example.com' assert cookie.http_only # NOTE(kgriffs): Explicitly test for None to ensure # falcon.testing.Cookie is returning exactly what we # expect. Apps using falcon.testing.Cookie can be a # bit more cavalier if they wish. assert cookie.max_age is None assert cookie.expires is None assert cookie.path == '/' assert cookie.secure
def test_read_body(self, client, resource): client.app.add_route('/', resource) expected_body = testing.rand_string(SIZE_1_KB / 2, SIZE_1_KB) expected_len = len(expected_body) headers = {'Content-Length': str(expected_len)} client.simulate_request(path='/', body=expected_body, headers=headers) content_len = resource.req.get_header('content-length') assert content_len == str(expected_len) stream = self._get_wrapped_stream(resource.req) actual_body = stream.read() assert actual_body == expected_body.encode('utf-8') stream.seek(0, 2) assert stream.tell() == expected_len assert stream.tell() == expected_len
def setup_method(self, method): self.qs = 'marker=deadbeef&limit=10' self.headers = { 'Content-Type': 'text/plain', 'Content-Length': '4829', 'Authorization': '' } self.app = '/test' self.path = '/hello' self.relative_uri = self.path + '?' + self.qs self.req = Request(testing.create_environ( app=self.app, port=8080, path='/hello', query_string=self.qs, headers=self.headers)) self.req_noqs = Request(testing.create_environ( app=self.app, path='/hello', headers=self.headers))
def test_range_unit(self): headers = {'Range': 'bytes=10-'} req = Request(testing.create_environ(headers=headers)) assert req.range == (10, -1) assert req.range_unit == 'bytes' headers = {'Range': 'items=10-'} req = Request(testing.create_environ(headers=headers)) assert req.range == (10, -1) assert req.range_unit == 'items' headers = {'Range': ''} req = Request(testing.create_environ(headers=headers)) with pytest.raises(falcon.HTTPInvalidHeader): req.range_unit req = Request(testing.create_environ()) assert req.range_unit is None
def test_attribute_headers(self): hash = 'fa0d1a60ef6616bb28038515c8ea4cb2' auth = 'HMAC_SHA1 c590afa9bb59191ffab30f223791e82d3fd3e3af' agent = 'testing/1.0.1' default_agent = 'curl/7.24.0 (x86_64-apple-darwin12.0)' referer = 'https://www.google.com/' self._test_attribute_header('Accept', 'x-falcon', 'accept', default='*/*') self._test_attribute_header('Authorization', auth, 'auth') self._test_attribute_header('Content-Type', 'text/plain', 'content_type') self._test_attribute_header('Expect', '100-continue', 'expect') self._test_attribute_header('If-Match', hash, 'if_match') self._test_attribute_header('If-None-Match', hash, 'if_none_match') self._test_attribute_header('If-Range', hash, 'if_range') self._test_attribute_header('User-Agent', agent, 'user_agent', default=default_agent) self._test_attribute_header('Referer', referer, 'referer')
def test_scheme_http(self, protocol, set_forwarded_proto): scheme = 'http' forwarded_scheme = 'HttPs' headers = dict(self.headers) if set_forwarded_proto: headers['X-Forwarded-Proto'] = forwarded_scheme req = Request(testing.create_environ( protocol=protocol, scheme=scheme, app=self.app, path='/hello', query_string=self.qs, headers=headers)) assert req.scheme == scheme assert req.port == 80 if set_forwarded_proto: assert req.forwarded_scheme == forwarded_scheme.lower() else: assert req.forwarded_scheme == scheme
def test_raise_status_in_before_hook(self): """ Make sure we get the 200 raised by before hook """ app = falcon.API() app.add_route('/status', TestStatusResource()) client = testing.TestClient(app) response = client.simulate_request(path='/status', method='GET') assert response.status == falcon.HTTP_200 assert response.headers['x-failed'] == 'False' assert response.text == 'Pass'
def test_raise_status_in_responder(self): """ Make sure we get the 200 raised by responder """ app = falcon.API() app.add_route('/status', TestStatusResource()) client = testing.TestClient(app) response = client.simulate_request(path='/status', method='POST') assert response.status == falcon.HTTP_200 assert response.headers['x-failed'] == 'False' assert response.text == 'Pass'
def test_raise_status_runs_after_hooks(self): """ Make sure after hooks still run """ app = falcon.API() app.add_route('/status', TestStatusResource()) client = testing.TestClient(app) response = client.simulate_request(path='/status', method='PUT') assert response.status == falcon.HTTP_200 assert response.headers['x-failed'] == 'False' assert response.text == 'Pass'
def test_raise_status_survives_after_hooks(self): """ Make sure after hook doesn't overwrite our status """ app = falcon.API() app.add_route('/status', TestStatusResource()) client = testing.TestClient(app) response = client.simulate_request(path='/status', method='DELETE') assert response.status == falcon.HTTP_200 assert response.headers['x-failed'] == 'False' assert response.text == 'Pass'
def test_raise_status_empty_body(self): """ Make sure passing None to body results in empty body """ app = falcon.API() app.add_route('/status', TestStatusResource()) client = testing.TestClient(app) response = client.simulate_request(path='/status', method='PATCH') assert response.text == ''
def client(): app = falcon.API() app.req_options.auto_parse_form_urlencoded = True return testing.TestClient(app)
def test_dont_auto_parse_by_default(self): app = falcon.API() resource = testing.SimpleTestResource() app.add_route('/', resource) client = testing.TestClient(app) headers = {'Content-Type': 'application/x-www-form-urlencoded'} client.simulate_request(path='/', body='q=42', headers=headers) req = resource.captured_req assert req.get_param('q') is None
def client(): app = falcon.API() return testing.TestClient(app)
def test_skip_process_resource(self): global context app = falcon.API(middleware=[RequestTimeMiddleware()]) app.add_route('/', MiddlewareClassResource()) client = testing.TestClient(app) response = client.simulate_request(path='/404') assert response.status == falcon.HTTP_404 assert 'start_time' in context assert 'mid_time' not in context assert 'end_time' in context assert not context['req_succeeded']
def test_response_middleware_raises_exception(self): """Test that error in response middleware is propagated up""" class RaiseErrorMiddleware(object): def process_response(self, req, resp, resource): raise Exception('Always fail') app = falcon.API(middleware=[RaiseErrorMiddleware()]) app.add_route(TEST_ROUTE, MiddlewareClassResource()) client = testing.TestClient(app) with pytest.raises(Exception): client.simulate_request(path=TEST_ROUTE)
def test_generate_trans_id_with_request(self): """Test that TransactionIdmiddleware is executed""" global context app = falcon.API(middleware=TransactionIdMiddleware()) app.add_route(TEST_ROUTE, MiddlewareClassResource()) client = testing.TestClient(app) response = client.simulate_request(path=TEST_ROUTE) assert _EXPECTED_BODY == response.json assert response.status == falcon.HTTP_200 assert 'transaction_id' in context assert 'unique-req-id' == context['transaction_id']
def test_multiple_reponse_mw_throw_exception(self): """Test that error in inner middleware leaves""" global context context['req_succeeded'] = [] class RaiseStatusMiddleware(object): def process_response(self, req, resp, resource): raise falcon.HTTPStatus(falcon.HTTP_201) class RaiseErrorMiddleware(object): def process_response(self, req, resp, resource): raise falcon.HTTPError(falcon.HTTP_748) class ProcessResponseMiddleware(object): def process_response(self, req, resp, resource, req_succeeded): context['executed_methods'].append('process_response') context['req_succeeded'].append(req_succeeded) app = falcon.API(middleware=[ProcessResponseMiddleware(), RaiseErrorMiddleware(), ProcessResponseMiddleware(), RaiseStatusMiddleware(), ProcessResponseMiddleware()]) app.add_route(TEST_ROUTE, MiddlewareClassResource()) client = testing.TestClient(app) response = client.simulate_request(path=TEST_ROUTE) assert response.status == falcon.HTTP_748 expected_methods = ['process_response'] * 3 assert context['executed_methods'] == expected_methods assert context['req_succeeded'] == [True, False, False]
def test_inner_mw_with_ex_handler_throw_exception(self): """Test that error in inner middleware leaves""" global context class RaiseErrorMiddleware(object): def process_request(self, req, resp, resource): raise Exception('Always fail') app = falcon.API(middleware=[TransactionIdMiddleware(), RequestTimeMiddleware(), RaiseErrorMiddleware()]) def handler(ex, req, resp, params): context['error_handler'] = True app.add_error_handler(Exception, handler) app.add_route(TEST_ROUTE, MiddlewareClassResource()) client = testing.TestClient(app) client.simulate_request(path=TEST_ROUTE) # RequestTimeMiddleware process_response should be executed assert 'transaction_id' in context assert 'start_time' in context assert 'mid_time' not in context assert 'end_time' in context assert 'error_handler' in context
def test_order_mw_executed_when_exception_in_resp(self): """Test that error in inner middleware leaves""" global context class RaiseErrorMiddleware(object): def process_response(self, req, resp, resource): raise Exception('Always fail') app = falcon.API(middleware=[ExecutedFirstMiddleware(), RaiseErrorMiddleware(), ExecutedLastMiddleware()]) def handler(ex, req, resp, params): pass app.add_error_handler(Exception, handler) app.add_route(TEST_ROUTE, MiddlewareClassResource()) client = testing.TestClient(app) client.simulate_request(path=TEST_ROUTE) # Any mw is executed now... expectedExecutedMethods = [ 'ExecutedFirstMiddleware.process_request', 'ExecutedLastMiddleware.process_request', 'ExecutedFirstMiddleware.process_resource', 'ExecutedLastMiddleware.process_resource', 'ExecutedLastMiddleware.process_response', 'ExecutedFirstMiddleware.process_response' ] assert expectedExecutedMethods == context['executed_methods']
def test_order_independent_mw_executed_when_exception_in_resp(self): """Test that error in inner middleware leaves""" global context class RaiseErrorMiddleware(object): def process_response(self, req, resp, resource): raise Exception('Always fail') app = falcon.API(independent_middleware=True, middleware=[ExecutedFirstMiddleware(), RaiseErrorMiddleware(), ExecutedLastMiddleware()]) def handler(ex, req, resp, params): pass app.add_error_handler(Exception, handler) app.add_route(TEST_ROUTE, MiddlewareClassResource()) client = testing.TestClient(app) client.simulate_request(path=TEST_ROUTE) # Any mw is executed now... expectedExecutedMethods = [ 'ExecutedFirstMiddleware.process_request', 'ExecutedLastMiddleware.process_request', 'ExecutedFirstMiddleware.process_resource', 'ExecutedLastMiddleware.process_resource', 'ExecutedLastMiddleware.process_response', 'ExecutedFirstMiddleware.process_response' ] assert expectedExecutedMethods == context['executed_methods']
def test_order_mw_executed_when_exception_in_req(self): """Test that error in inner middleware leaves""" global context class RaiseErrorMiddleware(object): def process_request(self, req, resp): raise Exception('Always fail') app = falcon.API(middleware=[ExecutedFirstMiddleware(), RaiseErrorMiddleware(), ExecutedLastMiddleware()]) def handler(ex, req, resp, params): pass app.add_error_handler(Exception, handler) app.add_route(TEST_ROUTE, MiddlewareClassResource()) client = testing.TestClient(app) client.simulate_request(path=TEST_ROUTE) # Any mw is executed now... expectedExecutedMethods = [ 'ExecutedFirstMiddleware.process_request', 'ExecutedFirstMiddleware.process_response' ] assert expectedExecutedMethods == context['executed_methods']
def test_order_independent_mw_executed_when_exception_in_req(self): """Test that error in inner middleware leaves""" global context class RaiseErrorMiddleware(object): def process_request(self, req, resp): raise Exception('Always fail') app = falcon.API(independent_middleware=True, middleware=[ExecutedFirstMiddleware(), RaiseErrorMiddleware(), ExecutedLastMiddleware()]) def handler(ex, req, resp, params): pass app.add_error_handler(Exception, handler) app.add_route(TEST_ROUTE, MiddlewareClassResource()) client = testing.TestClient(app) client.simulate_request(path=TEST_ROUTE) # All response middleware still executed... expectedExecutedMethods = [ 'ExecutedFirstMiddleware.process_request', 'ExecutedLastMiddleware.process_response', 'ExecutedFirstMiddleware.process_response' ] assert expectedExecutedMethods == context['executed_methods']
def test_order_mw_executed_when_exception_in_rsrc(self): """Test that error in inner middleware leaves""" global context class RaiseErrorMiddleware(object): def process_resource(self, req, resp, resource): raise Exception('Always fail') app = falcon.API(middleware=[ExecutedFirstMiddleware(), RaiseErrorMiddleware(), ExecutedLastMiddleware()]) def handler(ex, req, resp, params): pass app.add_error_handler(Exception, handler) app.add_route(TEST_ROUTE, MiddlewareClassResource()) client = testing.TestClient(app) client.simulate_request(path=TEST_ROUTE) # Any mw is executed now... expectedExecutedMethods = [ 'ExecutedFirstMiddleware.process_request', 'ExecutedLastMiddleware.process_request', 'ExecutedFirstMiddleware.process_resource', 'ExecutedLastMiddleware.process_response', 'ExecutedFirstMiddleware.process_response' ] assert expectedExecutedMethods == context['executed_methods']
def test_base_path_is_removed_before_routing(self): """Test that RemoveBasePathMiddleware is executed before routing""" app = falcon.API(middleware=RemoveBasePathMiddleware()) # We dont include /base_path as it will be removed in middleware app.add_route('/sub_path', MiddlewareClassResource()) client = testing.TestClient(app) response = client.simulate_request(path='/base_path/sub_path') assert _EXPECTED_BODY == response.json assert response.status == falcon.HTTP_200 response = client.simulate_request(path='/base_pathIncorrect/sub_path') assert response.status == falcon.HTTP_404
def test_error_composed_before_resp_middleware_called(self): mw = CaptureResponseMiddleware() app = falcon.API(middleware=mw) app.add_route('/', MiddlewareClassResource()) client = testing.TestClient(app) response = client.simulate_request(path='/', method='POST') assert response.status == falcon.HTTP_403 assert mw.resp.status == response.status composed_body = json.loads(mw.resp.body) assert composed_body['title'] == response.status assert not mw.req_succeeded # NOTE(kgriffs): Sanity-check the other params passed to # process_response() assert isinstance(mw.req, falcon.Request) assert isinstance(mw.resource, MiddlewareClassResource)
def test_srmock(self): mock = testing.StartResponseMock() mock(falcon.HTTP_200, ()) assert mock.status == falcon.HTTP_200 assert mock.exc_info is None mock = testing.StartResponseMock() exc_info = sys.exc_info() mock(falcon.HTTP_200, (), exc_info) assert mock.exc_info == exc_info
def client(): app = falcon.API() app.add_route('/', CookieResource()) app.add_route('/test-convert', CookieResourceMaxAgeFloatString()) return testing.TestClient(app) # ===================================================================== # Response # =====================================================================
def test_request_cookie_parsing(): # testing with a github-ish set of cookies headers = [ ( 'Cookie', """ logged_in=no;_gh_sess=eyJzZXXzaW9uX2lkIjoiN2; tz=Europe/Berlin; _ga=GA1.2.332347814.1422308165; _gat=1; _octo=GH1.1.201722077.1422308165 """ ), ] environ = testing.create_environ(headers=headers) req = falcon.Request(environ) assert req.cookies['logged_in'] == 'no' assert req.cookies['tz'] == 'Europe/Berlin' assert req.cookies['_octo'] == 'GH1.1.201722077.1422308165' assert 'logged_in' in req.cookies assert '_gh_sess' in req.cookies assert 'tz' in req.cookies assert '_ga' in req.cookies assert '_gat' in req.cookies assert '_octo' in req.cookies
def client(): app = falcon.API() resource = RedirectingResource() app.add_route('/', resource) return testing.TestClient(app)
def resource(): return testing.TestResource()
def test_bounded_stream_property_empty_body(self): """Test that we can get a bounded stream outside of wsgiref.""" environ = testing.create_environ() req = falcon.Request(environ) bounded_stream = req.bounded_stream # NOTE(kgriffs): Verify that we aren't creating a new object # each time the property is called. Also ensures branch # coverage of the property implementation. assert bounded_stream is req.bounded_stream data = bounded_stream.read() assert len(data) == 0
def test_request_repr(self): environ = testing.create_environ() req = falcon.Request(environ) _repr = '<%s: %s %r>' % (req.__class__.__name__, req.method, req.url) assert req.__repr__() == _repr
def client(): app = falcon.API() resource = FaultyResource() app.add_route('/fail', resource) return testing.TestClient(app)
def client(resource): app = falcon.API() app.add_route('/', resource) return testing.TestClient(app)
def test_post_read_bounded_stream(self): body = testing.rand_string(_SIZE_1_KB / 2, _SIZE_1_KB) resp = requests.post(_SERVER_BASE_URL + 'bucket', data=body) assert resp.status_code == 200 assert resp.text == body
def client(): return testing.TestClient(falcon.API()) # NOTE(kgriffs): Concept from Gunicorn's source (wsgi.py)
def test_env_headers_list_of_tuples(self): env = testing.create_environ(headers=[('User-Agent', 'Falcon-Test')]) assert env['HTTP_USER_AGENT'] == 'Falcon-Test'
def test_root_route(self, client): doc = {u'message': u'Hello world!'} resource = testing.SimpleTestResource(json=doc) client.app.add_route('/', resource) result = client.simulate_get() assert result.json == doc
def client(): app = falcon.API() app.add_route('/stonewall', Stonewall()) resource_things = ThingsResource() app.add_route('/things', resource_things) app.add_route('/things/{id}/stuff/{sid}', resource_things) resource_misc = MiscResource() app.add_route('/misc', resource_misc) resource_get_with_faulty_put = GetWithFaultyPutResource() app.add_route('/get_with_param/{param}', resource_get_with_faulty_put) return testing.TestClient(app)