我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用werkzeug.wrappers.Response()。
def __call__(self, environ, start_response): self.last_environ = environ if self.enable_binary: mimetype = 'image/jpeg' else: mimetype = 'text/plain' response = Response(u'Hello World ?!', mimetype=mimetype) cookies = [ ('CUSTOMER', 'WILE_E_COYOTE'), ('PART_NUMBER', 'ROCKET_LAUNCHER_0002'), ('LOT_NUMBER', '42') ] for cookie in cookies[:self.cookie_count]: response.set_cookie(cookie[0], cookie[1]) print("application debug #1", file=environ['wsgi.errors']) return response(environ, start_response)
def private(f): """Only allow approved source addresses.""" @functools.wraps(f) def wrapper(self, request): if not request.access_route: # this means something probably bugged in werkzeug, but let's fail # gracefully return Response('no client ip provided', status='403') ip_str = request.access_route[-1] if isinstance(ip_str, bytes): ip_str = ip_str.decode('utf8') ip = ip_address(ip_str) if ip.is_loopback or any(ip in network for network in get_networks()): return f(self, request) else: msg = PRIVATE_BODY_RESPONSE_TEMPLATE.format( ip_str, force_unicode(request.remote_addr), request.headers.get('x-forwarded-for')) return Response(msg, status='403') return wrapper
def __call__(self, environ, start_response): request = Request(environ) if request.path.startswith(self.prefix): method = request.path[len(self.prefix):] if method == '': # no trailing / start_response('302', [('location', self.prefix + '/')]) return '' try: funcname = self.urlmap[method] func = getattr(self, funcname) except (KeyError, AttributeError): response = Response(status=404) else: response = func(request) return response(environ, start_response) else: return self.app(environ, start_response)
def abort(status, *args, **kwargs): ''' Raises an :py:exc:`HTTPException` for the given status code or WSGI application:: abort(404) # 404 Not Found abort(Response('Hello World')) Can be passed a WSGI application or a status code. If a status code is given it's looked up in the list of exceptions and will raise that exception, if passed a WSGI application it will wrap it in a proxy WSGI exception and raise that:: abort(404) abort(Response('Hello World')) ''' return _aborter(status, *args, **kwargs)
def redirect(location, status=302): """ which is seldom used in api server """ from werkzeug.wrappers import Response from werkzeug.urls import iri_to_uri location = iri_to_uri(location, safe_conversion=True) return Response( "<!DOCTYPE html>\ <html>\ <h1>Redirecting...</h1>\ <a href='{0}'>touch this to make manually redirection</a>\ </html>" .format(location), status=status, headers={'Location': location}) # sample # app = create_app() # @app.route('/', 'home') # def home(request): # app.UploadHandlerClass(request).save() # OR give some specific filenames # app.UploadHandlerClass(request, ['image']).save()
def download_xlsx(filename, info): '''Çreate xlsx file for given info and download it ''' output = StringIO() workbook = xlsxwriter.Workbook(output, {'in_memory': True}) worksheet = workbook.add_worksheet() row_num = 0 col_num = 0 for row in info: for grid in row: worksheet.write(row_num, col_num, grid) col_num += 1 col_num = 0 row_num += 1 workbook.close() output.seek(0) headers = Headers() headers.set('Content-Disposition', 'attachment', filename=filename) return Response(output.read(), mimetype='application/vnd.openxmlformats-' 'officedocument.spreadsheetml.sheet', headers=headers)
def flask(body, headers): import flask path = '/hello/<account_id>/test' flask_app = flask.Flask('hello') @flask_app.route(path) def hello(account_id): request = flask.request user_agent = request.headers['User-Agent'] # NOQA limit = request.args.get('limit', '10') # NOQA return flask.Response(body, headers=headers, mimetype='text/plain') return flask_app
def werkzeug(body, headers): import werkzeug.wrappers as werkzeug from werkzeug.routing import Map, Rule path = '/hello/<account_id>/test' url_map = Map([Rule(path, endpoint='hello')]) @werkzeug.Request.application def hello(request): user_agent = request.headers['User-Agent'] # NOQA limit = request.args.get('limit', '10') # NOQA adapter = url_map.bind_to_environ(request.environ) # NOQA endpoint, values = adapter.match() # NOQA aid = values['account_id'] # NOQA return werkzeug.Response(body, headers=headers, mimetype='text/plain') return hello
def __call__(self, environ, start_response): code = self.code content = self.content request = Request(environ) self.requests.append(request) data = request.data if request.content_encoding == 'deflate': data = zlib.decompress(data) data = data.decode(request.charset) if request.content_type == 'application/json': data = json.loads(data) self.payloads.append(data) validator = VALIDATORS.get(request.path, None) if validator and not self.skip_validate: try: validator.validate(data) code = 202 except jsonschema.ValidationError as e: code = 400 content = json.dumps({'status': 'error', 'message': str(e)}) response = Response(status=code) response.headers.clear() response.headers.extend(self.headers) response.data = content return response(environ, start_response)
def test_type_forcing(self): def wsgi_application(environ, start_response): start_response('200 OK', [('Content-Type', 'text/html')]) return ['Hello World!'] base_response = wrappers.BaseResponse('Hello World!', content_type='text/html') class SpecialResponse(wrappers.Response): def foo(self): return 42 # good enough for this simple application, but don't ever use that in # real world examples! fake_env = {} for orig_resp in wsgi_application, base_response: response = SpecialResponse.force_type(orig_resp, fake_env) assert response.__class__ is SpecialResponse self.assert_strict_equal(response.foo(), 42) self.assert_strict_equal(response.get_data(), b'Hello World!') self.assert_equal(response.content_type, 'text/html') # without env, no arbitrary conversion self.assert_raises(TypeError, SpecialResponse.force_type, wsgi_application)
def test_etag_response_mixin_freezing(self): class WithFreeze(wrappers.ETagResponseMixin, wrappers.BaseResponse): pass class WithoutFreeze(wrappers.BaseResponse, wrappers.ETagResponseMixin): pass response = WithFreeze('Hello World') response.freeze() self.assert_strict_equal(response.get_etag(), (text_type(wrappers.generate_etag(b'Hello World')), False)) response = WithoutFreeze('Hello World') response.freeze() self.assert_equal(response.get_etag(), (None, None)) response = wrappers.Response('Hello World') response.freeze() self.assert_equal(response.get_etag(), (None, None))
def test_ranges(self): # basic range stuff req = wrappers.Request.from_values() assert req.range is None req = wrappers.Request.from_values(headers={'Range': 'bytes=0-499'}) self.assert_equal(req.range.ranges, [(0, 500)]) resp = wrappers.Response() resp.content_range = req.range.make_content_range(1000) self.assert_equal(resp.content_range.units, 'bytes') self.assert_equal(resp.content_range.start, 0) self.assert_equal(resp.content_range.stop, 500) self.assert_equal(resp.content_range.length, 1000) self.assert_equal(resp.headers['Content-Range'], 'bytes 0-499/1000') resp.content_range.unset() assert 'Content-Range' not in resp.headers resp.headers['Content-Range'] = 'bytes 0-499/1000' self.assert_equal(resp.content_range.units, 'bytes') self.assert_equal(resp.content_range.start, 0) self.assert_equal(resp.content_range.stop, 500) self.assert_equal(resp.content_range.length, 1000)
def test_dynamic_charset_response_mixin(self): class MyResponse(wrappers.DynamicCharsetResponseMixin, Response): default_charset = 'utf-7' resp = MyResponse(mimetype='text/html') assert resp.charset == 'utf-7' resp.charset = 'utf-8' assert resp.charset == 'utf-8' assert resp.mimetype == 'text/html' assert resp.mimetype_params == {'charset': 'utf-8'} resp.mimetype_params['charset'] = 'iso-8859-15' assert resp.charset == 'iso-8859-15' resp.set_data(u'Hällo Wörld') assert b''.join(resp.iter_encoded()) == \ u'Hällo Wörld'.encode('iso-8859-15') del resp.headers['content-type'] try: resp.charset = 'utf-8' except TypeError as e: pass else: assert False, 'expected type error on charset setting without ct'
def capture(f): def decorator(*args, **options): # ?????? try: # ?????? rep = f(*args, **options) except SYLFkException as e: # ???? SYLFkException ??????????????????????????? ERROR_MAP ???????????????? if e.code in ERROR_MAP and ERROR_MAP[e.code]: # ????????? rep = ERROR_MAP[e.code] # ???????? 100??????????? 500 ????? status = int(e.code) if int(e.code) >= 100 else 500 # ???????????????????????????????????????????? return rep if isinstance(rep, Response) or rep is None else Response(rep(), content_type=content_type, status=status) else: # ????????????? raise e # ??????????? return rep # ????? return decorator
def dispatch_static(self, static_path): # ???????? key = parse_static_key(static_path) # ???????????????????????????????? if os.path.exists(static_path): doc_type = TYPE_MAP.get(key, 'text/plain') with open(static_path, 'rb') as f: rep = f.read() # ???????? return Response(rep, content_type=doc_type) else: # ????????? raise exceptions.PageNotFoundError # ????
def render_json(data): # ???????????? content_type = "text/plain" # ??? Dict ?? List ????????? JSON ???? if isinstance(data, dict) or isinstance(data, list): # ? data ??? JSON ???? data = json.dumps(data) # ??????? JSON ?? content_type = "application/json" # ????????? return Response(data, content_type="%s; charset=UTF-8" % content_type, status=200) # ?????????????????
def test_handle(): s = HostHttpServer(host) c = Client(s, Response) s.start() r = c.get('/') assert r.status == '403 FORBIDDEN' r = c.get('/?ticket=%s' % s.ticket_create()) assert r.status == '200 OK' r = c.get('/foo/bar/?ticket=%s' % s.ticket_create()) assert r.status == '500 INTERNAL SERVER ERROR' s.stop()
def __init__(self, url, service_impl_cls): self.url = url_endswith_slash(url) self.scheme, self.host, self.path, _, _ = urllib.parse.urlsplit( self.url ) def null_app(environ, start_response): start_response('404 Not Found', [('Content-Type', 'text/plain')]) return ['Not Found'] wsgi_app = WsgiApp(service_impl_cls()) if self.path != '/': self.wsgi_app = DispatcherMiddleware( null_app, {self.path[:-1]: wsgi_app} ) else: self.wsgi_app = wsgi_app self.wsgi_test_client = Client(self.wsgi_app, Response)
def raise_exc(cls, code, message): """ Raise an HTTPException with a code and a message sent in a json like { "code": code "message": message } :param cls: HTTPException of the error, for example NotFound, BadRequest, NotAuthorized :param code: A brief message for the exception, like MISSING_PARAMETER :param message: A longer description of the error :return: Nothing, raise the provided exception with the correct response """ response = Response() response.mimetype = "application/json" response.status_code = cls.code response.data = json.dumps({ "code": code, "message": message }) Logger.warning(cls.__name__.upper(), code + ": " + message) raise cls(response=response)
def handle(self, endpoint, route_args, request): """ Handle a request in the derived handler. The request is routed to the correct method using *endpoint* :param endpoint: A string with the name of the class method to call with (route_args, request) as parameters, this method should return a Response or call self.raise_exc. *NOTE*: the method MUST be implemented in the derived class :param route_args: The route parameters, the parameters extracted from the matching route in the URL :param request: The Request object, request.args contains the query parameters of the request :return: Return a Response if the request is successful, an HTTPException if an error occurred """ try: data = BaseHandler._call(self.__getattribute__(endpoint), route_args, request) response = Response() if data is not None: response.code = 200 response.mimetype = "application/json" response.data = json.dumps(data) else: response.code = 204 return response except HTTPException as e: return e
def respond(response_data, *args, **kwargs): """ Create a C{Response}. @param response_data: metadata about the response such as the view to be rendered. @param args: other positional arguments. @param kwargs: other named arguments. @return: a C{Response}; never C{None}. """ if isinstance(response_data, Response): response = response_data else: (view_name, model, status_code) = deconstruct(response_data) responsifier = current_app.responsifier response = responsifier.responsify(model, status_code=status_code, view_name=view_name, *args, **kwargs) response.status_code = status_code return response
def deconstruct_list(response_data, model=None, status_code=OK): view_name = None if not response_data: raise KrumpException( 'Response data is empty; the view name at least is required.') if len(response_data) >= 1: view_name = required_view_name(response_data[0]) if len(response_data) >= 2: model = response_data[1] if len(response_data) >= 3: status_code = response_data[2] if model is None: model = {} if status_code is None: status_code = OK return view_name, model, status_code
def redirect(location, code=302, Response=None): """Returns a response object (a WSGI application) that, if called, redirects the client to the target location. Supported codes are 301, 302, 303, 305, and 307. 300 is not supported because it's not a real redirect and 304 because it's the answer for a request with a request with defined If-Modified-Since headers. .. versionadded:: 0.6 The location can now be a unicode string that is encoded using the :func:`iri_to_uri` function. .. versionadded:: 0.10 The class used for the Response object can now be passed in. :param location: the location the response should redirect to. :param code: the redirect status code. defaults to 302. :param class Response: a Response class to use when instantiating a response. The default is :class:`werkzeug.wrappers.Response` if unspecified. """ if Response is None: from werkzeug.wrappers import Response display_location = escape(location) if isinstance(location, text_type): # Safe conversion is necessary here as we might redirect # to a broken URI scheme (for instance itms-services). from werkzeug.urls import iri_to_uri location = iri_to_uri(location, safe_conversion=True) response = Response( '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">\n' '<title>Redirecting...</title>\n' '<h1>Redirecting...</h1>\n' '<p>You should be redirected automatically to target URL: ' '<a href="%s">%s</a>. If not click the link.' % (escape(location), display_location), code, mimetype='text/html') response.headers['Location'] = location return response
def get_response(self, environ=None): """Get a response object. If one was passed to the exception it's returned directly. :param environ: the optional environ for the request. This can be used to modify the response depending on how the request looked like. :return: a :class:`Response` object or a subclass thereof. """ if self.response is not None: return self.response if environ is not None: environ = _get_environ(environ) headers = self.get_headers(environ) return Response(self.get_body(environ), self.code, headers)
def protected(self, request): return Response(status=200)
def ok_response(): return Response(str(talisker.revision.get()) + '\n')
def index(self, request): methods = [] item = '<li><a href="{0}"/>{1}</a> - {2}</li>' for url, funcname in self.urlmap.items(): if funcname and url not in self.no_index: func = getattr(self, funcname) methods.append( item.format(self.prefix + url, url, func.__doc__)) return Response( '<ul>' + '\n'.join(methods) + '<ul>', mimetype='text/html')
def test_statsd(self, request): """Increment statsd metric for testing""" statsd = request.environ['statsd'] statsd.incr('test') return Response('Incremented {}.test'.format(statsd._prefix))
def test_prometheus(self, request): """Increment prometheus metric for testing""" if not pkg_is_installed('prometheus-client'): return Response('Not Supported', status=501) if not hasattr(self, 'test_counter'): import prometheus_client self.test_counter = prometheus_client.Counter('test', 'test') self.test_counter.inc() return Response('Incremented test counter')
def info(self, request): return Response('Not Implemented', status=501)
def metrics(self, request): """Endpoint exposing Prometheus metrics""" if not pkg_is_installed('prometheus-client'): return Response('Not Supported', status=501) # Importing this too early would break multiprocess metrics from prometheus_client import ( CONTENT_TYPE_LATEST, CollectorRegistry, REGISTRY, generate_latest, multiprocess, ) if 'prometheus_multiproc_dir' in os.environ: # prometheus_client is running in multiprocess mode. # Use a custom registry, as the global one includes custom # collectors which are not supported in this mode registry = CollectorRegistry() multiprocess.MultiProcessCollector(registry) else: if request.environ.get('wsgi.multiprocess', False): return Response( 'Not Supported: running in multiprocess mode but ' '`prometheus_multiproc_dir` envvar not set', status=501) # prometheus_client is running in single process mode. # Use the global registry (includes CPU and RAM collectors) registry = REGISTRY data = generate_latest(registry) return Response(data, status=200, mimetype=CONTENT_TYPE_LATEST)
def debug_logtree(self, request): import logging_tree tree = logging_tree.format.build_description() return Response(tree)
def __call__(self, environ, start_response): local.request = req = Request(environ) if req.authorization is None: resp = Response('Unauthorized. Please supply authorization.', status=401, headers={ ('WWW-Authenticate', 'Basic Realm="{}"'.format(self.realm)), } ) return resp(environ, start_response) return self.wrapped(environ, start_response)