我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用encodings.idna()。
def test_nameprep(self): from encodings.idna import nameprep for pos, (orig, prepped) in enumerate(nameprep_tests): if orig is None: # Skipped continue # The Unicode strings are given in UTF-8 orig = str(orig, "utf-8", "surrogatepass") if prepped is None: # Input contains prohibited characters self.assertRaises(UnicodeError, nameprep, orig) else: prepped = str(prepped, "utf-8", "surrogatepass") try: self.assertEqual(nameprep(orig), prepped) except Exception as e: raise support.TestFailed("Test 3.%d: %s" % (pos+1, str(e)))
def __call__(self, value): if not(isinstance(value, (basestring, unicodeT))) or not value or '@' not in value: return (value, translate(self.error_message)) body, domain = value.rsplit('@', 1) try: match_body = self.body_regex.match(body) match_domain = self.domain_regex.match(domain) if not match_domain: # check for Internationalized Domain Names # see https://docs.python.org/2/library/codecs.html#module-encodings.idna domain_encoded = to_unicode(domain).encode('idna').decode('ascii') match_domain = self.domain_regex.match(domain_encoded) match = (match_body is not None) and (match_domain is not None) except (TypeError, UnicodeError): # Value may not be a string where we can look for matches. # Example: we're calling ANY_OF formatter and IS_EMAIL is asked to validate a date. match = None if match: if (not self.banned or not self.banned.match(domain)) \ and (not self.forced or self.forced.match(domain)): return (value, None) return (value, translate(self.error_message))
def _write_SOCKS5_address(self, addr, file): """ Return the host and port packed for the SOCKS5 protocol, and the resolved address as a tuple object. """ host, port = addr proxy_type, _, _, rdns, username, password = self.proxy if ":" in host: addr_bytes = socket.inet_pton(socket.AF_INET6, host) file.write(b"\x04" + addr_bytes) elif check_ip_valid(host): addr_bytes = socket.inet_pton(socket.AF_INET, host) file.write(b"\x01" + addr_bytes) else: if rdns: # Resolve remotely host_bytes = host.encode('idna') file.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes) else: # Resolve locally addr_bytes = socket.inet_aton(socket.gethostbyname(host)) file.write(b"\x01" + addr_bytes) host = socket.inet_ntoa(addr_bytes) file.write(struct.pack(">H", port)) return host, port
def test_builtin_decode(self): self.assertEqual(str(b"python.org", "idna"), "python.org") self.assertEqual(str(b"python.org.", "idna"), "python.org.") self.assertEqual(str(b"xn--pythn-mua.org", "idna"), "pyth\xf6n.org") self.assertEqual(str(b"xn--pythn-mua.org.", "idna"), "pyth\xf6n.org.")
def test_builtin_encode(self): self.assertEqual("python.org".encode("idna"), b"python.org") self.assertEqual("python.org.".encode("idna"), b"python.org.") self.assertEqual("pyth\xf6n.org".encode("idna"), b"xn--pythn-mua.org") self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.")
def test_stream(self): r = codecs.getreader("idna")(io.BytesIO(b"abc")) r.read(3) self.assertEqual(r.read(), "")
def test_incremental_decode(self): self.assertEqual( "".join(codecs.iterdecode((bytes([c]) for c in b"python.org"), "idna")), "python.org" ) self.assertEqual( "".join(codecs.iterdecode((bytes([c]) for c in b"python.org."), "idna")), "python.org." ) self.assertEqual( "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")), "pyth\xf6n.org." ) self.assertEqual( "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")), "pyth\xf6n.org." ) decoder = codecs.getincrementaldecoder("idna")() self.assertEqual(decoder.decode(b"xn--xam", ), "") self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.") self.assertEqual(decoder.decode(b"rg"), "") self.assertEqual(decoder.decode(b"", True), "org") decoder.reset() self.assertEqual(decoder.decode(b"xn--xam", ), "") self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.") self.assertEqual(decoder.decode(b"rg."), "org.") self.assertEqual(decoder.decode(b"", True), "")
def test_errors(self): """Only supports "strict" error handler""" "python.org".encode("idna", "strict") b"python.org".decode("idna", "strict") for errors in ("ignore", "replace", "backslashreplace", "surrogateescape"): self.assertRaises(Exception, "python.org".encode, "idna", errors) self.assertRaises(Exception, b"python.org".decode, "idna", errors)
def test_basics_capi(self): from _testcapi import codec_incrementalencoder, codec_incrementaldecoder s = "abc123" # all codecs should be able to encode these for encoding in all_unicode_encodings: if encoding not in broken_unicode_with_stateful: # check incremental decoder/encoder (fetched via the C API) try: cencoder = codec_incrementalencoder(encoding) except LookupError: # no IncrementalEncoder pass else: # check C API encodedresult = b"" for c in s: encodedresult += cencoder.encode(c) encodedresult += cencoder.encode("", True) cdecoder = codec_incrementaldecoder(encoding) decodedresult = "" for c in encodedresult: decodedresult += cdecoder.decode(bytes([c])) decodedresult += cdecoder.decode(b"", True) self.assertEqual(decodedresult, s, "encoding=%r" % encoding) if encoding not in ("idna", "mbcs"): # check incremental decoder/encoder with errors argument try: cencoder = codec_incrementalencoder(encoding, "ignore") except LookupError: # no IncrementalEncoder pass else: encodedresult = b"".join(cencoder.encode(c) for c in s) cdecoder = codec_incrementaldecoder(encoding, "ignore") decodedresult = "".join(cdecoder.decode(bytes([c])) for c in encodedresult) self.assertEqual(decodedresult, s, "encoding=%r" % encoding)
def test_seek(self): # all codecs should be able to encode these s = "%s\n%s\n" % (100*"abc123", 100*"def456") for encoding in all_unicode_encodings: if encoding == "idna": # FIXME: See SF bug #1163178 continue if encoding in broken_unicode_with_stateful: continue reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding))) for t in range(5): # Test that calling seek resets the internal codec state and buffers reader.seek(0, 0) data = reader.read() self.assertEqual(s, data)
def test_bad_decode_args(self): for encoding in all_unicode_encodings: decoder = codecs.getdecoder(encoding) self.assertRaises(TypeError, decoder) if encoding not in ("idna", "punycode"): self.assertRaises(TypeError, decoder, 42)
def test_basics_capi(self): from _testcapi import codec_incrementalencoder, codec_incrementaldecoder s = "abc123" # all codecs should be able to encode these for encoding in all_unicode_encodings: if encoding not in broken_incremental_coders: # check incremental decoder/encoder (fetched via the C API) try: cencoder = codec_incrementalencoder(encoding) except LookupError: # no IncrementalEncoder pass else: # check C API encodedresult = b"" for c in s: encodedresult += cencoder.encode(c) encodedresult += cencoder.encode("", True) cdecoder = codec_incrementaldecoder(encoding) decodedresult = "" for c in encodedresult: decodedresult += cdecoder.decode(bytes([c])) decodedresult += cdecoder.decode(b"", True) self.assertEqual(decodedresult, s, "encoding=%r" % encoding) if encoding not in ("idna", "mbcs"): # check incremental decoder/encoder with errors argument try: cencoder = codec_incrementalencoder(encoding, "ignore") except LookupError: # no IncrementalEncoder pass else: encodedresult = b"".join(cencoder.encode(c) for c in s) cdecoder = codec_incrementaldecoder(encoding, "ignore") decodedresult = "".join(cdecoder.decode(bytes([c])) for c in encodedresult) self.assertEqual(decodedresult, s, "encoding=%r" % encoding)
def test_seek(self): # all codecs should be able to encode these s = "%s\n%s\n" % (100*"abc123", 100*"def456") for encoding in all_unicode_encodings: if encoding == "idna": # FIXME: See SF bug #1163178 continue if encoding in broken_unicode_with_streams: continue reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding))) for t in range(5): # Test that calling seek resets the internal codec state and buffers reader.seek(0, 0) data = reader.read() self.assertEqual(s, data)
def unicode_to_ascii_authority(authority): """ Follows the steps in RFC 3490, Section 4 to convert a unicode authority string into its ASCII equivalent. For example, u'www.Alliancefran\xe7aise.nu' will be converted into 'www.xn--alliancefranaise-npb.nu' Args: authority: unicode string, the URL authority component to convert, e.g. u'www.Alliancefran\xe7aise.nu' Returns: string: the US-ASCII character equivalent to the inputed authority, e.g. 'www.xn--alliancefranaise-npb.nu' Raises: Exception: if the function is not able to convert the inputed authority @author: Jonathan Benn """ # RFC 3490, Section 4, Step 1 # The encodings.idna Python module assumes that AllowUnassigned == True # RFC 3490, Section 4, Step 2 labels = label_split_regex.split(authority) # RFC 3490, Section 4, Step 3 # The encodings.idna Python module assumes that UseSTD3ASCIIRules == False # RFC 3490, Section 4, Step 4 # We use the ToASCII operation because we are about to put the authority # into an IDN-unaware slot asciiLabels = [] import encodings.idna for label in labels: if label: asciiLabels.append(to_native(encodings.idna.ToASCII(label))) else: # encodings.idna.ToASCII does not accept an empty string, but # it is necessary for us to allow for empty labels so that we # don't modify the URL asciiLabels.append('') # RFC 3490, Section 4, Step 5 return str(reduce(lambda x, y: x + unichr(0x002E) + y, asciiLabels))
def _negotiate_SOCKS4(self, dest_addr, dest_port): """ Negotiates a connection through a SOCKS4 server. """ proxy_type, addr, port, rdns, username, password = self.proxy writer = self.makefile("wb") reader = self.makefile("rb", 0) # buffering=0 renamed in Python 3 try: # Check if the destination address provided is an IP address remote_resolve = False try: addr_bytes = socket.inet_aton(dest_addr) except socket.error: # It's a DNS name. Check where it should be resolved. if rdns: addr_bytes = b"\x00\x00\x00\x01" remote_resolve = True else: addr_bytes = socket.inet_aton(socket.gethostbyname(dest_addr)) # Construct the request packet writer.write(struct.pack(">BBH", 0x04, 0x01, dest_port)) writer.write(addr_bytes) # The username parameter is considered userid for SOCKS4 if username: writer.write(username) writer.write(b"\x00") # DNS name if remote resolving is required # NOTE: This is actually an extension to the SOCKS4 protocol # called SOCKS4A and may not be supported in all cases. if remote_resolve: writer.write(dest_addr.encode('idna') + b"\x00") writer.flush() # Get the response from the server resp = self._readall(reader, 8) if resp[0:1] != b"\x00": # Bad data raise GeneralProxyError("SOCKS4 proxy server sent invalid data") status = ord(resp[1:2]) if status != 0x5A: # Connection failed: server returned an error error = SOCKS4_ERRORS.get(status, "Unknown error") raise SOCKS4Error("{0:#04x}: {1}".format(status, error)) # Get the bound address/port self.proxy_sockname = (socket.inet_ntoa(resp[4:]), struct.unpack(">H", resp[2:4])[0]) if remote_resolve: self.proxy_peername = socket.inet_ntoa(addr_bytes), dest_port else: self.proxy_peername = dest_addr, dest_port finally: reader.close() writer.close()
def _negotiate_HTTP(self, dest_addr, dest_port): """ Negotiates a connection through an HTTP server. NOTE: This currently only supports HTTP CONNECT-style proxies. """ proxy_type, addr, port, rdns, username, password = self.proxy # If we need to resolve locally, we do this now addr = dest_addr if rdns else socket.gethostbyname(dest_addr) self.sendall(b"CONNECT " + addr.encode('idna') + b":" + str(dest_port).encode() + b" HTTP/1.1\r\n" + b"Host: " + dest_addr.encode('idna') + b"\r\n\r\n") # We just need the first line to check if the connection was successful fobj = self.makefile() status_line = fobj.readline() fobj.close() if not status_line: raise GeneralProxyError("Connection closed unexpectedly") try: proto, status_code, status_msg = status_line.split(" ", 2) except ValueError: raise GeneralProxyError("HTTP proxy server sent invalid response") if not proto.startswith("HTTP/"): raise GeneralProxyError("Proxy server does not appear to be an HTTP proxy") try: status_code = int(status_code) except ValueError: raise HTTPError("HTTP proxy server did not return a valid HTTP status") if status_code != 200: error = "{0}: {1}".format(status_code, status_msg) if status_code in (400, 403, 405): # It's likely that the HTTP proxy server does not support the CONNECT tunneling method error += ("\n[*] Note: The HTTP proxy server may not be supported by PySocks" " (must be a CONNECT tunnel proxy)") raise HTTPError(error) self.proxy_sockname = (b"0.0.0.0", 0) self.proxy_peername = addr, dest_port
def valid_url(self, value): match = self.URL_REGEX.match(value) if not match: return False url = match.groupdict() if url['scheme'].lower() not in self.schemes: return False if url['host6']: if IPv6Type.valid_ip(url['host6']): return url else: return False if url['host4']: return url try: hostname = url['hostn'].encode('ascii').decode('ascii') except UnicodeError: try: hostname = url['hostn'].encode('idna').decode('ascii') except UnicodeError: return False if hostname[-1] == '.': hostname = hostname[:-1] if len(hostname) > 253: return False labels = hostname.split('.') for label in labels: if not 0 < len(label) < 64: return False if '-' in (label[0], label[-1]): return False if self.fqdn: if len(labels) == 1 \ or not self.TLD_REGEX.match(labels[-1]): return False url['hostn_enc'] = hostname return url