我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.stream_with_context()。
def _finalize(self, response): if self.stream and not is_json_response(response): text = flask.stream_with_context( stream_response(response)) else: text = response.text final_response = flask.Response( text, response.status_code, headers=self._prepare_headers(response.headers, fix_case=True) ) LOG.info(format_for_log(title='Response from proxy', status_code=final_response.status_code, headers=dict(final_response.headers))) return final_response
def content_item(id): if request.method == 'GET': content = model.getContent(id) return jsonld_response(json.dumps(content)) if request.method == 'POST': abort(400) if request.method == 'PUT': force = request.headers['Content-Type'].startswith('application/ld+json') data = request.get_json(force=force) if data is None: abort(400) status_code,data,contentType = model.updateContent(id,data); if status_code==200: return Response(stream_with_context(data),content_type = contentType) else: abort(status_code) if request.method == 'DELETE': status = model.deleteContent(id) return Response(status=status)
def content_item_resource_upload(id,property): #print(request.headers['Content-Type']) #print(request.files) file = request.files['file'] #print(file.filename) #print(file.content_type) #print(file.content_length) uploadContentType = file.content_type if file.content_type.startswith("text/") and file.content_type.find("charset=")<0: uploadContentType = file.content_type+"; charset=UTF-8" uploadDir = app.config['UPLOAD_STAGING'] if 'UPLOAD_STAGING' in app.config else 'tmp' os.makedirs(uploadDir,exist_ok=True) staged = os.path.join(uploadDir, file.filename) file.save(staged) status = 500 responseJSON = None contentType = None with open(staged,"rb") as data: status,responseJSON,contentType = model.uploadContentResource(id,property,file.filename,uploadContentType,os.path.getsize(staged),data) os.unlink(staged) if status==200 or status==201: return Response(stream_with_context(responseJSON),status=status,content_type = contentType) else: return Response(status=status)
def stream_gen(f, flag=False): def decorator(*args, **kwargs): def SEE(ctx): for i in ctx: if flag: data = to_json( { 'status': exceptions.OK, 'msg': None, 'data': i, } ) else: data = str(i) ev = ServerSentEvent(data) yield ev.encode() rf = f(*args, **kwargs) if isinstance(rf, types.GeneratorType): return Response(stream_with_context(SEE(rf)), mimetype="text/event-stream") else: raise exceptions.InternalError return decorator
def test_streaming_with_context(self): app = flask.Flask(__name__) app.testing = True @app.route('/') def index(): def generate(): yield 'Hello ' yield flask.request.args['name'] yield '!' return flask.Response(flask.stream_with_context(generate())) c = app.test_client() rv = c.get('/?name=World') self.assertEqual(rv.data, b'Hello World!')
def test_streaming_with_context_as_decorator(self): app = flask.Flask(__name__) app.testing = True @app.route('/') def index(): @flask.stream_with_context def generate(): yield 'Hello ' yield flask.request.args['name'] yield '!' return flask.Response(generate()) c = app.test_client() rv = c.get('/?name=World') self.assertEqual(rv.data, b'Hello World!')
def test_streaming_with_context_and_custom_close(self): app = flask.Flask(__name__) app.testing = True called = [] class Wrapper(object): def __init__(self, gen): self._gen = gen def __iter__(self): return self def close(self): called.append(42) def __next__(self): return next(self._gen) next = __next__ @app.route('/') def index(): def generate(): yield 'Hello ' yield flask.request.args['name'] yield '!' return flask.Response(flask.stream_with_context( Wrapper(generate()))) c = app.test_client() rv = c.get('/?name=World') self.assertEqual(rv.data, b'Hello World!') self.assertEqual(called, [42])
def home(name,path): proxies = app.config.get('PROXIES') if proxies is None: abort(404) base = proxies.get(name) if base is None: abort(404) req = requests.get(base + path, stream = True) return Response(stream_with_context(req.iter_content()), content_type = req.headers['content-type'])
def entryMedia(date,time,path): data = getPond(current_app.config) uri = current_app.config['CACHE']['base'] + date + '/' + path resource = data.getResource(uri) if resource[0] == Pond.ResourceType.uri: return redirect(resource[1], code=resource[2]) elif resource[0] == Pond.ResourceType.stream: return Response(stream_with_context(resource[1]), content_type = resource[2]) else: return send_file(resource[1])
def streaming_response(iterations): import time def generate(): yield "<html><body><h1>streaming response</h1>" for i in range(iterations): yield "<span>%s</span>\n" % i time.sleep(0.01) yield "</body></html>" return Response(stream_with_context(generate()), mimetype="text/html")