我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.AF_INET6。
def __init__(self, node, platform='', cpus=0, memory=0, disk=0): if node.find('*') < 0: try: info = socket.getaddrinfo(node, None)[0] ip_addr = info[4][0] if info[0] == socket.AF_INET6: ip_addr = re.sub(r'^0*', '', ip_addr) ip_addr = re.sub(r':0*', ':', ip_addr) ip_addr = re.sub(r'::+', '::', ip_addr) node = ip_addr except: node = '' if node: self.ip_rex = node.replace('.', '\\.').replace('*', '.*') else: logger.warning('node "%s" is invalid', node) self.ip_rex = '' self.platform = platform.lower() self.cpus = cpus self.memory = memory self.disk = disk
def __init__(self, address): self.log = logging.getLogger(self.server_logger) self.socket = None if ":" in address[0]: self.address_family = socket.AF_INET6 else: self.address_family = socket.AF_INET self.log.debug("Listening on %s", address) super(_SpoonMixIn, self).__init__(address, self.handler_klass, bind_and_activate=False) self.load_config() self._setup_socket() # Finally, set signals if self.signal_reload is not None: signal.signal(self.signal_reload, self.reload_handler) if self.signal_shutdown is not None: signal.signal(self.signal_shutdown, self.shutdown_handler)
def api_has_bwctl(host, timeout=5, bind=None): """ Determine if a host is running the BWCTL daemon """ # Null implies localhost if host is None: host = "localhost" # HACK: BWTCLBC # If the environment says to bind to a certain address, do it. if bind is None: bind = os.environ.get('PSCHEDULER_LEAD_BIND_HACK', None) for family in [socket.AF_INET, socket.AF_INET6]: try: with closing(socket.socket(family, socket.SOCK_STREAM)) as sock: if bind is not None: sock.bind((bind, 0)) sock.settimeout(timeout) return sock.connect_ex((host, 4823)) == 0 except socket.error: pass return False
def source_interface(address, port=80, ip_version=None): """Figure out what local interface is being used to reach an address. Returns a tuple of (address, interface_name) """ sock = socket.socket( socket.AF_INET6 if ip_version == 6 else socket.AF_INET, socket.SOCK_DGRAM) try: sock.connect((address, port)) except socket.error: return (None, None) interface_address = sock.getsockname()[0] sock.close() interface_name = address_interface(interface_address, ip_version=ip_version) if interface_name: return (interface_address, interface_name) return (None, None)
def inet_pton(address_family, ip_string): if address_family == socket.AF_INET: return socket.inet_aton(ip_string) addr = sockaddr() addr.sa_family = address_family addr_size = ctypes.c_int(ctypes.sizeof(addr)) if WSAStringToAddressA( ip_string, address_family, None, ctypes.byref(addr), ctypes.byref(addr_size) ) != 0: raise socket.error(ctypes.FormatError()) if address_family == socket.AF_INET6: return ctypes.string_at(addr.ipv6_addr, 16) raise socket.error('unknown address family')
def select_ip_version(host, port): """Returns AF_INET4 or AF_INET6 depending on where to connect to.""" # disabled due to problems with current ipv6 implementations # and various operating systems. Probably this code also is # not supposed to work, but I can't come up with any other # ways to implement this. # try: # info = socket.getaddrinfo(host, port, socket.AF_UNSPEC, # socket.SOCK_STREAM, 0, # socket.AI_PASSIVE) # if info: # return info[0][0] # except socket.gaierror: # pass if ':' in host and hasattr(socket, 'AF_INET6'): return socket.AF_INET6 return socket.AF_INET
def _socket_bind_addr(self, sock, af): bind_addr = '' if self._bind and af == socket.AF_INET: bind_addr = self._bind elif self._bindv6 and af == socket.AF_INET6: bind_addr = self._bindv6 else: bind_addr = self._accept_address[0] bind_addr = bind_addr.replace("::ffff:", "") if bind_addr in self._ignore_bind_list: bind_addr = None if bind_addr: local_addrs = socket.getaddrinfo(bind_addr, 0, 0, socket.SOCK_STREAM, socket.SOL_TCP) if local_addrs[0][0] == af: logging.debug("bind %s" % (bind_addr,)) try: sock.bind((bind_addr, 0)) except Exception as e: logging.warn("bind %s fail" % (bind_addr,))
def _socket_bind_addr(self, sock, af): bind_addr = '' if self._bind and af == socket.AF_INET: bind_addr = self._bind elif self._bindv6 and af == socket.AF_INET6: bind_addr = self._bindv6 bind_addr = bind_addr.replace("::ffff:", "") if bind_addr in self._ignore_bind_list: bind_addr = None if bind_addr: local_addrs = socket.getaddrinfo(bind_addr, 0, 0, socket.SOCK_DGRAM, socket.SOL_UDP) if local_addrs[0][0] == af: logging.debug("bind %s" % (bind_addr,)) try: sock.bind((bind_addr, 0)) except Exception as e: logging.warn("bind %s fail" % (bind_addr,))
def test_ipv6(self): try: [sock] = bind_sockets(None, '::1', family=socket.AF_INET6) port = sock.getsockname()[1] self.http_server.add_socket(sock) except socket.gaierror as e: if e.args[0] == socket.EAI_ADDRFAMILY: # python supports ipv6, but it's not configured on the network # interface, so skip this test. return raise url = '%s://[::1]:%d/hello' % (self.get_protocol(), port) # ipv6 is currently enabled by default but can be disabled self.http_client.fetch(url, self.stop, allow_ipv6=False) response = self.wait() self.assertEqual(response.code, 599) self.http_client.fetch(url, self.stop) response = self.wait() self.assertEqual(response.body, b"Hello world!")
def __init__(self, stream, address, protocol): self.address = address # Save the socket's address family now so we know how to # interpret self.address even after the stream is closed # and its socket attribute replaced with None. if stream.socket is not None: self.address_family = stream.socket.family else: self.address_family = None # In HTTPServerRequest we want an IP, not a full socket address. if (self.address_family in (socket.AF_INET, socket.AF_INET6) and address is not None): self.remote_ip = address[0] else: # Unix (or other) socket; fake the remote address. self.remote_ip = '0.0.0.0' if protocol: self.protocol = protocol elif isinstance(stream, iostream.SSLIOStream): self.protocol = "https" else: self.protocol = "http" self._orig_remote_ip = self.remote_ip self._orig_protocol = self.protocol
def postprocess(self, name, val): if name == 'GetSettings': if 'ssid' in val.get('802-11-wireless', {}): val['802-11-wireless']['ssid'] = fixups.ssid_to_python(val['802-11-wireless']['ssid']) for key in val: val_ = val[key] if 'mac-address' in val_: val_['mac-address'] = fixups.mac_to_python(val_['mac-address']) if 'cloned-mac-address' in val_: val_['cloned-mac-address'] = fixups.mac_to_python(val_['cloned-mac-address']) if 'bssid' in val_: val_['bssid'] = fixups.mac_to_python(val_['bssid']) if 'ipv4' in val: val['ipv4']['addresses'] = [fixups.addrconf_to_python(addr, socket.AF_INET) for addr in val['ipv4']['addresses']] val['ipv4']['routes'] = [fixups.route_to_python(route, socket.AF_INET) for route in val['ipv4']['routes']] val['ipv4']['dns'] = [fixups.addr_to_python(addr, socket.AF_INET) for addr in val['ipv4']['dns']] if 'ipv6' in val: val['ipv6']['addresses'] = [fixups.addrconf_to_python(addr, socket.AF_INET6) for addr in val['ipv6']['addresses']] val['ipv6']['routes'] = [fixups.route_to_python(route, socket.AF_INET6) for route in val['ipv6']['routes']] val['ipv6']['dns'] = [fixups.addr_to_python(addr, socket.AF_INET6) for addr in val['ipv6']['dns']] return val
def request(self, check_config): domain = check_config.get('domain') if check_config.get('ip_version') == 6: af = socket.AF_INET6 elif check_config.get('ip_version') == 4: af = socket.AF_INET else: af = 0 try: result = socket.getaddrinfo(domain, 0, af) except Exception as e: return Plugin.STATUS_CRIT, '{}'.format(e) ips = [ip[4][0] for ip in result] return ( Plugin.STATUS_OK, 'Domain was resolved with {}'.format(', '.join(ips)) )
def neighsol(addr, src, iface, timeout=1, chainCC=0): """ Sends an ICMPv6 Neighbor Solicitation message to get the MAC address of the neighbor with specified IPv6 address addr. 'src' address is used as source of the message. Message is sent on iface. By default, timeout waiting for an answer is 1 second. If no answer is gathered, None is returned. Else, the answer is returned (ethernet frame). """ nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr)) d = inet_ntop(socket.AF_INET6, nsma) dm = in6_getnsmac(nsma) p = Ether(dst=dm)/IPv6(dst=d, src=src, hlim=255) p /= ICMPv6ND_NS(tgt=addr) p /= ICMPv6NDOptSrcLLAddr(lladdr=get_if_hwaddr(iface)) res = srp1(p,type=ETH_P_IPV6, iface=iface, timeout=1, verbose=0, chainCC=chainCC) return res
def getfield(self, pkt, s): c = l = None if self.length_from is not None: l = self.length_from(pkt) elif self.count_from is not None: c = self.count_from(pkt) lst = [] ret = "" remain = s if l is not None: remain,ret = s[:l],s[l:] while remain: if c is not None: if c <= 0: break c -= 1 addr = inet_ntop(socket.AF_INET6, remain[:16]) lst.append(addr) remain = remain[16:] return remain+ret,lst
def i2m(self, pkt, x): l = pkt.len if x is None: x = "::" if l is None: l = 1 x = inet_pton(socket.AF_INET6, x) if l is None: return x if l in [0, 1]: return "" if l in [2, 3]: return x[:8*(l-1)] return x + '\x00'*8*(l-3)
def h2i(self, pkt, x): if x is tuple and type(x[0]) is int: return x val = None try: # Try IPv6 inet_pton(socket.AF_INET6, x) val = (0, x) except: try: # Try IPv4 inet_pton(socket.AF_INET, x) val = (2, x) except: # Try DNS if x is None: x = "" x = names2dnsrepr(x) val = (1, x) return val
def _resolve_one(self, ip): """ overloaded version to provide a Whois resolution on the embedded IPv4 address if the address is 6to4 or Teredo. Otherwise, the native IPv6 address is passed. """ if in6_isaddr6to4(ip): # for 6to4, use embedded @ tmp = inet_pton(socket.AF_INET6, ip) addr = inet_ntop(socket.AF_INET, tmp[2:6]) elif in6_isaddrTeredo(ip): # for Teredo, use mapped address addr = teredoAddrExtractInfo(ip)[2] else: addr = ip _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr) return ip,asn,desc
def ifchange(self, iff, addr): the_addr, the_plen = (addr.split("/")+["128"])[:2] the_plen = int(the_plen) naddr = inet_pton(socket.AF_INET6, the_addr) nmask = in6_cidr2mask(the_plen) the_net = inet_ntop(socket.AF_INET6, in6_and(nmask,naddr)) for i in range(len(self.routes)): net,plen,gw,iface,addr = self.routes[i] if iface != iff: continue if gw == '::': self.routes[i] = (the_net,the_plen,gw,iface,the_addr) else: self.routes[i] = (net,the_plen,gw,iface,the_addr) self.invalidate_cache() ip6_neigh_cache.flush()
def ifadd(self, iff, addr): """ Add an interface 'iff' with provided address into routing table. Ex: ifadd('eth0', '2001:bd8:cafe:1::1/64') will add following entry into Scapy6 internal routing table: Destination Next Hop iface Def src @ 2001:bd8:cafe:1::/64 :: eth0 2001:bd8:cafe:1::1 prefix length value can be omitted. In that case, a value of 128 will be used. """ addr, plen = (addr.split("/")+["128"])[:2] addr = in6_ptop(addr) plen = int(plen) naddr = inet_pton(socket.AF_INET6, addr) nmask = in6_cidr2mask(plen) prefix = inet_ntop(socket.AF_INET6, in6_and(nmask,naddr)) self.invalidate_cache() self.routes.append((prefix,plen,'::',iff,[addr]))
def inet_pton(family, text): """Convert the textual form of a network address into its binary form. @param family: the address family @type family: int @param text: the textual address @type text: string @raises NotImplementedError: the address family specified is not implemented. @rtype: string """ if family == AF_INET: return dns.ipv4.inet_aton(text) elif family == AF_INET6: return dns.ipv6.inet_aton(text) else: raise NotImplementedError
def inet_ntop(family, address): """Convert the binary form of a network address into its textual form. @param family: the address family @type family: int @param address: the binary address @type address: string @raises NotImplementedError: the address family specified is not implemented. @rtype: string """ if family == AF_INET: return dns.ipv4.inet_ntoa(address) elif family == AF_INET6: return dns.ipv6.inet_ntoa(address) else: raise NotImplementedError
def af_for_address(text): """Determine the address family of a textual-form network address. @param text: the textual address @type text: string @raises ValueError: the address family cannot be determined from the input. @rtype: int """ try: dns.ipv4.inet_aton(text) return AF_INET except: try: dns.ipv6.inet_aton(text) return AF_INET6 except: raise ValueError
def _gethostbyaddr(ip): try: dns.ipv6.inet_aton(ip) sockaddr = (ip, 80, 0, 0) family = socket.AF_INET6 except: sockaddr = (ip, 80) family = socket.AF_INET (name, port) = _getnameinfo(sockaddr, socket.NI_NAMEREQD) aliases = [] addresses = [] tuples = _getaddrinfo(name, 0, family, socket.SOCK_STREAM, socket.SOL_TCP, socket.AI_CANONNAME) canonical = tuples[0][3] for item in tuples: addresses.append(item[4][0]) # XXX we just ignore aliases return (canonical, aliases, addresses)
def testNToP(self): from twisted.python.compat import inet_ntop f = lambda a: inet_ntop(socket.AF_INET6, a) g = lambda a: inet_ntop(socket.AF_INET, a) self.assertEquals('::', f('\x00' * 16)) self.assertEquals('::1', f('\x00' * 15 + '\x01')) self.assertEquals( 'aef:b01:506:1001:ffff:9997:55:170', f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')) self.assertEquals('1.0.1.0', g('\x01\x00\x01\x00')) self.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55')) self.assertEquals('255.255.255.255', g('\xff\xff\xff\xff')) self.assertEquals('100::', f('\x01' + '\x00' * 15)) self.assertEquals('100::1', f('\x01' + '\x00' * 14 + '\x01'))
def m2i(self, pkt, s): family = None if pkt.type == 1: # A family = socket.AF_INET elif pkt.type == 12: # PTR s = DNSgetstr(s, 0)[0] elif pkt.type == 16: # TXT ret_s = "" tmp_s = s # RDATA contains a list of strings, each are prepended with # a byte containing the size of the following string. while tmp_s: tmp_len = struct.unpack("!B", tmp_s[0])[0] + 1 if tmp_len > len(tmp_s): warning("DNS RR TXT prematured end of character-string (size=%i, remaining bytes=%i)" % (tmp_len, len(tmp_s))) ret_s += tmp_s[1:tmp_len] tmp_s = tmp_s[tmp_len:] s = ret_s elif pkt.type == 28: # AAAA family = socket.AF_INET6 if family is not None: s = inet_ntop(family, s) return s
def i2m(self, pkt, s): if pkt.type == 1: # A if s: s = inet_aton(s) elif pkt.type in [2,3,4,5]: # NS, MD, MF, CNAME s = "".join(map(lambda x: chr(len(x))+x, s.split("."))) if ord(s[-1]): s += "\x00" elif pkt.type == 16: # TXT if s: ret_s = "" # The initial string must be splitted into a list of strings # prepended with theirs sizes. while len(s) >= 255: ret_s += "\xff" + s[:255] s = s[255:] # The remaining string is less than 255 bytes long if len(s): ret_s += struct.pack("!B", len(s)) + s s = ret_s elif pkt.type == 28: # AAAA if s: s = inet_pton(socket.AF_INET6, s) return s
def hashret(self): # we filter on peer address field return inet_pton(socket.AF_INET6, self.peeraddr) ##################################################################### # sent between Relay Agents and Servers # Normalement, doit inclure une option "Relay Message Option" # peut en inclure d'autres. # Les valeurs des champs hop-count, link-addr et peer-addr # sont copiees du messsage Forward associe. POur le suivi de session. # Pour le moment, comme decrit dans le commentaire, le hashret # se limite au contenu du champ peer address. # Voir section 7.2 de la 3315. # Relay-Reply Message # - sent by servers to relay agents # - if the solicit message was received in a Relay-Forward message, # the server constructs a relay-reply message with the Advertise # message in the payload of a relay-message. cf page 37/101. Envoie de # ce message en unicast au relay-agent. utilisation de l'adresse ip # presente en ip source du paquet recu
def af_for_address(text): """Determine the address family of a textual-form network address. @param text: the textual address @type text: string @raises ValueError: the address family cannot be determined from the input. @rtype: int """ try: junk = dns.ipv4.inet_aton(text) return AF_INET except: try: junk = dns.ipv6.inet_aton(text) return AF_INET6 except: raise ValueError
def _gethostbyaddr(ip): try: addr = dns.ipv6.inet_aton(ip) sockaddr = (ip, 80, 0, 0) family = socket.AF_INET6 except: sockaddr = (ip, 80) family = socket.AF_INET (name, port) = _getnameinfo(sockaddr, socket.NI_NAMEREQD) aliases = [] addresses = [] tuples = _getaddrinfo(name, 0, family, socket.SOCK_STREAM, socket.SOL_TCP, socket.AI_CANONNAME) canonical = tuples[0][3] for item in tuples: addresses.append(item[4][0]) # XXX we just ignore aliases return (canonical, aliases, addresses)
def _read_socks5_address(self): atyp = yield self.read_bytes(1) if atyp == b"\x01": data = yield self.read_bytes(4) addr = socket.inet_ntoa(data) elif atyp == b"\x03": length = yield self.read_bytes(1) addr = yield self.read_bytes(length) elif atyp == b"\x04": data = yield self.read_bytes(16) addr = socket.inet_ntop(socket.AF_INET6, data) else: raise GeneralProxyError("SOCKS5 proxy server sent invalid data") data = yield self.read_bytes(2) port = struct.unpack(">H", data)[0] raise gen.Return((addr, port))
def test_sock_connect_address(self): # In debug mode, sock_connect() must ensure that the address is already # resolved (call _check_resolved_address()) self.loop.set_debug(True) addresses = [(socket.AF_INET, ('www.python.org', 80))] if support.IPV6_ENABLED: addresses.extend(( (socket.AF_INET6, ('www.python.org', 80)), (socket.AF_INET6, ('www.python.org', 80, 0, 0)), )) for family, address in addresses: for sock_type in (socket.SOCK_STREAM, socket.SOCK_DGRAM): sock = socket.socket(family, sock_type) with sock: sock.setblocking(False) connect = self.loop.sock_connect(sock, address) with self.assertRaises(ValueError) as cm: self.loop.run_until_complete(connect) self.assertIn('address must be resolved', str(cm.exception))
def makesock(self): if self.config["ipv6"]: family = socket.AF_INET6 else: family = socket.AF_INET if self.proxy is not None: proxy_serv, proxy_port = self.proxy.split(":") self.sock = socks.socket(family, socket.SOCK_STREAM) self.sock.setblocking(0) self.sock.setproxy(socks.PROXY_TYPE_SOCKS5, proxy_serv, proxy_port) else: self.sock = socket.socket(family, socket.SOCK_STREAM) self.sock.settimeout(self.timeout) if self.vhost is not None: self.sock.bind((self.vhost, 0)) if self.config["ssl"]: self.sock = ssl.wrap_socket(self.sock)