Python django.utils.six 模块,BytesIO() 实例源码

我们从Python开源项目中,提取了以下43个代码示例,用于说明如何使用django.utils.six.BytesIO()

项目:sdining    作者:Lurance    | 项目源码 | 文件源码
def _load_stream(self):
        """
        Return the content body of the request, as a stream.
        """
        meta = self._request.META
        try:
            content_length = int(
                meta.get('CONTENT_LENGTH', meta.get('HTTP_CONTENT_LENGTH', 0))
            )
        except (ValueError, TypeError):
            content_length = 0

        if content_length == 0:
            self._stream = None
        elif not self._request._read_started:
            self._stream = self._request
        else:
            self._stream = six.BytesIO(self.body)
项目:django-sspanel    作者:Ehco1996    | 项目源码 | 文件源码
def get_ssr_qrcode(request, node_id):
    '''?????????ssr???'''

    # ??????
    ss_user = request.user.ss_user
    user = request.user
    # ??????
    node = Node.objects.get(node_id=node_id)
    # ??????????
    if user.level < node.level:
        return HttpResponse('?????????????????????')
    ssr_link = node.get_ssr_link(ss_user)
    ssr_img = qrcode.make(ssr_link)
    buf = BytesIO()
    ssr_img.save(buf)
    image_stream = buf.getvalue()
    # ????reponse
    response = HttpResponse(image_stream, content_type="image/png")
    return response
项目:django-sspanel    作者:Ehco1996    | 项目源码 | 文件源码
def get_ss_qrcode(request, node_id):
    '''?????????ss???'''

    # ??????
    ss_user = request.user.ss_user
    user = request.user
    # ??????
    node = Node.objects.get(node_id=node_id)
    # ??????????
    if user.level < node.level:
        return HttpResponse('?????????????????????')
    ss_link = node.get_ss_link(ss_user)
    ss_img = qrcode.make(ss_link)
    buf = BytesIO()
    ss_img.save(buf)
    image_stream = buf.getvalue()
    # ????reponse
    response = HttpResponse(image_stream, content_type="image/png")
    return response
项目:django-sspanel    作者:Ehco1996    | 项目源码 | 文件源码
def gen_face_pay_qrcode(request):
    '''?????????'''
    try:
        # ?seesion?????????
        url = request.session.get('code_url', '')
        # ?????????
        record = AlipayRequest.objects.create(username=request.user,
                                              info_code=request.session['out_trade_no'],
                                              amount=request.session['amount'],)
        # ??sessions??
        del request.session['code_url']
        del request.session['amount']
        # ??ss???
        img = qrcode.make(url)
        buf = BytesIO()
        img.save(buf)
        image_stream = buf.getvalue()
        # ????reponse
        response = HttpResponse(image_stream, content_type="image/png")
        return response
    except:
        return HttpResponse('wrong request')
项目:jianshu-api    作者:strugglingyouth    | 项目源码 | 文件源码
def _load_stream(self):
        """
        Return the content body of the request, as a stream.
        """
        meta = self._request.META
        try:
            content_length = int(
                meta.get('CONTENT_LENGTH', meta.get('HTTP_CONTENT_LENGTH', 0))
            )
        except (ValueError, TypeError):
            content_length = 0

        if content_length == 0:
            self._stream = None
        elif hasattr(self._request, 'read'):
            self._stream = self._request
        else:
            self._stream = six.BytesIO(self.raw_post_data)
项目:esdc-ce    作者:erigones    | 项目源码 | 文件源码
def _load_stream(self):
        """
        Return the content body of the request, as a stream.
        """
        meta = self._request.META
        try:
            content_length = int(
                meta.get('CONTENT_LENGTH', meta.get('HTTP_CONTENT_LENGTH', 0))
            )
        except (ValueError, TypeError):
            content_length = 0

        if content_length == 0:
            self._stream = None
        elif hasattr(self._request, 'read'):
            self._stream = self._request
        else:
            self._stream = six.BytesIO(self.raw_post_data)
项目:Wagtail-Image-Folders    作者:anteatersa    | 项目源码 | 文件源码
def test_runs_operations(self):
        run_mock = Mock()

        def run(willow, image, env):
            run_mock(willow, image, env)

        self.operation_instance.run = run

        fil = Filter(spec='operation1|operation2')
        image = Image.objects.create(
            title="Test image",
            file=get_test_image_file(),
        )
        fil.run(image, BytesIO())

        self.assertEqual(run_mock.call_count, 2)
项目:django-logpipe    作者:thelabnyc    | 项目源码 | 文件源码
def parse(data):
    data = _bytes(data)
    code, body = data.split(_delim, 1)
    if code not in _formats:
        raise UnknownFormatError('Could not find parser for format %s' % code.decode())
    return _formats[code]['parser'].parse( BytesIO(body) )
项目:drf-batch-requests    作者:roman-karpovich    | 项目源码 | 文件源码
def __init__(self, request, request_data):
        super(BatchRequest, self).__init__()
        self.name = request_data.get('name')
        self.omit_response_on_success = request_data.get('omit_response_on_success', False)

        self._stream = BytesIO(request_data['_body'].encode('utf-8'))
        self._read_started = False

        self.method = request_data['method']
        self.path_info = self.path = request_data['relative_url']
        self.META = request.META
        self.COOKIES = request.COOKIES
        self.GET = parse_qs(urlparse(self.path_info).query)
项目:DjangoBlog    作者:0daybug    | 项目源码 | 文件源码
def as_bytes(self, unixfrom=False, linesep='\n'):
            """Return the entire formatted message as bytes.
            Optional `unixfrom' when True, means include the Unix From_ envelope
            header.

            This overrides the default as_bytes() implementation to not mangle
            lines that begin with 'From '. See bug #13433 for details.
            """
            fp = six.BytesIO()
            g = generator.BytesGenerator(fp, mangle_from_=False)
            g.flatten(self, unixfrom=unixfrom, linesep=linesep)
            return fp.getvalue()
项目:django_ecommerce    作者:rick-ey    | 项目源码 | 文件源码
def test_json_to_StatusReport(self):

        json = JSONRenderer().render(self.expected_dict)
        stream = BytesIO(json)
        data = JSONParser().parse(stream)

        serializer = StatusReportSerializer(self.new_status, data=data)
        self.assertTrue(serializer.is_valid())

        status = serializer.save()
        self.assertEqual(self.new_status, status)
        self.assertEqual(self.new_status.id, status.id)
        self.assertEqual(self.new_status.status, status.status)
        self.assertEqual(self.new_status.when, status.when)
        self.assertEqual(self.new_status.user, status.user)
项目:django_ecommerce    作者:rick-ey    | 项目源码 | 文件源码
def test_json_to_new_StatusReport(self):
        json = JSONRenderer().render(self.expected_dict)
        stream = BytesIO(json)
        data = JSONParser().parse(stream)

        serializer = StatusReportSerializer(data=data)
        self.assertTrue(serializer.is_valid())

        status = serializer.save()
        self.assertEqual(self.new_status.status, status.status)
        self.assertIsNotNone(status.when)
        self.assertEqual(self.new_status.user, status.user)
项目:trydjango18    作者:lucifer-yqh    | 项目源码 | 文件源码
def as_bytes(self, unixfrom=False, linesep='\n'):
            """Return the entire formatted message as bytes.
            Optional `unixfrom' when True, means include the Unix From_ envelope
            header.

            This overrides the default as_bytes() implementation to not mangle
            lines that begin with 'From '. See bug #13433 for details.
            """
            fp = six.BytesIO()
            g = generator.BytesGenerator(fp, mangle_from_=False)
            g.flatten(self, unixfrom=unixfrom, linesep=linesep)
            return fp.getvalue()
项目:trydjango18    作者:wei0104    | 项目源码 | 文件源码
def as_bytes(self, unixfrom=False, linesep='\n'):
            """Return the entire formatted message as bytes.
            Optional `unixfrom' when True, means include the Unix From_ envelope
            header.

            This overrides the default as_bytes() implementation to not mangle
            lines that begin with 'From '. See bug #13433 for details.
            """
            fp = six.BytesIO()
            g = generator.BytesGenerator(fp, mangle_from_=False)
            g.flatten(self, unixfrom=unixfrom, linesep=linesep)
            return fp.getvalue()
项目:esdc-ce    作者:erigones    | 项目源码 | 文件源码
def from_native(self, data):
        """
        Checks that the file-upload field data contains a valid image (GIF, JPG,
        PNG, possibly others -- whatever the Python Imaging Library supports).
        """
        f = super(ImageField, self).from_native(data)
        if f is None:
            return None

        from api.compat import Image
        assert Image is not None, 'Either Pillow or PIL must be installed for ImageField support.'

        # We need to get a file object for PIL. We might have a path or we might
        # have to read the data into memory.
        if hasattr(data, 'temporary_file_path'):
            _file = data.temporary_file_path()
        else:
            if hasattr(data, 'read'):
                _file = six.BytesIO(data.read())
            else:
                _file = six.BytesIO(data['content'])

        try:
            # load() could spot a truncated JPEG, but it loads the entire
            # image in memory, which is a DoS vector. See #3848 and #18520.
            # verify() must be called immediately after the constructor.
            Image.open(_file).verify()
        except ImportError:
            # Under PyPy, it is possible to import PIL. However, the underlying
            # _imaging C module isn't available, so an ImportError will be
            # raised. Catch and re-raise.
            raise
        except Exception:  # Python Imaging Library doesn't recognize it as an image
            raise ValidationError(self.error_messages['invalid_image'])
        if hasattr(f, 'seek') and callable(f.seek):
            f.seek(0)
        return f
项目:django-rest-messaging    作者:raphaelgyory    | 项目源码 | 文件源码
def parse_json_response(json):
    """ parse the json response """
    rendered = JSONRenderer().render(json)
    stream = BytesIO(rendered)
    return JSONParser().parse(stream)
项目:django-icekit    作者:ic-labs    | 项目源码 | 文件源码
def _get_image_or_404(identifier, load_image=False):
    """
    Return image matching `identifier`.

    The `identifier` is expected to be a raw image ID for now, but may be
    more complex later.
    """
    ik_image = get_object_or_404(ICEkitImage, id=identifier)
    if not load_image:
        return ik_image, None

    ####################################################################
    # Image-loading incantation cribbed from easythumbnail's `pil_image`
    image = Image.open(BytesIO(ik_image.image.read()))
    # Fully load the image now to catch any problems with the image contents.
    try:
        # An "Image file truncated" exception can occur for some images that
        # are still mostly valid -- we'll swallow the exception.
        image.load()
    except IOError:
        pass
    # Try a second time to catch any other potential exceptions.
    image.load()
    if True:  # Support EXIF orientation data
        image = et_utils.exif_orientation(image)
    ####################################################################

    return ik_image, image
项目:gougo    作者:amaozhao    | 项目源码 | 文件源码
def save_image(image, destination=None, filename=None, **options):
    """
    Save a PIL image.
    """
    if destination is None:
        destination = BytesIO()
    filename = filename or ''
    # Ensure plugins are fully loaded so that Image.EXTENSION is populated.
    Image.init()
    format = Image.EXTENSION.get(os.path.splitext(filename)[1].lower(), 'JPEG')
    if format in ('JPEG', 'WEBP'):
        options.setdefault('quality', 85)
    saved = False
    if format == 'JPEG':
        if settings.THUMBNAIL_PROGRESSIVE and (
                max(image.size) >= settings.THUMBNAIL_PROGRESSIVE):
            options['progressive'] = True
        try:
            image.save(destination, format=format, optimize=1, **options)
            saved = True
        except IOError:
            # Try again, without optimization (PIL can't optimize an image
            # larger than ImageFile.MAXBLOCK, which is 64k by default). This
            # shouldn't be triggered very often these days, as recent versions
            # of pillow avoid the MAXBLOCK limitation.
            pass
    if not saved:
        image.save(destination, format=format, **options)
    if hasattr(destination, 'seek'):
        destination.seek(0)
    return destination
项目:gougo    作者:amaozhao    | 项目源码 | 文件源码
def setUp(self):
        self.source = BytesIO(six.b('file-contents'))
项目:gougo    作者:amaozhao    | 项目源码 | 文件源码
def image_from_b64(data):
    return Image.open(BytesIO(base64.b64decode(data)))
项目:gougo    作者:amaozhao    | 项目源码 | 文件源码
def test_not_image(self):
        """
        Non-images raise an exception.
        """
        self.assertRaises(
            IOError,
            source_generators.pil_image, BytesIO(six.b('not an image')))
项目:gougo    作者:amaozhao    | 项目源码 | 文件源码
def test_nearly_image(self):
        """
        Truncated images *don't* raise an exception if they can still be read.
        """
        data = self.create_image(None, None)
        reference = source_generators.pil_image(data)
        data.seek(0)
        trunc_data = BytesIO()
        trunc_data.write(data.read()[:-10])
        trunc_data.seek(0)
        im = source_generators.pil_image(trunc_data)
        # im will be truncated, but it should be the same dimensions.
        self.assertEqual(im.size, reference.size)
        # self.assertRaises(IOError, source_generators.pil_image, trunc_data)
项目:gougo    作者:amaozhao    | 项目源码 | 文件源码
def test_switch_off_exif_orientation(self):
        """
        Images with EXIF orientation data are not reoriented if the
        ``exif_orientation`` parameter is ``False``.
        """
        reference = image_from_b64(EXIF_REFERENCE)
        data = EXIF_ORIENTATION[2]
        im = image_from_b64(data)
        self.assertFalse(near_identical(reference, im))

        im = source_generators.pil_image(
            BytesIO(base64.b64decode(data)), exif_orientation=False)
        self.assertFalse(
            near_identical(reference, im),
            'Image should not have been modified')
项目:gougo    作者:amaozhao    | 项目源码 | 文件源码
def create_image(self, storage, filename, size=(800, 600),
                     image_mode='RGB', image_format='JPEG'):
        """
        Generate a test image, returning the filename that it was saved as.

        If ``storage`` is ``None``, the BytesIO containing the image data
        will be passed instead.
        """
        data = BytesIO()
        Image.new(image_mode, size).save(data, image_format)
        data.seek(0)
        if not storage:
            return data
        image_file = ContentFile(data.read())
        return storage.save(filename, image_file)
项目:gougo    作者:amaozhao    | 项目源码 | 文件源码
def pil_image(source, exif_orientation=True, **options):
    """
    Try to open the source file directly using PIL, ignoring any errors.

    exif_orientation

        If EXIF orientation data is present, perform any required reorientation
        before passing the data along the processing pipeline.

    """
    # Use a BytesIO wrapper because if the source is an incomplete file like
    # object, PIL may have problems with it. For example, some image types
    # require tell and seek methods that are not present on all storage
    # File objects.
    if not source:
        return
    source = BytesIO(source.read())

    image = Image.open(source)
    # Fully load the image now to catch any problems with the image contents.
    try:
        # An "Image file truncated" exception can occur for some images that
        # are still mostly valid -- we'll swallow the exception.
        image.load()
    except IOError:
        pass
    # Try a second time to catch any other potential exceptions.
    image.load()

    if exif_orientation:
        image = utils.exif_orientation(image)
    return image
项目:django-config-models    作者:edx    | 项目源码 | 文件源码
def test_bad_username(self):
        """
        Tests the error handling when the specified user does not exist.
        """
        test_json = textwrap.dedent("""
            {
                "model": "config_models.ExampleDeserializeConfig",
                "data": [{"name": "dino"}]
            }
            """)
        with self.assertRaisesRegexp(Exception, "User matching query does not exist"):
            deserialize_json(BytesIO(test_json), "unknown_username")
项目:django-config-models    作者:edx    | 项目源码 | 文件源码
def test_invalid_json(self):
        """
        Tests the error handling when there is invalid JSON.
        """
        test_json = textwrap.dedent("""
            {
                "model": "config_models.ExampleDeserializeConfig",
                "data": [{"name": "dino"
            """)
        with self.assertRaisesRegexp(Exception, "JSON parse error"):
            deserialize_json(BytesIO(test_json), self.test_username)
项目:django-config-models    作者:edx    | 项目源码 | 文件源码
def test_invalid_model(self):
        """
        Tests the error handling when the configuration model specified does not exist.
        """
        test_json = textwrap.dedent("""
            {
                "model": "xxx.yyy",
                "data":[{"name": "dino"}]
            }
            """)
        with self.assertRaisesRegexp(Exception, "No installed app"):
            deserialize_json(BytesIO(test_json), self.test_username)
项目:Wagtail-Image-Folders    作者:anteatersa    | 项目源码 | 文件源码
def get_test_image_file(filename='test.png', colour='white', size=(640, 480)):
    f = BytesIO()
    image = PIL.Image.new('RGB', size, colour)
    image.save(f, 'PNG')
    return ImageFile(f, name=filename)
项目:Wagtail-Image-Folders    作者:anteatersa    | 项目源码 | 文件源码
def get_test_image_file_jpeg(filename='test.jpg', colour='white', size=(640, 480)):
    f = BytesIO()
    image = PIL.Image.new('RGB', size, colour)
    image.save(f, 'JPEG')
    return ImageFile(f, name=filename)
项目:Wagtail-Image-Folders    作者:anteatersa    | 项目源码 | 文件源码
def test_runs_operations_without_env_argument(self):
        # The "env" argument was added in Wagtail 1.5. This tests that
        # image operations written for 1.4 will still work

        run_mock = Mock()

        def run(willow, image):
            run_mock(willow, image)

        self.operation_instance.run = run

        fil = Filter(spec='operation1|operation2')
        image = Image.objects.create(
            title="Test image",
            file=get_test_image_file(),
        )

        with warnings.catch_warnings(record=True) as ws:
            warnings.simplefilter('always')

            fil.run(image, BytesIO())

            self.assertEqual(len(ws), 2)
            self.assertIs(ws[0].category, RemovedInWagtail19Warning)

        self.assertEqual(run_mock.call_count, 2)
项目:Wagtail-Image-Folders    作者:anteatersa    | 项目源码 | 文件源码
def test_jpeg(self):
        fil = Filter(spec='width-400|format-jpeg')
        image = Image.objects.create(
            title="Test image",
            file=get_test_image_file(),
        )
        out = fil.run(image, BytesIO())

        self.assertEqual(out.format_name, 'jpeg')
项目:Wagtail-Image-Folders    作者:anteatersa    | 项目源码 | 文件源码
def test_gif(self):
        fil = Filter(spec='width-400|format-gif')
        image = Image.objects.create(
            title="Test image",
            file=get_test_image_file(),
        )
        out = fil.run(image, BytesIO())

        self.assertEqual(out.format_name, 'gif')
项目:Wagtail-Image-Folders    作者:anteatersa    | 项目源码 | 文件源码
def test_invalid(self):
        fil = Filter(spec='width-400|format-foo')
        image = Image.objects.create(
            title="Test image",
            file=get_test_image_file(),
        )
        self.assertRaises(InvalidFilterSpecError, fil.run, image, BytesIO())
项目:Wagtail-Image-Folders    作者:anteatersa    | 项目源码 | 文件源码
def test_default_quality(self):
        fil = Filter(spec='width-400')
        image = Image.objects.create(
            title="Test image",
            file=get_test_image_file_jpeg(),
        )

        f = BytesIO()
        with patch('PIL.Image.Image.save') as save:
            fil.run(image, f)

        save.assert_called_with(f, 'JPEG', quality=85, optimize=True, progressive=True)
项目:Wagtail-Image-Folders    作者:anteatersa    | 项目源码 | 文件源码
def test_jpeg_quality_filter(self):
        fil = Filter(spec='width-400|jpegquality-40')
        image = Image.objects.create(
            title="Test image",
            file=get_test_image_file_jpeg(),
        )

        f = BytesIO()
        with patch('PIL.Image.Image.save') as save:
            fil.run(image, f)

        save.assert_called_with(f, 'JPEG', quality=40, optimize=True, progressive=True)
项目:Wagtail-Image-Folders    作者:anteatersa    | 项目源码 | 文件源码
def test_jpeg_quality_filter_invalid(self):
        fil = Filter(spec='width-400|jpegquality-abc')
        image = Image.objects.create(
            title="Test image",
            file=get_test_image_file_jpeg(),
        )
        self.assertRaises(InvalidFilterSpecError, fil.run, image, BytesIO())
项目:Wagtail-Image-Folders    作者:anteatersa    | 项目源码 | 文件源码
def test_jpeg_quality_filter_too_big(self):
        fil = Filter(spec='width-400|jpegquality-101')
        image = Image.objects.create(
            title="Test image",
            file=get_test_image_file_jpeg(),
        )
        self.assertRaises(InvalidFilterSpecError, fil.run, image, BytesIO())
项目:Wagtail-Image-Folders    作者:anteatersa    | 项目源码 | 文件源码
def test_jpeg_quality_setting(self):
        fil = Filter(spec='width-400')
        image = Image.objects.create(
            title="Test image",
            file=get_test_image_file_jpeg(),
        )

        f = BytesIO()
        with patch('PIL.Image.Image.save') as save:
            fil.run(image, f)

        save.assert_called_with(f, 'JPEG', quality=50, optimize=True, progressive=True)
项目:Wagtail-Image-Folders    作者:anteatersa    | 项目源码 | 文件源码
def test_jpeg_quality_filter_overrides_setting(self):
        fil = Filter(spec='width-400|jpegquality-40')
        image = Image.objects.create(
            title="Test image",
            file=get_test_image_file_jpeg(),
        )

        f = BytesIO()
        with patch('PIL.Image.Image.save') as save:
            fil.run(image, f)

        save.assert_called_with(f, 'JPEG', quality=40, optimize=True, progressive=True)
项目:geekpoint    作者:Lujinghu    | 项目源码 | 文件源码
def as_bytes(self, unixfrom=False, linesep='\n'):
            """Return the entire formatted message as bytes.
            Optional `unixfrom' when True, means include the Unix From_ envelope
            header.

            This overrides the default as_bytes() implementation to not mangle
            lines that begin with 'From '. See bug #13433 for details.
            """
            fp = six.BytesIO()
            g = generator.BytesGenerator(fp, mangle_from_=False)
            g.flatten(self, unixfrom=unixfrom, linesep=linesep)
            return fp.getvalue()
项目:django-wechat-api    作者:crazy-canux    | 项目源码 | 文件源码
def as_bytes(self, unixfrom=False, linesep='\n'):
            """Return the entire formatted message as bytes.
            Optional `unixfrom' when True, means include the Unix From_ envelope
            header.

            This overrides the default as_bytes() implementation to not mangle
            lines that begin with 'From '. See bug #13433 for details.
            """
            fp = six.BytesIO()
            g = generator.BytesGenerator(fp, mangle_from_=False)
            g.flatten(self, unixfrom=unixfrom, linesep=linesep)
            return fp.getvalue()
项目:Wagtail-Image-Folders    作者:anteatersa    | 项目源码 | 文件源码
def get_rendition(self, filter):
        if isinstance(filter, string_types):
            filter = Filter(spec=filter)

        cache_key = filter.get_cache_key(self)
        Rendition = self.get_rendition_model()

        try:
            rendition = self.renditions.get(
                filter_spec=filter.spec,
                focal_point_key=cache_key,
            )
        except Rendition.DoesNotExist:
            # Generate the rendition image
            generated_image = filter.run(self, BytesIO())

            # Generate filename
            input_filename = os.path.basename(self.file.name)
            input_filename_without_extension, input_extension = os.path.splitext(input_filename)

            # A mapping of image formats to extensions
            FORMAT_EXTENSIONS = {
                'jpeg': '.jpg',
                'png': '.png',
                'gif': '.gif',
            }

            output_extension = filter.spec.replace('|', '.') + FORMAT_EXTENSIONS[generated_image.format_name]
            if cache_key:
                output_extension = cache_key + '.' + output_extension

            # Truncate filename to prevent it going over 60 chars
            output_filename_without_extension = input_filename_without_extension[:(59 - len(output_extension))]
            output_filename = output_filename_without_extension + '.' + output_extension

            rendition, created = self.renditions.get_or_create(
                filter_spec=filter.spec,
                focal_point_key=cache_key,
                defaults={'file': File(generated_image.f, name=output_filename)}
            )

        return rendition