我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.core.files.storage.get_storage_class()。
def backup_media(): extension = "tar.gz" filename = utils.filename_generate(extension, content_type='media') # Create tarball media_storage = get_storage_class()() outputfile = utils.create_spooled_temporary_file() tar_file = tarfile.open(name=filename, fileobj=outputfile, mode='w:gz') for media_filename in explore_storage(media_storage): tarinfo = tarfile.TarInfo(media_filename) media_file = media_storage.open(media_filename) tarinfo.size = len(media_file) tar_file.addfile(tarinfo, media_file) # Close the TAR for writing tar_file.close() # Store backup outputfile.seek(0) return outputfile, filename
def test_dl_no_xls(self): """ Exports are built from the JSON form structure so we dont need the xls to generate an export """ self._create_xform() self.xform.shared_data = True self.xform.save() default_storage = get_storage_class()() path = self.xform.xls.name self.assertEqual(default_storage.exists(path), True) default_storage.delete(path) self.assertEqual(default_storage.exists(path), False) url = reverse('xls_export', kwargs={'username': self.user.username, 'id_string': self.xform.id_string}) response = self.anon.get(url) self.assertEqual(response.status_code, 404)
def test_dotted_fields_csv_export_output(self): path = os.path.join(os.path.dirname(__file__), 'fixtures', 'userone', 'userone_with_dot_name_fields.xls') self._publish_xls_file_and_set_xform(path) path = os.path.join(os.path.dirname(__file__), 'fixtures', 'userone', 'userone_with_dot_name_fields.xml') self._make_submission( path, forced_submission_time=self._submission_time) # test csv export = generate_export(Export.CSV_EXPORT, 'csv', self.user.username, 'userone') storage = get_storage_class()() self.assertTrue(storage.exists(export.filepath)) path, ext = os.path.splitext(export.filename) self.assertEqual(ext, '.csv') with open(os.path.join( os.path.dirname(__file__), 'fixtures', 'userone', 'userone_with_dot_name_fields.csv')) as f1: with storage.open(export.filepath) as f2: expected_content = f1.read() actual_content = f2.read() self.assertEquals(actual_content, expected_content)
def get_media_file_response(metadata): if metadata.data_file: file_path = metadata.data_file.name filename, extension = os.path.splitext(file_path.split('/')[-1]) extension = extension.strip('.') dfs = get_storage_class()() if dfs.exists(file_path): response = response_with_mimetype_and_name( metadata.data_file_type, filename, extension=extension, show_date=False, file_path=file_path, full_mime=True) return response else: return HttpResponseNotFound() else: return HttpResponseRedirect(metadata.data_value)
def test_create_export(self): self._publish_transportation_form_and_submit_instance() storage = get_storage_class()() # test xls export = generate_export(Export.XLS_EXPORT, 'xls', self.user.username, self.xform.id_string) self.assertTrue(storage.exists(export.filepath)) path, ext = os.path.splitext(export.filename) self.assertEqual(ext, '.xls') # test csv export = generate_export(Export.CSV_EXPORT, 'csv', self.user.username, self.xform.id_string) self.assertTrue(storage.exists(export.filepath)) path, ext = os.path.splitext(export.filename) self.assertEqual(ext, '.csv') # test xls with existing export_id existing_export = Export.objects.create(xform=self.xform, export_type=Export.XLS_EXPORT) export = generate_export(Export.XLS_EXPORT, 'xls', self.user.username, self.xform.id_string, existing_export.id) self.assertEqual(existing_export.id, export.id)
def test_graceful_exit_on_export_delete_if_file_doesnt_exist(self): self._publish_transportation_form() self._submit_transport_instance() export = generate_export(Export.XLS_EXPORT, 'xls', self.user.username, self.xform.id_string) storage = get_storage_class()() # delete file storage.delete(export.filepath) self.assertFalse(storage.exists(export.filepath)) # clear filename, like it would be in an incomplete export export.filename = None export.filedir = None export.save() # delete export record, which should try to delete file as well delete_url = reverse(delete_export, kwargs={ 'username': self.user.username, 'id_string': self.xform.id_string, 'export_type': 'xls' }) post_data = {'export_id': export.id} response = self.client.post(delete_url, post_data) self.assertEqual(response.status_code, 302)
def create_attachments_zipfile(attachments, temporary_file=None): if not temporary_file: temporary_file = NamedTemporaryFile() storage = get_storage_class()() with zipfile.ZipFile(temporary_file, 'w', zipfile.ZIP_STORED, allowZip64=True) as zip_file: for attachment in attachments: if storage.exists(attachment.media_file.name): try: with storage.open(attachment.media_file.name, 'rb') as source_file: zip_file.writestr(attachment.media_file.name, source_file.read()) except Exception, e: report_exception("Error adding file \"{}\" to archive.".format(attachment.media_file.name), e) # Be kind; rewind. temporary_file.seek(0) return temporary_file
def test_csv_export_output(self): path = os.path.join(self.fixture_dir, 'tutorial_w_repeats.xls') self._publish_xls_file_and_set_xform(path) path = os.path.join(self.fixture_dir, 'tutorial_w_repeats.xml') self._make_submission( path, forced_submission_time=self._submission_time) # test csv export = generate_export(Export.CSV_EXPORT, 'csv', self.user.username, 'tutorial_w_repeats') storage = get_storage_class()() self.assertTrue(storage.exists(export.filepath)) path, ext = os.path.splitext(export.filename) self.assertEqual(ext, '.csv') with open(os.path.join( self.fixture_dir, 'tutorial_w_repeats.csv')) as f1: with storage.open(export.filepath) as f2: expected_content = f1.read() actual_content = f2.read() self.assertEquals(actual_content, expected_content)
def _setup(self): self._wrapped = get_storage_class(settings.STATICFILES_STORAGE)()
def __init__(self, storage=None, **kwargs): super(FileView, self).__init__(**kwargs) if storage is None: self.storage = get_storage_class()() else: self.storage = storage()
def response_with_mimetype_and_name( mimetype, name, extension=None, show_date=True, file_path=None, use_local_filesystem=False, full_mime=False): if extension is None: extension = mimetype if not full_mime: mimetype = "application/%s" % mimetype if file_path: try: if not use_local_filesystem: default_storage = get_storage_class()() wrapper = FileWrapper(default_storage.open(file_path)) response = StreamingHttpResponse(wrapper, content_type=mimetype) response['Content-Length'] = default_storage.size(file_path) else: wrapper = FileWrapper(open(file_path)) response = StreamingHttpResponse(wrapper, content_type=mimetype) response['Content-Length'] = os.path.getsize(file_path) except IOError: response = HttpResponseNotFound( _(u"The requested file could not be found.")) else: response = HttpResponse(content_type=mimetype) response['Content-Disposition'] = disposition_ext_and_date( name, extension, show_date) return response
def download_metadata(request, username, id_string, data_id): xform = get_object_or_404(XForm, user__username__iexact=username, id_string__exact=id_string) owner = xform.user if username == request.user.username or xform.shared: data = get_object_or_404(MetaData, pk=data_id) file_path = data.data_file.name filename, extension = os.path.splitext(file_path.split('/')[-1]) extension = extension.strip('.') dfs = get_storage_class()() if dfs.exists(file_path): audit = { 'xform': xform.id_string } audit_log( Actions.FORM_UPDATED, request.user, owner, _("Document '%(filename)s' for '%(id_string)s' downloaded.") % { 'id_string': xform.id_string, 'filename': "%s.%s" % (filename, extension) }, audit, request) response = response_with_mimetype_and_name( data.data_file_type, filename, extension=extension, show_date=False, file_path=file_path) return response else: return HttpResponseNotFound() return HttpResponseForbidden(_(u'Permission denied.'))
def test_csv_nested_repeat_output(self): path = os.path.join(self.fixture_dir, 'double_repeat.xls') self._publish_xls_file(path) path = os.path.join(self.fixture_dir, 'instance.xml') self._make_submission( path, forced_submission_time=self._submission_time) self.maxDiff = None dd = DataDictionary.objects.all()[0] xpaths = [ u'/double_repeat/bed_net[1]/member[1]/name', u'/double_repeat/bed_net[1]/member[2]/name', u'/double_repeat/bed_net[2]/member[1]/name', u'/double_repeat/bed_net[2]/member[2]/name', u'/double_repeat/meta/instanceID' ] self.assertEquals(dd.xpaths(repeat_iterations=2), xpaths) # test csv export = generate_export(Export.CSV_EXPORT, 'csv', self.user.username, 'double_repeat') storage = get_storage_class()() self.assertTrue(storage.exists(export.filepath)) path, ext = os.path.splitext(export.filename) self.assertEqual(ext, '.csv') with open(os.path.join(self.fixture_dir, 'export.csv')) as f1: with storage.open(export.filepath) as f2: expected_content = f1.read() actual_content = f2.read() self.assertEquals(actual_content, expected_content)
def export_delete_callback(sender, **kwargs): export = kwargs['instance'] storage = get_storage_class()() if export.filepath and storage.exists(export.filepath): storage.delete(export.filepath)
def full_filepath(self): if self.filepath: default_storage = get_storage_class()() try: return default_storage.path(self.filepath) except NotImplementedError: # read file from s3 name, ext = os.path.splitext(self.filepath) tmp = NamedTemporaryFile(suffix=ext, delete=False) f = default_storage.open(self.filepath) tmp.write(f.read()) tmp.close() return tmp.name return None
def test_delete_file_on_export_delete(self): self._publish_transportation_form() self._submit_transport_instance() export = generate_export(Export.XLS_EXPORT, 'xls', self.user.username, self.xform.id_string) storage = get_storage_class()() self.assertTrue(storage.exists(export.filepath)) # delete export object export.delete() self.assertFalse(storage.exists(export.filepath))
def _get_csv_data(self, filepath): storage = get_storage_class()() csv_file = storage.open(filepath) reader = csv.DictReader(csv_file) data = reader.next() csv_file.close() return data
def _get_xls_data(self, filepath): storage = get_storage_class()() with storage.open(filepath) as f: workbook = open_workbook(file_contents=f.read()) transportation_sheet = workbook.sheet_by_name("transportation") self.assertTrue(transportation_sheet.nrows > 1) headers = transportation_sheet.row_values(0) column1 = transportation_sheet.row_values(1) return dict(zip(headers, column1))
def _xls_file_io(self): ''' pulls the xls file from remote storage this should be used sparingly ''' file_path = self.xls.name default_storage = get_storage_class()() if file_path != '' and default_storage.exists(file_path): with default_storage.open(file_path) as ff: if file_path.endswith('.csv'): return convert_csv_to_xls(ff.read()) else: return StringIO(ff.read())
def to_xlsform(self): '''Generate an XLS format XLSForm copy of this form.''' file_path= self.xls.name default_storage= get_storage_class()() if file_path != '' and default_storage.exists(file_path): with default_storage.open(file_path) as xlsform_file: if file_path.endswith('.csv'): xlsform_io = convert_csv_to_xls(xlsform_file.read()) else: xlsform_io= io.BytesIO(xlsform_file.read()) return xlsform_io else: return None
def handle(self, *args, **kwargs): try: fs = get_storage_class( 'django.core.files.storage.FileSystemStorage')() s3 = get_storage_class('storages.backends.s3boto.S3BotoStorage')() except: print _(u"Missing necessary libraries. Try running: pip install -r" "requirements/s3.pip") sys.exit(1) default_storage = get_storage_class()() if default_storage.__class__ != s3.__class__: print _(u"You must first set your default storage to s3 in your " "local_settings.py file.") sys.exit(1) classes_to_move = [ (Attachment, 'media_file', attachment_upload_to), (XForm, 'xls', xform_upload_to), ] for cls, file_field, upload_to in classes_to_move: print _("Moving %(class)ss to s3...") % {'class': cls.__name__} for i in cls.objects.all(): f = getattr(i, file_field) old_filename = f.name if f.name and fs.exists(f.name) and not s3.exists( upload_to(i, f.name)): f.save(fs.path(f.name), fs.open(fs.path(f.name))) print (_("\t+ '%(fname)s'\n\t---> '%(url)s'") % {'fname': fs.path(old_filename), 'url': f.url}) else: print "\t- (f.name=%s, fs.exists(f.name)=%s, not s3.exist"\ "s(upload_to(i, f.name))=%s)" % ( f.name, fs.exists(f.name), not s3.exists(upload_to(i, f.name)))
def generate_attachments_zip_export( export_type, extension, username, id_string, export_id=None, filter_query=None): xform = XForm.objects.get(user__username=username, id_string=id_string) attachments = Attachment.objects.filter(instance__xform=xform) basename = "%s_%s" % (id_string, datetime.now().strftime("%Y_%m_%d_%H_%M_%S")) filename = basename + "." + extension file_path = os.path.join( username, 'exports', id_string, export_type, filename) with NamedTemporaryFile('wb+', prefix='media_zip_export_', suffix='.zip') as temporary_file: create_attachments_zipfile(attachments, temporary_file=temporary_file) export_filename = get_storage_class()().save( file_path, File(temporary_file, file_path)) dir_name, basename = os.path.split(export_filename) # get or create export object if(export_id): export = Export.objects.get(id=export_id) else: export = Export.objects.create(xform=xform, export_type=export_type) export.filedir = dir_name export.filename = basename export.internal_status = Export.SUCCESSFUL export.save() return export
def generate_kml_export( export_type, extension, username, id_string, export_id=None, filter_query=None): user = User.objects.get(username=username) xform = XForm.objects.get(user__username=username, id_string=id_string) response = render_to_response( 'survey.kml', {'data': kml_export_data(id_string, user)}) basename = "%s_%s" % (id_string, datetime.now().strftime("%Y_%m_%d_%H_%M_%S")) filename = basename + "." + extension file_path = os.path.join( username, 'exports', id_string, export_type, filename) storage = get_storage_class()() temp_file = NamedTemporaryFile(suffix=extension) temp_file.write(response.content) temp_file.seek(0) export_filename = storage.save( file_path, File(temp_file, file_path)) temp_file.close() dir_name, basename = os.path.split(export_filename) # get or create export object if(export_id): export = Export.objects.get(id=export_id) else: export = Export.objects.create(xform=xform, export_type=export_type) export.filedir = dir_name export.filename = basename export.internal_status = Export.SUCCESSFUL export.save() return export
def image_urls(instance): default_storage = get_storage_class()() urls = [] suffix = settings.THUMB_CONF['medium']['suffix'] for a in instance.attachments.all(): if default_storage.exists(get_path(a.media_file.name, suffix)): url = default_storage.url( get_path(a.media_file.name, suffix)) else: url = a.media_file.url urls.append(url) return urls
def resize(filename): default_storage = get_storage_class()() path = default_storage.url(filename) req = requests.get(path) if req.status_code == 200: im = StringIO(req.content) image = Image.open(im) conf = settings.THUMB_CONF [_save_thumbnails( image, filename, conf[key]['size'], conf[key]['suffix']) for key in settings.THUMB_ORDER]
def resize_local_env(filename): default_storage = get_storage_class()() path = default_storage.path(filename) image = Image.open(path) conf = settings.THUMB_CONF [_save_thumbnails( image, path, conf[key]['size'], conf[key]['suffix']) for key in settings.THUMB_ORDER]
def image_url(attachment, suffix): '''Return url of an image given size(@param suffix) e.g large, medium, small, or generate required thumbnail ''' url = attachment.media_file.url if suffix == 'original': return url else: default_storage = get_storage_class()() fs = get_storage_class('django.core.files.storage.FileSystemStorage')() if suffix in settings.THUMB_CONF: size = settings.THUMB_CONF[suffix]['suffix'] filename = attachment.media_file.name if default_storage.exists(filename): if default_storage.exists(get_path(filename, size)) and\ default_storage.size(get_path(filename, size)) > 0: url = default_storage.url( get_path(filename, size)) else: if default_storage.__class__ != fs.__class__: resize(filename) else: resize_local_env(filename) return image_url(attachment, suffix) else: return None return url
def _setup(self): self._wrapped = get_storage_class(getattr(settings, 'STATICFILES_STORAGE', default_storage))()
def _setup(self): self._wrapped = get_storage_class( settings.THUMBNAIL_DEFAULT_STORAGE)()
def _setup(self): self._wrapped = get_storage_class('compressor.storage.GzipCompressorFileStorage')()
def _setup(self): self._wrapped = get_storage_class(settings.COMPRESS_STORAGE)()