我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用django.core.files.uploadedfile.UploadedFile()。
def test_resized_images_updated(self): """ thumbnails should be updated if image is already present and updated when update_image=True """ assert self.profile.image assert self.profile.image_small assert self.profile.image_medium # create a dummy image file in memory for upload image_file = BytesIO() image = Image.new('RGBA', size=(50, 50), color=(256, 0, 0)) image.save(image_file, 'png') image_file.seek(0) self.profile.image = UploadedFile(image_file, "filename.png", "image/png", len(image_file.getvalue())) self.profile.save(update_image=True) image_file_bytes = image_file.read() assert self.profile.image_small.file.read() != image_file_bytes assert self.profile.image_medium.file.read() != image_file_bytes
def test_case_study_create_api_success( mock_create_case_study, supplier_case_study_end_to_end, sso_user, all_case_study_data, api_response_200 ): mock_create_case_study.return_value = api_response_200 response = supplier_case_study_end_to_end() assert response.status_code == http.client.FOUND assert response.get('Location') == reverse('company-detail') data = { **all_case_study_data, 'image_one': ANY, 'image_two': ANY, 'image_three': ANY, } # django converts uploaded files to UploadedFile, which makes # `assert_called_once_with` tricky. assert mock_create_case_study.call_count == 1 assert mock_create_case_study.call_args == call( data=data, sso_session_id=sso_user.session_id, )
def test_case_study_update_api_success( mock_update_case_study, supplier_case_study_end_to_end, sso_user, all_case_study_data, api_response_200 ): mock_update_case_study.return_value = api_response_200 response = supplier_case_study_end_to_end(case_study_id='1') assert response.status_code == http.client.FOUND assert response.get('Location') == reverse('company-detail') # django converts uploaded files to UploadedFile, which makes # `assert_called_once_with` tricky. data = { **all_case_study_data, 'image_one': ANY, 'image_two': ANY, 'image_three': ANY, } mock_update_case_study.assert_called_once_with( data=data, case_study_id='1', sso_session_id=sso_user.session_id, )
def _save_to_answer(self, field, answer, value): action = 'pretalx.submission.answer' + ('update' if answer.pk else 'create') if isinstance(field, forms.ModelMultipleChoiceField): answstr = ', '.join([str(o) for o in value]) if not answer.pk: answer.save() else: answer.options.clear() answer.answer = answstr answer.options.add(*value) elif isinstance(field, forms.ModelChoiceField): if not answer.pk: answer.save() else: answer.options.clear() answer.options.add(value) answer.answer = value.answer elif isinstance(field, forms.FileField): if isinstance(value, UploadedFile): answer.answer_file.save(value.name, value) answer.answer = 'file://' + value.name value = answer.answer else: answer.answer = value answer.log_action(action, person=self.request_user, data={'answer': value})
def get_step_files(self, step): wizard_files = self.data[self.step_files_key].get(step, {}) if wizard_files and not self.file_storage: raise NoFileStorageConfigured( "You need to define 'file_storage' in your " "wizard view in order to handle file uploads.") files = {} for field, field_dict in six.iteritems(wizard_files): field_dict = field_dict.copy() tmp_name = field_dict.pop('tmp_name') if (step, field) not in self._files: self._files[(step, field)] = UploadedFile( file=self.file_storage.open(tmp_name), **field_dict) files[field] = self._files[(step, field)] return files or None
def setUp(self): TestBase.setUp(self) self._publish_transportation_form() self._submit_transport_instance_w_attachment() src = os.path.join(self.this_directory, "fixtures", "transportation", "screenshot.png") uf = UploadedFile(file=open(src), content_type='image/png') count = MetaData.objects.count() MetaData.media_upload(self.xform, uf) self.assertEqual(MetaData.objects.count(), count + 1) url = urljoin( self.base_url, reverse(profile, kwargs={'username': self.user.username}) ) self._logout() self._create_user_and_login('deno', 'deno') self.bc = BriefcaseClient( username='bob', password='bob', url=url, user=self.user )
def clean_file(self): pdf = self.cleaned_data['file'] if not pdf: raise ValidationError(_('no file')) # pdf magic check if pdf.read(4) != PDF_MAGIC: raise ValidationError(_('This file is not a PDF document.')) pdf.seek(0) # sanitization try: f = decrypt_pdf(pdf) except ValueError: raise ValidationError(_('The PDF-File seems to broken. For more Information click on the question mark in the sidebar.')) f.seek(0) return UploadedFile(f, content_type='application/pdf', name='upload.pdf')
def clean(self, value, *args, **kwargs): value = super().clean(value, *args, **kwargs) if isinstance(value, UploadedFile): try: from PIL import Image except ImportError: return value value.open('rb') value.seek(0) try: with Image.open(value) as im, tempfile.NamedTemporaryFile('rb', suffix='.png') as tmpfile: im.save(tmpfile.name) tmpfile.seek(0) return SimpleUploadedFile('picture.png', tmpfile.read(), 'image png') except IOError: logger.exception('Could not convert image to PNG.') raise ValidationError( _('The file you uploaded could not be converted to PNG format.') ) return value
def clean(self, *args, **kwargs): data = super(PrivateFileField, self).clean(*args, **kwargs) file = data.file if isinstance(file, UploadedFile): # content_type is only available for uploaded files, # and not for files which are already stored in the model. content_type = file.content_type if self.content_types and content_type not in self.content_types: logger.debug('Rejected uploaded file type: %s', content_type) raise ValidationError(self.error_messages['invalid_file_type']) if self.max_file_size and file.size > self.max_file_size: raise ValidationError(self.error_messages['file_too_large'].format( max_size=filesizeformat(self.max_file_size), size=filesizeformat(file.size) )) return data
def test_files(rf): request = rf.post('/upload', { 'file': ContentFile(b'foo', name='foo.txt'), }) request.api_info = APIInfo(router.get_path('/upload').get_operation('post')) parameters = read_parameters(request, {}) assert isinstance(parameters['file'], UploadedFile)
def setUp(self): super().setUp() # create a dummy image file in memory for upload image_file = BytesIO() image = Image.new('RGBA', size=(50, 50), color=(256, 0, 0)) image.save(image_file, 'png') image_file.seek(0) self.profile.image = UploadedFile(image_file, "filename.png", "image/png", len(image_file.getvalue())) self.profile.save(update_image=True)
def image_one(): return UploadedFile( file=create_test_image('png'), name='one.png', content_type='image/png', size=100, )
def image_two(): return UploadedFile( file=create_test_image('png'), name='one.png', content_type='image/png', size=100, )
def image_three(): return UploadedFile( file=create_test_image('png'), name='one.png', content_type='image/png', size=100, )
def form_valid(self, form): """ If the form is valid, redirect to the supplied URL. """ log.info('received POST to main multiuploader view') file_obj = self.request.FILES['file'] wrapped_file = UploadedFile(file_obj) filename = wrapped_file.name file_size = wrapped_file.file.size log.info('Got file: "%s"' % filename) fl = self.model(filename=filename, file=file_obj) fl.save() log.info('File saving done') thumb_url = "" size = self.request.POST.get('size') size = size if size else '180x80' quality = self.request.POST.get('quality') quality = quality if quality else 50 try: im = get_thumbnail(fl.file, size, quality=quality) thumb_url = im.url except Exception as e: log.error(e) # generating json response array result = {"files": [{"id": str(fl.id), "name": filename, "size": file_size, 'type': file_obj.content_type, "url": reverse('multiuploader', args=[fl.pk]), "thumbnailUrl": thumb_url, "deleteUrl": reverse('multiuploader', args=[fl.pk]), "deleteType": "DELETE", }] } return JsonResponse(result)
def serialize_form(self, form): """ Given this job's bound form return the form's data as a session serializable object The field order is preserved from the original form """ data = [] for field_name, field in form.fields.items(): if field_name in form.cleaned_data: form_value = form.cleaned_data[field_name] display_value = None if isinstance(form_value, models.Model): ctype = ContentType.objects.get_for_model(form_value) form_value = '{0}{1}.{2}:{3}'.format( _CONTENT_TYPE_PREFIX, ctype.app_label, ctype.model, form_value.pk ) elif isinstance(form_value, UploadedFile): file_name = _fs.get_available_name(form_value.name) file_path = _fs.path(file_name) with open(file_path, 'wb+') as destination: for chunk in form_value.chunks(): destination.write(chunk) form_value = file_path display_value = file_name data.append({ 'name': field_name, 'label': force_text(field.label) if field.label else None, 'value': form_value, 'display_value': display_value, }) return data
def clean(self): file = self.cleaned_data.get('file') if not self.cleaned_data.get('name'): if isinstance(file, UploadedFile): self.cleaned_data['name'] = file.name else: del self.cleaned_data['name'] return self.cleaned_data
def test_metadata_file_hash(self): self._publish_transportation_form() src = os.path.join(self.this_directory, "fixtures", "transportation", "screenshot.png") uf = UploadedFile(file=open(src), content_type='image/png') count = MetaData.objects.count() MetaData.media_upload(self.xform, uf) # assert successful insert of new metadata record self.assertEqual(MetaData.objects.count(), count + 1) md = MetaData.objects.get(xform=self.xform, data_value='screenshot.png') # assert checksum string has been generated, hash length > 1 self.assertTrue(len(md.hash) > 16)
def clean(self, value, *args, **kwargs): value = super().clean(value, *args, **kwargs) if isinstance(value, UploadedFile): value.open('rb') value.seek(0) content = value.read() if content.startswith(b'-----BEGIN CERTIFICATE-----') and b'-----BEGIN CERTIFICATE-----' in content: return SimpleUploadedFile('cert.pem', content, 'text/plain') openssl_cmd = [ 'openssl', 'x509', '-inform', 'DER', '-outform', 'PEM', ] process = subprocess.Popen( openssl_cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, stdin=subprocess.PIPE, ) process.stdin.write(content) pem, error = process.communicate() if process.returncode != 0: logger.info('Trying to convert a DER to PEM failed: {}'.format(error)) raise ValidationError( _('This does not look like a X509 certificate in either PEM or DER format'), ) return SimpleUploadedFile('cert.pem', pem, 'text/plain') return value
def get_step_files(self, step): wizard_files = self.data[self.step_files_key].get(step, {}) if wizard_files and not self.file_storage: raise NoFileStorageConfigured( "You need to define 'file_storage' in your " "wizard view in order to handle file uploads.") files = {} for field, field_dict in six.iteritems(wizard_files): field_dict = field_dict.copy() tmp_name = field_dict.pop('tmp_name') files[field] = UploadedFile( file=self.file_storage.open(tmp_name), **field_dict) return files or None
def make_value_from_form(self, value): """Convert a form value to a property value. This extracts the content from the UploadedFile instance returned by the FileField instance. """ if have_uploadedfile and isinstance(value, uploadedfile.UploadedFile): if not self.form_value: self.form_value = value.read() b = db.Blob(self.form_value) return b return super(BlobProperty, self).make_value_from_form(value)
def post_create_issue(self, request, issue, data): from issues_media.models import IssueMedia for file in request.FILES.getlist('media', ()): assert isinstance(file, UploadedFile) if file.size > 100 * 1024 * 1024: raise ValidationError("File %s is too large" % file) # TODO: Add mimetype validation somewhere around here IssueMedia.objects.create( issue=issue, file=file )
def save(self) -> None: """ Saves all changed values to the database. """ for name, field in self.fields.items(): value = self.cleaned_data[name] if isinstance(value, UploadedFile): # Delete old file fname = self._s.get(name, as_type=File) if fname: try: default_storage.delete(fname.name) except OSError: # pragma: no cover logger.error('Deleting file %s failed.' % fname.name) # Create new file newname = default_storage.save(self.get_new_filename(value.name), value) value._name = newname self._s.set(name, value) elif isinstance(value, File): # file is unchanged continue elif isinstance(field, forms.FileField): # file is deleted fname = self._s.get(name, as_type=File) if fname: try: default_storage.delete(fname.name) except OSError: # pragma: no cover logger.error('Deleting file %s failed.' % fname.name) del self._s[name] elif value is None: del self._s[name] elif self._s.get(name, as_type=type(value)) != value: self._s.set(name, value)
def value_from_datadict(self, data: Dict[str, str], files: Dict[str, UploadedFile], name: str) -> str: values = super().value_from_datadict(data, files, name) return ",".join(values)
def test_upload_avatar_inTemp(self): avatar = open(image_path, "rb") new_dict = default_dict2.copy() new_dict['avatar'] = avatar avatar2 = UploadedFile(avatar, name=avatar.name, content_type="image/png", size=os.path.getsize(image_path)) response = self.client.post(reverse('user_profile:edit_profile', kwargs={"username": user_name}), new_dict, follow=True) self.assertContains(response, avatar2) self.delete_image_test() avatar.close() # helper function
def _save(self, name, content): name = self._normalise_name(name) name = self._prepend_prefix(name) content = UploadedFile(content, name) response = self._upload(name, content) return response['public_id']
def validate_and_resize_image(self, image_uploaded_file): def cant_read(): raise forms.ValidationError( "Impossible de lire l'image. Veuillez contacter les administrateurs de T&N" ) if not isinstance(image_uploaded_file, UploadedFile): return image_uploaded_file result_bytes = io.BytesIO() try: with Image.open(image_uploaded_file) as image: # Ensure that our image has the proper orientation according to its EXIF data. For # this, we piggy-back on easy_thumbnails's own utility functions. new_image = exif_orientation(image) w, h = new_image.size if w > 630 or h > 630: new_image.thumbnail((630, 630)) try: new_image.save(result_bytes, format=image.format) except OSError: # could be a "cannot write mode P as JPEG" situation. # let's try http://stackoverflow.com/a/21669827 try: new_image.convert('RGB').save(result_bytes, format=image.format) except OSError: # Oh, screw that. cant_read() except (FileNotFoundError, OSError): # Can't read the image, unset it cant_read() return SimpleUploadedFile( name=image_uploaded_file.name, content=result_bytes.getvalue(), content_type=image_uploaded_file.content_type, )