我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.core.files.storage.FileSystemStorage()。
def setUpModule(): """ Called to set up the module by the test runner """ if not django: return global context, models, storage context = { 'sys.path': sys.path[:], 'sys.modules': sys.modules.copy(), 'os.environ': os.environ.copy(), } if init_django(): from django.core.files.storage import FileSystemStorage from django_app.adapters import models # noqa setup_test_environment() context['DB_NAME'] = create_test_db(verbosity=0, autoclobber=True) storage = FileSystemStorage(mkdtemp())
def upload_avatar(request): user = request.user user_avatar = request.FILES.get('user_avatar', None) if request.method == 'POST' and user_avatar: ext = os.path.splitext(user_avatar.name)[1] if ext.lower() in ['.jpg', '.jpeg', '.png']: filename = user.username + '.jpg' fs = FileSystemStorage() if fs.exists(filename): fs.delete(filename) fs.save(filename, user_avatar) user_account = Account.get_by_user(user=user) request.session['user_avatar'] = user_account.user_avatar request.session.save() return redirect('account_settings')
def post(self, request): form = self.form_class(request.POST) if form.is_valid(): new_question = Question() new_question.content = form.cleaned_data['content'] picture = request.FILES.get('picture') if picture: fs = FileSystemStorage() new_question.picture = fs.save('questions/question.jpg', picture) new_question.profile = Profile.objects.get(user=request.user) new_question.topic = Topic.objects.get(id=int(request.POST.get('topic'))) new_question.save() return redirect("/") else: topics = Topic.objects.all() return render(request, "question/ask.html", {"form": form, "topics": topics})
def post(self, request, id): question = Question.objects.get(id=id) form = self.form_class(request.POST) if form.is_valid(): answer = Answer(profile=Profile.objects.get(user=request.user)) answer.content = form.cleaned_data['content'] source_file = request.FILES.get('picture') if source_file: fs = FileSystemStorage() answer.picture = fs.save('answers/answer.jpg', source_file) answer.question = question answer.save() return redirect('question:question', id=id) else: return render(request, "question/answer.html", { "form": form, "question": question })
def import_dataset(request): parameters = {key: (value if not isinstance(value, list) else value[0]) for key, value in request.POST.items()} parameters['directory'] = prepare_import_directory(IMPORTER_DIRECTORY) parameters['elastic_url'] = es_url if DocumentStorer.exists(**parameters): return HttpResponse('Index and mapping exist', status=403) if parameters['format'] not in {'postgres', 'mongodb', 'elastic'}: if 'file' in request.FILES: fs = FileSystemStorage(location=parameters['directory']) file_name = fs.save(request.FILES['file'].name, request.FILES['file']) parameters['file_path'] = fs.path(file_name) elif 'url' not in parameters: return HttpResponse('failed') Process(target=_import_dataset, args=(parameters,)).start() return HttpResponse()
def post(self, request, *args, **kwargs): form = self.form_class(request.POST, request.FILES) if form.is_valid(): if (str(request.user) == 'AnonymousUser'): path = 'media/AnonymousUser/' user = None else: path = 'media/user/' + str(request.user) + '/' user = request.user subprocess.call(['mkdir', '-p', path]) f = request.FILES['upload_from_local'] fs = FileSystemStorage(path) filename = fs.save(f.name, f) path = path + str(filename) scan_directory = filename url = fs.url(filename) scan_start_time = timezone.now() scan_id = create_scan_id(user, url, scan_directory, scan_start_time) apply_scan_async.delay(path, scan_id) return HttpResponseRedirect('/resultscan/' + str(scan_id))
def upload_file(request): # TODO: Validate form on client side if request.method == 'POST' and request.FILES['music-input']: audio_file = request.FILES['music-input'] fs = FileSystemStorage() filename = fs.save('analyser/uploaded_files/' + audio_file.name, audio_file) logging.error('FILENAME:' + filename) tags = id3tags.get_tags(filename) convert_to_wav(filename) return render(request, 'analyser/feature_home.html', { 'uploaded_filename': filename, 'show_loading_animation': True, 'artist': tags['artist'], 'title': tags['title'], 'album': tags['album'], }) return render(request, 'analyser/index.html')
def __init__(self, apps=None, *args, **kwargs): # List of locations with static files self.locations = [] # Maps dir paths to an appropriate storage instance self.storages = SortedDict() if not isinstance(settings.STATICFILES_DIRS, (list, tuple)): raise ImproperlyConfigured( "Your STATICFILES_DIRS setting is not a tuple or list; " "perhaps you forgot a trailing comma?") for root in settings.STATICFILES_DIRS: if isinstance(root, (list, tuple)): prefix, root = root else: prefix = '' if os.path.abspath(settings.STATIC_ROOT) == os.path.abspath(root): raise ImproperlyConfigured( "The STATICFILES_DIRS setting should " "not contain the STATIC_ROOT setting") if (prefix, root) not in self.locations: self.locations.append((prefix, root)) for prefix, root in self.locations: filesystem_storage = FileSystemStorage(location=root) filesystem_storage.prefix = prefix self.storages[root] = filesystem_storage super(FileSystemFinder, self).__init__(*args, **kwargs)
def post(self, request, *args, **kwargs): myfile = request.FILES['myfile'] fs = FileSystemStorage() filename = fs.save(myfile.name, myfile) uploaded_file_url = fs.url(filename) myfile.seek(0) parsed_logs = [] log_file = myfile.read() # Create ParsedLog object for each line in log file for line in log_file.splitlines(): tokens = parse_line(line) # parse_line returns None if regex fails to match if tokens != None: parsed_log = ParsedLog(owner=request.user, ip_address=tokens[0], rfc_id=tokens[1], user_id=tokens[ 2], date_time=tokens[3], request_line=tokens[4], http_status=tokens[5], num_bytes=tokens[6]) parsed_logs.append(parsed_log) # Bulk insert into database ParsedLog.objects.bulk_create(parsed_logs) return render(request, 'account/main.html', {'error': "Upload Successful"})
def is_local_storage(self): return isinstance(self.storage, FileSystemStorage)
def __init__(self, location=None, base_url=None, *args, **kwargs): if location is None: location = settings.STATIC_ROOT if base_url is None: base_url = settings.STATIC_URL check_settings(base_url) super(StaticFilesStorage, self).__init__(location, base_url, *args, **kwargs) # FileSystemStorage fallbacks to MEDIA_ROOT when location # is empty, so we restore the empty value. if not location: self.base_location = None self.location = None
def user_avatar(self): fs = FileSystemStorage() if fs.exists(self.user.username + '.jpg'): return '/static/avatar/' + self.user.username + '.jpg' else: return '/static/img/profile/default.jpg'
def post(self, request): form = self.form_class(request.POST) if form.is_valid(): profile = Profile.objects.get(user=request.user) profile.education = form.cleaned_data['education'] profile.profession = form.cleaned_data['profession'] profile.employment = form.cleaned_data['employment'] avatar = request.FILES.get('avatar') if avatar is not None: fs = FileSystemStorage() source_file = fs.save('cache/avatar.jpg', avatar) img = Image.open('media/' + source_file) width, height = img.size if width >= height: upper_x = int((width / 2) - (height / 2)) upper_y = 0 lower_x = int((width / 2) + (height / 2)) lower_y = height else: upper_x = 0 upper_y = int((height / 2) - (width / 2)) lower_x = width lower_y = int((height / 2) + (width / 2)) box = (upper_x, upper_y, lower_x, lower_y) img = img.crop(box) img.save('media/'+source_file) profile.avatar = fs.save('avatar/avatar.jpg', open('media/'+source_file, 'rb')) os.remove('media/'+source_file) profile.save() return redirect("account:profile", username=request.user.username) else: return render(request, "account/edit.html", {"form": form})
def simple_upload(request): if request.method == 'POST' and request.FILES['myfile']: myfile = request.FILES['myfile'] fs = FileSystemStorage() filename = fs.save(myfile.name, myfile) uploaded_file_url = fs.url(filename) return render(request, 'core/simple_upload.html', { 'uploaded_file_url': uploaded_file_url }) return render(request, 'core/simple_upload.html')
def test_post_signed_url_where_not_supported(monkeypatch): monkeypatch.setattr(views, 'default_storage', FileSystemStorage()) post_payload = {'key': 'file.txt'} request = HttpRequest() setattr(request, 'method', 'POST') setattr(request, 'POST', post_payload) response = views.signed_url(request) content = response.content.decode('utf-8') assert response.status_code == 404 assert json.loads(content)['error'] == "Not found"
def __init__(self, app_names=None, *args, **kwargs): self.locations = [] self.storages = collections.OrderedDict() for dependency in self.dependencies: module = __import__(dependency) path = '{0}/static'.format(os.path.dirname(module.__file__)) self.locations.append(('', path)) for prefix, root in self.locations: filesystem_storage = FileSystemStorage(location=root) filesystem_storage.prefix = prefix self.storages[root] = filesystem_storage super(FileSystemFinder, self).__init__(*args, **kwargs)
def export_download(request, username, id_string, export_type, filename): owner = get_object_or_404(User, username__iexact=username) xform = get_object_or_404(XForm, id_string__exact=id_string, user=owner) helper_auth_helper(request) if not has_permission(xform, owner, request): return HttpResponseForbidden(_(u'Not shared.')) # find the export entry in the db export = get_object_or_404(Export, xform=xform, filename=filename) if (export_type == Export.GDOC_EXPORT or export_type == Export.EXTERNAL_EXPORT) \ and export.export_url is not None: return HttpResponseRedirect(export.export_url) ext, mime_type = export_def_from_filename(export.filename) audit = { "xform": xform.id_string, "export_type": export.export_type } audit_log( Actions.EXPORT_DOWNLOADED, request.user, owner, _("Downloaded %(export_type)s export '%(filename)s' " "on '%(id_string)s'.") % { 'export_type': export.export_type.upper(), 'filename': export.filename, 'id_string': xform.id_string, }, audit, request) if request.GET.get('raw'): id_string = None default_storage = get_storage_class()() if not isinstance(default_storage, FileSystemStorage): return HttpResponseRedirect(default_storage.url(export.filepath)) basename = os.path.splitext(export.filename)[0] response = response_with_mimetype_and_name( mime_type, name=basename, extension=ext, file_path=export.filepath, show_date=False) return response
def __init__(self, location=None, base_url=None, file_permissions_mode=None, directory_permissions_mode=None): self._file_path = '' self._file_name = '' if os.path.isfile(location): # Separate the path from the filename self._file_path, self._file_name = os.path.split(location) else: # This class won't work if `location` doesn't point to an actual file raise IOError('Path should point to an existing file') # Pass in the folder so this acts like a typical FileSystemStorage that needs a path super().__init__( self._file_path, base_url, file_permissions_mode, directory_permissions_mode )
def handle(self, *args, **options): locale = options['locale'] location = self.get_location() file = 'choices-{}.js'.format(locale) fs = FileSystemStorage(location=location) if fs.exists(file): fs.delete(file) content = generate_js(locale) fs.save(file, ContentFile(content)) if len(sys.argv) > 1 and sys.argv[1] in ['collectstatic_js_choices']: self.stdout.write('{0} file written to {1}'.format(file, location))
def test_iiif_image_api_storage(self, _getter): # Enable file storage engine for this test from icekit.plugins.iiif import views views.iiif_storage = FileSystemStorage(location=tempfile.gettempdir()) # Generate image at 10% size, manually specified canonical_url = reverse( 'iiif_image_api', args=[self.ik_image.pk, 'full', '20,', '0', 'default', 'jpg']) image = self.mock_image(return_from=_getter) response = self.app.get(canonical_url, user=self.superuser) self.assertEqual(image.mock_calls, [ call.resize((20, 30)), call.resize().convert('RGB'), call.resize().convert().save(ANY, format='jpeg') ]) self.assertEqual(200, response.status_code) self.FileResponse.assert_called_with( ANY, content_type='image/jpeg') # Generate image at 10% size using pct:10, confirm we get redirected # to the expected canonical URL path and that image loaded from storage # instead of generated image = self.mock_image(return_from=_getter) response = self.app.get( reverse( 'iiif_image_api', args=[self.ik_image.pk, 'full', 'pct:10', '0', 'default', 'jpg']), user=self.superuser, ) # Redirected to canonical URL? self.assertEqual(302, response.status_code) self.assertTrue(response['location'].endswith(canonical_url)) response = response.follow() self.assertEqual(200, response.status_code) self.assertEqual(image.mock_calls, []) # No calls on image self.FileResponse.assert_called_with( ANY, content_type='image/jpeg')
def list(self, ignore_patterns): result = [] for prefix, path in logo_paths.items(): if not path: continue basedir, filename = os.path.split(path) storage = FileSystemStorage(location=basedir) storage.prefix = prefix result.append((filename, storage)) return result