我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用codecs.getencoder()。
def __init__(self, stream, fieldnames, encoding='utf-8', **kwds): """Initialzer. Args: stream: Stream to write to. fieldnames: Fieldnames to pass to the DictWriter. encoding: Desired encoding. kwds: Additional arguments to pass to the DictWriter. """ writer = codecs.getwriter(encoding) if (writer is encodings.utf_8.StreamWriter or writer is encodings.ascii.StreamWriter or writer is encodings.latin_1.StreamWriter or writer is encodings.cp1252.StreamWriter): self.no_recoding = True self.encoder = codecs.getencoder(encoding) self.writer = csv.DictWriter(stream, fieldnames, **kwds) else: self.no_recoding = False self.encoder = codecs.getencoder('utf-8') self.queue = cStringIO.StringIO() self.writer = csv.DictWriter(self.queue, fieldnames, **kwds) self.stream = writer(stream)
def test_all(self): api = ( "encode", "decode", "register", "CodecInfo", "Codec", "IncrementalEncoder", "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup", "getencoder", "getdecoder", "getincrementalencoder", "getincrementaldecoder", "getreader", "getwriter", "register_error", "lookup_error", "strict_errors", "replace_errors", "ignore_errors", "xmlcharrefreplace_errors", "backslashreplace_errors", "open", "EncodedFile", "iterencode", "iterdecode", "BOM", "BOM_BE", "BOM_LE", "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE", "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE", "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented "StreamReaderWriter", "StreamRecoder", ) self.assertEqual(sorted(api), sorted(codecs.__all__)) for api in codecs.__all__: getattr(codecs, api)
def get_tlsa_records(resolver, name): """Extracts all TLSA records for a given name""" logging.debug("searching for TLSA record on %s", name) s, r = resolver.resolve(name, rrtype=RR_TYPE_TLSA) if 0 != s: ub_strerror(s) return if r.data is None: logging.warn("No TLSA record returned") return set() result = set() for record in r.data.data: hexencoder = codecs.getencoder('hex') usage = ord(record[0]) selector = ord(record[1]) matching = ord(record[2]) data = record[3:] result.add(TLSARecord(usage, selector, matching, data)) return result
def testInvalidUtf8(self): def DoTest(self, raw_bytes, has_invalid_utf8): error_collector = ErrorCollector(self.assert_) cpplint.ProcessFileData( 'foo.cc', 'cc', unicode(raw_bytes, 'utf8', 'replace').split('\n'), error_collector) # The warning appears only once. self.assertEquals( int(has_invalid_utf8), error_collector.Results().count( 'Line contains invalid UTF-8' ' (or Unicode replacement character).' ' [readability/utf8] [5]')) DoTest(self, 'Hello world\n', False) DoTest(self, '\xe9\x8e\xbd\n', False) DoTest(self, '\xe9x\x8e\xbd\n', True) # This is the encoding of the replacement character itself (which # you can see by evaluating codecs.getencoder('utf8')(u'\ufffd')). DoTest(self, '\xef\xbf\xbd\n', True)
def test_all(self): api = ( "encode", "decode", "register", "CodecInfo", "Codec", "IncrementalEncoder", "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup", "getencoder", "getdecoder", "getincrementalencoder", "getincrementaldecoder", "getreader", "getwriter", "register_error", "lookup_error", "strict_errors", "replace_errors", "ignore_errors", "xmlcharrefreplace_errors", "backslashreplace_errors", "open", "EncodedFile", "iterencode", "iterdecode", "BOM", "BOM_BE", "BOM_LE", "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE", "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE", "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented "StreamReaderWriter", "StreamRecoder", ) self.assertCountEqual(api, codecs.__all__) for api in codecs.__all__: getattr(codecs, api)
def bind_processor(self, dialect): if self.convert_unicode or dialect.convert_unicode: if dialect.supports_unicode_binds and \ self.convert_unicode != 'force': if self._warn_on_bytestring: def process(value): if isinstance(value, util.binary_type): util.warn_limited( "Unicode type received non-unicode " "bind param value %r.", (util.ellipses_string(value),)) return value return process else: return None else: encoder = codecs.getencoder(dialect.encoding) warn_on_bytestring = self._warn_on_bytestring def process(value): if isinstance(value, util.text_type): return encoder(value, self.unicode_error)[0] elif warn_on_bytestring and value is not None: util.warn_limited( "Unicode type received non-unicode bind " "param value %r.", (util.ellipses_string(value),)) return value return process else: return None
def quote_value(self, value): # The backend "mostly works" without this function and there are use # cases for compiling Python without the sqlite3 libraries (e.g. # security hardening). import sqlite3 try: value = sqlite3.adapt(value) except sqlite3.ProgrammingError: pass # Manual emulation of SQLite parameter quoting if isinstance(value, type(True)): return str(int(value)) elif isinstance(value, (Decimal, float)): return str(value) elif isinstance(value, six.integer_types): return str(value) elif isinstance(value, six.string_types): return "'%s'" % six.text_type(value).replace("\'", "\'\'") elif value is None: return "NULL" elif isinstance(value, (bytes, bytearray, six.memoryview)): # Bytes are only allowed for BLOB fields, encoded as string # literals containing hexadecimal data and preceded by a single "X" # character: # value = b'\x01\x02' => value_hex = b'0102' => return X'0102' value = bytes(value) hex_encoder = codecs.getencoder('hex_codec') value_hex, _length = hex_encoder(value) # Use 'ascii' encoding for b'01' => '01', no need to use force_text here. return "X'%s'" % value_hex.decode('ascii') else: raise ValueError("Cannot quote parameter value %r of type %s" % (value, type(value)))
def _bytes_as_hex_str(cls, b): b_as_hex = codecs.getencoder('hex_codec')(b)[0] b_as_hex_str = b_as_hex.decode(cls.DNS_CTYPE) return b_as_hex_str
def iter(self, fields, files): """ :param fields: sequence of (name, value) elements for regular form fields :param files: sequence of (name, filename, contenttype, filedata) elements for data to be uploaded as files :return: """ encoder = codecs.getencoder('utf-8') for (key, value) in fields: key = self.u(key) yield encoder('--{}\r\n'.format(self.boundary)) yield encoder(self.u('Content-Disposition: form-data; name="{}"\r\n').format(key)) yield encoder('\r\n') if isinstance(value, (int, float)): value = str(value) yield encoder(self.u(value)) yield encoder('\r\n') for (key, filename, contenttype, fd) in files: key = self.u(key) filename = self.u(filename) yield encoder('--{}\r\n'.format(self.boundary)) yield encoder(self.u('Content-Disposition: form-data; name="{}"; filename="{}"\r\n').format(key, filename)) yield encoder('Content-Type: {}\r\n'.format( contenttype or mimetypes.guess_type(filename)[0] or 'application/octet-stream')) yield encoder('Content-Transfer-Encoding: binary\r\n') yield encoder('\r\n') yield (fd, len(fd)) yield encoder('\r\n') yield encoder('--{}--\r\n'.format(self.boundary))
def hexencode(b): if sys.version_info[0] < 3: return b.encode('hex') else: import codecs coder = codecs.getencoder('hex_codec') return coder(b)[0]
def _bytes_to_hex(bs): return codecs.getencoder('hex')(bs)[0]
def bind_processor(self, dialect): if self.convert_unicode or dialect.convert_unicode: if dialect.supports_unicode_binds and \ self.convert_unicode != 'force': if self._warn_on_bytestring: def process(value): if isinstance(value, util.binary_type): util.warn("Unicode type received non-unicode" "bind param value.") return value return process else: return None else: encoder = codecs.getencoder(dialect.encoding) warn_on_bytestring = self._warn_on_bytestring def process(value): if isinstance(value, util.text_type): return encoder(value, self.unicode_error)[0] elif warn_on_bytestring and value is not None: util.warn("Unicode type received non-unicode bind " "param value") return value return process else: return None
def test_encode_length(self): # Issue 3739 encoder = codecs.getencoder("unicode_internal") self.assertEqual(encoder("a")[1], 1) self.assertEqual(encoder("\xe9\u0142")[1], 2) self.assertEqual(codecs.escape_encode(br'\x00')[1], 4) # From http://www.gnu.org/software/libidn/draft-josefsson-idn-test-vectors.html
def test_getencoder(self): self.assertRaises(TypeError, codecs.getencoder) self.assertRaises(LookupError, codecs.getencoder, "__spam__")
def test_bad_encode_args(self): for encoding in all_unicode_encodings: encoder = codecs.getencoder(encoding) self.assertRaises(TypeError, encoder)
def test_basics(self): binput = bytes(range(256)) for encoding in bytes_transform_encodings: # generic codecs interface (o, size) = codecs.getencoder(encoding)(binput) self.assertEqual(size, len(binput)) (i, size) = codecs.getdecoder(encoding)(o) self.assertEqual(size, len(o)) self.assertEqual(i, binput)
def bind_processor(self, dialect): if self.convert_unicode or dialect.convert_unicode: if dialect.supports_unicode_binds and \ self.convert_unicode != 'force': if self._warn_on_bytestring: def process(value): # Py3K #if isinstance(value, bytes): # Py2K if isinstance(value, str): # end Py2K util.warn("Unicode type received non-unicode bind " "param value.") return value return process else: return None else: encoder = codecs.getencoder(dialect.encoding) warn_on_bytestring = self._warn_on_bytestring def process(value): if isinstance(value, unicode): return encoder(value, self.unicode_error)[0] elif warn_on_bytestring and value is not None: util.warn("Unicode type received non-unicode bind " "param value") return value return process else: return None
def __init__(self, output_encoding='utf-8', input_encoding='utf-8', fallback_encoding='iso-8859-1'): self.output_codec = codecs.getencoder(output_encoding) self.input_decoder = codecs.getdecoder(input_encoding) self.fallback_decoder = codecs.getdecoder(fallback_encoding) self._input_buffer = bytearray() self._output_buffer = bytearray() self._closed = False
def quote_value(self, value): # The backend "mostly works" without this function and there are use # cases for compiling Python without the sqlite3 libraries (e.g. # security hardening). import _sqlite3 try: value = _sqlite3.adapt(value) except _sqlite3.ProgrammingError: pass # Manual emulation of SQLite parameter quoting if isinstance(value, type(True)): return str(int(value)) elif isinstance(value, (Decimal, float)): return str(value) elif isinstance(value, six.integer_types): return str(value) elif isinstance(value, six.string_types): return "'%s'" % six.text_type(value).replace("\'", "\'\'") elif value is None: return "NULL" elif isinstance(value, (bytes, bytearray, six.memoryview)): # Bytes are only allowed for BLOB fields, encoded as string # literals containing hexadecimal data and preceded by a single "X" # character: # value = b'\x01\x02' => value_hex = b'0102' => return X'0102' value = bytes(value) hex_encoder = codecs.getencoder('hex_codec') value_hex, _length = hex_encoder(value) # Use 'ascii' encoding for b'01' => '01', no need to use force_text here. return "X'%s'" % value_hex.decode('ascii') else: raise ValueError("Cannot quote parameter value %r of type %s" % (value, type(value)))
def generate_session_id(): """Generates a random session ID for Steam. Returns ------- str A random 12 byte hex string. """ return codecs.getencoder('hex')(Random.get_random_bytes(12))[0].decode('utf-8')
def test_encode_length(self): # Issue 3739 encoder = codecs.getencoder("unicode_internal") self.assertEqual(encoder(u"a")[1], 1) self.assertEqual(encoder(u"\xe9\u0142")[1], 2) encoder = codecs.getencoder("string-escape") self.assertEqual(encoder(r'\x00')[1], 4) # From http://www.gnu.org/software/libidn/draft-josefsson-idn-test-vectors.html
def test_basics(self): s = "abc123" for encoding in all_string_encodings: (bytes, size) = codecs.getencoder(encoding)(s) self.assertEqual(size, len(s)) (chars, size) = codecs.getdecoder(encoding)(bytes) self.assertEqual(chars, s, "%r != %r (encoding=%r)" % (chars, s, encoding))
def iter(self, fields, files): """ fields is a sequence of (name, value) elements for regular form fields. files is a sequence of (name, filename, file-type) elements for data to be uploaded as files Yield body's chunk as bytes """ encoder = codecs.getencoder('utf-8') for (key, value) in fields: key = self.u(key) yield encoder('--{}\r\n'.format(self.boundary)) yield encoder(self.u('Content-Disposition: form-data; name="{}"\r\n').format(key)) yield encoder('\r\n') if isinstance(value, int) or isinstance(value, float): value = str(value) yield encoder(self.u(value)) yield encoder('\r\n') for (key, filename, fd) in files: key = self.u(key) filename = self.u(filename) yield encoder('--{}\r\n'.format(self.boundary)) yield encoder(self.u('Content-Disposition: form-data; name="{}"; filename="{}"\r\n').format(key, filename)) yield encoder('Content-Type: {}\r\n'.format(mimetypes.guess_type(filename)[0] or 'application/octet-stream')) yield encoder('\r\n') with fd: buff = fd.read() yield (buff, len(buff)) yield encoder('\r\n') yield encoder('--{}--\r\n'.format(self.boundary))
def quote_value(self, value): try: value = _sqlite3.adapt(value) except _sqlite3.ProgrammingError: pass # Manual emulation of SQLite parameter quoting if isinstance(value, type(True)): return str(int(value)) elif isinstance(value, (Decimal, float)): return str(value) elif isinstance(value, six.integer_types): return str(value) elif isinstance(value, six.string_types): return "'%s'" % six.text_type(value).replace("\'", "\'\'") elif value is None: return "NULL" elif isinstance(value, (bytes, bytearray, six.memoryview)): # Bytes are only allowed for BLOB fields, encoded as string # literals containing hexadecimal data and preceded by a single "X" # character: # value = b'\x01\x02' => value_hex = b'0102' => return X'0102' value = bytes(value) hex_encoder = codecs.getencoder('hex_codec') value_hex, _length = hex_encoder(value) # Use 'ascii' encoding for b'01' => '01', no need to use force_text here. return "X'%s'" % value_hex.decode('ascii') else: raise ValueError("Cannot quote parameter value %r of type %s" % (value, type(value)))
def test_encode_length(self): with support.check_warnings(('unicode_internal codec has been ' 'deprecated', DeprecationWarning)): # Issue 3739 encoder = codecs.getencoder("unicode_internal") self.assertEqual(encoder("a")[1], 1) self.assertEqual(encoder("\xe9\u0142")[1], 2) self.assertEqual(codecs.escape_encode(br'\x00')[1], 4) # From http://www.gnu.org/software/libidn/draft-josefsson-idn-test-vectors.html
def test_bad_encode_args(self): for encoding in all_unicode_encodings: encoder = codecs.getencoder(encoding) with support.check_warnings(): # unicode-internal has been deprecated self.assertRaises(TypeError, encoder)
def quote_value(self, value): # The backend "mostly works" without this function and there are use # cases for compiling Python without the sqlite3 libraries (e.g. # security hardening). try: import sqlite3 value = sqlite3.adapt(value) except ImportError: pass except sqlite3.ProgrammingError: pass # Manual emulation of SQLite parameter quoting if isinstance(value, type(True)): return str(int(value)) elif isinstance(value, (Decimal, float)): return str(value) elif isinstance(value, six.integer_types): return str(value) elif isinstance(value, six.string_types): return "'%s'" % six.text_type(value).replace("\'", "\'\'") elif value is None: return "NULL" elif isinstance(value, (bytes, bytearray, six.memoryview)): # Bytes are only allowed for BLOB fields, encoded as string # literals containing hexadecimal data and preceded by a single "X" # character: # value = b'\x01\x02' => value_hex = b'0102' => return X'0102' value = bytes(value) hex_encoder = codecs.getencoder('hex_codec') value_hex, _length = hex_encoder(value) # Use 'ascii' encoding for b'01' => '01', no need to use force_text here. return "X'%s'" % value_hex.decode('ascii') else: raise ValueError("Cannot quote parameter value %r of type %s" % (value, type(value)))
def __repr__(self): hexencoder = codecs.getencoder('hex') return '<TLSA %d %d %d %s>' % (self._usage, self._selector, self._matching, hexencoder(self._payload)[0].decode())
def codepage(self, codepage): if not isinstance(codepage, CodePage): raise TypeError("codepage should be a CodePage, not a %r" % type(codepage)) meta = self._meta if meta.status != READ_WRITE: raise DbfError('%s not in read/write mode, unable to change codepage' % meta.filename) meta.header.codepage(codepage.code) meta.decoder = codecs.getdecoder(codepage.name) meta.encoder = codecs.getencoder(codepage.name) self._update_disk(headeronly=True)