Python codecs 模块,getencoder() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用codecs.getencoder()

项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def __init__(self, stream, fieldnames, encoding='utf-8', **kwds):
    """Initialzer.

    Args:
      stream: Stream to write to.
      fieldnames: Fieldnames to pass to the DictWriter.
      encoding: Desired encoding.
      kwds: Additional arguments to pass to the DictWriter.
    """

    writer = codecs.getwriter(encoding)



    if (writer is encodings.utf_8.StreamWriter or
        writer is encodings.ascii.StreamWriter or
        writer is encodings.latin_1.StreamWriter or
        writer is encodings.cp1252.StreamWriter):
      self.no_recoding = True
      self.encoder = codecs.getencoder(encoding)
      self.writer = csv.DictWriter(stream, fieldnames, **kwds)
    else:
      self.no_recoding = False
      self.encoder = codecs.getencoder('utf-8')
      self.queue = cStringIO.StringIO()
      self.writer = csv.DictWriter(self.queue, fieldnames, **kwds)
      self.stream = writer(stream)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_all(self):
        api = (
            "encode", "decode",
            "register", "CodecInfo", "Codec", "IncrementalEncoder",
            "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
            "getencoder", "getdecoder", "getincrementalencoder",
            "getincrementaldecoder", "getreader", "getwriter",
            "register_error", "lookup_error",
            "strict_errors", "replace_errors", "ignore_errors",
            "xmlcharrefreplace_errors", "backslashreplace_errors",
            "open", "EncodedFile",
            "iterencode", "iterdecode",
            "BOM", "BOM_BE", "BOM_LE",
            "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
            "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
            "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE",  # Undocumented
            "StreamReaderWriter", "StreamRecoder",
        )
        self.assertEqual(sorted(api), sorted(codecs.__all__))
        for api in codecs.__all__:
            getattr(codecs, api)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_all(self):
        api = (
            "encode", "decode",
            "register", "CodecInfo", "Codec", "IncrementalEncoder",
            "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
            "getencoder", "getdecoder", "getincrementalencoder",
            "getincrementaldecoder", "getreader", "getwriter",
            "register_error", "lookup_error",
            "strict_errors", "replace_errors", "ignore_errors",
            "xmlcharrefreplace_errors", "backslashreplace_errors",
            "open", "EncodedFile",
            "iterencode", "iterdecode",
            "BOM", "BOM_BE", "BOM_LE",
            "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
            "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
            "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE",  # Undocumented
            "StreamReaderWriter", "StreamRecoder",
        )
        self.assertEqual(sorted(api), sorted(codecs.__all__))
        for api in codecs.__all__:
            getattr(codecs, api)
项目:dane-monitoring-plugins    作者:siccegge    | 项目源码 | 文件源码
def get_tlsa_records(resolver, name):
    """Extracts all TLSA records for a given name"""

    logging.debug("searching for TLSA record on %s", name)
    s, r = resolver.resolve(name, rrtype=RR_TYPE_TLSA)
    if 0 != s:
        ub_strerror(s)
        return

    if r.data is None:
        logging.warn("No TLSA record returned")
        return set()

    result = set()
    for record in r.data.data:
        hexencoder = codecs.getencoder('hex')
        usage = ord(record[0])
        selector = ord(record[1])
        matching = ord(record[2])
        data = record[3:]
        result.add(TLSARecord(usage, selector, matching, data))

    return result
项目:OpenSky_BL    作者:fishpepper    | 项目源码 | 文件源码
def testInvalidUtf8(self):
    def DoTest(self, raw_bytes, has_invalid_utf8):
      error_collector = ErrorCollector(self.assert_)
      cpplint.ProcessFileData(
          'foo.cc', 'cc',
          unicode(raw_bytes, 'utf8', 'replace').split('\n'),
          error_collector)
      # The warning appears only once.
      self.assertEquals(
          int(has_invalid_utf8),
          error_collector.Results().count(
              'Line contains invalid UTF-8'
              ' (or Unicode replacement character).'
              '  [readability/utf8] [5]'))

    DoTest(self, 'Hello world\n', False)
    DoTest(self, '\xe9\x8e\xbd\n', False)
    DoTest(self, '\xe9x\x8e\xbd\n', True)
    # This is the encoding of the replacement character itself (which
    # you can see by evaluating codecs.getencoder('utf8')(u'\ufffd')).
    DoTest(self, '\xef\xbf\xbd\n', True)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_all(self):
        api = (
            "encode", "decode",
            "register", "CodecInfo", "Codec", "IncrementalEncoder",
            "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
            "getencoder", "getdecoder", "getincrementalencoder",
            "getincrementaldecoder", "getreader", "getwriter",
            "register_error", "lookup_error",
            "strict_errors", "replace_errors", "ignore_errors",
            "xmlcharrefreplace_errors", "backslashreplace_errors",
            "open", "EncodedFile",
            "iterencode", "iterdecode",
            "BOM", "BOM_BE", "BOM_LE",
            "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
            "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
            "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE",  # Undocumented
            "StreamReaderWriter", "StreamRecoder",
        )
        self.assertCountEqual(api, codecs.__all__)
        for api in codecs.__all__:
            getattr(codecs, api)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def bind_processor(self, dialect):
        if self.convert_unicode or dialect.convert_unicode:
            if dialect.supports_unicode_binds and \
                    self.convert_unicode != 'force':
                if self._warn_on_bytestring:
                    def process(value):
                        if isinstance(value, util.binary_type):
                            util.warn_limited(
                                "Unicode type received non-unicode "
                                "bind param value %r.",
                                (util.ellipses_string(value),))
                        return value
                    return process
                else:
                    return None
            else:
                encoder = codecs.getencoder(dialect.encoding)
                warn_on_bytestring = self._warn_on_bytestring

                def process(value):
                    if isinstance(value, util.text_type):
                        return encoder(value, self.unicode_error)[0]
                    elif warn_on_bytestring and value is not None:
                        util.warn_limited(
                            "Unicode type received non-unicode bind "
                            "param value %r.",
                            (util.ellipses_string(value),))
                    return value
            return process
        else:
            return None
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def quote_value(self, value):
        # The backend "mostly works" without this function and there are use
        # cases for compiling Python without the sqlite3 libraries (e.g.
        # security hardening).
        import sqlite3
        try:
            value = sqlite3.adapt(value)
        except sqlite3.ProgrammingError:
            pass
        # Manual emulation of SQLite parameter quoting
        if isinstance(value, type(True)):
            return str(int(value))
        elif isinstance(value, (Decimal, float)):
            return str(value)
        elif isinstance(value, six.integer_types):
            return str(value)
        elif isinstance(value, six.string_types):
            return "'%s'" % six.text_type(value).replace("\'", "\'\'")
        elif value is None:
            return "NULL"
        elif isinstance(value, (bytes, bytearray, six.memoryview)):
            # Bytes are only allowed for BLOB fields, encoded as string
            # literals containing hexadecimal data and preceded by a single "X"
            # character:
            # value = b'\x01\x02' => value_hex = b'0102' => return X'0102'
            value = bytes(value)
            hex_encoder = codecs.getencoder('hex_codec')
            value_hex, _length = hex_encoder(value)
            # Use 'ascii' encoding for b'01' => '01', no need to use force_text here.
            return "X'%s'" % value_hex.decode('ascii')
        else:
            raise ValueError("Cannot quote parameter value %r of type %s" % (value, type(value)))
项目:ripe-atlas-scripts    作者:farrokhi    | 项目源码 | 文件源码
def _bytes_as_hex_str(cls, b):
        b_as_hex = codecs.getencoder('hex_codec')(b)[0]
        b_as_hex_str = b_as_hex.decode(cls.DNS_CTYPE)
        return b_as_hex_str
项目:instagram_private_api    作者:ping    | 项目源码 | 文件源码
def iter(self, fields, files):
        """
        :param fields: sequence of (name, value) elements for regular form fields
        :param files: sequence of (name, filename, contenttype, filedata) elements for data to be uploaded as files
        :return:
        """
        encoder = codecs.getencoder('utf-8')
        for (key, value) in fields:
            key = self.u(key)
            yield encoder('--{}\r\n'.format(self.boundary))
            yield encoder(self.u('Content-Disposition: form-data; name="{}"\r\n').format(key))
            yield encoder('\r\n')
            if isinstance(value, (int, float)):
                value = str(value)
            yield encoder(self.u(value))
            yield encoder('\r\n')
        for (key, filename, contenttype, fd) in files:
            key = self.u(key)
            filename = self.u(filename)
            yield encoder('--{}\r\n'.format(self.boundary))
            yield encoder(self.u('Content-Disposition: form-data; name="{}"; filename="{}"\r\n').format(key, filename))
            yield encoder('Content-Type: {}\r\n'.format(
                contenttype or mimetypes.guess_type(filename)[0] or 'application/octet-stream'))
            yield encoder('Content-Transfer-Encoding: binary\r\n')
            yield encoder('\r\n')
            yield (fd, len(fd))
            yield encoder('\r\n')
        yield encoder('--{}--\r\n'.format(self.boundary))
项目:instagram_private_api    作者:ping    | 项目源码 | 文件源码
def iter(self, fields, files):
        """
        :param fields: sequence of (name, value) elements for regular form fields
        :param files: sequence of (name, filename, contenttype, filedata) elements for data to be uploaded as files
        :return:
        """
        encoder = codecs.getencoder('utf-8')
        for (key, value) in fields:
            key = self.u(key)
            yield encoder('--{}\r\n'.format(self.boundary))
            yield encoder(self.u('Content-Disposition: form-data; name="{}"\r\n').format(key))
            yield encoder('\r\n')
            if isinstance(value, (int, float)):
                value = str(value)
            yield encoder(self.u(value))
            yield encoder('\r\n')
        for (key, filename, contenttype, fd) in files:
            key = self.u(key)
            filename = self.u(filename)
            yield encoder('--{}\r\n'.format(self.boundary))
            yield encoder(self.u('Content-Disposition: form-data; name="{}"; filename="{}"\r\n').format(key, filename))
            yield encoder('Content-Type: {}\r\n'.format(
                contenttype or mimetypes.guess_type(filename)[0] or 'application/octet-stream'))
            yield encoder('Content-Transfer-Encoding: binary\r\n')
            yield encoder('\r\n')
            yield (fd, len(fd))
            yield encoder('\r\n')
        yield encoder('--{}--\r\n'.format(self.boundary))
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def bind_processor(self, dialect):
        if self.convert_unicode or dialect.convert_unicode:
            if dialect.supports_unicode_binds and \
                    self.convert_unicode != 'force':
                if self._warn_on_bytestring:
                    def process(value):
                        if isinstance(value, util.binary_type):
                            util.warn_limited(
                                "Unicode type received non-unicode "
                                "bind param value %r.",
                                (util.ellipses_string(value),))
                        return value
                    return process
                else:
                    return None
            else:
                encoder = codecs.getencoder(dialect.encoding)
                warn_on_bytestring = self._warn_on_bytestring

                def process(value):
                    if isinstance(value, util.text_type):
                        return encoder(value, self.unicode_error)[0]
                    elif warn_on_bytestring and value is not None:
                        util.warn_limited(
                            "Unicode type received non-unicode bind "
                            "param value %r.",
                            (util.ellipses_string(value),))
                    return value
            return process
        else:
            return None
项目:spiderfoot    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def hexencode(b):
    if sys.version_info[0] < 3:
        return b.encode('hex')
    else:
        import codecs
        coder = codecs.getencoder('hex_codec')
        return coder(b)[0]
项目:cassandra-migrate    作者:Cobliteam    | 项目源码 | 文件源码
def _bytes_to_hex(bs):
        return codecs.getencoder('hex')(bs)[0]
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def bind_processor(self, dialect):
        if self.convert_unicode or dialect.convert_unicode:
            if dialect.supports_unicode_binds and \
                    self.convert_unicode != 'force':
                if self._warn_on_bytestring:
                    def process(value):
                        if isinstance(value, util.binary_type):
                            util.warn("Unicode type received non-unicode"
                                      "bind param value.")
                        return value
                    return process
                else:
                    return None
            else:
                encoder = codecs.getencoder(dialect.encoding)
                warn_on_bytestring = self._warn_on_bytestring

                def process(value):
                    if isinstance(value, util.text_type):
                        return encoder(value, self.unicode_error)[0]
                    elif warn_on_bytestring and value is not None:
                        util.warn("Unicode type received non-unicode bind "
                                  "param value")
                    return value
            return process
        else:
            return None
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def bind_processor(self, dialect):
        if self.convert_unicode or dialect.convert_unicode:
            if dialect.supports_unicode_binds and \
                    self.convert_unicode != 'force':
                if self._warn_on_bytestring:
                    def process(value):
                        if isinstance(value, util.binary_type):
                            util.warn("Unicode type received non-unicode"
                                      "bind param value.")
                        return value
                    return process
                else:
                    return None
            else:
                encoder = codecs.getencoder(dialect.encoding)
                warn_on_bytestring = self._warn_on_bytestring

                def process(value):
                    if isinstance(value, util.text_type):
                        return encoder(value, self.unicode_error)[0]
                    elif warn_on_bytestring and value is not None:
                        util.warn("Unicode type received non-unicode bind "
                                  "param value")
                    return value
            return process
        else:
            return None
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_encode_length(self):
        # Issue 3739
        encoder = codecs.getencoder("unicode_internal")
        self.assertEqual(encoder("a")[1], 1)
        self.assertEqual(encoder("\xe9\u0142")[1], 2)

        self.assertEqual(codecs.escape_encode(br'\x00')[1], 4)

# From http://www.gnu.org/software/libidn/draft-josefsson-idn-test-vectors.html
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_getencoder(self):
        self.assertRaises(TypeError, codecs.getencoder)
        self.assertRaises(LookupError, codecs.getencoder, "__spam__")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_bad_encode_args(self):
        for encoding in all_unicode_encodings:
            encoder = codecs.getencoder(encoding)
            self.assertRaises(TypeError, encoder)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_basics(self):
        binput = bytes(range(256))
        for encoding in bytes_transform_encodings:
            # generic codecs interface
            (o, size) = codecs.getencoder(encoding)(binput)
            self.assertEqual(size, len(binput))
            (i, size) = codecs.getdecoder(encoding)(o)
            self.assertEqual(size, len(o))
            self.assertEqual(i, binput)
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def bind_processor(self, dialect):
        if self.convert_unicode or dialect.convert_unicode:
            if dialect.supports_unicode_binds and \
                    self.convert_unicode != 'force':
                if self._warn_on_bytestring:
                    def process(value):
                        if isinstance(value, util.binary_type):
                            util.warn_limited(
                                "Unicode type received non-unicode "
                                "bind param value %r.",
                                (util.ellipses_string(value),))
                        return value
                    return process
                else:
                    return None
            else:
                encoder = codecs.getencoder(dialect.encoding)
                warn_on_bytestring = self._warn_on_bytestring

                def process(value):
                    if isinstance(value, util.text_type):
                        return encoder(value, self.unicode_error)[0]
                    elif warn_on_bytestring and value is not None:
                        util.warn_limited(
                            "Unicode type received non-unicode bind "
                            "param value %r.",
                            (util.ellipses_string(value),))
                    return value
            return process
        else:
            return None
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def bind_processor(self, dialect):
        if self.convert_unicode or dialect.convert_unicode:
            if dialect.supports_unicode_binds and \
                    self.convert_unicode != 'force':
                if self._warn_on_bytestring:
                    def process(value):
                        if isinstance(value, util.binary_type):
                            util.warn_limited(
                                "Unicode type received non-unicode "
                                "bind param value %r.",
                                (util.ellipses_string(value),))
                        return value
                    return process
                else:
                    return None
            else:
                encoder = codecs.getencoder(dialect.encoding)
                warn_on_bytestring = self._warn_on_bytestring

                def process(value):
                    if isinstance(value, util.text_type):
                        return encoder(value, self.unicode_error)[0]
                    elif warn_on_bytestring and value is not None:
                        util.warn_limited(
                            "Unicode type received non-unicode bind "
                            "param value %r.",
                            (util.ellipses_string(value),))
                    return value
            return process
        else:
            return None
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def bind_processor(self, dialect):
        if self.convert_unicode or dialect.convert_unicode:
            if dialect.supports_unicode_binds and \
                self.convert_unicode != 'force':
                if self._warn_on_bytestring:
                    def process(value):
                        # Py3K
                        #if isinstance(value, bytes):
                        # Py2K
                        if isinstance(value, str):
                        # end Py2K
                            util.warn("Unicode type received non-unicode bind "
                                      "param value.")
                        return value
                    return process
                else:
                    return None
            else:
                encoder = codecs.getencoder(dialect.encoding)
                warn_on_bytestring = self._warn_on_bytestring

                def process(value):
                    if isinstance(value, unicode):
                        return encoder(value, self.unicode_error)[0]
                    elif warn_on_bytestring and value is not None:
                        util.warn("Unicode type received non-unicode bind "
                                  "param value")
                    return value
            return process
        else:
            return None
项目:ircproto    作者:agronholm    | 项目源码 | 文件源码
def __init__(self, output_encoding='utf-8', input_encoding='utf-8',
                 fallback_encoding='iso-8859-1'):
        self.output_codec = codecs.getencoder(output_encoding)
        self.input_decoder = codecs.getdecoder(input_encoding)
        self.fallback_decoder = codecs.getdecoder(fallback_encoding)
        self._input_buffer = bytearray()
        self._output_buffer = bytearray()
        self._closed = False
项目:DjangoBlog    作者:0daybug    | 项目源码 | 文件源码
def quote_value(self, value):
        # The backend "mostly works" without this function and there are use
        # cases for compiling Python without the sqlite3 libraries (e.g.
        # security hardening).
        import _sqlite3
        try:
            value = _sqlite3.adapt(value)
        except _sqlite3.ProgrammingError:
            pass
        # Manual emulation of SQLite parameter quoting
        if isinstance(value, type(True)):
            return str(int(value))
        elif isinstance(value, (Decimal, float)):
            return str(value)
        elif isinstance(value, six.integer_types):
            return str(value)
        elif isinstance(value, six.string_types):
            return "'%s'" % six.text_type(value).replace("\'", "\'\'")
        elif value is None:
            return "NULL"
        elif isinstance(value, (bytes, bytearray, six.memoryview)):
            # Bytes are only allowed for BLOB fields, encoded as string
            # literals containing hexadecimal data and preceded by a single "X"
            # character:
            # value = b'\x01\x02' => value_hex = b'0102' => return X'0102'
            value = bytes(value)
            hex_encoder = codecs.getencoder('hex_codec')
            value_hex, _length = hex_encoder(value)
            # Use 'ascii' encoding for b'01' => '01', no need to use force_text here.
            return "X'%s'" % value_hex.decode('ascii')
        else:
            raise ValueError("Cannot quote parameter value %r of type %s" % (value, type(value)))
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def bind_processor(self, dialect):
        if self.convert_unicode or dialect.convert_unicode:
            if dialect.supports_unicode_binds and \
                    self.convert_unicode != 'force':
                if self._warn_on_bytestring:
                    def process(value):
                        if isinstance(value, util.binary_type):
                            util.warn_limited(
                                "Unicode type received non-unicode "
                                "bind param value %r.",
                                (util.ellipses_string(value),))
                        return value
                    return process
                else:
                    return None
            else:
                encoder = codecs.getencoder(dialect.encoding)
                warn_on_bytestring = self._warn_on_bytestring

                def process(value):
                    if isinstance(value, util.text_type):
                        return encoder(value, self.unicode_error)[0]
                    elif warn_on_bytestring and value is not None:
                        util.warn_limited(
                            "Unicode type received non-unicode bind "
                            "param value %r.",
                            (util.ellipses_string(value),))
                    return value
            return process
        else:
            return None
项目:PySteam    作者:russelg    | 项目源码 | 文件源码
def generate_session_id():
    """Generates a random session ID for Steam.

    Returns
    -------
    str
        A random 12 byte hex string.
    """
    return codecs.getencoder('hex')(Random.get_random_bytes(12))[0].decode('utf-8')
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_encode_length(self):
        # Issue 3739
        encoder = codecs.getencoder("unicode_internal")
        self.assertEqual(encoder(u"a")[1], 1)
        self.assertEqual(encoder(u"\xe9\u0142")[1], 2)

        encoder = codecs.getencoder("string-escape")
        self.assertEqual(encoder(r'\x00')[1], 4)

# From http://www.gnu.org/software/libidn/draft-josefsson-idn-test-vectors.html
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_getencoder(self):
        self.assertRaises(TypeError, codecs.getencoder)
        self.assertRaises(LookupError, codecs.getencoder, "__spam__")
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_bad_encode_args(self):
        for encoding in all_unicode_encodings:
            encoder = codecs.getencoder(encoding)
            self.assertRaises(TypeError, encoder)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_basics(self):
        s = "abc123"
        for encoding in all_string_encodings:
            (bytes, size) = codecs.getencoder(encoding)(s)
            self.assertEqual(size, len(s))
            (chars, size) = codecs.getdecoder(encoding)(bytes)
            self.assertEqual(chars, s, "%r != %r (encoding=%r)" % (chars, s, encoding))
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_encode_length(self):
        # Issue 3739
        encoder = codecs.getencoder("unicode_internal")
        self.assertEqual(encoder(u"a")[1], 1)
        self.assertEqual(encoder(u"\xe9\u0142")[1], 2)

        encoder = codecs.getencoder("string-escape")
        self.assertEqual(encoder(r'\x00')[1], 4)

# From http://www.gnu.org/software/libidn/draft-josefsson-idn-test-vectors.html
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_getencoder(self):
        self.assertRaises(TypeError, codecs.getencoder)
        self.assertRaises(LookupError, codecs.getencoder, "__spam__")
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_bad_encode_args(self):
        for encoding in all_unicode_encodings:
            encoder = codecs.getencoder(encoding)
            self.assertRaises(TypeError, encoder)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_basics(self):
        s = "abc123"
        for encoding in all_string_encodings:
            (bytes, size) = codecs.getencoder(encoding)(s)
            self.assertEqual(size, len(s))
            (chars, size) = codecs.getdecoder(encoding)(bytes)
            self.assertEqual(chars, s, "%r != %r (encoding=%r)" % (chars, s, encoding))
项目:StoolDesign    作者:brianekins    | 项目源码 | 文件源码
def iter(self, fields, files):
        """
        fields is a sequence of (name, value) elements for regular form fields.
        files is a sequence of (name, filename, file-type) elements for data to be uploaded as files
        Yield body's chunk as bytes
        """
        encoder = codecs.getencoder('utf-8')
        for (key, value) in fields:
            key = self.u(key)
            yield encoder('--{}\r\n'.format(self.boundary))
            yield encoder(self.u('Content-Disposition: form-data; name="{}"\r\n').format(key))
            yield encoder('\r\n')
            if isinstance(value, int) or isinstance(value, float):
                value = str(value)
            yield encoder(self.u(value))
            yield encoder('\r\n')
        for (key, filename, fd) in files:
            key = self.u(key)
            filename = self.u(filename)
            yield encoder('--{}\r\n'.format(self.boundary))
            yield encoder(self.u('Content-Disposition: form-data; name="{}"; filename="{}"\r\n').format(key, filename))
            yield encoder('Content-Type: {}\r\n'.format(mimetypes.guess_type(filename)[0] or 'application/octet-stream'))
            yield encoder('\r\n')
            with fd:
                buff = fd.read()
                yield (buff, len(buff))
            yield encoder('\r\n')
        yield encoder('--{}--\r\n'.format(self.boundary))
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def bind_processor(self, dialect):
        if self.convert_unicode or dialect.convert_unicode:
            if dialect.supports_unicode_binds and \
                    self.convert_unicode != 'force':
                if self._warn_on_bytestring:
                    def process(value):
                        if isinstance(value, util.binary_type):
                            util.warn_limited(
                                "Unicode type received non-unicode "
                                "bind param value %r.",
                                (util.ellipses_string(value),))
                        return value
                    return process
                else:
                    return None
            else:
                encoder = codecs.getencoder(dialect.encoding)
                warn_on_bytestring = self._warn_on_bytestring

                def process(value):
                    if isinstance(value, util.text_type):
                        return encoder(value, self.unicode_error)[0]
                    elif warn_on_bytestring and value is not None:
                        util.warn_limited(
                            "Unicode type received non-unicode bind "
                            "param value %r.",
                            (util.ellipses_string(value),))
                    return value
            return process
        else:
            return None
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def bind_processor(self, dialect):
        if self.convert_unicode or dialect.convert_unicode:
            if dialect.supports_unicode_binds and \
                    self.convert_unicode != 'force':
                if self._warn_on_bytestring:
                    def process(value):
                        if isinstance(value, util.binary_type):
                            util.warn_limited(
                                "Unicode type received non-unicode "
                                "bind param value %r.",
                                (util.ellipses_string(value),))
                        return value
                    return process
                else:
                    return None
            else:
                encoder = codecs.getencoder(dialect.encoding)
                warn_on_bytestring = self._warn_on_bytestring

                def process(value):
                    if isinstance(value, util.text_type):
                        return encoder(value, self.unicode_error)[0]
                    elif warn_on_bytestring and value is not None:
                        util.warn_limited(
                            "Unicode type received non-unicode bind "
                            "param value %r.",
                            (util.ellipses_string(value),))
                    return value
            return process
        else:
            return None
项目:trydjango18    作者:wei0104    | 项目源码 | 文件源码
def quote_value(self, value):
        try:
            value = _sqlite3.adapt(value)
        except _sqlite3.ProgrammingError:
            pass
        # Manual emulation of SQLite parameter quoting
        if isinstance(value, type(True)):
            return str(int(value))
        elif isinstance(value, (Decimal, float)):
            return str(value)
        elif isinstance(value, six.integer_types):
            return str(value)
        elif isinstance(value, six.string_types):
            return "'%s'" % six.text_type(value).replace("\'", "\'\'")
        elif value is None:
            return "NULL"
        elif isinstance(value, (bytes, bytearray, six.memoryview)):
            # Bytes are only allowed for BLOB fields, encoded as string
            # literals containing hexadecimal data and preceded by a single "X"
            # character:
            # value = b'\x01\x02' => value_hex = b'0102' => return X'0102'
            value = bytes(value)
            hex_encoder = codecs.getencoder('hex_codec')
            value_hex, _length = hex_encoder(value)
            # Use 'ascii' encoding for b'01' => '01', no need to use force_text here.
            return "X'%s'" % value_hex.decode('ascii')
        else:
            raise ValueError("Cannot quote parameter value %r of type %s" % (value, type(value)))
项目:Callandtext    作者:iaora    | 项目源码 | 文件源码
def bind_processor(self, dialect):
        if self.convert_unicode or dialect.convert_unicode:
            if dialect.supports_unicode_binds and \
                    self.convert_unicode != 'force':
                if self._warn_on_bytestring:
                    def process(value):
                        if isinstance(value, util.binary_type):
                            util.warn_limited(
                                "Unicode type received non-unicode "
                                "bind param value %r.",
                                (util.ellipses_string(value),))
                        return value
                    return process
                else:
                    return None
            else:
                encoder = codecs.getencoder(dialect.encoding)
                warn_on_bytestring = self._warn_on_bytestring

                def process(value):
                    if isinstance(value, util.text_type):
                        return encoder(value, self.unicode_error)[0]
                    elif warn_on_bytestring and value is not None:
                        util.warn_limited(
                            "Unicode type received non-unicode bind "
                            "param value %r.",
                            (util.ellipses_string(value),))
                    return value
            return process
        else:
            return None
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_encode_length(self):
        with support.check_warnings(('unicode_internal codec has been '
                                     'deprecated', DeprecationWarning)):
            # Issue 3739
            encoder = codecs.getencoder("unicode_internal")
            self.assertEqual(encoder("a")[1], 1)
            self.assertEqual(encoder("\xe9\u0142")[1], 2)

            self.assertEqual(codecs.escape_encode(br'\x00')[1], 4)

# From http://www.gnu.org/software/libidn/draft-josefsson-idn-test-vectors.html
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_getencoder(self):
        self.assertRaises(TypeError, codecs.getencoder)
        self.assertRaises(LookupError, codecs.getencoder, "__spam__")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_bad_encode_args(self):
        for encoding in all_unicode_encodings:
            encoder = codecs.getencoder(encoding)
            with support.check_warnings():
                # unicode-internal has been deprecated
                self.assertRaises(TypeError, encoder)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_basics(self):
        binput = bytes(range(256))
        for encoding in bytes_transform_encodings:
            # generic codecs interface
            (o, size) = codecs.getencoder(encoding)(binput)
            self.assertEqual(size, len(binput))
            (i, size) = codecs.getdecoder(encoding)(o)
            self.assertEqual(size, len(o))
            self.assertEqual(i, binput)
项目:python_ddd_flask    作者:igorvinnicius    | 项目源码 | 文件源码
def bind_processor(self, dialect):
        if self.convert_unicode or dialect.convert_unicode:
            if dialect.supports_unicode_binds and \
                    self.convert_unicode != 'force':
                if self._warn_on_bytestring:
                    def process(value):
                        if isinstance(value, util.binary_type):
                            util.warn_limited(
                                "Unicode type received non-unicode "
                                "bind param value %r.",
                                (util.ellipses_string(value),))
                        return value
                    return process
                else:
                    return None
            else:
                encoder = codecs.getencoder(dialect.encoding)
                warn_on_bytestring = self._warn_on_bytestring

                def process(value):
                    if isinstance(value, util.text_type):
                        return encoder(value, self.unicode_error)[0]
                    elif warn_on_bytestring and value is not None:
                        util.warn_limited(
                            "Unicode type received non-unicode bind "
                            "param value %r.",
                            (util.ellipses_string(value),))
                    return value
            return process
        else:
            return None
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def quote_value(self, value):
        # The backend "mostly works" without this function and there are use
        # cases for compiling Python without the sqlite3 libraries (e.g.
        # security hardening).
        try:
            import sqlite3
            value = sqlite3.adapt(value)
        except ImportError:
            pass
        except sqlite3.ProgrammingError:
            pass
        # Manual emulation of SQLite parameter quoting
        if isinstance(value, type(True)):
            return str(int(value))
        elif isinstance(value, (Decimal, float)):
            return str(value)
        elif isinstance(value, six.integer_types):
            return str(value)
        elif isinstance(value, six.string_types):
            return "'%s'" % six.text_type(value).replace("\'", "\'\'")
        elif value is None:
            return "NULL"
        elif isinstance(value, (bytes, bytearray, six.memoryview)):
            # Bytes are only allowed for BLOB fields, encoded as string
            # literals containing hexadecimal data and preceded by a single "X"
            # character:
            # value = b'\x01\x02' => value_hex = b'0102' => return X'0102'
            value = bytes(value)
            hex_encoder = codecs.getencoder('hex_codec')
            value_hex, _length = hex_encoder(value)
            # Use 'ascii' encoding for b'01' => '01', no need to use force_text here.
            return "X'%s'" % value_hex.decode('ascii')
        else:
            raise ValueError("Cannot quote parameter value %r of type %s" % (value, type(value)))
项目:dane-monitoring-plugins    作者:siccegge    | 项目源码 | 文件源码
def __repr__(self):
        hexencoder = codecs.getencoder('hex')
        return '<TLSA %d %d %d %s>' % (self._usage, self._selector, self._matching, hexencoder(self._payload)[0].decode())
项目:dbf    作者:wenzhihong2003    | 项目源码 | 文件源码
def codepage(self, codepage):
        if not isinstance(codepage, CodePage):
            raise TypeError("codepage should be a CodePage, not a %r" % type(codepage))
        meta = self._meta
        if meta.status != READ_WRITE:
            raise DbfError('%s not in read/write mode, unable to change codepage' % meta.filename)
        meta.header.codepage(codepage.code)
        meta.decoder = codecs.getdecoder(codepage.name)
        meta.encoder = codecs.getencoder(codepage.name)
        self._update_disk(headeronly=True)
项目:dbf    作者:wenzhihong2003    | 项目源码 | 文件源码
def codepage(self, codepage):
        if not isinstance(codepage, CodePage):
            raise TypeError("codepage should be a CodePage, not a %r" % type(codepage))
        meta = self._meta
        if meta.status != READ_WRITE:
            raise DbfError('%s not in read/write mode, unable to change codepage' % meta.filename)
        meta.header.codepage(codepage.code)
        meta.decoder = codecs.getdecoder(codepage.name)
        meta.encoder = codecs.getencoder(codepage.name)
        self._update_disk(headeronly=True)
项目:dbf    作者:wenzhihong2003    | 项目源码 | 文件源码
def codepage(self, codepage):
        if not isinstance(codepage, CodePage):
            raise TypeError("codepage should be a CodePage, not a %r" % type(codepage))
        meta = self._meta
        if meta.status != READ_WRITE:
            raise DbfError('%s not in read/write mode, unable to change codepage' % meta.filename)
        meta.header.codepage(codepage.code)
        meta.decoder = codecs.getdecoder(codepage.name)
        meta.encoder = codecs.getencoder(codepage.name)
        self._update_disk(headeronly=True)