我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用ipaddr.IPv6Address()。
def ip_type(self, ip): """ Returns 4 if the given string is a valid IPv4 address and 6 if it's a valid IPv6 address. Returns 0 if neither. """ try: ipaddr.IPv4Address(ip) return 4 except ipaddr.AddressValueError: pass try: ipaddr.IPv6Address(ip) return 6 except ipaddr.AddressValueError: pass return 0 #}}} #{{{ Has private network
def test_0025_aaaa_record_with_ttl(self): record = AAAARecord.q.filter(AAAARecord.time_to_live != None).first() entry = record.gen_entry entry_expected = u"%s %s IN AAAA %s" % (record.name,\ record.time_to_live, record.address.address) self.assertEqual(entry, entry_expected) rev_entry = record.gen_reverse_entry rev_entry_expected = u"%s.ip6.arpa. %s IN PTR %s" % ( ".".join(["%x" % ord(b) for b in reversed( (ipaddr.IPv6Address(record.address.address)).packed)]), record.time_to_live, record.name) self.assertEqual(rev_entry, rev_entry_expected)
def ipv6Addr_to_bytes(addr): from ipaddr import IPv6Address if not ':' in addr: raise CLI_FormatExploreError() try: ip = IPv6Address(addr) except: raise UIn_BadIPv6Error() try: return [ord(b) for b in ip.packed] except: raise UIn_BadIPv6Error()
def default(self, obj): if isinstance(obj, (ipaddress.IPv4Network, ipaddress.IPv6Network, ipaddress.IPv4Address, ipaddress.IPv6Address)): return str(obj) return json.JSONEncoder.default(self, obj)
def test_0020_aaaa_record_without_ttl(self): record = AAAARecord.q.filter(AAAARecord.time_to_live == None).first() entry = record.gen_entry entry_expected = u"%s IN AAAA %s" % ( record.name, record.address.address) self.assertEqual(entry, entry_expected) rev_entry = record.gen_reverse_entry rev_entry_expected = u"%s.ip6.arpa. IN PTR %s" % ( ".".join(["%x" % ord(b) for b in reversed( (ipaddr.IPv6Address(record.address.address)).packed)]), record.name) self.assertEqual(rev_entry, rev_entry_expected)
def gen_reverse_entry(self): reversed_address = ".".join(["%x" % ord(b) for b in reversed( (ipaddr.IPv6Address(self.address.address)).packed)]) if not self.time_to_live: return u"%s.ip6.arpa. IN PTR %s" % (reversed_address, self.name) else: return u"%s.ip6.arpa. %s IN PTR %s" % ( reversed_address, self.time_to_live, self.name)
def contains(self, ip_address): ip = ipaddr.IPAddress(ip_address) if isinstance(ip, ipaddr.IPv4Address): networks = self.ipv4_networks elif isinstance(ip, ipaddr.IPv6Address): networks = self.ipv6_networks else: raise RuntimeError("Should never happen") for network in networks: if network.Contains(ip): return True return False
def is_private_address(address, only_loopback=False): """ Checks to see if an IP address is in private IP space and if the hostname is either localhost or *.local. :param address: an IP address of a hostname :param only_loopback: will only check if the IP address is either 127.0.0.1/8 or ::1 in ipv6 :return: True if the IP address or host is in private space """ try: ip_address = IPv4Address(address) except AddressValueError: try: ip_address = IPv6Address(address) except AddressValueError: if address == "localhost": return True elif address.endswith(".local"): return True return False candidates = [ip_address.is_loopback] if not only_loopback: candidates.append(ip_address.is_private) return any(candidates)
def handle_control_plane(self, in_port, vlan, eth_src, eth_dst, pkt): flowmods = [] if eth_dst == self.FAUCET_MAC or not mac_addr_is_unicast(eth_dst): arp_pkt = pkt.get_protocol(arp.arp) ipv4_pkt = pkt.get_protocol(ipv4.ipv4) ipv6_pkt = pkt.get_protocol(ipv6.ipv6) if arp_pkt is not None: src_ip = ipaddr.IPv4Address(arp_pkt.src_ip) dst_ip = ipaddr.IPv4Address(arp_pkt.dst_ip) if (arp_pkt.opcode == arp.ARP_REQUEST and self.to_faucet_ip(vlan, src_ip, dst_ip)): flowmods.extend(self.control_plane_arp_handler( in_port, vlan, eth_src, arp_pkt)) elif (arp_pkt.opcode == arp.ARP_REPLY and eth_dst == self.FAUCET_MAC): flowmods.extend(self.control_plane_arp_handler( in_port, vlan, eth_src, arp_pkt)) elif ipv4_pkt is not None: icmp_pkt = pkt.get_protocol(icmp.icmp) if icmp_pkt is not None: src_ip = ipaddr.IPv4Address(ipv4_pkt.src) dst_ip = ipaddr.IPv4Address(ipv4_pkt.dst) if self.to_faucet_ip(vlan, src_ip, dst_ip): flowmods.extend(self.control_plane_icmp_handler( in_port, vlan, eth_src, ipv4_pkt, icmp_pkt)) elif ipv6_pkt is not None: icmpv6_pkt = pkt.get_protocol(icmpv6.icmpv6) if icmpv6_pkt is not None: src_ip = ipaddr.IPv6Address(ipv6_pkt.src) dst_ip = ipaddr.IPv6Address(ipv6_pkt.dst) if self.to_faucet_ip(vlan, src_ip, dst_ip): flowmods.extend(self.control_plane_icmpv6_handler( in_port, vlan, eth_src, ipv6_pkt, icmpv6_pkt)) return flowmods
def ipv6_link_mcast_from_ucast(ucast): link_mcast_prefix = ipaddr.IPv6Network('ff02::1:ff00:0/104') mcast_bytes = ipaddr.Bytes( link_mcast_prefix.packed[:13] + ucast.packed[-3:]) link_mcast = ipaddr.IPv6Address(mcast_bytes) return link_mcast
def default(self, obj): if isinstance(obj, ( ipaddress.IPv4Network, ipaddress.IPv6Network, ipaddress.IPv4Address, ipaddress.IPv6Address )): return str(obj) return json.JSONEncoder.default(self, obj)
def add_random_net(ip_ver, target_prefix_len): def dump_vars(): print("target_prefix_len", target_prefix_len) print("prefix_len", prefix_len) print("max_prefix_len", max_prefix_len) print("max_range_len", max_range_len) print("max_range", max_range) print("rand", rand) print("net_id", net_id) print("ip", ip) print("net", net) print("str(net.ip)", str(net.ip)) print("str(net.network)", str(net.network)) if ip_ver == 4: prefix_len = random.randint(8, target_prefix_len-1) max_range_len = prefix_len max_prefix_len = 32 ip_class = ipaddr.IPv4Address net_class = ipaddr.IPv4Network else: prefix_len = random.randint(19, target_prefix_len-1) max_range_len = prefix_len - 1 # to avoid overflow max_prefix_len = 64 ip_class = ipaddr.IPv6Address net_class = ipaddr.IPv6Network max_range = 2**max_range_len - 1 rand = random.randint(0, max_range) net_id = rand << max_prefix_len - prefix_len if ip_ver == 6: net_id = net_id << 64 ip = ip_class(net_id) net = net_class("{}/{}".format(str(ip), prefix_len)) try: assert str(net.ip) == str(net.network) except: dump_vars() raise try: usres_monitor.add_net(net) except USRESMonitorException as e: if "it was already in the db" in str(e): return "dup", net except: dump_vars() return "ok", net
def control_plane_icmpv6_handler(self, in_port, vlan, eth_src, ipv6_pkt, icmpv6_pkt): flowmods = [] pkt = self.build_ethernet_pkt( eth_src, in_port, vlan, ether.ETH_TYPE_IPV6) if icmpv6_pkt.type_ == icmpv6.ND_NEIGHBOR_SOLICIT: dst = icmpv6_pkt.data.dst ipv6_reply = ipv6.ipv6( src=dst, dst=ipv6_pkt.src, nxt=inet.IPPROTO_ICMPV6, hop_limit=ipv6_pkt.hop_limit) pkt.add_protocol(ipv6_reply) icmpv6_reply = icmpv6.icmpv6( type_=icmpv6.ND_NEIGHBOR_ADVERT, data=icmpv6.nd_neighbor( dst=dst, option=icmpv6.nd_option_tla(hw_src=self.FAUCET_MAC), res=7)) pkt.add_protocol(icmpv6_reply) pkt.serialize() flowmods.extend([self.valve_packetout(in_port, pkt.data)]) elif icmpv6_pkt.type_ == icmpv6.ND_NEIGHBOR_ADVERT: resolved_ip_gw = ipaddr.IPv6Address(icmpv6_pkt.data.dst) for ip_dst, ip_gw in vlan.ipv6_routes.iteritems(): if ip_gw == resolved_ip_gw: self.logger.info('ND response %s for %s', eth_src, resolved_ip_gw) flowmods.extend( self.add_resolved_route( ether.ETH_TYPE_IPV6, vlan, vlan.nd_cache, ip_gw, ip_dst, eth_src)) elif icmpv6_pkt.type_ == icmpv6.ICMPV6_ECHO_REQUEST: dst = ipv6_pkt.dst ipv6_reply = ipv6.ipv6( src=dst, dst=ipv6_pkt.src, nxt=inet.IPPROTO_ICMPV6, hop_limit=ipv6_pkt.hop_limit) pkt.add_protocol(ipv6_reply) icmpv6_reply = icmpv6.icmpv6( type_=icmpv6.ICMPV6_ECHO_REPLY, data=icmpv6.echo( id_=icmpv6_pkt.data.id, seq=icmpv6_pkt.data.seq, data=icmpv6_pkt.data.data)) pkt.add_protocol(icmpv6_reply) pkt.serialize() flowmods.extend([self.valve_packetout(in_port, pkt.data)]) return flowmods