Python django.core.files.uploadedfile 模块,UploadedFile() 实例源码

我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用django.core.files.uploadedfile.UploadedFile()

项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def test_resized_images_updated(self):
        """
        thumbnails should be updated if image is already present and updated when update_image=True
        """
        assert self.profile.image
        assert self.profile.image_small
        assert self.profile.image_medium

        # create a dummy image file in memory for upload
        image_file = BytesIO()
        image = Image.new('RGBA', size=(50, 50), color=(256, 0, 0))
        image.save(image_file, 'png')
        image_file.seek(0)

        self.profile.image = UploadedFile(image_file, "filename.png", "image/png", len(image_file.getvalue()))
        self.profile.save(update_image=True)
        image_file_bytes = image_file.read()
        assert self.profile.image_small.file.read() != image_file_bytes
        assert self.profile.image_medium.file.read() != image_file_bytes
项目:directory-ui-buyer    作者:uktrade    | 项目源码 | 文件源码
def test_case_study_create_api_success(
    mock_create_case_study, supplier_case_study_end_to_end, sso_user,
    all_case_study_data, api_response_200
):
    mock_create_case_study.return_value = api_response_200

    response = supplier_case_study_end_to_end()
    assert response.status_code == http.client.FOUND
    assert response.get('Location') == reverse('company-detail')
    data = {
        **all_case_study_data,
        'image_one': ANY, 'image_two': ANY, 'image_three': ANY,
    }
    # django converts uploaded files to UploadedFile, which makes
    # `assert_called_once_with` tricky.

    assert mock_create_case_study.call_count == 1
    assert mock_create_case_study.call_args == call(
        data=data,
        sso_session_id=sso_user.session_id,
    )
项目:directory-ui-buyer    作者:uktrade    | 项目源码 | 文件源码
def test_case_study_update_api_success(
    mock_update_case_study, supplier_case_study_end_to_end, sso_user,
    all_case_study_data, api_response_200
):
    mock_update_case_study.return_value = api_response_200

    response = supplier_case_study_end_to_end(case_study_id='1')

    assert response.status_code == http.client.FOUND
    assert response.get('Location') == reverse('company-detail')
    # django converts uploaded files to UploadedFile, which makes
    # `assert_called_once_with` tricky.
    data = {
        **all_case_study_data,
        'image_one': ANY, 'image_two': ANY, 'image_three': ANY,
    }
    mock_update_case_study.assert_called_once_with(
        data=data,
        case_study_id='1',
        sso_session_id=sso_user.session_id,
    )
项目:pretalx    作者:pretalx    | 项目源码 | 文件源码
def _save_to_answer(self, field, answer, value):
        action = 'pretalx.submission.answer' + ('update' if answer.pk else 'create')
        if isinstance(field, forms.ModelMultipleChoiceField):
            answstr = ', '.join([str(o) for o in value])
            if not answer.pk:
                answer.save()
            else:
                answer.options.clear()
            answer.answer = answstr
            answer.options.add(*value)
        elif isinstance(field, forms.ModelChoiceField):
            if not answer.pk:
                answer.save()
            else:
                answer.options.clear()
            answer.options.add(value)
            answer.answer = value.answer
        elif isinstance(field, forms.FileField):
            if isinstance(value, UploadedFile):
                answer.answer_file.save(value.name, value)
                answer.answer = 'file://' + value.name
            value = answer.answer
        else:
            answer.answer = value
        answer.log_action(action, person=self.request_user, data={'answer': value})
项目:deb-python-django-formtools    作者:openstack    | 项目源码 | 文件源码
def get_step_files(self, step):
        wizard_files = self.data[self.step_files_key].get(step, {})

        if wizard_files and not self.file_storage:
            raise NoFileStorageConfigured(
                "You need to define 'file_storage' in your "
                "wizard view in order to handle file uploads.")

        files = {}
        for field, field_dict in six.iteritems(wizard_files):
            field_dict = field_dict.copy()
            tmp_name = field_dict.pop('tmp_name')
            if (step, field) not in self._files:
                self._files[(step, field)] = UploadedFile(
                    file=self.file_storage.open(tmp_name), **field_dict)
            files[field] = self._files[(step, field)]
        return files or None
项目:fieldsight-kobocat    作者:awemulya    | 项目源码 | 文件源码
def setUp(self):
        TestBase.setUp(self)
        self._publish_transportation_form()
        self._submit_transport_instance_w_attachment()
        src = os.path.join(self.this_directory, "fixtures",
                           "transportation", "screenshot.png")
        uf = UploadedFile(file=open(src), content_type='image/png')
        count = MetaData.objects.count()
        MetaData.media_upload(self.xform, uf)
        self.assertEqual(MetaData.objects.count(), count + 1)
        url = urljoin(
            self.base_url,
            reverse(profile, kwargs={'username': self.user.username})
        )
        self._logout()
        self._create_user_and_login('deno', 'deno')
        self.bc = BriefcaseClient(
            username='bob', password='bob',
            url=url,
            user=self.user
        )
项目:ecs    作者:ecs-org    | 项目源码 | 文件源码
def clean_file(self):
        pdf = self.cleaned_data['file']
        if not pdf:
            raise ValidationError(_('no file'))

        # pdf magic check
        if pdf.read(4) != PDF_MAGIC:
            raise ValidationError(_('This file is not a PDF document.'))
        pdf.seek(0)

        # sanitization
        try:
            f = decrypt_pdf(pdf)
        except ValueError:
            raise ValidationError(_('The PDF-File seems to broken. For more Information click on the question mark in the sidebar.'))

        f.seek(0)
        return UploadedFile(f, content_type='application/pdf', name='upload.pdf')
项目:pretix-passbook    作者:pretix    | 项目源码 | 文件源码
def clean(self, value, *args, **kwargs):
        value = super().clean(value, *args, **kwargs)
        if isinstance(value, UploadedFile):
            try:
                from PIL import Image
            except ImportError:
                return value

            value.open('rb')
            value.seek(0)
            try:
                with Image.open(value) as im, tempfile.NamedTemporaryFile('rb', suffix='.png') as tmpfile:
                    im.save(tmpfile.name)
                    tmpfile.seek(0)
                    return SimpleUploadedFile('picture.png', tmpfile.read(), 'image png')
            except IOError:
                logger.exception('Could not convert image to PNG.')
                raise ValidationError(
                    _('The file you uploaded could not be converted to PNG format.')
                )

        return value
项目:django-private-storage    作者:edoburu    | 项目源码 | 文件源码
def clean(self, *args, **kwargs):
        data = super(PrivateFileField, self).clean(*args, **kwargs)
        file = data.file
        if isinstance(file, UploadedFile):
            # content_type is only available for uploaded files,
            # and not for files which are already stored in the model.
            content_type = file.content_type

            if self.content_types and content_type not in self.content_types:
                logger.debug('Rejected uploaded file type: %s', content_type)
                raise ValidationError(self.error_messages['invalid_file_type'])

            if self.max_file_size and file.size > self.max_file_size:
                raise ValidationError(self.error_messages['file_too_large'].format(
                    max_size=filesizeformat(self.max_file_size),
                    size=filesizeformat(file.size)
                ))

        return data
项目:pretix-espass    作者:esPass    | 项目源码 | 文件源码
def clean(self, value, *args, **kwargs):
        value = super().clean(value, *args, **kwargs)
        if isinstance(value, UploadedFile):
            try:
                from PIL import Image
            except ImportError:
                return value

            value.open('rb')
            value.seek(0)
            try:
                with Image.open(value) as im, tempfile.NamedTemporaryFile('rb', suffix='.png') as tmpfile:
                    im.save(tmpfile.name)
                    tmpfile.seek(0)
                    return SimpleUploadedFile('picture.png', tmpfile.read(), 'image png')
            except IOError:
                logger.exception('Could not convert image to PNG.')
                raise ValidationError(
                    _('The file you uploaded could not be converted to PNG format.')
                )

        return value
项目:FormShare    作者:qlands    | 项目源码 | 文件源码
def setUp(self):
        TestBase.setUp(self)
        self._publish_transportation_form()
        self._submit_transport_instance_w_attachment()
        src = os.path.join(self.this_directory, "fixtures",
                           "transportation", "screenshot.png")
        uf = UploadedFile(file=open(src), content_type='image/png')
        count = MetaData.objects.count()
        MetaData.media_upload(self.xform, uf)
        self.assertEqual(MetaData.objects.count(), count + 1)
        url = urljoin(
            self.base_url,
            reverse(profile, kwargs={'username': self.user.username})
        )
        self._logout()
        self._create_user_and_login('deno', 'deno')
        self.bc = BriefcaseClient(
            username='bob', password='bob',
            url=url,
            user=self.user
        )
项目:lepo    作者:akx    | 项目源码 | 文件源码
def test_files(rf):
    request = rf.post('/upload', {
        'file': ContentFile(b'foo', name='foo.txt'),
    })
    request.api_info = APIInfo(router.get_path('/upload').get_operation('post'))
    parameters = read_parameters(request, {})
    assert isinstance(parameters['file'], UploadedFile)
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def setUp(self):
        super().setUp()

        # create a dummy image file in memory for upload
        image_file = BytesIO()
        image = Image.new('RGBA', size=(50, 50), color=(256, 0, 0))
        image.save(image_file, 'png')
        image_file.seek(0)

        self.profile.image = UploadedFile(image_file, "filename.png", "image/png", len(image_file.getvalue()))
        self.profile.save(update_image=True)
项目:directory-ui-buyer    作者:uktrade    | 项目源码 | 文件源码
def image_one():
    return UploadedFile(
        file=create_test_image('png'),
        name='one.png',
        content_type='image/png',
        size=100,
    )
项目:directory-ui-buyer    作者:uktrade    | 项目源码 | 文件源码
def image_two():
    return UploadedFile(
        file=create_test_image('png'),
        name='one.png',
        content_type='image/png',
        size=100,
    )
项目:directory-ui-buyer    作者:uktrade    | 项目源码 | 文件源码
def image_three():
    return UploadedFile(
        file=create_test_image('png'),
        name='one.png',
        content_type='image/png',
        size=100,
    )
项目:multiuploader    作者:vinaypost    | 项目源码 | 文件源码
def form_valid(self, form):
        """
        If the form is valid, redirect to the supplied URL.
        """
        log.info('received POST to main multiuploader view')
        file_obj = self.request.FILES['file']
        wrapped_file = UploadedFile(file_obj)
        filename = wrapped_file.name
        file_size = wrapped_file.file.size
        log.info('Got file: "%s"' % filename)
        fl = self.model(filename=filename, file=file_obj)
        fl.save()
        log.info('File saving done')
        thumb_url = ""
        size = self.request.POST.get('size')
        size = size if size else '180x80'
        quality = self.request.POST.get('quality')
        quality = quality if quality else 50
        try:
            im = get_thumbnail(fl.file, size, quality=quality)
            thumb_url = im.url
        except Exception as e:
            log.error(e)
        # generating json response array
        result = {"files": [{"id": str(fl.id),
                             "name": filename,
                             "size": file_size,
                             'type': file_obj.content_type,
                             "url": reverse('multiuploader', args=[fl.pk]),
                             "thumbnailUrl": thumb_url,
                             "deleteUrl": reverse('multiuploader', args=[fl.pk]),
                             "deleteType": "DELETE", }]
                  }

        return JsonResponse(result)
项目:django-admin-rq    作者:Proper-Job    | 项目源码 | 文件源码
def serialize_form(self, form):
        """
        Given this job's bound form return the form's data as a session serializable object
        The field order is preserved from the original form
        """
        data = []
        for field_name, field in form.fields.items():
            if field_name in form.cleaned_data:
                form_value = form.cleaned_data[field_name]
                display_value = None
                if isinstance(form_value, models.Model):
                    ctype = ContentType.objects.get_for_model(form_value)
                    form_value = '{0}{1}.{2}:{3}'.format(
                        _CONTENT_TYPE_PREFIX,
                        ctype.app_label,
                        ctype.model,
                        form_value.pk
                    )
                elif isinstance(form_value, UploadedFile):
                    file_name = _fs.get_available_name(form_value.name)
                    file_path = _fs.path(file_name)
                    with open(file_path, 'wb+') as destination:
                        for chunk in form_value.chunks():
                            destination.write(chunk)
                    form_value = file_path
                    display_value = file_name
                data.append({
                    'name': field_name,
                    'label': force_text(field.label) if field.label else None,
                    'value': form_value,
                    'display_value': display_value,
                })
        return data
项目:beg-django-e-commerce    作者:Apress    | 项目源码 | 文件源码
def clean(self):
        file = self.cleaned_data.get('file')
        if not self.cleaned_data.get('name'):
            if isinstance(file, UploadedFile):
                self.cleaned_data['name'] = file.name
            else:
                del self.cleaned_data['name']
        return self.cleaned_data
项目:fieldsight-kobocat    作者:awemulya    | 项目源码 | 文件源码
def test_metadata_file_hash(self):
        self._publish_transportation_form()
        src = os.path.join(self.this_directory, "fixtures",
                           "transportation", "screenshot.png")
        uf = UploadedFile(file=open(src), content_type='image/png')
        count = MetaData.objects.count()
        MetaData.media_upload(self.xform, uf)
        # assert successful insert of new metadata record
        self.assertEqual(MetaData.objects.count(), count + 1)
        md = MetaData.objects.get(xform=self.xform,
                                  data_value='screenshot.png')
        # assert checksum string has been generated, hash length > 1
        self.assertTrue(len(md.hash) > 16)
项目:pretix-passbook    作者:pretix    | 项目源码 | 文件源码
def clean(self, value, *args, **kwargs):
        value = super().clean(value, *args, **kwargs)
        if isinstance(value, UploadedFile):
            value.open('rb')
            value.seek(0)
            content = value.read()
            if content.startswith(b'-----BEGIN CERTIFICATE-----') and b'-----BEGIN CERTIFICATE-----' in content:
                return SimpleUploadedFile('cert.pem', content, 'text/plain')

            openssl_cmd = [
                'openssl',
                'x509',
                '-inform',
                'DER',
                '-outform',
                'PEM',
            ]
            process = subprocess.Popen(
                openssl_cmd,
                stderr=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stdin=subprocess.PIPE,
            )
            process.stdin.write(content)
            pem, error = process.communicate()
            if process.returncode != 0:
                logger.info('Trying to convert a DER to PEM failed: {}'.format(error))
                raise ValidationError(
                    _('This does not look like a X509 certificate in either PEM or DER format'),
                )

            return SimpleUploadedFile('cert.pem', pem, 'text/plain')
        return value
项目:YouPBX    作者:JoneXiong    | 项目源码 | 文件源码
def get_step_files(self, step):
        wizard_files = self.data[self.step_files_key].get(step, {})

        if wizard_files and not self.file_storage:
            raise NoFileStorageConfigured(
                    "You need to define 'file_storage' in your "
                    "wizard view in order to handle file uploads.")

        files = {}
        for field, field_dict in six.iteritems(wizard_files):
            field_dict = field_dict.copy()
            tmp_name = field_dict.pop('tmp_name')
            files[field] = UploadedFile(
                file=self.file_storage.open(tmp_name), **field_dict)
        return files or None
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def make_value_from_form(self, value):
    """Convert a form value to a property value.

    This extracts the content from the UploadedFile instance returned
    by the FileField instance.
    """
    if have_uploadedfile and isinstance(value, uploadedfile.UploadedFile):
      if not self.form_value:
        self.form_value = value.read()
      b = db.Blob(self.form_value)
      return b
    return super(BlobProperty, self).make_value_from_form(value)
项目:tissuelab    作者:VirtualPlants    | 项目源码 | 文件源码
def get_step_files(self, step):
        wizard_files = self.data[self.step_files_key].get(step, {})

        if wizard_files and not self.file_storage:
            raise NoFileStorageConfigured(
                    "You need to define 'file_storage' in your "
                    "wizard view in order to handle file uploads.")

        files = {}
        for field, field_dict in six.iteritems(wizard_files):
            field_dict = field_dict.copy()
            tmp_name = field_dict.pop('tmp_name')
            files[field] = UploadedFile(
                file=self.file_storage.open(tmp_name), **field_dict)
        return files or None
项目:issue-reporting    作者:6aika    | 项目源码 | 文件源码
def post_create_issue(self, request, issue, data):
        from issues_media.models import IssueMedia
        for file in request.FILES.getlist('media', ()):
            assert isinstance(file, UploadedFile)
            if file.size > 100 * 1024 * 1024:
                raise ValidationError("File %s is too large" % file)
            # TODO: Add mimetype validation somewhere around here
            IssueMedia.objects.create(
                issue=issue,
                file=file
            )
项目:FormShare    作者:qlands    | 项目源码 | 文件源码
def test_metadata_file_hash(self):
        self._publish_transportation_form()
        src = os.path.join(self.this_directory, "fixtures",
                           "transportation", "screenshot.png")
        uf = UploadedFile(file=open(src), content_type='image/png')
        count = MetaData.objects.count()
        MetaData.media_upload(self.xform, uf)
        # assert successful insert of new metadata record
        self.assertEqual(MetaData.objects.count(), count + 1)
        md = MetaData.objects.get(xform=self.xform,
                                  data_value='screenshot.png')
        # assert checksum string has been generated, hash length > 1
        self.assertTrue(len(md.hash) > 16)
项目:django-hierarkey    作者:raphaelm    | 项目源码 | 文件源码
def save(self) -> None:
        """
        Saves all changed values to the database.
        """
        for name, field in self.fields.items():
            value = self.cleaned_data[name]
            if isinstance(value, UploadedFile):
                # Delete old file
                fname = self._s.get(name, as_type=File)
                if fname:
                    try:
                        default_storage.delete(fname.name)
                    except OSError:  # pragma: no cover
                        logger.error('Deleting file %s failed.' % fname.name)

                # Create new file
                newname = default_storage.save(self.get_new_filename(value.name), value)
                value._name = newname
                self._s.set(name, value)
            elif isinstance(value, File):
                # file is unchanged
                continue
            elif isinstance(field, forms.FileField):
                # file is deleted
                fname = self._s.get(name, as_type=File)
                if fname:
                    try:
                        default_storage.delete(fname.name)
                    except OSError:  # pragma: no cover
                        logger.error('Deleting file %s failed.' % fname.name)
                del self._s[name]
            elif value is None:
                del self._s[name]
            elif self._s.get(name, as_type=type(value)) != value:
                self._s.set(name, value)
项目:authserver    作者:jdelic    | 项目源码 | 文件源码
def value_from_datadict(self, data: Dict[str, str], files: Dict[str, UploadedFile], name: str) -> str:
        values = super().value_from_datadict(data, files, name)
        return ",".join(values)
项目:iguana    作者:iguana-project    | 项目源码 | 文件源码
def test_upload_avatar_inTemp(self):
        avatar = open(image_path, "rb")
        new_dict = default_dict2.copy()
        new_dict['avatar'] = avatar
        avatar2 = UploadedFile(avatar, name=avatar.name, content_type="image/png", size=os.path.getsize(image_path))
        response = self.client.post(reverse('user_profile:edit_profile', kwargs={"username": user_name}),
                                    new_dict, follow=True)
        self.assertContains(response, avatar2)
        self.delete_image_test()
        avatar.close()

    # helper function
项目:django-cloudinary-storage    作者:klis87    | 项目源码 | 文件源码
def _save(self, name, content):
        name = self._normalise_name(name)
        name = self._prepend_prefix(name)
        content = UploadedFile(content, name)
        response = self._upload(name, content)
        return response['public_id']
项目:tn2    作者:hsoft    | 项目源码 | 文件源码
def validate_and_resize_image(self, image_uploaded_file):
        def cant_read():
            raise forms.ValidationError(
                "Impossible de lire l'image. Veuillez contacter les administrateurs de T&N"
            )

        if not isinstance(image_uploaded_file, UploadedFile):
            return image_uploaded_file
        result_bytes = io.BytesIO()
        try:
            with Image.open(image_uploaded_file) as image:
                # Ensure that our image has the proper orientation according to its EXIF data. For
                # this, we piggy-back on easy_thumbnails's own utility functions.
                new_image = exif_orientation(image)
                w, h = new_image.size
                if w > 630 or h > 630:
                    new_image.thumbnail((630, 630))
                try:
                    new_image.save(result_bytes, format=image.format)
                except OSError:
                    # could be a "cannot write mode P as JPEG" situation.
                    # let's try http://stackoverflow.com/a/21669827
                    try:
                        new_image.convert('RGB').save(result_bytes, format=image.format)
                    except OSError:
                        # Oh, screw that.
                        cant_read()
        except (FileNotFoundError, OSError):
            # Can't read the image, unset it
            cant_read()

        return SimpleUploadedFile(
            name=image_uploaded_file.name,
            content=result_bytes.getvalue(),
            content_type=image_uploaded_file.content_type,
        )