我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用codecs.getincrementalencoder()。
def test_stateful(self): # jisx0213 encoder is stateful for a few codepoints. eg) # U+00E6 => A9DC # U+00E6 U+0300 => ABC4 # U+0300 => ABDC encoder = codecs.getincrementalencoder('jisx0213')() self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4') self.assertEqual(encoder.encode('\u00e6'), b'') self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4') self.assertEqual(encoder.encode('\u00e6', True), b'\xa9\xdc') self.assertEqual(encoder.reset(), None) self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc') self.assertEqual(encoder.encode('\u00e6'), b'') self.assertEqual(encoder.encode('', True), b'\xa9\xdc') self.assertEqual(encoder.encode('', True), b'')
def test_all(self): api = ( "encode", "decode", "register", "CodecInfo", "Codec", "IncrementalEncoder", "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup", "getencoder", "getdecoder", "getincrementalencoder", "getincrementaldecoder", "getreader", "getwriter", "register_error", "lookup_error", "strict_errors", "replace_errors", "ignore_errors", "xmlcharrefreplace_errors", "backslashreplace_errors", "open", "EncodedFile", "iterencode", "iterdecode", "BOM", "BOM_BE", "BOM_LE", "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE", "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE", "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented "StreamReaderWriter", "StreamRecoder", ) self.assertEqual(sorted(api), sorted(codecs.__all__)) for api in codecs.__all__: getattr(codecs, api)
def test_stateful(self): # jisx0213 encoder is stateful for a few code points. eg) # U+00E6 => A9DC # U+00E6 U+0300 => ABC4 # U+0300 => ABDC encoder = codecs.getincrementalencoder('jisx0213')() self.assertEqual(encoder.encode(u'\u00e6\u0300'), '\xab\xc4') self.assertEqual(encoder.encode(u'\u00e6'), '') self.assertEqual(encoder.encode(u'\u0300'), '\xab\xc4') self.assertEqual(encoder.encode(u'\u00e6', True), '\xa9\xdc') self.assertEqual(encoder.reset(), None) self.assertEqual(encoder.encode(u'\u0300'), '\xab\xdc') self.assertEqual(encoder.encode(u'\u00e6'), '') self.assertEqual(encoder.encode('', True), '\xa9\xdc') self.assertEqual(encoder.encode('', True), '')
def test_stateful(self): # jisx0213 encoder is stateful for a few codepoints. eg) # U+00E6 => A9DC # U+00E6 U+0300 => ABC4 # U+0300 => ABDC encoder = codecs.getincrementalencoder('jisx0213')() self.assertEqual(encoder.encode(u'\u00e6\u0300'), '\xab\xc4') self.assertEqual(encoder.encode(u'\u00e6'), '') self.assertEqual(encoder.encode(u'\u0300'), '\xab\xc4') self.assertEqual(encoder.encode(u'\u00e6', True), '\xa9\xdc') self.assertEqual(encoder.reset(), None) self.assertEqual(encoder.encode(u'\u0300'), '\xab\xdc') self.assertEqual(encoder.encode(u'\u00e6'), '') self.assertEqual(encoder.encode('', True), '\xa9\xdc') self.assertEqual(encoder.encode('', True), '')
def test_all(self): api = ( "encode", "decode", "register", "CodecInfo", "Codec", "IncrementalEncoder", "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup", "getencoder", "getdecoder", "getincrementalencoder", "getincrementaldecoder", "getreader", "getwriter", "register_error", "lookup_error", "strict_errors", "replace_errors", "ignore_errors", "xmlcharrefreplace_errors", "backslashreplace_errors", "open", "EncodedFile", "iterencode", "iterdecode", "BOM", "BOM_BE", "BOM_LE", "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE", "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE", "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented "StreamReaderWriter", "StreamRecoder", ) self.assertCountEqual(api, codecs.__all__) for api in codecs.__all__: getattr(codecs, api)
def __init__(self, f, dialect=csv.excel, encoding="utf-8-sig", **kwds): self.queue = cStringIO.StringIO() self.writer = csv.writer(self.queue, dialect=dialect, **kwds) self.stream = f self.encoder = codecs.getincrementalencoder(encoding)()
def _get_encoder(self): make_encoder = codecs.getincrementalencoder(self._encoding) self._encoder = make_encoder(self._errors) return self._encoder
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds): """Init method.""" # Redirect output to a queue self.queue = cStringIO.StringIO() self.writer = csv.writer(self.queue, dialect=dialect, **kwds) self.stream = f self.encoder = codecs.getincrementalencoder(encoding)()
def __init__(self, f, encoding, **kwds): # Redirect output to a queue import cStringIO self.queue = cStringIO.StringIO() self.writer = csv.writer(self.queue, **kwds) self.stream = f self.encoding = encoding self.encoder = codecs.getincrementalencoder(self.encoding)()
def set_tx_encoding(self, encoding, errors='replace'): self.output_encoding = encoding self.tx_encoder = codecs.getincrementalencoder(encoding)(errors)
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwargs): """ Instantiates the UnicodeWriter instance :param f: File like object to write CSV data to :param dialect: The dialect for the CSV :param encoding: The CSV encoding :param kwargs: Keyword args """ self.queue = cStringIO.StringIO() self.writer = csv.writer(self.queue, dialect=dialect, **kwargs) self.stream = f self.encoder = codecs.getincrementalencoder(encoding)()
def __init__(self, f, fieldnames, dialect=csv.excel, encoding="utf-8", newfile=True, **kwds): # Redirect output to a queue self.queue = cStringIO.StringIO() self.writer = csv.DictWriter(self.queue, fieldnames, dialect=dialect, **kwds) self.stream = f self.encoder = codecs.getincrementalencoder(encoding)() if newfile: self.writebom()
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds): # Redirect output to a queue self.queue = cStringIO.StringIO() self.writer = csv.writer(self.queue, dialect=dialect, **kwds) self.stream = f self.encoder = codecs.getincrementalencoder(encoding)()
def set_tx_encoding(self, encoding, errors='replace'): """set encoding for transmitted data""" self.output_encoding = encoding self.tx_encoder = codecs.getincrementalencoder(encoding)(errors)
def check_state_handling_encode(self, encoding, u, s): for i in range(len(u)+1): d = codecs.getincrementalencoder(encoding)() part1 = d.encode(u[:i]) state = d.getstate() d = codecs.getincrementalencoder(encoding)() d.setstate(state) part2 = d.encode(u[i:], True) self.assertEqual(s, part1+part2)
def test_incremental_encode(self): self.assertEqual( b"".join(codecs.iterencode("python.org", "idna")), b"python.org" ) self.assertEqual( b"".join(codecs.iterencode("python.org.", "idna")), b"python.org." ) self.assertEqual( b"".join(codecs.iterencode("pyth\xf6n.org.", "idna")), b"xn--pythn-mua.org." ) self.assertEqual( b"".join(codecs.iterencode("pyth\xf6n.org.", "idna")), b"xn--pythn-mua.org." ) encoder = codecs.getincrementalencoder("idna")() self.assertEqual(encoder.encode("\xe4x"), b"") self.assertEqual(encoder.encode("ample.org"), b"xn--xample-9ta.") self.assertEqual(encoder.encode("", True), b"org") encoder.reset() self.assertEqual(encoder.encode("\xe4x"), b"") self.assertEqual(encoder.encode("ample.org."), b"xn--xample-9ta.org.") self.assertEqual(encoder.encode("", True), b"")
def check_newline_decoding(self, decoder, encoding): result = [] if encoding is not None: encoder = codecs.getincrementalencoder(encoding)() def _decode_bytewise(s): # Decode one byte at a time for b in encoder.encode(s): result.append(decoder.decode(bytes([b]))) else: encoder = None def _decode_bytewise(s): # Decode one char at a time for c in s: result.append(decoder.decode(c)) self.assertEqual(decoder.newlines, None) _decode_bytewise("abc\n\r") self.assertEqual(decoder.newlines, '\n') _decode_bytewise("\nabc") self.assertEqual(decoder.newlines, ('\n', '\r\n')) _decode_bytewise("abc\r") self.assertEqual(decoder.newlines, ('\n', '\r\n')) _decode_bytewise("abc") self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) _decode_bytewise("abc\r") self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") decoder.reset() input = "abc" if encoder is not None: encoder.reset() input = encoder.encode(input) self.assertEqual(decoder.decode(input), "abc") self.assertEqual(decoder.newlines, None)
def test_stateless(self): # cp949 encoder isn't stateful at all. encoder = codecs.getincrementalencoder('cp949')() self.assertEqual(encoder.encode('\ud30c\uc774\uc36c \ub9c8\uc744'), b'\xc6\xc4\xc0\xcc\xbd\xe3 \xb8\xb6\xc0\xbb') self.assertEqual(encoder.reset(), None) self.assertEqual(encoder.encode('\u2606\u223c\u2606', True), b'\xa1\xd9\xa1\xad\xa1\xd9') self.assertEqual(encoder.reset(), None) self.assertEqual(encoder.encode('', True), b'') self.assertEqual(encoder.encode('', False), b'') self.assertEqual(encoder.reset(), None)
def test_issue5640(self): encoder = codecs.getincrementalencoder('shift-jis')('backslashreplace') self.assertEqual(encoder.encode('\xff'), b'\\xff') self.assertEqual(encoder.encode('\n'), b'\n')
def test_incrementalencoder(self): encoder = codecs.getincrementalencoder(self.encoding)() output = b''.join( encoder.encode(char) for char in self.text) self.assertEqual(output, self.expected)
def test_incrementalencoder_final(self): encoder = codecs.getincrementalencoder(self.encoding)() last_index = len(self.text) - 1 output = b''.join( encoder.encode(char, index == last_index) for index, char in enumerate(self.text)) self.assertEqual(output, self.expected_reset)
def __init__(self, f, dialect=csv.excel, encoding="utf-8", errors='replace', **kwds): # Redirect output to a queue self.queue = cStringIO.StringIO() self.writer = csv.writer(self.queue, dialect=dialect, **kwds) self.stream = f encoder_cls = codecs.getincrementalencoder(encoding) self.encoder = encoder_cls(errors=errors)
def test_incremental_encode(self): self.assertEqual( "".join(codecs.iterencode(u"python.org", "idna")), "python.org" ) self.assertEqual( "".join(codecs.iterencode(u"python.org.", "idna")), "python.org." ) self.assertEqual( "".join(codecs.iterencode(u"pyth\xf6n.org.", "idna")), "xn--pythn-mua.org." ) self.assertEqual( "".join(codecs.iterencode(u"pyth\xf6n.org.", "idna")), "xn--pythn-mua.org." ) encoder = codecs.getincrementalencoder("idna")() self.assertEqual(encoder.encode(u"\xe4x"), "") self.assertEqual(encoder.encode(u"ample.org"), "xn--xample-9ta.") self.assertEqual(encoder.encode(u"", True), "org") encoder.reset() self.assertEqual(encoder.encode(u"\xe4x"), "") self.assertEqual(encoder.encode(u"ample.org."), "xn--xample-9ta.org.") self.assertEqual(encoder.encode(u"", True), "")
def check_newline_decoding(self, decoder, encoding): result = [] if encoding is not None: encoder = codecs.getincrementalencoder(encoding)() def _decode_bytewise(s): # Decode one byte at a time for b in encoder.encode(s): result.append(decoder.decode(b)) else: encoder = None def _decode_bytewise(s): # Decode one char at a time for c in s: result.append(decoder.decode(c)) self.assertEqual(decoder.newlines, None) _decode_bytewise("abc\n\r") self.assertEqual(decoder.newlines, '\n') _decode_bytewise("\nabc") self.assertEqual(decoder.newlines, ('\n', '\r\n')) _decode_bytewise("abc\r") self.assertEqual(decoder.newlines, ('\n', '\r\n')) _decode_bytewise("abc") self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) _decode_bytewise("abc\r") self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") decoder.reset() input = "abc" if encoder is not None: encoder.reset() input = encoder.encode(input) self.assertEqual(decoder.decode(input), "abc") self.assertEqual(decoder.newlines, None)
def test_stateless(self): # cp949 encoder isn't stateful at all. encoder = codecs.getincrementalencoder('cp949')() self.assertEqual(encoder.encode(u'\ud30c\uc774\uc36c \ub9c8\uc744'), '\xc6\xc4\xc0\xcc\xbd\xe3 \xb8\xb6\xc0\xbb') self.assertEqual(encoder.reset(), None) self.assertEqual(encoder.encode(u'\u2606\u223c\u2606', True), '\xa1\xd9\xa1\xad\xa1\xd9') self.assertEqual(encoder.reset(), None) self.assertEqual(encoder.encode(u'', True), '') self.assertEqual(encoder.encode(u'', False), '') self.assertEqual(encoder.reset(), None)
def test_issue5640(self): encoder = codecs.getincrementalencoder('shift-jis')('backslashreplace') self.assertEqual(encoder.encode(u'\xff'), b'\\xff') self.assertEqual(encoder.encode(u'\n'), b'\n')
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds): # Redirect output to a queue self.queue = io.StringIO() self.writer = csv.writer(self.queue, dialect=dialect, **kwds) self.stream = f self.encoder = codecs.getincrementalencoder(encoding)()