我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用codecs.register_error()。
def error_handler(error): """Error handler for surrogateescape decoding. Should be used with an ASCII-compatible encoding (e.g., 'latin-1' or 'utf-8'). Replaces any invalid byte sequences with surrogate code points. As specified in https://docs.python.org/2/library/codecs.html#codecs.register_error. """ # We can't use this with UnicodeEncodeError; the UTF-8 encoder doesn't raise # an error for surrogates. Instead, use encode. if not isinstance(error, UnicodeDecodeError): raise error result = [] for i in range(error.start, error.end): byte = ord(error.object[i]) if byte < 128: raise error result.append(unichr(0xdc00 + byte)) return ''.join(result), error.end
def test_xmlcharnamereplace(self): # This time use a named character entity for unencodable # characters, if one is available. def xmlcharnamereplace(exc): if not isinstance(exc, UnicodeEncodeError): raise TypeError("don't know how to handle %r" % exc) l = [] for c in exc.object[exc.start:exc.end]: try: l.append("&%s;" % html.entities.codepoint2name[ord(c)]) except KeyError: l.append("&#%d;" % ord(c)) return ("".join(l), exc.end) codecs.register_error( "test.xmlcharnamereplace", xmlcharnamereplace) sin = "\xab\u211c\xbb = \u2329\u1234\u20ac\u232a" sout = b"«ℜ» = ⟨ሴ€⟩" self.assertEqual(sin.encode("ascii", "test.xmlcharnamereplace"), sout) sout = b"\xabℜ\xbb = ⟨ሴ€⟩" self.assertEqual(sin.encode("latin-1", "test.xmlcharnamereplace"), sout) sout = b"\xabℜ\xbb = ⟨ሴ\xa4⟩" self.assertEqual(sin.encode("iso-8859-15", "test.xmlcharnamereplace"), sout)
def test_decoding_callbacks(self): # This is a test for a decoding callback handler # that allows the decoding of the invalid sequence # "\xc0\x80" and returns "\x00" instead of raising an error. # All other illegal sequences will be handled strictly. def relaxedutf8(exc): if not isinstance(exc, UnicodeDecodeError): raise TypeError("don't know how to handle %r" % exc) if exc.object[exc.start:exc.start+2] == b"\xc0\x80": return ("\x00", exc.start+2) # retry after two bytes else: raise exc codecs.register_error("test.relaxedutf8", relaxedutf8) # all the "\xc0\x80" will be decoded to "\x00" sin = b"a\x00b\xc0\x80c\xc3\xbc\xc0\x80\xc0\x80" sout = "a\x00b\x00c\xfc\x00\x00" self.assertEqual(sin.decode("utf-8", "test.relaxedutf8"), sout) # "\xc0\x81" is not valid and a UnicodeDecodeError will be raised sin = b"\xc0\x80\xc0\x81" self.assertRaises(UnicodeDecodeError, sin.decode, "utf-8", "test.relaxedutf8")
def test_longstrings(self): # test long strings to check for memory overflow problems errors = [ "strict", "ignore", "replace", "xmlcharrefreplace", "backslashreplace"] # register the handlers under different names, # to prevent the codec from recognizing the name for err in errors: codecs.register_error("test." + err, codecs.lookup_error(err)) l = 1000 errors += [ "test." + err for err in errors ] for uni in [ s*l for s in ("x", "\u3042", "a\xe4") ]: for enc in ("ascii", "latin-1", "iso-8859-1", "iso-8859-15", "utf-8", "utf-7", "utf-16", "utf-32"): for err in errors: try: uni.encode(enc, err) except UnicodeError: pass
def test_badhandlerresults(self): results = ( 42, "foo", (1,2,3), ("foo", 1, 3), ("foo", None), ("foo",), ("foo", 1, 3), ("foo", None), ("foo",) ) encs = ("ascii", "latin-1", "iso-8859-1", "iso-8859-15") for res in results: codecs.register_error("test.badhandler", lambda x: res) for enc in encs: self.assertRaises( TypeError, "\u3042".encode, enc, "test.badhandler" ) for (enc, bytes) in ( ("ascii", b"\xff"), ("utf-8", b"\xff"), ("utf-7", b"+x-"), ("unicode-internal", b"\x00"), ): self.assertRaises( TypeError, bytes.decode, enc, "test.badhandler" )
def test_incrementalencoder_error_callback(self): inv = self.unmappedunicode e = self.incrementalencoder() self.assertRaises(UnicodeEncodeError, e.encode, inv, True) e.errors = 'ignore' self.assertEqual(e.encode(inv, True), b'') e.reset() def tempreplace(exc): return ('called', exc.end) codecs.register_error('test.incremental_error_callback', tempreplace) e.errors = 'test.incremental_error_callback' self.assertEqual(e.encode(inv, True), b'called') # again e.errors = 'ignore' self.assertEqual(e.encode(inv, True), b'')
def test_all(self): api = ( "encode", "decode", "register", "CodecInfo", "Codec", "IncrementalEncoder", "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup", "getencoder", "getdecoder", "getincrementalencoder", "getincrementaldecoder", "getreader", "getwriter", "register_error", "lookup_error", "strict_errors", "replace_errors", "ignore_errors", "xmlcharrefreplace_errors", "backslashreplace_errors", "open", "EncodedFile", "iterencode", "iterdecode", "BOM", "BOM_BE", "BOM_LE", "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE", "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE", "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented "StreamReaderWriter", "StreamRecoder", ) self.assertEqual(sorted(api), sorted(codecs.__all__)) for api in codecs.__all__: getattr(codecs, api)
def test_xmlcharnamereplace(self): # This time use a named character entity for unencodable # characters, if one is available. def xmlcharnamereplace(exc): if not isinstance(exc, UnicodeEncodeError): raise TypeError("don't know how to handle %r" % exc) l = [] for c in exc.object[exc.start:exc.end]: try: l.append(u"&%s;" % htmlentitydefs.codepoint2name[ord(c)]) except KeyError: l.append(u"&#%d;" % ord(c)) return (u"".join(l), exc.end) codecs.register_error( "test.xmlcharnamereplace", xmlcharnamereplace) sin = u"\xab\u211c\xbb = \u2329\u1234\u20ac\u232a" sout = "«ℜ» = ⟨ሴ€⟩" self.assertEqual(sin.encode("ascii", "test.xmlcharnamereplace"), sout) sout = "\xabℜ\xbb = ⟨ሴ€⟩" self.assertEqual(sin.encode("latin-1", "test.xmlcharnamereplace"), sout) sout = "\xabℜ\xbb = ⟨ሴ\xa4⟩" self.assertEqual(sin.encode("iso-8859-15", "test.xmlcharnamereplace"), sout)
def test_decoding_callbacks(self): # This is a test for a decoding callback handler # that allows the decoding of the invalid sequence # "\xc0\x80" and returns "\x00" instead of raising an error. # All other illegal sequences will be handled strictly. def relaxedutf8(exc): if not isinstance(exc, UnicodeDecodeError): raise TypeError("don't know how to handle %r" % exc) if exc.object[exc.start:exc.start+2] == "\xc0\x80": return (u"\x00", exc.start+2) # retry after two bytes else: raise exc codecs.register_error("test.relaxedutf8", relaxedutf8) # all the "\xc0\x80" will be decoded to "\x00" sin = "a\x00b\xc0\x80c\xc3\xbc\xc0\x80\xc0\x80" sout = u"a\x00b\x00c\xfc\x00\x00" self.assertEqual(sin.decode("utf-8", "test.relaxedutf8"), sout) # "\xc0\x81" is not valid and a UnicodeDecodeError will be raised sin = "\xc0\x80\xc0\x81" self.assertRaises(UnicodeDecodeError, sin.decode, "utf-8", "test.relaxedutf8")
def test_longstrings(self): # test long strings to check for memory overflow problems errors = [ "strict", "ignore", "replace", "xmlcharrefreplace", "backslashreplace"] # register the handlers under different names, # to prevent the codec from recognizing the name for err in errors: codecs.register_error("test." + err, codecs.lookup_error(err)) l = 1000 errors += [ "test." + err for err in errors ] for uni in [ s*l for s in (u"x", u"\u3042", u"a\xe4") ]: for enc in ("ascii", "latin-1", "iso-8859-1", "iso-8859-15", "utf-8", "utf-7", "utf-16", "utf-32"): for err in errors: try: uni.encode(enc, err) except UnicodeError: pass
def test_badhandlerresults(self): results = ( 42, u"foo", (1,2,3), (u"foo", 1, 3), (u"foo", None), (u"foo",), ("foo", 1, 3), ("foo", None), ("foo",) ) encs = ("ascii", "latin-1", "iso-8859-1", "iso-8859-15") for res in results: codecs.register_error("test.badhandler", lambda x: res) for enc in encs: self.assertRaises( TypeError, u"\u3042".encode, enc, "test.badhandler" ) for (enc, bytes) in ( ("ascii", "\xff"), ("utf-8", "\xff"), ("utf-7", "+x-"), ("unicode-internal", "\x00"), ): self.assertRaises( TypeError, bytes.decode, enc, "test.badhandler" )
def test_incrementalencoder_error_callback(self): inv = self.unmappedunicode e = self.incrementalencoder() self.assertRaises(UnicodeEncodeError, e.encode, inv, True) e.errors = 'ignore' self.assertEqual(e.encode(inv, True), '') e.reset() def tempreplace(exc): return (u'called', exc.end) codecs.register_error('test.incremental_error_callback', tempreplace) e.errors = 'test.incremental_error_callback' self.assertEqual(e.encode(inv, True), 'called') # again e.errors = 'ignore' self.assertEqual(e.encode(inv, True), '')
def test_all(self): api = ( "encode", "decode", "register", "CodecInfo", "Codec", "IncrementalEncoder", "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup", "getencoder", "getdecoder", "getincrementalencoder", "getincrementaldecoder", "getreader", "getwriter", "register_error", "lookup_error", "strict_errors", "replace_errors", "ignore_errors", "xmlcharrefreplace_errors", "backslashreplace_errors", "open", "EncodedFile", "iterencode", "iterdecode", "BOM", "BOM_BE", "BOM_LE", "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE", "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE", "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented "StreamReaderWriter", "StreamRecoder", ) self.assertCountEqual(api, codecs.__all__) for api in codecs.__all__: getattr(codecs, api)
def __init__(self, name, base_encoding, mapping): self.name = name self.base_encoding = base_encoding self.mapping = mapping self.reverse = {v:k for k,v in mapping.items()} self.max_len = max(len(v) for v in mapping.values()) self.info = codecs.CodecInfo(name=self.name, encode=self.encode, decode=self.decode) codecs.register_error(name, self.error)
def encode(self, input, errors='strict'): assert errors == 'strict' #return codecs.encode(input, self.base_encoding, self.name), len(input) # The above line could totally be all we needed, relying on the error # handling to replace the unencodable Unicode characters with our extended # byte sequences. # # However, there seems to be a design bug in Python (probably intentional): # the error handler for encoding is supposed to return a **Unicode** character, # that then needs to be encodable itself... Ugh. # # So we implement what codecs.encode() should have been doing: which is expect # error handler to return bytes() to be added to the output. # # This seems to have been fixed in Python 3.3. We should try using that and # use fallback only if that failed. # https://docs.python.org/3.3/library/codecs.html#codecs.register_error length = len(input) out = b'' while input: try: part = codecs.encode(input, self.base_encoding) out += part input = '' # All converted except UnicodeEncodeError as e: # Convert the correct part out += codecs.encode(input[:e.start], self.base_encoding) replacement, pos = self.error(e) out += replacement input = input[pos:] return out, length
def register_strwidth_error(strwidth): '''Create new encode errors handling method similar to ``replace`` Like ``replace`` this method uses question marks in place of the characters that cannot be represented in the requested encoding. Unlike ``replace`` the amount of question marks is identical to the amount of display cells offending character occupies. Thus encoding ``…`` (U+2026, HORIZONTAL ELLIPSIS) to ``latin1`` will emit one question mark, but encoding ``?`` (U+FF21, FULLWIDTH LATIN CAPITAL LETTER A) will emit two question marks. Since width of some characters depends on the terminal settings and powerline knows how to respect them a single error handling method cannot be used. Instead of it the generator function is used which takes ``strwidth`` function (function that knows how to compute string width respecting all needed settings) and emits new error handling method name. :param function strwidth: Function that computs string width measured in display cells the string occupies when displayed. :return: New error handling method name. ''' global last_swe_idx last_swe_idx += 1 def powerline_encode_strwidth_error(e): if not isinstance(e, UnicodeEncodeError): raise NotImplementedError return ('?' * strwidth(e.object[e.start:e.end]), e.end) ename = 'powerline_encode_strwidth_error_{0}'.format(last_swe_idx) codecs.register_error(ename, powerline_encode_strwidth_error) return ename
def create_fb_format(lines_file, convo_file, outpath): print('[building fbformat]') ftrain = open(os.path.join(outpath, 'train.txt'), 'w') fvalid = open(os.path.join(outpath, 'valid.txt'), 'w') ftest = open(os.path.join(outpath, 'test.txt'), 'w') lines = {} codecs.register_error('strict', codecs.ignore_errors) with codecs.open(lines_file, 'r') as f: for line in f: l = line.split(' +++$+++ ') lines[l[0]] = ' '.join(l[4:]).strip('\n').replace('\t', ' ') cnt = 0 with codecs.open(convo_file, 'r') as f: for line in f: l = line.split(' ') convo = ' '.join(l[6:]).strip('\n').strip('[').strip(']') c = convo.replace("'",'').replace(' ','').split(',') s = '' index = 0 for i in range(0, len(c), 2): index = index + 1 s = (s + str(index)+ ' ' + lines[c[i]]) if len(c) > i + 1: s = s + '\t' + lines[c[i+1]] s = s + '\n' cnt = cnt + 1 handle = ftrain if (cnt % 10) == 0: handle = ftest if (cnt % 10) == 1: handle = fvalid handle.write(s + '\n') ftrain.close() fvalid.close() ftest.close()
def replace_surrogate_encode(mystring): """ Returns a (unicode) string, not the more logical bytes, because the codecs register_error functionality expects this. """ decoded = [] for ch in mystring: # if PY3: # code = ch # else: code = ord(ch) # The following magic comes from Py3.3's Python/codecs.c file: if not 0xD800 <= code <= 0xDCFF: # Not a surrogate. Fail with the original exception. raise exc # mybytes = [0xe0 | (code >> 12), # 0x80 | ((code >> 6) & 0x3f), # 0x80 | (code & 0x3f)] # Is this a good idea? if 0xDC00 <= code <= 0xDC7F: decoded.append(_unichr(code - 0xDC00)) elif code <= 0xDCFF: decoded.append(_unichr(code - 0xDC00)) else: raise NotASurrogateError return str().join(decoded)
def register_surrogateescape(): """ Registers the surrogateescape error handler on Python 2 (only) """ if PY3: return try: codecs.lookup_error(FS_ERRORS) except LookupError: codecs.register_error(FS_ERRORS, surrogateescape_handler)
def replace_surrogate_encode(mystring): """ Returns a (unicode) string, not the more logical bytes, because the codecs register_error functionality expects this. """ decoded = [] for ch in mystring: # if utils.PY3: # code = ch # else: code = ord(ch) # The following magic comes from Py3.3's Python/codecs.c file: if not 0xD800 <= code <= 0xDCFF: # Not a surrogate. Fail with the original exception. raise exc # mybytes = [0xe0 | (code >> 12), # 0x80 | ((code >> 6) & 0x3f), # 0x80 | (code & 0x3f)] # Is this a good idea? if 0xDC00 <= code <= 0xDC7F: decoded.append(_unichr(code - 0xDC00)) elif code <= 0xDCFF: decoded.append(_unichr(code - 0xDC00)) else: raise NotASurrogateError return str().join(decoded)
def register_surrogateescape(): """ Registers the surrogateescape error handler on Python 2 (only) """ if utils.PY3: return try: codecs.lookup_error(FS_ERRORS) except LookupError: codecs.register_error(FS_ERRORS, surrogateescape_handler)
def test_decode_callback(self): if sys.maxunicode > 0xffff: codecs.register_error("UnicodeInternalTest", codecs.ignore_errors) decoder = codecs.getdecoder("unicode_internal") ab = "ab".encode("unicode_internal").decode() ignored = decoder(bytes("%s\x22\x22\x22\x22%s" % (ab[:4], ab[4:]), "ascii"), "UnicodeInternalTest") self.assertEqual(("ab", 12), ignored)
def test_uninamereplace(self): # We're using the names from the unicode database this time, # and we're doing "syntax highlighting" here, i.e. we include # the replaced text in ANSI escape sequences. For this it is # useful that the error handler is not called for every single # unencodable character, but for a complete sequence of # unencodable characters, otherwise we would output many # unnecessary escape sequences. def uninamereplace(exc): if not isinstance(exc, UnicodeEncodeError): raise TypeError("don't know how to handle %r" % exc) l = [] for c in exc.object[exc.start:exc.end]: l.append(unicodedata.name(c, "0x%x" % ord(c))) return ("\033[1m%s\033[0m" % ", ".join(l), exc.end) codecs.register_error( "test.uninamereplace", uninamereplace) sin = "\xac\u1234\u20ac\u8000" sout = b"\033[1mNOT SIGN, ETHIOPIC SYLLABLE SEE, EURO SIGN, CJK UNIFIED IDEOGRAPH-8000\033[0m" self.assertEqual(sin.encode("ascii", "test.uninamereplace"), sout) sout = b"\xac\033[1mETHIOPIC SYLLABLE SEE, EURO SIGN, CJK UNIFIED IDEOGRAPH-8000\033[0m" self.assertEqual(sin.encode("latin-1", "test.uninamereplace"), sout) sout = b"\xac\033[1mETHIOPIC SYLLABLE SEE\033[0m\xa4\033[1mCJK UNIFIED IDEOGRAPH-8000\033[0m" self.assertEqual(sin.encode("iso-8859-15", "test.uninamereplace"), sout)
def test_decodeunicodeinternal(self): self.assertRaises( UnicodeDecodeError, b"\x00\x00\x00\x00\x00".decode, "unicode-internal", ) if sys.maxunicode > 0xffff: def handler_unicodeinternal(exc): if not isinstance(exc, UnicodeDecodeError): raise TypeError("don't know how to handle %r" % exc) return ("\x01", 1) self.assertEqual( b"\x00\x00\x00\x00\x00".decode("unicode-internal", "ignore"), "\u0000" ) self.assertEqual( b"\x00\x00\x00\x00\x00".decode("unicode-internal", "replace"), "\u0000\ufffd" ) codecs.register_error("test.hui", handler_unicodeinternal) self.assertEqual( b"\x00\x00\x00\x00\x00".decode("unicode-internal", "test.hui"), "\u0000\u0001\u0000" )