我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用ipaddr.IPv4Address()。
def ip_type(self, ip): """ Returns 4 if the given string is a valid IPv4 address and 6 if it's a valid IPv6 address. Returns 0 if neither. """ try: ipaddr.IPv4Address(ip) return 4 except ipaddr.AddressValueError: pass try: ipaddr.IPv6Address(ip) return 6 except ipaddr.AddressValueError: pass return 0 #}}} #{{{ Has private network
def default(self, obj): if isinstance(obj, (ipaddress.IPv4Network, ipaddress.IPv6Network, ipaddress.IPv4Address, ipaddress.IPv6Address)): return str(obj) return json.JSONEncoder.default(self, obj)
def contains(self, ip_address): ip = ipaddr.IPAddress(ip_address) if isinstance(ip, ipaddr.IPv4Address): networks = self.ipv4_networks elif isinstance(ip, ipaddr.IPv6Address): networks = self.ipv6_networks else: raise RuntimeError("Should never happen") for network in networks: if network.Contains(ip): return True return False
def in_private_ip_space(address): ip_address = IPv4Address(address) return any( [ip_address.is_private, ip_address.is_loopback] )
def is_private_address(address, only_loopback=False): """ Checks to see if an IP address is in private IP space and if the hostname is either localhost or *.local. :param address: an IP address of a hostname :param only_loopback: will only check if the IP address is either 127.0.0.1/8 or ::1 in ipv6 :return: True if the IP address or host is in private space """ try: ip_address = IPv4Address(address) except AddressValueError: try: ip_address = IPv6Address(address) except AddressValueError: if address == "localhost": return True elif address.endswith(".local"): return True return False candidates = [ip_address.is_loopback] if not only_loopback: candidates.append(ip_address.is_private) return any(candidates)
def control_plane_arp_handler(self, in_port, vlan, eth_src, arp_pkt): ofmsgs = [] if arp_pkt.opcode == arp.ARP_REQUEST: pkt = self.build_ethernet_pkt( eth_src, in_port, vlan, ether.ETH_TYPE_ARP) arp_pkt = arp.arp( opcode=arp.ARP_REPLY, src_mac=self.FAUCET_MAC, src_ip=arp_pkt.dst_ip, dst_mac=eth_src, dst_ip=arp_pkt.src_ip) pkt.add_protocol(arp_pkt) pkt.serialize() ofmsgs.append(self.valve_packetout(in_port, pkt.data)) self.logger.info('Responded to ARP request for %s from %s', arp_pkt.src_ip, arp_pkt.dst_ip) elif arp_pkt.opcode == arp.ARP_REPLY: resolved_ip_gw = ipaddr.IPv4Address(arp_pkt.src_ip) for ip_dst, ip_gw in vlan.ipv4_routes.iteritems(): if ip_gw == resolved_ip_gw: self.logger.info('ARP response %s for %s', eth_src, resolved_ip_gw) ofmsgs.extend( self.add_resolved_route( ether.ETH_TYPE_IP, vlan, vlan.arp_cache, ip_gw, ip_dst, eth_src)) return ofmsgs
def handle_control_plane(self, in_port, vlan, eth_src, eth_dst, pkt): flowmods = [] if eth_dst == self.FAUCET_MAC or not mac_addr_is_unicast(eth_dst): arp_pkt = pkt.get_protocol(arp.arp) ipv4_pkt = pkt.get_protocol(ipv4.ipv4) ipv6_pkt = pkt.get_protocol(ipv6.ipv6) if arp_pkt is not None: src_ip = ipaddr.IPv4Address(arp_pkt.src_ip) dst_ip = ipaddr.IPv4Address(arp_pkt.dst_ip) if (arp_pkt.opcode == arp.ARP_REQUEST and self.to_faucet_ip(vlan, src_ip, dst_ip)): flowmods.extend(self.control_plane_arp_handler( in_port, vlan, eth_src, arp_pkt)) elif (arp_pkt.opcode == arp.ARP_REPLY and eth_dst == self.FAUCET_MAC): flowmods.extend(self.control_plane_arp_handler( in_port, vlan, eth_src, arp_pkt)) elif ipv4_pkt is not None: icmp_pkt = pkt.get_protocol(icmp.icmp) if icmp_pkt is not None: src_ip = ipaddr.IPv4Address(ipv4_pkt.src) dst_ip = ipaddr.IPv4Address(ipv4_pkt.dst) if self.to_faucet_ip(vlan, src_ip, dst_ip): flowmods.extend(self.control_plane_icmp_handler( in_port, vlan, eth_src, ipv4_pkt, icmp_pkt)) elif ipv6_pkt is not None: icmpv6_pkt = pkt.get_protocol(icmpv6.icmpv6) if icmpv6_pkt is not None: src_ip = ipaddr.IPv6Address(ipv6_pkt.src) dst_ip = ipaddr.IPv6Address(ipv6_pkt.dst) if self.to_faucet_ip(vlan, src_ip, dst_ip): flowmods.extend(self.control_plane_icmpv6_handler( in_port, vlan, eth_src, ipv6_pkt, icmpv6_pkt)) return flowmods
def default(self, obj): if isinstance(obj, ( ipaddress.IPv4Network, ipaddress.IPv6Network, ipaddress.IPv4Address, ipaddress.IPv6Address )): return str(obj) return json.JSONEncoder.default(self, obj)
def add_random_net(ip_ver, target_prefix_len): def dump_vars(): print("target_prefix_len", target_prefix_len) print("prefix_len", prefix_len) print("max_prefix_len", max_prefix_len) print("max_range_len", max_range_len) print("max_range", max_range) print("rand", rand) print("net_id", net_id) print("ip", ip) print("net", net) print("str(net.ip)", str(net.ip)) print("str(net.network)", str(net.network)) if ip_ver == 4: prefix_len = random.randint(8, target_prefix_len-1) max_range_len = prefix_len max_prefix_len = 32 ip_class = ipaddr.IPv4Address net_class = ipaddr.IPv4Network else: prefix_len = random.randint(19, target_prefix_len-1) max_range_len = prefix_len - 1 # to avoid overflow max_prefix_len = 64 ip_class = ipaddr.IPv6Address net_class = ipaddr.IPv6Network max_range = 2**max_range_len - 1 rand = random.randint(0, max_range) net_id = rand << max_prefix_len - prefix_len if ip_ver == 6: net_id = net_id << 64 ip = ip_class(net_id) net = net_class("{}/{}".format(str(ip), prefix_len)) try: assert str(net.ip) == str(net.network) except: dump_vars() raise try: usres_monitor.add_net(net) except USRESMonitorException as e: if "it was already in the db" in str(e): return "dup", net except: dump_vars() return "ok", net