我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用django.http.StreamingHttpResponse()。
def get(self, request, *args, **kwargs): post = get_object_or_404(self.get_queryset(), pk=self.kwargs['pk']) if request.user.is_superuser or request.user.has_perm('archives.change_post') or post.author_id == request.user.id: pass elif post.visible == 'private' or post.visible == 'sell' and not post.buyers.filter(id=request.user.id).exists(): raise Http404 chunk_size = 8192 response = StreamingHttpResponse(FileWrapper(open(post.attachment.path, 'rb'), chunk_size), content_type='application/octet-stream') response['Content-Length'] = post.attachment.size filename = post.attachment_filename if post.attachment_filename else 'attachment' response["Content-Disposition"] = \ "attachment; " \ "filenane={ascii_filename};" \ "filename*=UTF-8''{utf_filename}".format( ascii_filename=quote(filename), utf_filename=quote(filename) ) return response
def download_log(request): if request.method == 'GET': log_path = request.GET.get('log_path') log_name = request.GET.get('log_name') print('log_path:',log_path,'log_name:',log_name) #??zip????? zip_file_name = log_name + '.zip' #????????????? zip_dir = dao_config.log_dir_master + 'tmp/'+ zip_file_name archive = zipfile.ZipFile(zip_dir, 'w', zipfile.ZIP_DEFLATED) #??zip????????? archive.write(log_path) #???? archive.close() print(zip_dir) if os.path.isfile(zip_dir): response = StreamingHttpResponse(readFile(zip_dir)) response['Content-Type'] = 'application/octet-stream' response['Content-Disposition'] = 'attachment;filename="{0}"'.format(zip_file_name) return response else: return HttpResponse('??????')
def export_to_debug_html_response(filename, headers, rows): """Returns a downloadable StreamingHttpResponse using an HTML payload for debugging""" def output_generator(): # Note the use of bytestrings to avoid unnecessary Unicode-bytes cycles: yield b'<!DOCTYPE html><html>' yield b'<head><meta charset="utf-8"><title>TABULAR DEBUG</title>' yield b'<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.5/css/bootstrap.min.css">' yield b'</head>' yield b'<body class="container-fluid"><div class="table-responsive"><table class="table table-striped">' yield b'<thead><tr><th>' yield b'</th><th>'.join(convert_value_to_unicode(i).encode('utf-8') for i in headers) yield b'</th></tr></thead>' yield b'<tbody>' for row in rows: values = map(convert_value_to_unicode, row) values = [i.encode('utf-8').replace(b'\n', b'<br>') for i in values] yield b'<tr><td>%s</td></tr>' % b'</td><td>'.join(values) yield b'</tbody>' yield b'</table></div></body></html>' return StreamingHttpResponse(output_generator(), content_type='text/html; charset=UTF-8')
def get(self, request, container, object_name): """Get the object contents. """ obj = api.swift.swift_get_object( request, container, object_name ) # Add the original file extension back on if it wasn't preserved in the # name given to the object. filename = object_name.rsplit(api.swift.FOLDER_DELIMITER)[-1] if not os.path.splitext(obj.name)[1] and obj.orig_name: name, ext = os.path.splitext(obj.orig_name) filename = "%s%s" % (filename, ext) response = StreamingHttpResponse(obj.data) safe = filename.replace(",", "") if six.PY2: safe = safe.encode('utf-8') response['Content-Disposition'] = 'attachment; filename="%s"' % safe response['Content-Type'] = 'application/octet-stream' response['Content-Length'] = obj.bytes return response
def object_download(request, container_name, object_path): try: obj = api.swift.swift_get_object(request, container_name, object_path, resp_chunk_size=swift.CHUNK_SIZE) except Exception: redirect = reverse("horizon:project:containers:index") exceptions.handle(request, _("Unable to retrieve object."), redirect=redirect) # Add the original file extension back on if it wasn't preserved in the # name given to the object. filename = object_path.rsplit(swift.FOLDER_DELIMITER)[-1] if not os.path.splitext(obj.name)[1] and obj.orig_name: name, ext = os.path.splitext(obj.orig_name) filename = "%s%s" % (filename, ext) response = http.StreamingHttpResponse(obj.data) safe_name = filename.replace(",", "") if six.PY2: safe_name = safe_name.encode('utf-8') response['Content-Disposition'] = 'attachment; filename="%s"' % safe_name response['Content-Type'] = 'application/octet-stream' response['Content-Length'] = obj.bytes return response
def export_protocol(request): if request.method == 'GET': if 'id' in request.GET: protocol_text = build_plain_protocol(request, request.GET['id']) if protocol_text == 1: return error('Cannot find the protocol.') elif protocol_text == 2: return error('You are not owner of the protocol.') else: from django.http import StreamingHttpResponse response = StreamingHttpResponse(protocol_text) response['Content-Type'] = 'application/octet-stream' response['Content-Disposition'] = 'attachment;filename="{0}"'.format(request.user.username + str(request.GET['id']) + '.txt') return response else: return error('Unknown parameter.') else: return error('Method error.')
def render_csv_response(queryset, filename=None, add_datestamp=False, **kwargs): """ entry function, making a CSV streaming http response, take a queryset """ if filename: filename = clean_filename(filename) if add_datestamp: filename = attach_datestamp(filename) else: filename = generate_filename(queryset, add_datestamp) response_args = {'content_type': 'text/csv'} response = StreamingHttpResponse( _iter_csv(queryset, Echo(), **kwargs), **response_args) # support chinese filename response['Content-Disposition'] = b'attachment; filename=%s;' % filename.encode(encoding='utf-8') response['Cache-Control'] = 'no-cache' return response
def render_csv_response(self, queryset): """ making a CSV streaming http response, take a queryset """ if self.filename: filename = clean_filename(self.filename) if self.add_datestamp: filename = attach_datestamp(filename) else: filename = generate_filename(queryset, self.add_datestamp) response_args = {'content_type': 'text/csv'} response = StreamingHttpResponse( self._iter_csv(queryset, Echo()), **response_args) # support chinese filename response['Content-Disposition'] = b'attachment; filename=%s;' % filename.encode(encoding='utf-8') response['Cache-Control'] = 'no-cache' return response
def export_matched_data(request): search_id = request.GET['search_id'] inclusive_metaquery = json.loads(request.GET['inclusive_grammar']) ds = Datasets().activate_dataset(request.session) component_query = ElasticGrammarQuery(inclusive_metaquery, None).generate() es_m = ds.build_manager(ES_Manager) if search_id == '-1': # Full search es_m.combined_query = component_query else: saved_query = json.loads(Search.objects.get(pk=search_id).query) es_m.load_combined_query(saved_query) es_m.merge_combined_query_with_query_dict(component_query) inclusive_instructions = generate_instructions(inclusive_metaquery) response = StreamingHttpResponse(get_all_matched_rows(es_m.combined_query['main'], request, inclusive_instructions), content_type='text/csv') response['Content-Disposition'] = 'attachment; filename="%s"' % ('extracted.csv') return response
def full_memory_dump_file(request, analysis_number): file_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(analysis_number), "memory.dmp") if os.path.exists(file_path): filename = os.path.basename(file_path) else: file_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(analysis_number), "memory.dmp.zip") if os.path.exists(file_path): filename = os.path.basename(file_path) if filename: content_type = "application/octet-stream" response = StreamingHttpResponse(FileWrapper(open(file_path), 8192), content_type=content_type) response['Content-Length'] = os.path.getsize(file_path) response['Content-Disposition'] = "attachment; filename=%s" % filename return response else: return render(request, "error.html", {"error": "File not found"})
def full_memory_dump_strings(request, analysis_number): file_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(analysis_number), "memory.dmp.strings") filename = None if os.path.exists(file_path): filename = os.path.basename(file_path) else: file_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(analysis_number), "memory.dmp.strings.zip") if os.path.exists(file_path): filename = os.path.basename(file_path) if filename: content_type = "application/octet-stream" response = StreamingHttpResponse(FileWrapper(open(file_path), 8192), content_type=content_type) response['Content-Length'] = os.path.getsize(file_path) response['Content-Disposition'] = "attachment; filename=%s" % filename return response else: return render(request, "error.html", {"error": "File not found"})
def test_export_csv_using_generator(self): headers = ['A Number', 'Status'] def my_generator(): for i in range(0, 1000): yield (i, u'\N{WARNING SIGN}') resp = export_to_csv_response('numbers.csv', headers, my_generator()) self.assertIsInstance(resp, StreamingHttpResponse) self.assertEqual("attachment; filename*=UTF-8''numbers.csv", resp['Content-Disposition']) # exhaust the iterator: content = list(i.decode('utf-8') for i in resp.streaming_content) # We should have one header row + 1000 content rows: self.assertEqual(len(content), 1001) self.assertEqual(content[0], u'A Number,Status\r\n') self.assertEqual(content[-1], u'999,\u26a0\r\n')
def export_to_csv_response(filename, headers, rows): """Returns a downloadable StreamingHttpResponse using an CSV payload generated from headers and rows""" pseudo_buffer = Echo() writer = csv.writer(pseudo_buffer) def row_generator(): yield map(convert_value_to_unicode, headers) for row in rows: yield map(convert_value_to_unicode, row) if sys.version_info < (3, 0): # On Python 2, csv.writer unconfigurably encodes unicode instances as ASCII # so we need to convert them to UTF-8: row_generator = force_utf8_encoding(row_generator) # This works because csv.writer.writerow calls the underlying file-like .write method # *and* returns the result. We cannot use the same approach for Excel because xlsxwriter # doesn't have a way to emit chunks from ZipFile and StreamingHttpResponse does not # offer a file-like handle. return StreamingHttpResponse((writer.writerow(row) for row in row_generator()), content_type='text/csv; charset=utf-8')
def salt_file_download(request, file_name): import sys reload(sys) sys.setdefaultencoding('utf-8') #??????????ascii??????????not find?reload?????? def file_iterator(file, chunk_size=512): with open(file) as f: while True: c = f.read(chunk_size) if c: yield c else: break f.close() response = StreamingHttpResponse(file_iterator(file_name)) #???????????? response['Content-Type'] = 'application/octet-stream' response['Content-Disposition'] = 'attachment;filename="{0}"'.format(file_name) return response
def _streamed_log_response(file_path, offset, as_attachment): mimetype = mimetypes.guess_type(file_path)[0] or 'application/octet-stream' try: content_len = os.path.getsize(file_path) - offset except OSError: content_len = 0 # use _stream_file() instead of passing file object in order to improve performance response = StreamingHttpResponse(_stream_file(file_path, offset), content_type=mimetype) response["Content-Length"] = content_len if as_attachment: # set filename to be real filesystem name response['Content-Disposition'] = 'attachment; filename=%s' % os.path.basename(file_path) return response
def test_search_with_format(self, mock_essearch, mock_dt): """ Searching with format """ for k, v in FORMAT_CONTENT_TYPE_MAP.iteritems(): url = reverse('complaint_search:search') params = {"format": k} mock_essearch.return_value = 'OK' mock_dt.now.return_value = datetime(2017,1,1,12,0) response = self.client.get(url, params) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertIn(v, response.get('Content-Type')) self.assertEqual(response.get('Content-Disposition'), 'attachment; filename="complaints-2017-01-01_12_00.{}"'.format(k)) self.assertTrue(isinstance(response, StreamingHttpResponse)) mock_essearch.has_calls([ mock.call(format=k) for k in FORMAT_CONTENT_TYPE_MAP ], any_order=True) self.assertEqual(len(FORMAT_CONTENT_TYPE_MAP), mock_essearch.call_count)
def test_search_with_export_anon_rate_throttle(self, mock_essearch): url = reverse('complaint_search:search') mock_essearch.return_value = 'OK' SearchAnonRateThrottle.rate = self.orig_search_anon_rate ExportUIRateThrottle.rate = self.orig_export_ui_rate ExportAnonRateThrottle.rate = self.orig_export_anon_rate limit = int(self.orig_export_anon_rate.split('/')[0]) for i in range(limit): response = self.client.get(url, {"format": "csv"}) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertTrue(isinstance(response, StreamingHttpResponse)) response = self.client.get(url, {"format": "csv"}) self.assertEqual(response.status_code, status.HTTP_429_TOO_MANY_REQUESTS) self.assertIsNotNone(response.data.get('detail')) self.assertIn("Request was throttled", response.data.get('detail')) self.assertEqual(limit, mock_essearch.call_count) self.assertEqual(2, limit)
def test_search_with_export_ui_rate_throttle(self, mock_essearch): url = reverse('complaint_search:search') mock_essearch.return_value = 'OK' SearchAnonRateThrottle.rate = self.orig_search_anon_rate ExportUIRateThrottle.rate = self.orig_export_ui_rate ExportAnonRateThrottle.rate = self.orig_export_anon_rate limit = int(self.orig_export_ui_rate.split('/')[0]) for _ in range(limit): response = self.client.get(url, {"format": "csv"}, HTTP_REFERER=_CCDB_UI_URL) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertTrue(isinstance(response, StreamingHttpResponse)) response = self.client.get(url, {"format": "csv"}, HTTP_REFERER=_CCDB_UI_URL) self.assertEqual(response.status_code, status.HTTP_429_TOO_MANY_REQUESTS) self.assertIsNotNone(response.data.get('detail')) self.assertIn("Request was throttled", response.data.get('detail')) self.assertEqual(limit, mock_essearch.call_count) self.assertEqual(6, limit)
def build_json_protocol(protocol): import json """ response = StreamingHttpResponse(json.dumps(protocol)) response['Content-Type'] = 'application/octet-stream' response['Content-Disposition'] = 'attachment;filename="{0}"'.format(protocol['name']+'.txt') return response """ return json.dumps(protocol)
def host_stats(request): return StreamingHttpResponse(stream_host_stats())
def container_stats(request, container_id): container = Container.objects.get_container(container_id, request.user) if container: return StreamingHttpResponse(stream_response_generator(container)) return render(request, 'no_access.html')
def upload(request): success = None if request.method == 'POST' and 'file' in request.FILES: if request.FILES['file'].content_type == 'text/plain': if process_xbe_info( request.FILES['file'].read().decode(errors='ignore'), request.FILES['file'].name, request.user.pk): success = 'Successfully processed 1 file.' else: success = 'Nothing new.' elif zipfile.is_zipfile(request.FILES['file']): return StreamingHttpResponse(process_zip(request.FILES['file'], process_xbe_info, request)) elif Xbe.is_xbe(request.FILES['file']): xbe = Xbe(xbe_file=request.FILES['file']) if process_xbe_info( xbe.get_dump(), request.FILES['file'].name, request.user.pk, signature_valid=xbe.valid_signature, signature_hash=None if xbe.valid_signature else xbe.calculate_hash() ): success = 'Successfully processed 1 file.' else: success = 'Nothing new.' return render(request, "home.html", {'upload_success': success})
def get_django_response(proxy_response): """ This method is used to create an appropriate response based on the Content-Length of the proxy_response. If the content is bigger than MIN_STREAMING_LENGTH, which is found on utils.py, than django.http.StreamingHttpResponse will be created, else a django.http.HTTPResponse will be created instead. :param proxy_response: An Instance of urllib3.response.HTTPResponse that will create an appropriate response :returns: Returns an appropriate response based on the proxy_response content-length """ status = proxy_response.status headers = proxy_response.headers content_type = headers.get('Content-Type') logger.debug('Proxy response headers: %s', headers) logger.debug('Content-Type: %s', content_type) if should_stream(proxy_response): logger.info('Content-Length is bigger than %s', DEFAULT_AMT) response = StreamingHttpResponse(proxy_response.stream(DEFAULT_AMT), status=status, content_type=content_type) else: content = proxy_response.data or b'' response = HttpResponse(content, status=status, content_type=content_type) logger.info('Normalizing response headers') set_response_headers(response, headers) logger.debug('Response headers: %s', getattr(response, '_headers')) cookies = proxy_response.headers.getlist('set-cookie') logger.info('Checking for invalid cookies') for cookie_string in cookies: cookie_dict = cookie_from_string(cookie_string) # if cookie is invalid cookie_dict will be None if cookie_dict: response.set_cookie(**cookie_dict) logger.debug('Response cookies: %s', response.cookies) return response
def get(self, request, uid, version, format=None): fs = get_fs(request.user) # Find requested file. try: file = UserFile.objects.get(uid=uid, user=request.user) except UserFile.DoesNotExist: raise exceptions.NotFound(uid) # Find requested version. try: version = file.file.versions.get(uid=version) except Version.DoesNotExist: raise exceptions.NotFound(version) # Prepare response. response = StreamingHttpResponse( fs.download(file.path, file=file, version=version), content_type=version.mime) # Adjust headers content_disposition = 'filename="%s"' % file.name if 'download' in request.GET: content_disposition = 'attachment; %s' % content_disposition response['Content-Disposition'] = content_disposition # Send the file. return response
def get(self, request, path, version, format=None): fs = get_fs(request.user) # Find requested file. try: file = fs.info(path) except PathNotFoundError: raise exceptions.NotFound(path) # Find requested version. try: version = file.file.versions.get(uid=version) except Version.DoesNotExist: raise exceptions.NotFound(version) # Prepare response. try: response = StreamingHttpResponse( fs.download(path, file=file, version=version), content_type=version.mime) except PathNotFoundError: raise exceptions.NotFound(path) # Adjust headers. content_disposition = 'filename="%s"' % file.name if 'download' in request.GET: content_disposition = 'attachment; %s' % content_disposition response['Content-Disposition'] = content_disposition # Send the file. return response
def search(request): try: processed_request = RestProcessor().process_searcher(request) except Exception as processing_error: return StreamingHttpResponse([json.dumps({'error': str(processing_error)})]) results = Searcher(es_url).search(processed_request) return StreamingHttpResponse(process_stream(results), content_type='application/json')
def export_pages(request): es_params = {entry['name']: entry['value'] for entry in json.loads(request.GET['args'])} if es_params['num_examples'] == '*': response = StreamingHttpResponse(get_all_rows(es_params, request), content_type='text/csv') else: response = StreamingHttpResponse(get_rows(es_params, request), content_type='text/csv') response['Content-Disposition'] = 'attachment; filename="%s"' % (es_params['filename']) return response
def file_download(request, query_type, object_id): """ return a file from the gridfs by id :param request: :param query_type: :param object_id: :return: """ if 'auth' in config: if config['auth']['enable'].lower() == 'true' and not request.user.is_authenticated: return HttpResponse('Auth Required.') if query_type == 'file': file_object = db.get_filebyid(object_id) file_name = '{0}.bin'.format(file_object.filename) response = StreamingHttpResponse((chunk for chunk in file_object), content_type='application/octet-stream') response['Content-Disposition'] = 'attachment; filename="{0}"'.format(file_name) return response if query_type == 'plugin': plugin_object = db.get_pluginbyid(object_id) file_name = '{0}.csv'.format(plugin_object['plugin_name']) plugin_data = plugin_object['plugin_output'] file_data = "" file_data += ",".join(plugin_data['columns']) file_data += "\n" for row in plugin_data['rows']: for item in row: file_data += "{0},".format(item) file_data.rstrip(',') file_data += "\n" response = HttpResponse(file_data, content_type='application/octet-stream') response['Content-Disposition'] = 'attachment; filename="{0}"'.format(file_name) return response
def tasks_fullmemory(request, task_id): if request.method != "GET": resp = {"error": True, "error_value": "Method not allowed"} return jsonize(resp, response=True) if not apiconf.taskfullmemory.get("enabled"): resp = {"error": True, "error_value": "Full memory download API is disabled"} return jsonize(resp, response=True) check = validate_task(task_id) if check["error"]: return jsonize(check, response=True) file_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(task_id), "memory.dmp") if os.path.exists(file_path): filename = os.path.basename(file_path) else: file_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(task_id), "memory.dmp.zip") if os.path.exists(file_path): filename = os.path.basename(file_path) if filename: content_type = "application/octet-stream" chunk_size = 8192 response = StreamingHttpResponse(FileWrapper(open(file_path), chunk_size), content_type=content_type) response['Content-Length'] = os.path.getsize(file_path) response['Content-Disposition'] = "attachment; filename=%s" % filename return response else: resp = {"error": True, "error_value": "Memory dump not found for task " + task_id} return jsonize(resp, response=True)
def get_files(request, stype, value): if request.method != "GET": resp = {"error": True, "error_value": "Method not allowed"} return jsonize(resp, response=True) if not apiconf.sampledl.get("enabled"): resp = {"error": True, "error_value": "Sample download API is disabled"} return jsonize(resp, response=True) if stype == "md5": file_hash = db.find_sample(md5=value).to_dict()["sha256"] elif stype == "sha1": file_hash = db.find_sample(sha1=value).to_dict()["sha256"] elif stype == "task": check = validate_task(value) if check["error"]: return jsonize(check, response=True) sid = db.view_task(value).to_dict()["sample_id"] file_hash = db.view_sample(sid).to_dict()["sha256"] elif stype == "sha256": file_hash = value sample = os.path.join(CUCKOO_ROOT, "storage", "binaries", file_hash) if os.path.exists(sample): mime = "application/octet-stream" fname = "%s.bin" % file_hash resp = StreamingHttpResponse(FileWrapper(open(sample), 8096), content_type=mime) resp["Content-Length"] = os.path.getsize(sample) resp["Content-Disposition"] = "attachment; filename=" + fname return resp else: resp = {"error": True, "error_value": "Sample %s was not found" % file_hash} return jsonize(file_hash, response=True)
def export_to_excel_response(filename, headers, rows): """Returns a downloadable HttpResponse using an XLSX payload generated from headers and rows""" # See http://technet.microsoft.com/en-us/library/ee309278%28office.12%29.aspx content_type = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' # This cannot be a StreamingHttpResponse because XLSX files are .zip format and # the Python ZipFile library doesn't offer a generator form (which would also # not be called per-row but per-chunk) resp = HttpResponse(content_type=content_type) workbook = xlsxwriter.Workbook(resp, {'constant_memory': True, 'in_memory': True, 'default_date_format': 'yyyy-mm-dd'}) date_format = workbook.add_format({'num_format': 'yyyy-mm-dd'}) worksheet = workbook.add_worksheet() for y, row in enumerate(chain((headers, ), rows)): for x, col in enumerate(row): if isinstance(col, datetime.datetime): # xlsxwriter cannot handle timezones: worksheet.write_datetime(y, x, col.replace(tzinfo=None), date_format) elif isinstance(col, datetime.date): worksheet.write_datetime(y, x, col, date_format) else: worksheet.write(y, x, force_text(col, strings_only=True)) workbook.close() return resp
def get_cache_package(request, filetype): enforce_tile_secret_auth(request) filename = os.path.join(settings.CACHE_ROOT, 'package.'+filetype) f = open(filename, 'rb') f.seek(0, os.SEEK_END) size = f.tell() f.seek(0) content_type = 'application/' + {'tar': 'x-tar', 'tar.gz': 'gzip', 'tar.xz': 'x-xz'}[filetype] response = StreamingHttpResponse(FileWrapper(f), content_type=content_type) response['Content-Length'] = size return response
def get(self, request): test_case_id = request.GET.get("test_case_id", None) if not test_case_id: return error_response(u"????") # ??URL./../../.?????? if not re.compile(r"^[0-9a-zA-Z]+$").match(test_case_id): return error_response(u"????") try: # ??????????????????? # ????????????????????? if request.user.admin_type != SUPER_ADMIN: ContestProblem.objects.get(test_case_id=test_case_id, created_by=request.user) test_case_dir = os.path.join(settings.TEST_CASE_DIR, test_case_id) if not os.path.exists(test_case_dir): return error_response(u"???????") # ??????,????? "test_case" + test_case_id + ".zip" test_case_zip = os.path.join("/tmp", "test_case-" + test_case_id + ".zip") zf = zipfile.ZipFile(test_case_zip, "w", zipfile.ZIP_DEFLATED) for filename in os.listdir(test_case_dir): # ????????,????????? if self._is_legal_test_case_file_name(filename) and not os.path.islink(os.path.join(test_case_dir, filename)): zf.write(os.path.join(test_case_dir, filename), filename) zf.close() # ????? response = StreamingHttpResponse(self.file_iterator(test_case_zip)) response['Content-Type'] = 'application/octet-stream' response['Content-Disposition'] = 'attachment;filename=test_case-%s.zip' % test_case_id return response except ContestProblem.DoesNotExist: return error_response(u"?????")
def serve(self, rendition): # Open and serve the file rendition.file.open('rb') image_format = imghdr.what(rendition.file) return StreamingHttpResponse(FileWrapper(rendition.file), content_type='image/' + image_format)
def test_download_returns_streaming_response(self): product, resource = self.make_usable_product_boot_resource() _, _, os, arch, subarch, series = product.split(':') resource_set = resource.get_latest_complete_set() version = resource_set.version resource_file = resource_set.files.order_by('?')[0] filename = resource_file.filename response = self.get_file_client( os, arch, subarch, series, version, filename) self.assertIsInstance(response, StreamingHttpResponse)
def render_to_response(self,context): buffer = EchoFile() writer = csv.writer(buffer) rows = self.get_rows() response = StreamingHttpResponse((writer.writerow(row) for row in rows),content_type="text/csv") response['Content-Disposition'] = 'attachment; filename="{}"'.format(self.get_filename()) return response
def process_response(self, res): data = res.get(SETTINGS.DATA, None) if isinstance(res, dict): # dict ? json ?? if SETTINGS.DATA_STYLE == 'dict': if data is not None: if isinstance(res[SETTINGS.DATA], (list, dict)) and len(res[SETTINGS.DATA]) == 0: res[SETTINGS.DATA] = None elif isinstance(res[SETTINGS.DATA], list): res[SETTINGS.DATA] = { SETTINGS.RESULT: res[SETTINGS.DATA], SETTINGS.COUNT: len(res[SETTINGS.DATA]) } elif isinstance(res[SETTINGS.DATA].get(SETTINGS.RESULT, None), (list, dict)) and len(res[SETTINGS.DATA][SETTINGS.RESULT]) == 0: res[SETTINGS.DATA][SETTINGS.RESULT] = None if data is not None and len(data) > 0: if self.method == 'get': path = '/' if SETTINGS.RESULT in res[SETTINGS.DATA]: has_result_field = True else: has_result_field = False else: path = None has_result_field = None self.process_keys(res, path, has_result_field) # additional data additional_data = getattr(self, 'additional_data', None) if isinstance(additional_data, dict): for key, value in additional_data.items(): res[SETTINGS.DATA][self.python_to_java(key, self.omit_underlines)] = value # process json response class json_response_class = getattr(SETTINGS, 'JSON_RESPONSE_CLASS', None) if json_response_class == 'rest_framework.response.Response': res = Response(res) elif json_response_class == 'django.http.JsonResponse': res = JsonResponse(res, json_dumps_params={"indent": 2}) else: raise Exception('JSON_RESPONSE_CLASS in the benchmark_settings is not defined or not correct. The value of it should be "rest_framework.response.Response", or "django.http.JsonResponse"') if isinstance(res, (StreamingHttpResponse, django.http.response.HttpResponse)): # ???, ?????? http ?? return res raise Exception('unknown response type: %s' % type(res)) # ?? style 2 ? get ????
def serve(request, document_id, document_filename): Document = get_document_model() doc = get_object_or_404(Document, id=document_id) # Send document_served signal document_served.send(sender=Document, instance=doc, request=request) try: local_path = doc.file.path except NotImplementedError: local_path = None if local_path: # Use wagtail.utils.sendfile to serve the file; # this provides support for mimetypes, if-modified-since and django-sendfile backends if hasattr(settings, 'SENDFILE_BACKEND'): return sendfile(request, local_path, attachment=True, attachment_filename=doc.filename) else: # Fallback to streaming backend if user hasn't specified SENDFILE_BACKEND return sendfile( request, local_path, attachment=True, attachment_filename=doc.filename, backend=sendfile_streaming_backend.sendfile ) else: # We are using a storage backend which does not expose filesystem paths # (e.g. storages.backends.s3boto.S3BotoStorage). # Fall back on pre-sendfile behaviour of reading the file content and serving it # as a StreamingHttpResponse wrapper = FileWrapper(doc.file) response = StreamingHttpResponse(wrapper, content_type='application/octet-stream') try: response['Content-Disposition'] = 'attachment; filename=%s' % doc.filename except BadHeaderError: # Unicode filenames can fail on Django <1.8, Python 2 due to # https://code.djangoproject.com/ticket/20889 - try with an ASCIIfied version of the name response['Content-Disposition'] = 'attachment; filename=%s' % unidecode(doc.filename) # FIXME: storage backends are not guaranteed to implement 'size' response['Content-Length'] = doc.file.size return response
def search(request): fixed_qparam = request.query_params data = _parse_query_params(request.query_params) # Add format to data format = request.accepted_renderer.format if format and format in EXPORT_FORMATS: data['format'] = format else: data['format'] = 'default' serializer = SearchInputSerializer(data=data) if not serializer.is_valid(): return Response( serializer.errors, status=status.HTTP_400_BAD_REQUEST ) results = es_interface.search( agg_exclude=AGG_EXCLUDE_FIELDS, **serializer.validated_data) headers = _buildHeaders() if format not in EXPORT_FORMATS: return Response(results, headers=headers) # If format is in export formats, update its attachment response # with a filename response = StreamingHttpResponse( streaming_content=results, content_type=FORMAT_CONTENT_TYPE_MAP[format] ) filename = 'complaints-{}.{}'.format( datetime.now().strftime('%Y-%m-%d_%H_%M'), format ) headerTemplate = 'attachment; filename="{}"' response['Content-Disposition'] = headerTemplate.format(filename) for header in headers: response[header] = headers[header] return response