Python ipaddr 模块,IPv4Address() 实例源码

我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用ipaddr.IPv4Address()

项目:docker-db    作者:EXASOL    | 项目源码 | 文件源码
def ip_type(self, ip):
        """
        Returns 4 if the given string is a valid IPv4 address and 6 if it's
        a valid IPv6 address. Returns 0 if neither.
        """
        try:
            ipaddr.IPv4Address(ip)
            return 4
        except ipaddr.AddressValueError: pass
        try:
            ipaddr.IPv6Address(ip)
            return 6
        except ipaddr.AddressValueError: pass
        return 0
#}}}            

#{{{ Has private network
项目:sonic-mgmt    作者:Azure    | 项目源码 | 文件源码
def default(self, obj):
        if isinstance(obj,
                      (ipaddress.IPv4Network, ipaddress.IPv6Network, ipaddress.IPv4Address, ipaddress.IPv6Address)):
            return str(obj)
        return json.JSONEncoder.default(self, obj)
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def contains(self, ip_address):
        ip = ipaddr.IPAddress(ip_address)
        if isinstance(ip, ipaddr.IPv4Address):
            networks = self.ipv4_networks
        elif isinstance(ip, ipaddr.IPv6Address):
            networks = self.ipv6_networks
        else:
            raise RuntimeError("Should never happen")
        for network in networks:
            if network.Contains(ip):
                return True
        return False
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def in_private_ip_space(address):
    ip_address = IPv4Address(address)
    return any(
        [ip_address.is_private, ip_address.is_loopback]
    )
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def is_private_address(address, only_loopback=False):
    """
    Checks to see if an IP address is in private IP space and if the
    hostname is either localhost or *.local.

    :param address: an IP address of a hostname
    :param only_loopback: will only check if the IP address is either
        127.0.0.1/8 or ::1 in ipv6
    :return: True if the IP address or host is in private space
    """
    try:
        ip_address = IPv4Address(address)
    except AddressValueError:
        try:
            ip_address = IPv6Address(address)
        except AddressValueError:
            if address == "localhost":
                return True
            elif address.endswith(".local"):
                return True
            return False

    candidates = [ip_address.is_loopback]
    if not only_loopback:
        candidates.append(ip_address.is_private)
    return any(candidates)
项目:noc    作者:onfsdn    | 项目源码 | 文件源码
def control_plane_arp_handler(self, in_port, vlan, eth_src, arp_pkt):
        ofmsgs = []

        if arp_pkt.opcode == arp.ARP_REQUEST:
            pkt = self.build_ethernet_pkt(
                eth_src, in_port, vlan, ether.ETH_TYPE_ARP)
            arp_pkt = arp.arp(
                opcode=arp.ARP_REPLY, src_mac=self.FAUCET_MAC,
                src_ip=arp_pkt.dst_ip, dst_mac=eth_src, dst_ip=arp_pkt.src_ip)
            pkt.add_protocol(arp_pkt)
            pkt.serialize()
            ofmsgs.append(self.valve_packetout(in_port, pkt.data))
            self.logger.info('Responded to ARP request for %s from %s',
                             arp_pkt.src_ip, arp_pkt.dst_ip)
        elif arp_pkt.opcode == arp.ARP_REPLY:
            resolved_ip_gw = ipaddr.IPv4Address(arp_pkt.src_ip)
            for ip_dst, ip_gw in vlan.ipv4_routes.iteritems():
                if ip_gw == resolved_ip_gw:
                    self.logger.info('ARP response %s for %s',
                                     eth_src, resolved_ip_gw)
                    ofmsgs.extend(
                        self.add_resolved_route(
                            ether.ETH_TYPE_IP, vlan, vlan.arp_cache,
                            ip_gw, ip_dst, eth_src))

        return ofmsgs
项目:noc    作者:onfsdn    | 项目源码 | 文件源码
def handle_control_plane(self, in_port, vlan, eth_src, eth_dst, pkt):
        flowmods = []
        if eth_dst == self.FAUCET_MAC or not mac_addr_is_unicast(eth_dst):
            arp_pkt = pkt.get_protocol(arp.arp)
            ipv4_pkt = pkt.get_protocol(ipv4.ipv4)
            ipv6_pkt = pkt.get_protocol(ipv6.ipv6)

            if arp_pkt is not None:
                src_ip = ipaddr.IPv4Address(arp_pkt.src_ip)
                dst_ip = ipaddr.IPv4Address(arp_pkt.dst_ip)
                if (arp_pkt.opcode == arp.ARP_REQUEST and
                        self.to_faucet_ip(vlan, src_ip, dst_ip)):
                    flowmods.extend(self.control_plane_arp_handler(
                        in_port, vlan, eth_src, arp_pkt))
                elif (arp_pkt.opcode == arp.ARP_REPLY and
                              eth_dst == self.FAUCET_MAC):
                    flowmods.extend(self.control_plane_arp_handler(
                        in_port, vlan, eth_src, arp_pkt))
            elif ipv4_pkt is not None:
                icmp_pkt = pkt.get_protocol(icmp.icmp)
                if icmp_pkt is not None:
                    src_ip = ipaddr.IPv4Address(ipv4_pkt.src)
                    dst_ip = ipaddr.IPv4Address(ipv4_pkt.dst)
                    if self.to_faucet_ip(vlan, src_ip, dst_ip):
                        flowmods.extend(self.control_plane_icmp_handler(
                            in_port, vlan, eth_src, ipv4_pkt, icmp_pkt))
            elif ipv6_pkt is not None:
                icmpv6_pkt = pkt.get_protocol(icmpv6.icmpv6)
                if icmpv6_pkt is not None:
                    src_ip = ipaddr.IPv6Address(ipv6_pkt.src)
                    dst_ip = ipaddr.IPv6Address(ipv6_pkt.dst)
                    if self.to_faucet_ip(vlan, src_ip, dst_ip):
                        flowmods.extend(self.control_plane_icmpv6_handler(
                            in_port, vlan, eth_src, ipv6_pkt, icmpv6_pkt))

        return flowmods
项目:sonic-buildimage    作者:Azure    | 项目源码 | 文件源码
def default(self, obj):
        if isinstance(obj, (
            ipaddress.IPv4Network, ipaddress.IPv6Network,
            ipaddress.IPv4Address, ipaddress.IPv6Address
            )):
            return str(obj)
        return json.JSONEncoder.default(self, obj)
项目:usres_monitor    作者:pierky    | 项目源码 | 文件源码
def add_random_net(ip_ver, target_prefix_len):

    def dump_vars():
        print("target_prefix_len", target_prefix_len)
        print("prefix_len", prefix_len)
        print("max_prefix_len", max_prefix_len)
        print("max_range_len", max_range_len)
        print("max_range", max_range)
        print("rand", rand)
        print("net_id", net_id)
        print("ip", ip)
        print("net", net)
        print("str(net.ip)", str(net.ip))
        print("str(net.network)", str(net.network))

    if ip_ver == 4:
        prefix_len = random.randint(8, target_prefix_len-1)
        max_range_len = prefix_len
        max_prefix_len = 32
        ip_class = ipaddr.IPv4Address
        net_class = ipaddr.IPv4Network
    else:
        prefix_len = random.randint(19, target_prefix_len-1)
        max_range_len = prefix_len - 1 # to avoid overflow
        max_prefix_len = 64
        ip_class = ipaddr.IPv6Address
        net_class = ipaddr.IPv6Network

    max_range = 2**max_range_len - 1
    rand = random.randint(0, max_range)
    net_id = rand << max_prefix_len - prefix_len
    if ip_ver == 6:
        net_id = net_id << 64
    ip = ip_class(net_id)
    net = net_class("{}/{}".format(str(ip), prefix_len))

    try:
        assert str(net.ip) == str(net.network)
    except:
        dump_vars()
        raise

    try:
        usres_monitor.add_net(net)
    except USRESMonitorException as e:
        if "it was already in the db" in str(e):
            return "dup", net
    except:
        dump_vars()

    return "ok", net