我们从Python开源项目中,提取了以下4个代码示例,用于说明如何使用flask.request.stream()。
def changenames(): result = BytesIO() zipf = zipfile.ZipFile(result, "w") i=0 for desc, font in _unpack(request.stream): i += 1 print('#',i,'got oldname', font['name'].getDebugName(6)) changefont(desc, font) filename = desc['filename'] print('changed', filename) # write the font file to the zip fontIO = BytesIO() font.save(fontIO) fontData = fontIO.getvalue() zipf.writestr(filename, fontData) zipf.close() data = result.getvalue() response = make_response(data) response.headers['Content-Type'] = 'application/octet-stream' response.headers['Content-Disposition'] = 'attachment; filename=fonts-with-changed-names.zip' return response
def post_file() -> JSONResponse[str]: """Temporarily store some data on the server. .. :quickref: File; Safe a file temporarily on the server. .. note:: The posted data will be removed after 60 seconds. :returns: A response with the JSON serialized name of the file as content and return code 201. :raises APIException: If the request is bigger than the maximum upload size. (REQUEST_TOO_LARGE) :raises PermissionException: If there is no logged in user. (NOT_LOGGED_IN) """ if ( request.content_length and request.content_length > app.config['MAX_UPLOAD_SIZE']): raise APIException( 'Uploaded file is too big.', 'Request is bigger than maximum upload size of {}.'.format( app.config['MAX_UPLOAD_SIZE'] ), APICodes.REQUEST_TOO_LARGE, 400 ) path, name = psef.files.random_file_path('MIRROR_UPLOAD_DIR') FileStorage(request.stream).save(path) return jsonify(name, status_code=201)
def _unpack(stream): # L = unsignedlong 4 bytes while True: head = stream.read(8) if not head: break jsonlen, fontlen = struct.unpack('II', head) desc = json.loads(stream.read(jsonlen).decode('utf-8')) font = TTFont(BytesIO(stream.read(fontlen))) yield (desc, font)
def content_item_resource(id,resource): if request.method == 'GET': wrap = request.args.get('wrap') status_code,data,contentType = model.getContentResource(id,resource); if status_code==200: if contentType.startswith("text/html") and wrap is not None: blob = io.BytesIO() for chunk in data: blob.write(chunk) content = blob.getvalue().decode("utf-8").strip() if not content.startswith('<!DOCTYPE'): editorConfig = app.config.get('EDITOR_CONFIG') header = '' bodyStart = '' bodyEnd = '' if editorConfig is not None and wrap=='preview': wheader = editorConfig.get('wrap-header') pheader = editorConfig.get('preview-wrap-header') if pheader is not None: header = pheader elif wheader is not None: header = wheader wbody = editorConfig.get('wrap-body') pbody = editorConfig.get('preview-body-main') if pbody is not None: bodyStart = pbody[0] bodyEnd = pbody[1] elif wbody is not None: bodyStart = wbody[0] bodyEnd = wbody[1] elif editorConfig is not None and wrap=='formatted': wheader = editorConfig.get('wrap-header') if wheader is not None: header = wheader wbody = editorConfig.get('wrap-body') if wbody is not None: bodyStart = wbody[0] bodyEnd = wbody[1] content = """ <!DOCTYPE html> <html> <head><title>""" + resource + '</title>' + header + """ </head> <body> """ + bodyStart + content + bodyEnd + '</body></html>' return Response(stream_with_context(content),content_type = contentType) else: return Response(stream_with_context(data),content_type = contentType) else: abort(status_code) if request.method == 'PUT': status_code,data,contentType = model.updateContentResource(id,resource,request.headers['Content-Type'],request.stream); if status_code==200 or status_code==201: return Response(stream_with_context(data),status=status_code,content_type = contentType) else: return Response(status=status) if request.method == 'DELETE': status = model.deleteContentResource(id,resource) return Response(status=status)