Python ipaddr 模块,IPv6Network() 实例源码

我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用ipaddr.IPv6Network()

项目:usres_monitor    作者:pierky    | 项目源码 | 文件源码
def get_sre(net, target_prefix_len):
        """Calculate first and last /target_prefix_len subnets from net

        Returns: first, last, cnt (all integers)
        """

        assert isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network))
        assert target_prefix_len <= 64, "Max target prefix length is 64"
        assert net.prefixlen <= target_prefix_len, \
            ("Prefix length ({}) must be <= of the target prefix "
             "length ({}): {}".format(net.prefixlen, target_prefix_len, net))

        first = UniqueSmallestRoutableEntriesMonitor.get_first(net)

        tot_len = 64 if net.version == 6 else 32

        # first <= 2^63 -1 to avoid overflows
        assert first <= 9223372036854775807, \
            "Only prefixes <= 7fff:ffff:ffff:ffff::/64 can be processed"

        diff_len = target_prefix_len - net.prefixlen

        last = first | ((2**diff_len - 1) << tot_len - target_prefix_len)

        return first, last, 2**diff_len
项目:usres_monitor    作者:pierky    | 项目源码 | 文件源码
def get_net(net):
        if isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network)):
            return net
        return ipaddr.IPNetwork(net)
项目:usres_monitor    作者:pierky    | 项目源码 | 文件源码
def get_first(net):
        assert isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network))
        if net.version == 6:
            return int(net.network) >> 64
        else:
            return int(net.network)
项目:sonic-mgmt    作者:Azure    | 项目源码 | 文件源码
def default(self, obj):
        if isinstance(obj,
                      (ipaddress.IPv4Network, ipaddress.IPv6Network, ipaddress.IPv4Address, ipaddress.IPv6Address)):
            return str(obj)
        return json.JSONEncoder.default(self, obj)
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def __init__(self):
        self.ipv4_networks = []
        for ip in WHATSAPP_IPV4.split("\n"):
            try:
                self.ipv4_networks.append(ipaddr.IPv4Network(ip))
            except Exception:
                log.err("IP is wrong")
                log.msg(ip)
        self.ipv6_networks = map(ipaddr.IPv6Network,
                                 WHATSAPP_IPV6.split("\n"))
项目:noc    作者:onfsdn    | 项目源码 | 文件源码
def ipv6_link_mcast_from_ucast(ucast):
        link_mcast_prefix = ipaddr.IPv6Network('ff02::1:ff00:0/104')
        mcast_bytes = ipaddr.Bytes(
            link_mcast_prefix.packed[:13] + ucast.packed[-3:])
        link_mcast = ipaddr.IPv6Address(mcast_bytes)
        return link_mcast
项目:sonic-buildimage    作者:Azure    | 项目源码 | 文件源码
def default(self, obj):
        if isinstance(obj, (
            ipaddress.IPv4Network, ipaddress.IPv6Network,
            ipaddress.IPv4Address, ipaddress.IPv6Address
            )):
            return str(obj)
        return json.JSONEncoder.default(self, obj)
项目:rest_api    作者:opentargets    | 项目源码 | 文件源码
def get(self):
        esstore = current_app.extensions['es_access_store']
        mpstore = current_app.extensions['mp_access_store']
        args = self.parser.parse_args()
        event = args['event'][:120]
        auth_token=request.headers.get('Auth-Token')
        ip_resolver = current_app.config['IP_RESOLVER']
        ip = RateLimiter.get_remote_addr()
        ip_net = IPNetwork(ip)
        resolved_org = ip_resolver['default']
        for net, org in ip_resolver.items():
            if isinstance(net, (IPv4Network, IPv6Network)):
                if net.overlaps(ip_net):
                    resolved_org = org
                    break
        data = dict(ip=ip,
                    org=resolved_org,
                    host=request.host,
                    timestamp=datetime.now(),
                    event=event)
        if auth_token:
            payload = TokenAuthentication.get_payload_from_token(auth_token)
            data['app_name'] = payload['app_name']
        # esstore.store_event(data)
        mpstore.store_event(data)
        data['timestamp']= str(data['timestamp'])
        return CTTVResponse.OK(SimpleResult(None, data=data))
项目:usres_monitor    作者:pierky    | 项目源码 | 文件源码
def add_random_net(ip_ver, target_prefix_len):

    def dump_vars():
        print("target_prefix_len", target_prefix_len)
        print("prefix_len", prefix_len)
        print("max_prefix_len", max_prefix_len)
        print("max_range_len", max_range_len)
        print("max_range", max_range)
        print("rand", rand)
        print("net_id", net_id)
        print("ip", ip)
        print("net", net)
        print("str(net.ip)", str(net.ip))
        print("str(net.network)", str(net.network))

    if ip_ver == 4:
        prefix_len = random.randint(8, target_prefix_len-1)
        max_range_len = prefix_len
        max_prefix_len = 32
        ip_class = ipaddr.IPv4Address
        net_class = ipaddr.IPv4Network
    else:
        prefix_len = random.randint(19, target_prefix_len-1)
        max_range_len = prefix_len - 1 # to avoid overflow
        max_prefix_len = 64
        ip_class = ipaddr.IPv6Address
        net_class = ipaddr.IPv6Network

    max_range = 2**max_range_len - 1
    rand = random.randint(0, max_range)
    net_id = rand << max_prefix_len - prefix_len
    if ip_ver == 6:
        net_id = net_id << 64
    ip = ip_class(net_id)
    net = net_class("{}/{}".format(str(ip), prefix_len))

    try:
        assert str(net.ip) == str(net.network)
    except:
        dump_vars()
        raise

    try:
        usres_monitor.add_net(net)
    except USRESMonitorException as e:
        if "it was already in the db" in str(e):
            return "dup", net
    except:
        dump_vars()

    return "ok", net