Python netaddr 模块,IPSet() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用netaddr.IPSet()

项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_unions_intersections_differences():
    adj_cidrs = list(IPNetwork('192.0.2.0/24').subnet(28))
    even_cidrs = adj_cidrs[::2]
    evens = IPSet(even_cidrs)

    assert evens ==  IPSet([
        '192.0.2.0/28', '192.0.2.32/28', '192.0.2.64/28',
        '192.0.2.96/28', '192.0.2.128/28', '192.0.2.160/28',
        '192.0.2.192/28', '192.0.2.224/28',
    ])

    assert IPSet(['192.0.2.0/24']) & evens == IPSet([
        '192.0.2.0/28', '192.0.2.32/28', '192.0.2.64/28',
        '192.0.2.96/28', '192.0.2.128/28', '192.0.2.160/28',
        '192.0.2.192/28', '192.0.2.224/28'])

    odds = IPSet(['192.0.2.0/24']) ^ evens
    assert odds == IPSet([
        '192.0.2.16/28', '192.0.2.48/28', '192.0.2.80/28',
        '192.0.2.112/28', '192.0.2.144/28', '192.0.2.176/28',
        '192.0.2.208/28', '192.0.2.240/28'])

    assert evens | odds == IPSet(['192.0.2.0/24'])
    assert evens & odds == IPSet([])
    assert evens ^ odds == IPSet(['192.0.2.0/24'])
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_combined_ipv4_and_ipv6_ipsets():
    s1 = IPSet(['192.0.2.0', '::192.0.2.0', '192.0.2.2', '::192.0.2.2'])
    s2 = IPSet(['192.0.2.2', '::192.0.2.2', '192.0.2.4', '::192.0.2.4'])

    assert s1 | s2 == IPSet([
        '192.0.2.0/32', '192.0.2.2/32', '192.0.2.4/32',
        '::192.0.2.0/128', '::192.0.2.2/128', '::192.0.2.4/128',
    ])

    assert s2 | s1 == IPSet([
        '192.0.2.0/32', '192.0.2.2/32', '192.0.2.4/32',
        '::192.0.2.0/128', '::192.0.2.2/128', '::192.0.2.4/128',
    ])

    assert s1 & s2 == IPSet(['192.0.2.2/32', '::192.0.2.2/128'])
    assert s1 - s2 == IPSet(['192.0.2.0/32', '::192.0.2.0/128'])
    assert s2 - s1 == IPSet(['192.0.2.4/32', '::192.0.2.4/128'])
    assert s1 ^ s2 == IPSet(['192.0.2.0/32', '192.0.2.4/32', '::192.0.2.0/128', '::192.0.2.4/128'])
项目:regional-ip-addresses    作者:x1angli    | 项目源码 | 文件源码
def parse_table(self):
        logging.info("Start parsing IP table(s)")

        with open(self.cache_apnic, 'r') as f:
            lines = f.readlines()

        ip_list = []
        for line in lines:
            if line.startswith('apnic|CN|ipv4'):
                line = line.rstrip()
                apnic, country, v4v6, prefix, count_of_addr, date, status = line.split('|')
                if v4v6 == 'ipv4' and country == 'CN':
                    decimal = 32 - binary_log(int(count_of_addr))
                    cidr_addr = prefix + '/' + str(decimal)
                    ip_list.append(cidr_addr)

        self.ipset_inwall = IPSet(ip_list)
        self.cidrs_inwall = list(self.ipset_inwall.iter_cidrs())

        logging.info("Finished parsing in-wall IP table(s). Total: %i CIDR blocks.", len(self.cidrs_inwall), )
项目:quark    作者:openstack    | 项目源码 | 文件源码
def ip_policy_update(context, ip_policy, **ip_policy_dict):
    exclude = ip_policy_dict.pop("exclude", [])
    if exclude:
        ip_policy["exclude"] = []
        ip_set = netaddr.IPSet()
        for excluded_cidr in exclude:
            cidr_net = netaddr.IPNetwork(excluded_cidr).ipv6()
            ip_policy["exclude"].append(
                models.IPPolicyCIDR(cidr=excluded_cidr,
                                    first_ip=cidr_net.first,
                                    last_ip=cidr_net.last))
            ip_set.add(excluded_cidr)
        ip_policy_dict["size"] = ip_set.size

    ip_policy.update(ip_policy_dict)
    context.session.add(ip_policy)
    return ip_policy
项目:quark    作者:openstack    | 项目源码 | 文件源码
def _build_excludes(self):
        self._validate_allocation_pools()
        subnet_net = netaddr.IPNetwork(self._subnet_cidr)
        version = subnet_net.version
        cidrset = netaddr.IPSet(
            netaddr.IPRange(
                netaddr.IPAddress(subnet_net.first, version=version),
                netaddr.IPAddress(subnet_net.last, version=version)).cidrs())

        if isinstance(self._alloc_pools, list):
            for p in self._alloc_pools:
                start = netaddr.IPAddress(p["start"])
                end = netaddr.IPAddress(p["end"])
                cidrset -= netaddr.IPSet(netaddr.IPRange(
                    netaddr.IPAddress(start),
                    netaddr.IPAddress(end)).cidrs())
        elif self._alloc_pools is None:
            # Empty list is completely unallocatable, None is fully
            # allocatable
            cidrset = netaddr.IPSet()

        for p in self._policies:
            cidrset.add(netaddr.IPNetwork(p))

        self._exclude_cidrs = cidrset
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_create_locks_lock_holder_exists(self):
        network = db_api.network_create(self.context)
        address_model = db_api.ip_address_create(
            self.context,
            address=netaddr.IPAddress("192.168.10.1"),
            network=network)
        db_api.lock_holder_create(
            self.context, address_model,
            name=null_routes.LOCK_NAME, type="ip_address")
        self.context.session.flush()

        addresses = netaddr.IPSet(netaddr.IPNetwork(self.sub_cidr))
        null_routes.create_locks(self.context, [network.id], addresses)

        lock_holders = db_api.lock_holder_find(
            self.context,
            lock_id=address_model.lock_id,
            name=null_routes.LOCK_NAME,
            scope=db_api.ALL)
        self.assertEqual(len(lock_holders), 1)
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_ipset_unions_intersections_differences():
    adj_cidrs = list(IPNetwork('192.0.2.0/24').subnet(28))
    even_cidrs = adj_cidrs[::2]
    evens = IPSet(even_cidrs)

    assert evens ==  IPSet([
        '192.0.2.0/28', '192.0.2.32/28', '192.0.2.64/28',
        '192.0.2.96/28', '192.0.2.128/28', '192.0.2.160/28',
        '192.0.2.192/28', '192.0.2.224/28',
    ])

    assert IPSet(['192.0.2.0/24']) & evens == IPSet([
        '192.0.2.0/28', '192.0.2.32/28', '192.0.2.64/28',
        '192.0.2.96/28', '192.0.2.128/28', '192.0.2.160/28',
        '192.0.2.192/28', '192.0.2.224/28'])

    odds = IPSet(['192.0.2.0/24']) ^ evens
    assert odds == IPSet([
        '192.0.2.16/28', '192.0.2.48/28', '192.0.2.80/28',
        '192.0.2.112/28', '192.0.2.144/28', '192.0.2.176/28',
        '192.0.2.208/28', '192.0.2.240/28'])

    assert evens | odds == IPSet(['192.0.2.0/24'])
    assert evens & odds == IPSet([])
    assert evens ^ odds == IPSet(['192.0.2.0/24'])
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_combined_ipv4_and_ipv6_ipsets():
    s1 = IPSet(['192.0.2.0', '::192.0.2.0', '192.0.2.2', '::192.0.2.2'])
    s2 = IPSet(['192.0.2.2', '::192.0.2.2', '192.0.2.4', '::192.0.2.4'])

    assert s1 | s2 == IPSet([
        '192.0.2.0/32', '192.0.2.2/32', '192.0.2.4/32',
        '::192.0.2.0/128', '::192.0.2.2/128', '::192.0.2.4/128',
    ])

    assert s2 | s1 == IPSet([
        '192.0.2.0/32', '192.0.2.2/32', '192.0.2.4/32',
        '::192.0.2.0/128', '::192.0.2.2/128', '::192.0.2.4/128',
    ])

    assert s1 & s2 == IPSet(['192.0.2.2/32', '::192.0.2.2/128'])
    assert s1 - s2 == IPSet(['192.0.2.0/32', '::192.0.2.0/128'])
    assert s2 - s1 == IPSet(['192.0.2.4/32', '::192.0.2.4/128'])
    assert s1 ^ s2 == IPSet(['192.0.2.0/32', '192.0.2.4/32', '::192.0.2.0/128', '::192.0.2.4/128'])
项目:ceph-iscsi-config    作者:ceph    | 项目源码 | 文件源码
def get_ip_address(iscsi_network):
    """
    Return an IP address assigned to the running host that matches the given
    subnet address. This IP becomes the portal IP for the target portal group
    :param iscsi_network: cidr network address
    :return: IP address, or '' if the host does not have an interface on the
    required subnet
    """

    ip = ''
    subnet = netaddr.IPSet([iscsi_network])
    target_ip_range = [str(ip) for ip in subnet]   # list where each element
                                                   # is an ip address

    for local_ip in ipv4_address():
        if local_ip in target_ip_range:
            ip = local_ip
            break

    return ip
项目:os-services    作者:open-power-ref-design-toolkit    | 项目源码 | 文件源码
def find_external_network(self, ext_floating_ip, networks):
        """Find external network based on the network address that matches
           external-floating-ipaddr.
           If not found, return the management network interface.

           Returns: network name, network details
        """
        if ext_floating_ip == 'N/A':
            return None

        for net, net_details in networks.iteritems():
            if 'addr' in net_details:
                # Check if matching
                if (netaddr.IPAddress(ext_floating_ip) in
                   netaddr.IPSet([net_details['addr']])):
                    return net, net_details

        return 'openstack-mgmt', networks.get('openstack-mgmt', None)
项目:vCloudDirectorManager    作者:blackms    | 项目源码 | 文件源码
def calculate_free_ips(global_variable):
    import itertools
    from netaddr import IPSet, IPAddress
    global_variable.free_ip_lock = True
    global_variable.app.logger.debug('Calculating free ips...')
    networks = {}
    for ip in list(itertools.chain(*[ip_ for ip_ in global_variable.used_ips])):
        net = '{network}.0/24'.format(network='.'.join(ip.split('.')[0:3]))
        if net not in networks.keys():
            networks[net] = IPSet([net])
        if IPAddress(ip) in networks[net]:
            networks[net].remove(IPAddress(ip))
        for x in xrange(0, 11):
            networks[net].remove(
                IPAddress(
                    '{network}.{last_oct}'.format(network='.'.join(ip.split('.')[0:3]),
                                                  last_oct=x)
                )
            )
    global_variable.free_ips = {key: [x for x in value] for key, value in networks.items()}
    global_variable.free_ip_lock = False
    return True
项目:scripts    作者:vulnersCom    | 项目源码 | 文件源码
def getRadarAs(asNumber):
    radarResponse = requests.get("https://radar.qrator.net/api/prefixes/%s?tab_id=current&page=1" % asNumber).json()
    totalPrefixes = int(radarResponse.get('total'))
    initalPageSoup = bs4.BeautifulSoup(radarResponse.get('page'), "html.parser")
    networkRawSet = set()
    for a in initalPageSoup.find_all(text=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d+?$")):
        networkRawSet.add("%s" % a)
    startPage = 1
    while len(networkRawSet) < totalPrefixes:
        radarResponse = requests.get("https://radar.qrator.net/api/prefixes/%s?tab_id=current&page=%s" % (asNumber, startPage)).json()
        pageSoup = bs4.BeautifulSoup(radarResponse.get('page'), "html.parser")
        for a in pageSoup.find_all(text=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d+?$")):
            networkRawSet.add("%s" % a)
        startPage += 1
    # Now minimize this shit
    networkSet = netaddr.IPSet([netaddr.IPNetwork(item) for item in networkRawSet])
    mergedNetworks = netaddr.cidr_merge(networkSet)
    if not mergedNetworks:
        print("Nothing found. Wrong AS number?")
    else:
        print("\n".join(["%s" % network for network in mergedNetworks]))
项目:netbox    作者:digitalocean    | 项目源码 | 文件源码
def get_available_ips(self):
        """
        Return all available IPs within this prefix as an IPSet.
        """
        prefix = netaddr.IPSet(self.prefix)
        child_ips = netaddr.IPSet([ip.address.ip for ip in self.get_child_ips()])
        available_ips = prefix - child_ips

        # Remove unusable IPs from non-pool prefixes
        if not self.is_pool:
            available_ips -= netaddr.IPSet([
                netaddr.IPAddress(self.prefix.first),
                netaddr.IPAddress(self.prefix.last),
            ])

        return available_ips
项目:aCloudGuru-Event-Driven-Security    作者:mikegchambers    | 项目源码 | 文件源码
def test_ipset_unions_intersections_differences():
    adj_cidrs = list(IPNetwork('192.0.2.0/24').subnet(28))
    even_cidrs = adj_cidrs[::2]
    evens = IPSet(even_cidrs)

    assert evens ==  IPSet([
        '192.0.2.0/28', '192.0.2.32/28', '192.0.2.64/28',
        '192.0.2.96/28', '192.0.2.128/28', '192.0.2.160/28',
        '192.0.2.192/28', '192.0.2.224/28',
    ])

    assert IPSet(['192.0.2.0/24']) & evens == IPSet([
        '192.0.2.0/28', '192.0.2.32/28', '192.0.2.64/28',
        '192.0.2.96/28', '192.0.2.128/28', '192.0.2.160/28',
        '192.0.2.192/28', '192.0.2.224/28'])

    odds = IPSet(['192.0.2.0/24']) ^ evens
    assert odds == IPSet([
        '192.0.2.16/28', '192.0.2.48/28', '192.0.2.80/28',
        '192.0.2.112/28', '192.0.2.144/28', '192.0.2.176/28',
        '192.0.2.208/28', '192.0.2.240/28'])

    assert evens | odds == IPSet(['192.0.2.0/24'])
    assert evens & odds == IPSet([])
    assert evens ^ odds == IPSet(['192.0.2.0/24'])
项目:aCloudGuru-Event-Driven-Security    作者:mikegchambers    | 项目源码 | 文件源码
def test_combined_ipv4_and_ipv6_ipsets():
    s1 = IPSet(['192.0.2.0', '::192.0.2.0', '192.0.2.2', '::192.0.2.2'])
    s2 = IPSet(['192.0.2.2', '::192.0.2.2', '192.0.2.4', '::192.0.2.4'])

    assert s1 | s2 == IPSet([
        '192.0.2.0/32', '192.0.2.2/32', '192.0.2.4/32',
        '::192.0.2.0/128', '::192.0.2.2/128', '::192.0.2.4/128',
    ])

    assert s2 | s1 == IPSet([
        '192.0.2.0/32', '192.0.2.2/32', '192.0.2.4/32',
        '::192.0.2.0/128', '::192.0.2.2/128', '::192.0.2.4/128',
    ])

    assert s1 & s2 == IPSet(['192.0.2.2/32', '::192.0.2.2/128'])
    assert s1 - s2 == IPSet(['192.0.2.0/32', '::192.0.2.0/128'])
    assert s2 - s1 == IPSet(['192.0.2.4/32', '::192.0.2.4/128'])
    assert s1 ^ s2 == IPSet(['192.0.2.0/32', '192.0.2.4/32', '::192.0.2.0/128', '::192.0.2.4/128'])
项目:edx-configuration    作者:kola-er    | 项目源码 | 文件源码
def tags_for_hostname(hostname, mapping):
    logging.debug("Hostname is {}".format(hostname))
    if not hostname.startswith('ip-'):
        return {}

    octets = hostname.lstrip('ip-').split('-')
    tags = {}

    # Update with env and deployment info
    tags.update(mapping['CIDR_SECOND_OCTET'][octets[1]])

    ip_addr = netaddr.IPAddress(".".join(octets))
    for key, value in mapping['CIDR_REST'].items():
        cidr = ".".join([
            mapping['CIDR_FIRST_OCTET'],
            octets[1],
            key])

        cidrset = netaddr.IPSet([cidr])

        if ip_addr in cidrset:
            tags.update(value)

    return tags
项目:vmware-nsxlib    作者:openstack    | 项目源码 | 文件源码
def _generate_ranges(self, cidr, gateway_ip):
        """Create list of ranges from the given cidr.

        Ignore the gateway_ip, if defined
        """
        ip_set = netaddr.IPSet(netaddr.IPNetwork(cidr))
        if gateway_ip:
            ip_set.remove(gateway_ip)
        return [{"start": str(r[0]),
                 "end": str(r[-1])} for r in ip_set.iter_ipranges()]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_basic_api():
    range1 = IPRange('192.0.2.1', '192.0.2.15')

    ip_list = [
        IPAddress('192.0.2.1'),
        '192.0.2.2/31',
        IPNetwork('192.0.2.4/31'),
        IPAddress('192.0.2.6'),
        IPAddress('192.0.2.7'),
        '192.0.2.8',
        '192.0.2.9',
        IPAddress('192.0.2.10'),
        IPAddress('192.0.2.11'),
        IPNetwork('192.0.2.12/30'),
    ]

    set1 = IPSet(range1.cidrs())

    set2 = IPSet(ip_list)

    assert set2 == IPSet([
        '192.0.2.1/32',
        '192.0.2.2/31',
        '192.0.2.4/30',
        '192.0.2.8/29',
    ])

    assert set1 == set2
    assert set2.pop() in set1
    assert set1 != set2
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_empty():
    assert IPSet() == IPSet([])
    empty_set = IPSet([])
    assert IPSet([]) == empty_set
    assert len(empty_set) == 0
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_constructor():
    assert IPSet(['192.0.2.0']) == IPSet(['192.0.2.0/32'])
    assert IPSet([IPAddress('192.0.2.0')]) == IPSet(['192.0.2.0/32'])
    assert IPSet([IPNetwork('192.0.2.0')]) == IPSet(['192.0.2.0/32'])
    assert IPSet(IPNetwork('1234::/32')) == IPSet(['1234::/32'])
    assert IPSet([IPNetwork('192.0.2.0/24')]) == IPSet(['192.0.2.0/24'])
    assert IPSet(IPSet(['192.0.2.0/32'])) == IPSet(['192.0.2.0/32'])
    assert IPSet(IPRange("10.0.0.0", "10.0.1.31")) == IPSet(['10.0.0.0/24', '10.0.1.0/27'])
    assert IPSet(IPRange('0.0.0.0', '255.255.255.255')) == IPSet(['0.0.0.0/0'])
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_iteration():
    assert list(IPSet(['192.0.2.0/28', '::192.0.2.0/124'])) == [
        IPAddress('192.0.2.0'),
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.4'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.6'),
        IPAddress('192.0.2.7'),
        IPAddress('192.0.2.8'),
        IPAddress('192.0.2.9'),
        IPAddress('192.0.2.10'),
        IPAddress('192.0.2.11'),
        IPAddress('192.0.2.12'),
        IPAddress('192.0.2.13'),
        IPAddress('192.0.2.14'),
        IPAddress('192.0.2.15'),
        IPAddress('::192.0.2.0'),
        IPAddress('::192.0.2.1'),
        IPAddress('::192.0.2.2'),
        IPAddress('::192.0.2.3'),
        IPAddress('::192.0.2.4'),
        IPAddress('::192.0.2.5'),
        IPAddress('::192.0.2.6'),
        IPAddress('::192.0.2.7'),
        IPAddress('::192.0.2.8'),
        IPAddress('::192.0.2.9'),
        IPAddress('::192.0.2.10'),
        IPAddress('::192.0.2.11'),
        IPAddress('::192.0.2.12'),
        IPAddress('::192.0.2.13'),
        IPAddress('::192.0.2.14'),
        IPAddress('::192.0.2.15'),
    ]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_member_insertion_and_deletion():
    s1 = IPSet()
    s1.add('192.0.2.0')
    assert s1 == IPSet(['192.0.2.0/32'])

    s1.remove('192.0.2.0')
    assert s1 == IPSet([])

    s1.add(IPRange("10.0.0.0", "10.0.0.255"))
    assert s1 == IPSet(['10.0.0.0/24'])

    s1.remove(IPRange("10.0.0.128", "10.10.10.10"))
    assert s1 == IPSet(['10.0.0.0/25'])
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_membership_largest():
    ipset = IPSet(['0.0.0.0/0'])

    assert IPAddress("10.0.0.1") in ipset
    assert IPAddress("0.0.0.0") in ipset
    assert IPAddress("255.255.255") in ipset
    assert IPNetwork("10.0.0.0/24") in ipset
    assert IPAddress("::1") not in ipset
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_set_membership_smallest():
    ipset = IPSet(["10.0.0.42/32"])

    assert IPAddress("10.0.0.42") in ipset
    assert IPNetwork("10.0.0.42/32") in ipset

    assert IPAddress("10.0.0.41") not in ipset
    assert IPAddress("10.0.0.43") not in ipset
    assert IPNetwork("10.0.0.42/31") not in ipset
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_unions():
    assert IPSet(['192.0.2.0']) == IPSet(['192.0.2.0/32'])
    assert IPSet(['192.0.2.0']) | IPSet(['192.0.2.1']) == IPSet(['192.0.2.0/31'])
    assert IPSet(['192.0.2.0']) | IPSet(['192.0.2.1']) | IPSet(['192.0.2.3']) == IPSet(['192.0.2.0/31', '192.0.2.3/32'])
    assert IPSet(['192.0.2.0']) | IPSet(['192.0.2.1']) | IPSet(['192.0.2.3/30']) == IPSet(['192.0.2.0/30'])
    assert IPSet(['192.0.2.0']) | IPSet(['192.0.2.1']) | IPSet(['192.0.2.3/31']) == IPSet(['192.0.2.0/30'])
    assert IPSet(['192.0.2.0/24']) | IPSet(['192.0.3.0/24']) | IPSet(['192.0.4.0/24']) == IPSet(['192.0.2.0/23', '192.0.4.0/24'])
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_updates():
    s1 = IPSet(['192.0.2.0/25'])
    s2 = IPSet(['192.0.2.128/25'])

    s1.update(s2)
    assert s1 == IPSet(['192.0.2.0/24'])

    s1.update(['192.0.0.0/24', '192.0.1.0/24', '192.0.3.0/24'])
    assert s1 == IPSet(['192.0.0.0/22'])
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_clear():
    ipset = IPSet(['10.0.0.0/16'])
    ipset.update(IPRange('10.1.0.0', '10.1.255.255'))
    assert ipset == IPSet(['10.0.0.0/15'])

    ipset.clear()
    assert ipset == IPSet([])
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_with_iprange():
    s1 = IPSet(['10.0.0.0/25', '10.0.0.128/25'])
    assert s1.iprange() == IPRange('10.0.0.0', '10.0.0.255')

    assert s1.iscontiguous()

    s1.remove('10.0.0.16')
    assert s1 == IPSet([
        '10.0.0.0/28', '10.0.0.17/32', '10.0.0.18/31',
        '10.0.0.20/30', '10.0.0.24/29', '10.0.0.32/27',
        '10.0.0.64/26', '10.0.0.128/25',
    ])

    assert not s1.iscontiguous()

    with pytest.raises(ValueError):
        s1.iprange()

    assert list(s1.iter_ipranges()) == [
        IPRange('10.0.0.0', '10.0.0.15'),
        IPRange('10.0.0.17', '10.0.0.255'),
    ]

    s2 = IPSet(['0.0.0.0/0'])
    assert s2.iscontiguous()
    assert s2.iprange() == IPRange('0.0.0.0', '255.255.255.255')
#
    s3 = IPSet()
    assert s3.iscontiguous()
    assert s3.iprange() is None

    s4 = IPSet(IPRange('10.0.0.0', '10.0.0.8'))
    assert s4.iscontiguous()
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_pickling():
    ip_data = IPSet(['10.0.0.0/16', 'fe80::/64'])
    buf = pickle.dumps(ip_data)
    ip_data_unpickled = pickle.loads(buf)
    assert ip_data == ip_data_unpickled
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_comparison():
    s1 = IPSet(['fc00::/2'])
    s2 = IPSet(['fc00::/3'])

    assert s1 > s2
    assert not s1 < s2
    assert s1 != s2
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_operations_with_combined_ipv4_and_ipv6():
    s1 = IPSet(['192.0.2.0', '::192.0.2.0', '192.0.2.2', '::192.0.2.2'])
    s2 = IPSet(['192.0.2.2', '::192.0.2.2', '192.0.2.4', '::192.0.2.4'])
    s3 = IPSet(['0.0.0.1', '10.0.0.64/30', '255.255.255.1'])
    s4 = IPSet(['10.0.0.64', '10.0.0.66'])
    s4b = IPSet(['10.0.0.64', '10.0.0.66', '111.111.111.111'])
    s5 = IPSet(['10.0.0.65', '10.0.0.67'])
    s6 = IPSet(['2405:8100::/32'])

    assert bool(s6)
    assert not bool(IPSet())

    #   set intersection
    assert s2 & s1 == IPSet(['192.0.2.2/32', '::192.0.2.2/128'])
    assert s3 & s4 == IPSet(['10.0.0.64/32', '10.0.0.66/32'])
    assert s4 & s3 == IPSet(['10.0.0.64/32', '10.0.0.66/32'])
    assert s3 & s5 == IPSet(['10.0.0.65/32', '10.0.0.67/32'])
    assert s5 & s3 == IPSet(['10.0.0.65/32', '10.0.0.67/32'])

    #   set difference
    assert s3 - s4 == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '255.255.255.1/32'])
    assert s4 - s3 == IPSet([])
    assert s3 - s4b == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '255.255.255.1/32'])
    assert s3 - s5 == IPSet(['0.0.0.1/32', '10.0.0.64/32', '10.0.0.66/32', '255.255.255.1/32'])
    assert s5 - s3 == IPSet([])

    #   set symmetric difference
    assert s2 ^ s1 == IPSet(['192.0.2.0/32', '192.0.2.4/32', '::192.0.2.0/128', '::192.0.2.4/128'])
    assert IPSet([]) ^ IPSet([]) == IPSet([])
    assert IPSet(['0.0.0.1/32']) ^ IPSet([]) == IPSet(['0.0.0.1/32'])
    assert IPSet(['0.0.0.1/32']) ^ IPSet(['0.0.0.1/32']) == IPSet([])
    assert s3 ^ s4 == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '255.255.255.1/32'])
    assert s4 ^ s3 == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '255.255.255.1/32'])
    assert s3 ^ s4b == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '111.111.111.111/32', '255.255.255.1/32'])
    assert s3 ^ s5 == IPSet(['0.0.0.1/32', '10.0.0.64/32', '10.0.0.66/32', '255.255.255.1/32'])
    assert s5 ^ s3 == IPSet(['0.0.0.1/32', '10.0.0.64/32', '10.0.0.66/32', '255.255.255.1/32'])
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_converting_ipsets_to_ipranges():
    assert list(IPSet().iter_ipranges()) == []
    assert list(IPSet([IPAddress('10.0.0.1')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.1')]
    assert list(IPSet([IPAddress('10.0.0.1'), IPAddress('10.0.0.2')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.2')]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_len_on_ipset_failure_with_large_ipv6_addresses():
    s1 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint, 6)))
    with pytest.raises(IndexError):
        len(s1)

    s2 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint - 1, 6)))
    assert len(s2) == _sys_maxint
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_ipv4_and_ipv4_separation():
    assert list(IPSet([IPAddress(1, 4), IPAddress(1, 6)]).iter_ipranges()) == [IPRange('0.0.0.1', '0.0.0.1'), IPRange('::1', '::1')]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_exceptions():
    s1 = IPSet(['10.0.0.1'])

    #   IPSet objects are not hashable.
    with pytest.raises(TypeError):
        hash(s1)

    #   Bad update argument type.
    with pytest.raises(TypeError):
        s1.update(42)
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_converts_to_cidr_networks_v4():
    s1 = IPSet(IPNetwork('10.1.2.3/8'))
    s1.add(IPNetwork('192.168.1.2/16'))
    assert list(s1.iter_cidrs()) == [
        IPNetwork('10.0.0.0/8'),
        IPNetwork('192.168.0.0/16'),
    ]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipset_converts_to_cidr_networks_v6():
    s1 = IPSet(IPNetwork('fe80::4242/64'))
    s1.add(IPNetwork('fe90::4343/64'))
    assert list(s1.iter_cidrs()) == [
        IPNetwork('fe80::/64'),
        IPNetwork('fe90::/64'),
    ]
项目:sender_policy_flattener    作者:cetanu    | 项目源码 | 文件源码
def ips_to_spf_strings(ips):
    other_tokens = list()
    for index, ip in enumerate(ips):
        try:
            IPNetwork(ip)
        except AddrFormatError:
            other_tokens.append(ip)
    for token in other_tokens:
        ips.remove(token)
    ips = [str(i) for i in IPSet(ips).iter_cidrs()]
    ips = ['ip6:' + ip if ':' in ip else
           'ip4:' + ip.replace('/32', '')
           for ip in ips]
    return ips + other_tokens
项目:restconf-examples    作者:CiscoDevNet    | 项目源码 | 文件源码
def random_address(base):
    """Return a random address based on a base prefix."""
    prefix = netaddr.IPNetwork(base)
    addresses = netaddr.IPSet(prefix)
    for address in [prefix.network, prefix.broadcast]:
        addresses.remove(address)
    return str(random.choice(list(addresses))) + '/' + str(prefix.prefixlen)
项目:cidr-tools    作者:silverwind    | 项目源码 | 文件源码
def ipset(nets):
  v4nets = netaddr.IPSet()
  v6nets = netaddr.IPSet()
  for net in nets:
    ipNetwork = netaddr.IPNetwork(net)
    parts = str(ipNetwork).split("/")
    ip = parts[0]
    mask = parts[1]
    if netaddr.valid_ipv4(ip) and int(mask) <= 32:
      v4nets.add(ipNetwork)
    elif netaddr.valid_ipv6(ip) and int(mask) <= 128:
      v6nets.add(ipNetwork)
  return v4nets, v6nets
项目:regional-ip-addresses    作者:x1angli    | 项目源码 | 文件源码
def populate_reserved(self):
        with open(self.reserved_ip, 'r') as f:
            lines = f.readlines()

        ip_list = []
        for line in lines:
            if not line.startswith('#'):
                line = line.strip()
                if len(line) > 0:
                    ip_list.append(line)

        self.ipset_reserved = IPSet(ip_list)
项目:regional-ip-addresses    作者:x1angli    | 项目源码 | 文件源码
def derive_outwall(self):
        """
        This would not only inverse the set with the "big one", it would also exclude
        See: http://www.tcpipguide.com/free/t_IPReservedPrivateandLoopbackAddresses-3.htm
        """

        self.ipset_outwall = IPSet(['0.0.0.0/0']) - self.ipset_inwall - self.ipset_reserved
        self.cidrs_outwall = list(self.ipset_outwall.iter_cidrs())

        logging.info("Finished deriving out-wall IP table(s). Total: %i CIDR blocks.", len(self.cidrs_outwall), )
项目:quark    作者:openstack    | 项目源码 | 文件源码
def _validate_subnet_cidr(context, network_id, new_subnet_cidr):
    """Validate the CIDR for a subnet.

    Verifies the specified CIDR does not overlap with the ones defined
    for the other subnets specified for this network, or with any other
    CIDR if overlapping IPs are disabled.

    """
    if neutron_cfg.cfg.CONF.allow_overlapping_ips:
        return

    try:
        new_subnet_ipset = netaddr.IPSet([new_subnet_cidr])
    except TypeError:
        LOG.exception("Invalid or missing cidr: %s" % new_subnet_cidr)
        raise n_exc.BadRequest(resource="subnet",
                               msg="Invalid or missing cidr")

    filters = {
        'network_id': network_id,
        'shared': [False]
    }
    # Using admin context here, in case we actually share networks later
    subnet_list = db_api.subnet_find(context=context.elevated(), **filters)

    for subnet in subnet_list:
        if (netaddr.IPSet([subnet.cidr]) & new_subnet_ipset):
            # don't give out details of the overlapping subnet
            err_msg = (_("Requested subnet with cidr: %(cidr)s for "
                         "network: %(network_id)s overlaps with another "
                         "subnet") %
                       {'cidr': new_subnet_cidr,
                        'network_id': network_id})
            LOG.error(_("Validation for CIDR: %(new_cidr)s failed - "
                        "overlaps with subnet %(subnet_id)s "
                        "(CIDR: %(cidr)s)"),
                      {'new_cidr': new_subnet_cidr,
                       'subnet_id': subnet.id,
                       'cidr': subnet.cidr})
            raise n_exc.InvalidInput(error_message=err_msg)
项目:quark    作者:openstack    | 项目源码 | 文件源码
def _pool_is_growing(original_pool, new_pool):
    # create IPSet for original pool
    ori_set = netaddr.IPSet()
    for rng in original_pool._alloc_pools:
        ori_set.add(netaddr.IPRange(rng['start'], rng['end']))

    # create IPSet for net pool
    new_set = netaddr.IPSet()
    for rng in new_pool._alloc_pools:
        new_set.add(netaddr.IPRange(rng['start'], rng['end']))

    # we are growing the original set is not a superset of the new set
    return not ori_set.issuperset(new_set)
项目:quark    作者:openstack    | 项目源码 | 文件源码
def ensure_default_policy(cidrs, subnets):
    policy_cidrs = netaddr.IPSet(cidrs)
    for subnet in subnets:
        subnet_cidr = netaddr.IPNetwork(subnet["cidr"])
        network_ip = subnet_cidr.network
        broadcast_ip = subnet_cidr.broadcast
        prefix_len = '32' if subnet_cidr.version == 4 else '128'
        default_policy_cidrs = ["%s/%s" % (network_ip, prefix_len),
                                "%s/%s" % (broadcast_ip, prefix_len)]
        for cidr in default_policy_cidrs:
            if (netaddr.IPNetwork(cidr) not in policy_cidrs
                    and cidr not in cidrs):
                cidrs.append(cidr)
项目:quark    作者:openstack    | 项目源码 | 文件源码
def allocation_pools(self):
        _cache = self.get("_allocation_pool_cache")
        if _cache:
            pools = json.loads(_cache)
            return pools
        else:
            if self["ip_policy"]:
                ip_policy_cidrs = self["ip_policy"].get_cidrs_ip_set()
            else:
                ip_policy_cidrs = netaddr.IPSet([])

            cidr = netaddr.IPSet([netaddr.IPNetwork(self["cidr"])])
            allocatable = cidr - ip_policy_cidrs
            pools = _pools_from_cidr(allocatable)
            return pools
项目:quark    作者:openstack    | 项目源码 | 文件源码
def get_cidrs_ip_set(self):
        ip_policies = self.get("exclude", [])
        ip_policy_cidrs = [ip_policy.cidr for ip_policy in ip_policies]
        return netaddr.IPSet(ip_policy_cidrs)
项目:quark    作者:openstack    | 项目源码 | 文件源码
def upgrade():
    ip_policy = table('quark_ip_policy',
                      column('id', sa.String(length=36)),
                      column('size', INET()))
    ip_policy_cidrs = table('quark_ip_policy_cidrs',
                            column('ip_policy_id', sa.String(length=36)),
                            column('cidr', sa.String(length=64)))
    connection = op.get_bind()

    # 1. Retrieve all ip_policy_cidr rows.
    results = connection.execute(
        select([ip_policy_cidrs.c.ip_policy_id, ip_policy_cidrs.c.cidr])
    ).fetchall()

    # 2. Determine IPSet for each IP Policy.
    ipp = dict()
    for ip_policy_id, cidr in results:
        if ip_policy_id not in ipp:
            ipp[ip_policy_id] = netaddr.IPSet()
        ipp[ip_policy_id].add(cidr)

    # 3. Populate size for each IP Policy.
    for ip_policy_id in ipp:
        connection.execute(ip_policy.update().values(
            size=ipp[ip_policy_id].size).where(
                ip_policy.c.id == ip_policy_id))
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_allow_allocation_pool_growth(self):
        CONF.set_override('allow_allocation_pool_growth', True, 'QUARK')
        cidr = "192.168.1.0/24"
        ip_network = netaddr.IPNetwork(cidr)
        network = dict(name="public", tenant_id="fake", network_plugin="BASE")
        network = {"network": network}
        pool = [dict(start='192.168.1.15', end='192.168.1.30')]
        subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2,
                      cidr=cidr, first_ip=ip_network.first,
                      last_ip=ip_network.last, ip_policy=None,
                      allocation_pools=pool, tenant_id="fake")
        subnet = {"subnet": subnet}
        with self._stubs(network, subnet) as (net, sub1):
            subnet = subnet_api.get_subnet(self.context, 1)
            start_pools = subnet['allocation_pools']
            new_pool = [dict(start='192.168.1.10', end='192.168.1.50')]

            subnet_update = {"subnet": dict(allocation_pools=new_pool)}
            subnet = subnet_api.update_subnet(self.context, 1,
                                              subnet_update)
            self.assertNotEqual(start_pools, subnet['allocation_pools'])
            self.assertEqual(new_pool, subnet['allocation_pools'])
            policies = policy_api.get_ip_policies(self.context)
            self.assertEqual(1, len(policies))
            policy = policies[0]
            ip_set = netaddr.IPSet()
            for ip in policy['exclude']:
                ip_set.add(netaddr.IPNetwork(ip))
            for extent in new_pool:
                for ip in netaddr.IPRange(extent['start'], extent['end']):
                    self.assertFalse(ip in ip_set)

            start_ip_set = netaddr.IPSet()
            for rng in start_pools:
                start_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))

            new_ip_set = netaddr.IPSet()
            for rng in subnet['allocation_pools']:
                new_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))

            self.assertTrue(start_ip_set | new_ip_set != start_ip_set)
项目:quark    作者:openstack    | 项目源码 | 文件源码
def test_do_not_allow_allocation_pool_growth(self):
        CONF.set_override('allow_allocation_pool_growth', False, 'QUARK')
        cidr = "192.168.1.0/24"
        ip_network = netaddr.IPNetwork(cidr)
        network = dict(name="public", tenant_id="fake", network_plugin="BASE")
        network = {"network": network}
        pool = [dict(start='192.168.1.15', end='192.168.1.30')]
        subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2,
                      cidr=cidr, first_ip=ip_network.first,
                      last_ip=ip_network.last, ip_policy=None,
                      allocation_pools=pool, tenant_id="fake")
        subnet = {"subnet": subnet}
        with self._stubs(network, subnet) as (net, sub1):
            subnet = subnet_api.get_subnet(self.context, 1)
            start_pools = subnet['allocation_pools']
            new_pool = [dict(start='192.168.1.10', end='192.168.1.50')]

            start_ip_set = netaddr.IPSet()
            for rng in start_pools:
                start_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))

            new_ip_set = netaddr.IPSet()
            for rng in new_pool:
                new_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))

            self.assertTrue(start_ip_set | new_ip_set != start_ip_set)

            subnet_update = {"subnet": dict(allocation_pools=new_pool)}
            with self.assertRaises(n_exc.BadRequest):
                subnet = subnet_api.update_subnet(self.context, 1,
                                                  subnet_update)