我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.send_file()。
def get_zip(self, project, ty): """Get a ZIP file directly from uploaded directory or generate one on the fly and upload it if not existing.""" filename = self.download_name(project, ty) if not self.zip_existing(project, ty): print "Warning: Generating %s on the fly now!" % filename self._make_zip(project, ty) if isinstance(uploader, local.LocalUploader): filepath = self._download_path(project) res = send_file(filename_or_fp=safe_join(filepath, filename), mimetype='application/octet-stream', as_attachment=True, attachment_filename=filename) # fail safe mode for more encoded filenames. # It seems Flask and Werkzeug do not support RFC 5987 http://greenbytes.de/tech/tc2231/#encoding-2231-char # res.headers['Content-Disposition'] = 'attachment; filename*=%s' % filename return res else: return redirect(url_for('rackspace', filename=filename, container=self._container(project), _external=True))
def collage(): if request.method=='GET': return render_template('collage.html',input=True) elif request.method == 'POST': username = request.form['lastfm_username'] duration = request.form['duration'] print(username) payload = { 'user': username, 'api_key': config.LASTFM_API_KEY, 'method': 'user.gettopalbums', 'period':duration, 'limit':9, 'format':'json' } r = requests.get(config.url, params = payload) filenames = get_album_art(r.text, username) generate_collage(filenames,username) return send_file('static/' + username+'.jpg', mimetype='image/jpg')
def get_image_file(uuid, image_file_path): """GetImageFile (GET /images/:uuid/file) Return the image file. """ encoded_md5 = get_image_file_md5(uuid, image_file_path) if NGINX_ENABLED: res = make_response('') res.headers['Content-Type'] = 'application/octet-stream' res.headers['X-Accel-Redirect'] = '/static/images/%s/file?md5=%s' % (uuid, encoded_md5) else: res = make_response(send_file(image_file_path, add_etags=False, cache_timeout=0)) res.headers['Content-Length'] = get_image_file_size(image_file_path) res.headers['Content-MD5'] = encoded_md5 return res
def get_job_output(job_id, filename, as_attach=False, mimetype=None, tail=None, head=None, job=None): try: output_file = job.relative_path(filename) if tail or head: if tail and head: return "Cannot specify tail AND head", 500 cmd = "head" if head else "tail" count = tail or head p = subprocess.Popen([cmd, "-n", count, output_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE) tail_data, tail_error = p.communicate() resp = make_response(tail_data) if as_attach: resp.headers["Content-Disposition"] = "attachment; filename={}".format(filename) if mimetype: resp.headers["Content-Type"] = mimetype return resp else: return send_file(output_file, as_attachment=as_attach, mimetype=mimetype) except Exception as e: print e return "File Not Found", 404
def send_file(filename, attachment_filename, mimetype, **kwargs): response = flask_send_file(filename, mimetype=mimetype) try: attachment_filename = attachment_filename.encode('latin-1') except UnicodeEncodeError: filenames = { 'filename': unicodedata .normalize('NFKD', attachment_filename) .encode('latin-1', 'ignore'), 'filename*': "UTF-8''{}".format( url_quote(attachment_filename)), } else: filenames = {'filename': attachment_filename} response.headers.set( 'Content-Disposition', 'attachment', **filenames) return response
def test_send_file_xsendfile(self): app = flask.Flask(__name__) app.use_x_sendfile = True with app.test_request_context(): rv = flask.send_file('static/index.html') self.assert_true(rv.direct_passthrough) self.assert_in('x-sendfile', rv.headers) self.assert_equal(rv.headers['x-sendfile'], os.path.join(app.root_path, 'static/index.html')) self.assert_equal(rv.mimetype, 'text/html') rv.close()
def serve_static_files(p, index_on_error=True): """Securely serve static files for the given path using send_file.""" # Determine the canonical path of the file full_path = os.path.realpath(os.path.join(app.static_folder, p)) # We have a problem if either: # - the path is not a sub-path of app.static_folder; or # - the path does not refer to a real file. if (os.path.commonprefix([app.static_folder, full_path]) != app.static_folder or not os.path.isfile(full_path)): file_to_return = app.config.get('STATIC_FILE_ON_404', None) if file_to_return is not None: full_path = os.path.realpath(os.path.join(app.static_folder, file_to_return)) else: return abort(404) return send_file(full_path)
def shrug(): import os, os.path from tools.slack_posting import postImage filename = os.path.join(os.getcwd(), tools.settings.getValue('misc', 'shinoa_shrug')) if request.method == 'GET': return send_file(filename, mimetype='image/{}'.format(filename.split('.')[-1])) elif request.method == 'POST': if (not (isValidCommand(request, 'shrug'))): return "You dun goof'd." postImage(filename, request.form['channel_id'], None, "shrug", "*shrug*") return ""
def pages(page_path): if not validate_custom_page_path(page_path): raise ApiException(error=Error.NOT_ALLOWED, message='The visit of path "{}" is not allowed.'.format(page_path)) rel_url, exists = storage.fix_relative_url('page', page_path) if exists: file_path = rel_url return send_file(file_path) elif rel_url is None: # pragma: no cover, it seems impossible to make this happen, see code of 'fix_relative_url' raise ApiException(error=Error.BAD_PATH, message='The path "{}" cannot be recognized.'.format(page_path)) else: page_d = cache.get('api-handler.' + rel_url) if page_d is not None: return page_d # pragma: no cover, here just get the cached dict page = storage.get_page(rel_url, include_draft=False) if page is None: raise ApiException(error=Error.RESOURCE_NOT_EXISTS) page_d = page.to_dict() del page_d['raw_content'] page_d['content'] = get_parser(page.format).parse_whole(page.raw_content) cache.set('api-handler.' + rel_url, page_d, timeout=2 * 60) return page_d
def report_get(task_id, report_format="json"): task = Task.query.get(task_id) if not task: return json_error(404, "Task not found") if task.status == Task.DELETED: return json_error(404, "Task report has been deleted") if task.status != Task.FINISHED: return json_error(420, "Task not finished yet") report_path = os.path.join(settings.reports_directory, "%d" % task_id, "report.%s" % report_format) if not os.path.isfile(report_path): return json_error(404, "Report format not found") return send_file(report_path)
def pcap_get(task_id): task = Task.query.get(task_id) if not task: return json_error(404, "Task not found") if task.status == Task.DELETED: return json_error(404, "Task files has been deleted") if task.status != Task.FINISHED: return json_error(420, "Task not finished yet") pcap_path = os.path.join(settings.reports_directory, "%s" % task_id, "dump.pcap") if not os.path.isfile(pcap_path): return json_error(404, "Pcap file not found") return send_file(pcap_path)
def shakemap_overlay(shakemap_id): session = Session() shakemap = (session.query(ShakeMap) .filter(ShakeMap.shakemap_id == shakemap_id) .order_by(desc(ShakeMap.shakemap_version)) .limit(1)).first() if shakemap is not None: img = os.path.join(app.config['EARTHQUAKES'], shakemap_id, shakemap_id + '-' + str(shakemap.shakemap_version), 'ii_overlay.png') else: img = app.send_static_file('sc_logo.png') Session.remove() return send_file(img, mimetype='image/gif')
def index(): path = request.args.get('path') frame = request.args.get('frame') id = request.args.get('id') basename = os.path.split(path)[1] video_path = '{}/{}'.format(FILE_DIR, basename) if not os.path.isfile(video_path): blob = bucket.get_blob(path) with open(video_path, 'wb') as f: blob.download_to_file(f) sp.check_call(shlex.split("ffmpeg -y -i {} -vf \"select=eq(n\\,{})\" -frames:v 1 {}/%05d.jpg".format(video_path, frame, FILE_DIR))) blob = bucket.blob('public/thumbnails/tvnews/frame_{}.jpg'.format(id)) img_path = '{}/00001.jpg'.format(FILE_DIR) blob.upload_from_filename(img_path) return send_file(img_path)
def media_file(filename): """ Retrieves a media file. """ outfile = media_storage.get_file(filename) return send_file(outfile,mimetype='image/png')
def showHtml(filename): return send_file(os.path.join(app.config['REPORT'], filename)) # ????????
def download_homework(filename): return send_file(os.path.join(app.config['HOMEWORK'], filename))
def scripts(): return send_file('ui/cached_dist/Bock.js')
def styles(): return send_file('ui/cached_dist/Bock.css')
def fonts(route): return send_file('ui/cached_dist/fonts/{}'.format(route))
def route(route): return send_file('ui/cached_dist/index.html')
def index(): return send_file('ui/cached_dist/index.html')
def test_send_file_regular(self): app = flask.Flask(__name__) with app.test_request_context(): rv = flask.send_file('static/index.html') self.assert_true(rv.direct_passthrough) self.assert_equal(rv.mimetype, 'text/html') with app.open_resource('static/index.html') as f: rv.direct_passthrough = False self.assert_equal(rv.data, f.read()) rv.close()
def test_attachment(self): app = flask.Flask(__name__) with catch_warnings() as captured: with app.test_request_context(): f = open(os.path.join(app.root_path, 'static/index.html')) rv = flask.send_file(f, as_attachment=True) value, options = parse_options_header(rv.headers['Content-Disposition']) self.assert_equal(value, 'attachment') rv.close() # mimetypes + etag self.assert_equal(len(captured), 2) with app.test_request_context(): self.assert_equal(options['filename'], 'index.html') rv = flask.send_file('static/index.html', as_attachment=True) value, options = parse_options_header(rv.headers['Content-Disposition']) self.assert_equal(value, 'attachment') self.assert_equal(options['filename'], 'index.html') rv.close() with app.test_request_context(): rv = flask.send_file(StringIO('Test'), as_attachment=True, attachment_filename='index.txt', add_etags=False) self.assert_equal(rv.mimetype, 'text/plain') value, options = parse_options_header(rv.headers['Content-Disposition']) self.assert_equal(value, 'attachment') self.assert_equal(options['filename'], 'index.txt') rv.close()
def test_static_file(self): app = flask.Flask(__name__) # default cache timeout is 12 hours with app.test_request_context(): # Test with static file handler. rv = app.send_static_file('index.html') cc = parse_cache_control_header(rv.headers['Cache-Control']) self.assert_equal(cc.max_age, 12 * 60 * 60) rv.close() # Test again with direct use of send_file utility. rv = flask.send_file('static/index.html') cc = parse_cache_control_header(rv.headers['Cache-Control']) self.assert_equal(cc.max_age, 12 * 60 * 60) rv.close() app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 3600 with app.test_request_context(): # Test with static file handler. rv = app.send_static_file('index.html') cc = parse_cache_control_header(rv.headers['Cache-Control']) self.assert_equal(cc.max_age, 3600) rv.close() # Test again with direct use of send_file utility. rv = flask.send_file('static/index.html') cc = parse_cache_control_header(rv.headers['Cache-Control']) self.assert_equal(cc.max_age, 3600) rv.close() class StaticFileApp(flask.Flask): def get_send_file_max_age(self, filename): return 10 app = StaticFileApp(__name__) with app.test_request_context(): # Test with static file handler. rv = app.send_static_file('index.html') cc = parse_cache_control_header(rv.headers['Cache-Control']) self.assert_equal(cc.max_age, 10) rv.close() # Test again with direct use of send_file utility. rv = flask.send_file('static/index.html') cc = parse_cache_control_header(rv.headers['Cache-Control']) self.assert_equal(cc.max_age, 10) rv.close()
def document(): url = request.args.get('url') if not url or not os.path.exists(url): return abort(400, "File %s not found " % url) return send_file(url)
def api_request_basic(): """Establishes the endpoint that is used for the basic fetch mode POST body should be json formated. Required field(s): - url : actual url that is requested to be fetched by the minion Optional field(s): - user-agent : user agent to be used during the request of the url if this argument is not provided, then the default user agent in the config file will be used. """ try: data = request.get_json() if data is None: raise BadRequest file_path = run_job(data, "basic") return send_file(file_path) except BadRequest as e: return prepare_400("api_request_basic", str(e)) except ValueError as e: return prepare_400("api_request_basic", str(e)) except Exception as e: print type(e) return prepare_500("api_request_basic", str(e))
def api_request_website(): """Establishes the endpoint that is used for the website fetch mode POST body should be json formated. Required field(s): - url : actual url that is requested to be fetched by the minion Optional field(s): - user-agent : user agent to be used during the request of the url if this argument is not provided, then the default user agent in the config file will be used. """ try: data = request.get_json() if data is None: raise BadRequest file_path = run_job(data, "website") return send_file(file_path) except BadRequest as e: return prepare_400("api_request_website", str(e)) except ValueError as e: return prepare_400("api_request_website", str(e)) except Exception as e: print type(e) return prepare_500("api_request_website", str(e))
def api_basic_results(job_id): """Establishes endpoint for retrieval of historical boomerang results :param job_id: id of the specific job that you wish to look up """ if minion.config['BOOMERANG_STORE_RESULTS']: try: file_path = get_job_results(job_id) return send_file(file_path) except BadRequest as e: return prepare_400("api_basic_results", str(e)) except ValueError: return prepare_404('api_basic_results') except Exception as e: return prepare_500("api_basic_results", str(e)) else: return prepare_400("api_basic_results", error="Config set to not store results")
def GET_v1_compile_job_id_hex(job_id): """Download a compiled firmware """ job = get_job_metadata(job_id) if not job: return error("Compile job not found", 404) if job['result']['firmware']: return send_file(job['result']['firmware'], mimetype='application/octet-stream', as_attachment=True, attachment_filename=job['result']['firmware_filename']) return error("Compile job not finished or other error.", 422)
def GET_v1_compile_job_id_src(job_id): """Download a completed compile job. """ job = get_job_metadata(job_id) if not job: return error("Compile job not found", 404) if job['result']['firmware']: source_zip = qmk_storage.get('%(id)s/%(source_archive)s' % job['result']) return send_file(source_zip, mimetype='application/octet-stream', as_attachment=True, attachment_filename=job['result']['source_archive']) return error("Compile job not finished or other error.", 422)
def cat_picture(): image_name = request.args.get('image_name') image_name = image_name.replace('..', '') return send_file(os.path.join(os.getcwd(), image_name))
def cat_picture(): image_name = request.args.get('image_name') if not '..' in image_name: return 404 return send_file(os.path.join(os.getcwd(), image_name))
def cat_picture(): image_name = request.args.get('image_name') if not image_name: return 404 return send_file(os.path.join(os.getcwd(), image_name))
def api_top_hits(): return send_file(common_filepaths['top-hits-1k'])
def api_family_export_sampleszip(family_id, tlp_level): my_family = api.get_elem_by_type("family", family_id) zpath = api.familycontrol.generate_samples_zip_file(my_family, tlp_level) if zpath is None: return "" return send_file("../" + zpath, as_attachment=True, attachment_filename="export.tar.gz")
def download_family_file(family_id, file_id): """ Family attachment download endpoint. """ attachment = api.get_elem_by_type("family_file", file_id) data_file = attachment.filepath if not os.path.exists(data_file): abort(404) return send_file('../' + data_file, as_attachment=True, attachment_filename=os.path.basename(data_file))
def api_get_sample_file(sid): """ Return the sample binary """ sample = api.get_elem_by_type("sample", sid) data_file = sample.storage_file return send_file('../' + data_file, as_attachment=True, attachment_filename=os.path.basename(data_file))
def cache(filename): if not config.CACHE_IMAGES: return g.plex.get_thumb_data(filename) cache_dir = os.path.join(config.DATA_DIR, "cache") cache_file = os.path.join(cache_dir, filename) if not os.path.exists(cache_file + ".jpg"): if helper.cache_file(filename, g.plex): return send_from_directory(cache_dir, filename + ".jpg") else: return send_file('static/images/poster.png') else: return send_from_directory(cache_dir, filename + ".jpg")
def image_jpg(): myCamera.getImage().save('/home/pi/boot/image.jpg') return send_file('/home/pi/boot/image.jpg', attachment_filename='image.jpg')
def home(): return send_file(os.path.realpath(os.path.join(app.static_folder, 'index.html')))
def font(): return send_file(os.path.realpath(os.path.join('SFPixelate-Bold.ttf'))) #return send_from_directory('/', 'SFPixelate-Bold.ttf')
def index(): return send_file('templates/index.html')
def get_vxstream_download(sha256, eid, ftype): Sample.query.filter_by(sha256=sha256).first_or_404() headers = { 'Accept': 'text/html', 'User-Agent': 'VxStream Sandbox API Client'} params = {'type': ftype, 'environmentId': eid} vx = vxstream.api.get('result/{}'.format(sha256), params=params, headers=headers) if ftype in ['xml', 'html', 'bin', 'pcap']: ftype += '.gz' return send_file(BytesIO(vx), attachment_filename='{}.{}'.format(sha256, ftype), as_attachment=True)
def get_fireeye_download(sha256, eid, ftype): raise ApiException({}, 501) Sample.query.filter_by(sha256=sha256).first_or_404() headers = { 'Accept': 'text/html', 'User-Agent': 'FireEye Sandbox API Client'} params = {'type': ftype, 'environmentId': eid} vx = fireeye.api.get('result/{}'.format(sha256), params=params, headers=headers) if ftype in ['xml', 'html', 'bin', 'pcap']: ftype += '.gz' return send_file(BytesIO(vx), attachment_filename='{}.{}'.format(sha256, ftype), as_attachment=True)
def download_file(file_id): """Download file **Example request**: .. sourcecode:: http GET /api/1.0/files/1 HTTP/1.1 Host: do.cert.europa.eu Accept: application/json **Example response**: .. sourcecode:: http HTTP/1.0 200 OK Content-Type: application/json Content-Disposition: attachment; filename=CIMBL-244-EU.zip Content-Length: 55277 Content-Type: application/zip :param file_id: file's unique ID :reqheader Accept: Content type(s) accepted by the client :resheader Content-Type: this depends on `Accept` header or request :status 200: File found :status 404: Resource not found """ dfile = DeliverableFile.query.filter_by(id=file_id).first_or_404() cfg = current_app.config return send_file(os.path.join(cfg['APP_UPLOADS'], dfile.name), attachment_filename=dfile.name, as_attachment=True)
def get_cp_vxstream_download(sha256, eid, ftype): Sample.query.filter_by(sha256=sha256, user_id=g.user.id).first_or_404() headers = { 'Accept': 'text/html', 'User-Agent': 'VxStream Sandbox API Client'} params = {'type': ftype, 'environmentId': eid} vx = vxstream.api.get('result/{}'.format(sha256), params=params, headers=headers) if ftype in ['xml', 'html', 'bin', 'pcap']: ftype += '.gz' return send_file(BytesIO(vx), attachment_filename='{}.{}'.format(sha256, ftype), as_attachment=True)