我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用netaddr.IPSet()。
def test_ipset_unions_intersections_differences(): adj_cidrs = list(IPNetwork('192.0.2.0/24').subnet(28)) even_cidrs = adj_cidrs[::2] evens = IPSet(even_cidrs) assert evens == IPSet([ '192.0.2.0/28', '192.0.2.32/28', '192.0.2.64/28', '192.0.2.96/28', '192.0.2.128/28', '192.0.2.160/28', '192.0.2.192/28', '192.0.2.224/28', ]) assert IPSet(['192.0.2.0/24']) & evens == IPSet([ '192.0.2.0/28', '192.0.2.32/28', '192.0.2.64/28', '192.0.2.96/28', '192.0.2.128/28', '192.0.2.160/28', '192.0.2.192/28', '192.0.2.224/28']) odds = IPSet(['192.0.2.0/24']) ^ evens assert odds == IPSet([ '192.0.2.16/28', '192.0.2.48/28', '192.0.2.80/28', '192.0.2.112/28', '192.0.2.144/28', '192.0.2.176/28', '192.0.2.208/28', '192.0.2.240/28']) assert evens | odds == IPSet(['192.0.2.0/24']) assert evens & odds == IPSet([]) assert evens ^ odds == IPSet(['192.0.2.0/24'])
def test_combined_ipv4_and_ipv6_ipsets(): s1 = IPSet(['192.0.2.0', '::192.0.2.0', '192.0.2.2', '::192.0.2.2']) s2 = IPSet(['192.0.2.2', '::192.0.2.2', '192.0.2.4', '::192.0.2.4']) assert s1 | s2 == IPSet([ '192.0.2.0/32', '192.0.2.2/32', '192.0.2.4/32', '::192.0.2.0/128', '::192.0.2.2/128', '::192.0.2.4/128', ]) assert s2 | s1 == IPSet([ '192.0.2.0/32', '192.0.2.2/32', '192.0.2.4/32', '::192.0.2.0/128', '::192.0.2.2/128', '::192.0.2.4/128', ]) assert s1 & s2 == IPSet(['192.0.2.2/32', '::192.0.2.2/128']) assert s1 - s2 == IPSet(['192.0.2.0/32', '::192.0.2.0/128']) assert s2 - s1 == IPSet(['192.0.2.4/32', '::192.0.2.4/128']) assert s1 ^ s2 == IPSet(['192.0.2.0/32', '192.0.2.4/32', '::192.0.2.0/128', '::192.0.2.4/128'])
def parse_table(self): logging.info("Start parsing IP table(s)") with open(self.cache_apnic, 'r') as f: lines = f.readlines() ip_list = [] for line in lines: if line.startswith('apnic|CN|ipv4'): line = line.rstrip() apnic, country, v4v6, prefix, count_of_addr, date, status = line.split('|') if v4v6 == 'ipv4' and country == 'CN': decimal = 32 - binary_log(int(count_of_addr)) cidr_addr = prefix + '/' + str(decimal) ip_list.append(cidr_addr) self.ipset_inwall = IPSet(ip_list) self.cidrs_inwall = list(self.ipset_inwall.iter_cidrs()) logging.info("Finished parsing in-wall IP table(s). Total: %i CIDR blocks.", len(self.cidrs_inwall), )
def ip_policy_update(context, ip_policy, **ip_policy_dict): exclude = ip_policy_dict.pop("exclude", []) if exclude: ip_policy["exclude"] = [] ip_set = netaddr.IPSet() for excluded_cidr in exclude: cidr_net = netaddr.IPNetwork(excluded_cidr).ipv6() ip_policy["exclude"].append( models.IPPolicyCIDR(cidr=excluded_cidr, first_ip=cidr_net.first, last_ip=cidr_net.last)) ip_set.add(excluded_cidr) ip_policy_dict["size"] = ip_set.size ip_policy.update(ip_policy_dict) context.session.add(ip_policy) return ip_policy
def _build_excludes(self): self._validate_allocation_pools() subnet_net = netaddr.IPNetwork(self._subnet_cidr) version = subnet_net.version cidrset = netaddr.IPSet( netaddr.IPRange( netaddr.IPAddress(subnet_net.first, version=version), netaddr.IPAddress(subnet_net.last, version=version)).cidrs()) if isinstance(self._alloc_pools, list): for p in self._alloc_pools: start = netaddr.IPAddress(p["start"]) end = netaddr.IPAddress(p["end"]) cidrset -= netaddr.IPSet(netaddr.IPRange( netaddr.IPAddress(start), netaddr.IPAddress(end)).cidrs()) elif self._alloc_pools is None: # Empty list is completely unallocatable, None is fully # allocatable cidrset = netaddr.IPSet() for p in self._policies: cidrset.add(netaddr.IPNetwork(p)) self._exclude_cidrs = cidrset
def test_create_locks_lock_holder_exists(self): network = db_api.network_create(self.context) address_model = db_api.ip_address_create( self.context, address=netaddr.IPAddress("192.168.10.1"), network=network) db_api.lock_holder_create( self.context, address_model, name=null_routes.LOCK_NAME, type="ip_address") self.context.session.flush() addresses = netaddr.IPSet(netaddr.IPNetwork(self.sub_cidr)) null_routes.create_locks(self.context, [network.id], addresses) lock_holders = db_api.lock_holder_find( self.context, lock_id=address_model.lock_id, name=null_routes.LOCK_NAME, scope=db_api.ALL) self.assertEqual(len(lock_holders), 1)
def get_ip_address(iscsi_network): """ Return an IP address assigned to the running host that matches the given subnet address. This IP becomes the portal IP for the target portal group :param iscsi_network: cidr network address :return: IP address, or '' if the host does not have an interface on the required subnet """ ip = '' subnet = netaddr.IPSet([iscsi_network]) target_ip_range = [str(ip) for ip in subnet] # list where each element # is an ip address for local_ip in ipv4_address(): if local_ip in target_ip_range: ip = local_ip break return ip
def find_external_network(self, ext_floating_ip, networks): """Find external network based on the network address that matches external-floating-ipaddr. If not found, return the management network interface. Returns: network name, network details """ if ext_floating_ip == 'N/A': return None for net, net_details in networks.iteritems(): if 'addr' in net_details: # Check if matching if (netaddr.IPAddress(ext_floating_ip) in netaddr.IPSet([net_details['addr']])): return net, net_details return 'openstack-mgmt', networks.get('openstack-mgmt', None)
def calculate_free_ips(global_variable): import itertools from netaddr import IPSet, IPAddress global_variable.free_ip_lock = True global_variable.app.logger.debug('Calculating free ips...') networks = {} for ip in list(itertools.chain(*[ip_ for ip_ in global_variable.used_ips])): net = '{network}.0/24'.format(network='.'.join(ip.split('.')[0:3])) if net not in networks.keys(): networks[net] = IPSet([net]) if IPAddress(ip) in networks[net]: networks[net].remove(IPAddress(ip)) for x in xrange(0, 11): networks[net].remove( IPAddress( '{network}.{last_oct}'.format(network='.'.join(ip.split('.')[0:3]), last_oct=x) ) ) global_variable.free_ips = {key: [x for x in value] for key, value in networks.items()} global_variable.free_ip_lock = False return True
def getRadarAs(asNumber): radarResponse = requests.get("https://radar.qrator.net/api/prefixes/%s?tab_id=current&page=1" % asNumber).json() totalPrefixes = int(radarResponse.get('total')) initalPageSoup = bs4.BeautifulSoup(radarResponse.get('page'), "html.parser") networkRawSet = set() for a in initalPageSoup.find_all(text=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d+?$")): networkRawSet.add("%s" % a) startPage = 1 while len(networkRawSet) < totalPrefixes: radarResponse = requests.get("https://radar.qrator.net/api/prefixes/%s?tab_id=current&page=%s" % (asNumber, startPage)).json() pageSoup = bs4.BeautifulSoup(radarResponse.get('page'), "html.parser") for a in pageSoup.find_all(text=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d+?$")): networkRawSet.add("%s" % a) startPage += 1 # Now minimize this shit networkSet = netaddr.IPSet([netaddr.IPNetwork(item) for item in networkRawSet]) mergedNetworks = netaddr.cidr_merge(networkSet) if not mergedNetworks: print("Nothing found. Wrong AS number?") else: print("\n".join(["%s" % network for network in mergedNetworks]))
def get_available_ips(self): """ Return all available IPs within this prefix as an IPSet. """ prefix = netaddr.IPSet(self.prefix) child_ips = netaddr.IPSet([ip.address.ip for ip in self.get_child_ips()]) available_ips = prefix - child_ips # Remove unusable IPs from non-pool prefixes if not self.is_pool: available_ips -= netaddr.IPSet([ netaddr.IPAddress(self.prefix.first), netaddr.IPAddress(self.prefix.last), ]) return available_ips
def tags_for_hostname(hostname, mapping): logging.debug("Hostname is {}".format(hostname)) if not hostname.startswith('ip-'): return {} octets = hostname.lstrip('ip-').split('-') tags = {} # Update with env and deployment info tags.update(mapping['CIDR_SECOND_OCTET'][octets[1]]) ip_addr = netaddr.IPAddress(".".join(octets)) for key, value in mapping['CIDR_REST'].items(): cidr = ".".join([ mapping['CIDR_FIRST_OCTET'], octets[1], key]) cidrset = netaddr.IPSet([cidr]) if ip_addr in cidrset: tags.update(value) return tags
def _generate_ranges(self, cidr, gateway_ip): """Create list of ranges from the given cidr. Ignore the gateway_ip, if defined """ ip_set = netaddr.IPSet(netaddr.IPNetwork(cidr)) if gateway_ip: ip_set.remove(gateway_ip) return [{"start": str(r[0]), "end": str(r[-1])} for r in ip_set.iter_ipranges()]
def test_ipset_basic_api(): range1 = IPRange('192.0.2.1', '192.0.2.15') ip_list = [ IPAddress('192.0.2.1'), '192.0.2.2/31', IPNetwork('192.0.2.4/31'), IPAddress('192.0.2.6'), IPAddress('192.0.2.7'), '192.0.2.8', '192.0.2.9', IPAddress('192.0.2.10'), IPAddress('192.0.2.11'), IPNetwork('192.0.2.12/30'), ] set1 = IPSet(range1.cidrs()) set2 = IPSet(ip_list) assert set2 == IPSet([ '192.0.2.1/32', '192.0.2.2/31', '192.0.2.4/30', '192.0.2.8/29', ]) assert set1 == set2 assert set2.pop() in set1 assert set1 != set2
def test_ipset_empty(): assert IPSet() == IPSet([]) empty_set = IPSet([]) assert IPSet([]) == empty_set assert len(empty_set) == 0
def test_ipset_constructor(): assert IPSet(['192.0.2.0']) == IPSet(['192.0.2.0/32']) assert IPSet([IPAddress('192.0.2.0')]) == IPSet(['192.0.2.0/32']) assert IPSet([IPNetwork('192.0.2.0')]) == IPSet(['192.0.2.0/32']) assert IPSet(IPNetwork('1234::/32')) == IPSet(['1234::/32']) assert IPSet([IPNetwork('192.0.2.0/24')]) == IPSet(['192.0.2.0/24']) assert IPSet(IPSet(['192.0.2.0/32'])) == IPSet(['192.0.2.0/32']) assert IPSet(IPRange("10.0.0.0", "10.0.1.31")) == IPSet(['10.0.0.0/24', '10.0.1.0/27']) assert IPSet(IPRange('0.0.0.0', '255.255.255.255')) == IPSet(['0.0.0.0/0'])
def test_ipset_iteration(): assert list(IPSet(['192.0.2.0/28', '::192.0.2.0/124'])) == [ IPAddress('192.0.2.0'), IPAddress('192.0.2.1'), IPAddress('192.0.2.2'), IPAddress('192.0.2.3'), IPAddress('192.0.2.4'), IPAddress('192.0.2.5'), IPAddress('192.0.2.6'), IPAddress('192.0.2.7'), IPAddress('192.0.2.8'), IPAddress('192.0.2.9'), IPAddress('192.0.2.10'), IPAddress('192.0.2.11'), IPAddress('192.0.2.12'), IPAddress('192.0.2.13'), IPAddress('192.0.2.14'), IPAddress('192.0.2.15'), IPAddress('::192.0.2.0'), IPAddress('::192.0.2.1'), IPAddress('::192.0.2.2'), IPAddress('::192.0.2.3'), IPAddress('::192.0.2.4'), IPAddress('::192.0.2.5'), IPAddress('::192.0.2.6'), IPAddress('::192.0.2.7'), IPAddress('::192.0.2.8'), IPAddress('::192.0.2.9'), IPAddress('::192.0.2.10'), IPAddress('::192.0.2.11'), IPAddress('::192.0.2.12'), IPAddress('::192.0.2.13'), IPAddress('::192.0.2.14'), IPAddress('::192.0.2.15'), ]
def test_ipset_member_insertion_and_deletion(): s1 = IPSet() s1.add('192.0.2.0') assert s1 == IPSet(['192.0.2.0/32']) s1.remove('192.0.2.0') assert s1 == IPSet([]) s1.add(IPRange("10.0.0.0", "10.0.0.255")) assert s1 == IPSet(['10.0.0.0/24']) s1.remove(IPRange("10.0.0.128", "10.10.10.10")) assert s1 == IPSet(['10.0.0.0/25'])
def test_ipset_membership_largest(): ipset = IPSet(['0.0.0.0/0']) assert IPAddress("10.0.0.1") in ipset assert IPAddress("0.0.0.0") in ipset assert IPAddress("255.255.255") in ipset assert IPNetwork("10.0.0.0/24") in ipset assert IPAddress("::1") not in ipset
def test_set_membership_smallest(): ipset = IPSet(["10.0.0.42/32"]) assert IPAddress("10.0.0.42") in ipset assert IPNetwork("10.0.0.42/32") in ipset assert IPAddress("10.0.0.41") not in ipset assert IPAddress("10.0.0.43") not in ipset assert IPNetwork("10.0.0.42/31") not in ipset
def test_ipset_unions(): assert IPSet(['192.0.2.0']) == IPSet(['192.0.2.0/32']) assert IPSet(['192.0.2.0']) | IPSet(['192.0.2.1']) == IPSet(['192.0.2.0/31']) assert IPSet(['192.0.2.0']) | IPSet(['192.0.2.1']) | IPSet(['192.0.2.3']) == IPSet(['192.0.2.0/31', '192.0.2.3/32']) assert IPSet(['192.0.2.0']) | IPSet(['192.0.2.1']) | IPSet(['192.0.2.3/30']) == IPSet(['192.0.2.0/30']) assert IPSet(['192.0.2.0']) | IPSet(['192.0.2.1']) | IPSet(['192.0.2.3/31']) == IPSet(['192.0.2.0/30']) assert IPSet(['192.0.2.0/24']) | IPSet(['192.0.3.0/24']) | IPSet(['192.0.4.0/24']) == IPSet(['192.0.2.0/23', '192.0.4.0/24'])
def test_ipset_updates(): s1 = IPSet(['192.0.2.0/25']) s2 = IPSet(['192.0.2.128/25']) s1.update(s2) assert s1 == IPSet(['192.0.2.0/24']) s1.update(['192.0.0.0/24', '192.0.1.0/24', '192.0.3.0/24']) assert s1 == IPSet(['192.0.0.0/22'])
def test_ipset_clear(): ipset = IPSet(['10.0.0.0/16']) ipset.update(IPRange('10.1.0.0', '10.1.255.255')) assert ipset == IPSet(['10.0.0.0/15']) ipset.clear() assert ipset == IPSet([])
def test_ipset_with_iprange(): s1 = IPSet(['10.0.0.0/25', '10.0.0.128/25']) assert s1.iprange() == IPRange('10.0.0.0', '10.0.0.255') assert s1.iscontiguous() s1.remove('10.0.0.16') assert s1 == IPSet([ '10.0.0.0/28', '10.0.0.17/32', '10.0.0.18/31', '10.0.0.20/30', '10.0.0.24/29', '10.0.0.32/27', '10.0.0.64/26', '10.0.0.128/25', ]) assert not s1.iscontiguous() with pytest.raises(ValueError): s1.iprange() assert list(s1.iter_ipranges()) == [ IPRange('10.0.0.0', '10.0.0.15'), IPRange('10.0.0.17', '10.0.0.255'), ] s2 = IPSet(['0.0.0.0/0']) assert s2.iscontiguous() assert s2.iprange() == IPRange('0.0.0.0', '255.255.255.255') # s3 = IPSet() assert s3.iscontiguous() assert s3.iprange() is None s4 = IPSet(IPRange('10.0.0.0', '10.0.0.8')) assert s4.iscontiguous()
def test_ipset_pickling(): ip_data = IPSet(['10.0.0.0/16', 'fe80::/64']) buf = pickle.dumps(ip_data) ip_data_unpickled = pickle.loads(buf) assert ip_data == ip_data_unpickled
def test_ipset_comparison(): s1 = IPSet(['fc00::/2']) s2 = IPSet(['fc00::/3']) assert s1 > s2 assert not s1 < s2 assert s1 != s2
def test_ipset_operations_with_combined_ipv4_and_ipv6(): s1 = IPSet(['192.0.2.0', '::192.0.2.0', '192.0.2.2', '::192.0.2.2']) s2 = IPSet(['192.0.2.2', '::192.0.2.2', '192.0.2.4', '::192.0.2.4']) s3 = IPSet(['0.0.0.1', '10.0.0.64/30', '255.255.255.1']) s4 = IPSet(['10.0.0.64', '10.0.0.66']) s4b = IPSet(['10.0.0.64', '10.0.0.66', '111.111.111.111']) s5 = IPSet(['10.0.0.65', '10.0.0.67']) s6 = IPSet(['2405:8100::/32']) assert bool(s6) assert not bool(IPSet()) # set intersection assert s2 & s1 == IPSet(['192.0.2.2/32', '::192.0.2.2/128']) assert s3 & s4 == IPSet(['10.0.0.64/32', '10.0.0.66/32']) assert s4 & s3 == IPSet(['10.0.0.64/32', '10.0.0.66/32']) assert s3 & s5 == IPSet(['10.0.0.65/32', '10.0.0.67/32']) assert s5 & s3 == IPSet(['10.0.0.65/32', '10.0.0.67/32']) # set difference assert s3 - s4 == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '255.255.255.1/32']) assert s4 - s3 == IPSet([]) assert s3 - s4b == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '255.255.255.1/32']) assert s3 - s5 == IPSet(['0.0.0.1/32', '10.0.0.64/32', '10.0.0.66/32', '255.255.255.1/32']) assert s5 - s3 == IPSet([]) # set symmetric difference assert s2 ^ s1 == IPSet(['192.0.2.0/32', '192.0.2.4/32', '::192.0.2.0/128', '::192.0.2.4/128']) assert IPSet([]) ^ IPSet([]) == IPSet([]) assert IPSet(['0.0.0.1/32']) ^ IPSet([]) == IPSet(['0.0.0.1/32']) assert IPSet(['0.0.0.1/32']) ^ IPSet(['0.0.0.1/32']) == IPSet([]) assert s3 ^ s4 == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '255.255.255.1/32']) assert s4 ^ s3 == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '255.255.255.1/32']) assert s3 ^ s4b == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '111.111.111.111/32', '255.255.255.1/32']) assert s3 ^ s5 == IPSet(['0.0.0.1/32', '10.0.0.64/32', '10.0.0.66/32', '255.255.255.1/32']) assert s5 ^ s3 == IPSet(['0.0.0.1/32', '10.0.0.64/32', '10.0.0.66/32', '255.255.255.1/32'])
def test_converting_ipsets_to_ipranges(): assert list(IPSet().iter_ipranges()) == [] assert list(IPSet([IPAddress('10.0.0.1')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.1')] assert list(IPSet([IPAddress('10.0.0.1'), IPAddress('10.0.0.2')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.2')]
def test_len_on_ipset_failure_with_large_ipv6_addresses(): s1 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint, 6))) with pytest.raises(IndexError): len(s1) s2 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint - 1, 6))) assert len(s2) == _sys_maxint
def test_ipset_ipv4_and_ipv4_separation(): assert list(IPSet([IPAddress(1, 4), IPAddress(1, 6)]).iter_ipranges()) == [IPRange('0.0.0.1', '0.0.0.1'), IPRange('::1', '::1')]
def test_ipset_exceptions(): s1 = IPSet(['10.0.0.1']) # IPSet objects are not hashable. with pytest.raises(TypeError): hash(s1) # Bad update argument type. with pytest.raises(TypeError): s1.update(42)
def test_ipset_converts_to_cidr_networks_v4(): s1 = IPSet(IPNetwork('10.1.2.3/8')) s1.add(IPNetwork('192.168.1.2/16')) assert list(s1.iter_cidrs()) == [ IPNetwork('10.0.0.0/8'), IPNetwork('192.168.0.0/16'), ]
def test_ipset_converts_to_cidr_networks_v6(): s1 = IPSet(IPNetwork('fe80::4242/64')) s1.add(IPNetwork('fe90::4343/64')) assert list(s1.iter_cidrs()) == [ IPNetwork('fe80::/64'), IPNetwork('fe90::/64'), ]
def ips_to_spf_strings(ips): other_tokens = list() for index, ip in enumerate(ips): try: IPNetwork(ip) except AddrFormatError: other_tokens.append(ip) for token in other_tokens: ips.remove(token) ips = [str(i) for i in IPSet(ips).iter_cidrs()] ips = ['ip6:' + ip if ':' in ip else 'ip4:' + ip.replace('/32', '') for ip in ips] return ips + other_tokens
def random_address(base): """Return a random address based on a base prefix.""" prefix = netaddr.IPNetwork(base) addresses = netaddr.IPSet(prefix) for address in [prefix.network, prefix.broadcast]: addresses.remove(address) return str(random.choice(list(addresses))) + '/' + str(prefix.prefixlen)
def ipset(nets): v4nets = netaddr.IPSet() v6nets = netaddr.IPSet() for net in nets: ipNetwork = netaddr.IPNetwork(net) parts = str(ipNetwork).split("/") ip = parts[0] mask = parts[1] if netaddr.valid_ipv4(ip) and int(mask) <= 32: v4nets.add(ipNetwork) elif netaddr.valid_ipv6(ip) and int(mask) <= 128: v6nets.add(ipNetwork) return v4nets, v6nets
def populate_reserved(self): with open(self.reserved_ip, 'r') as f: lines = f.readlines() ip_list = [] for line in lines: if not line.startswith('#'): line = line.strip() if len(line) > 0: ip_list.append(line) self.ipset_reserved = IPSet(ip_list)
def derive_outwall(self): """ This would not only inverse the set with the "big one", it would also exclude See: http://www.tcpipguide.com/free/t_IPReservedPrivateandLoopbackAddresses-3.htm """ self.ipset_outwall = IPSet(['0.0.0.0/0']) - self.ipset_inwall - self.ipset_reserved self.cidrs_outwall = list(self.ipset_outwall.iter_cidrs()) logging.info("Finished deriving out-wall IP table(s). Total: %i CIDR blocks.", len(self.cidrs_outwall), )
def _validate_subnet_cidr(context, network_id, new_subnet_cidr): """Validate the CIDR for a subnet. Verifies the specified CIDR does not overlap with the ones defined for the other subnets specified for this network, or with any other CIDR if overlapping IPs are disabled. """ if neutron_cfg.cfg.CONF.allow_overlapping_ips: return try: new_subnet_ipset = netaddr.IPSet([new_subnet_cidr]) except TypeError: LOG.exception("Invalid or missing cidr: %s" % new_subnet_cidr) raise n_exc.BadRequest(resource="subnet", msg="Invalid or missing cidr") filters = { 'network_id': network_id, 'shared': [False] } # Using admin context here, in case we actually share networks later subnet_list = db_api.subnet_find(context=context.elevated(), **filters) for subnet in subnet_list: if (netaddr.IPSet([subnet.cidr]) & new_subnet_ipset): # don't give out details of the overlapping subnet err_msg = (_("Requested subnet with cidr: %(cidr)s for " "network: %(network_id)s overlaps with another " "subnet") % {'cidr': new_subnet_cidr, 'network_id': network_id}) LOG.error(_("Validation for CIDR: %(new_cidr)s failed - " "overlaps with subnet %(subnet_id)s " "(CIDR: %(cidr)s)"), {'new_cidr': new_subnet_cidr, 'subnet_id': subnet.id, 'cidr': subnet.cidr}) raise n_exc.InvalidInput(error_message=err_msg)
def _pool_is_growing(original_pool, new_pool): # create IPSet for original pool ori_set = netaddr.IPSet() for rng in original_pool._alloc_pools: ori_set.add(netaddr.IPRange(rng['start'], rng['end'])) # create IPSet for net pool new_set = netaddr.IPSet() for rng in new_pool._alloc_pools: new_set.add(netaddr.IPRange(rng['start'], rng['end'])) # we are growing the original set is not a superset of the new set return not ori_set.issuperset(new_set)
def ensure_default_policy(cidrs, subnets): policy_cidrs = netaddr.IPSet(cidrs) for subnet in subnets: subnet_cidr = netaddr.IPNetwork(subnet["cidr"]) network_ip = subnet_cidr.network broadcast_ip = subnet_cidr.broadcast prefix_len = '32' if subnet_cidr.version == 4 else '128' default_policy_cidrs = ["%s/%s" % (network_ip, prefix_len), "%s/%s" % (broadcast_ip, prefix_len)] for cidr in default_policy_cidrs: if (netaddr.IPNetwork(cidr) not in policy_cidrs and cidr not in cidrs): cidrs.append(cidr)
def allocation_pools(self): _cache = self.get("_allocation_pool_cache") if _cache: pools = json.loads(_cache) return pools else: if self["ip_policy"]: ip_policy_cidrs = self["ip_policy"].get_cidrs_ip_set() else: ip_policy_cidrs = netaddr.IPSet([]) cidr = netaddr.IPSet([netaddr.IPNetwork(self["cidr"])]) allocatable = cidr - ip_policy_cidrs pools = _pools_from_cidr(allocatable) return pools
def get_cidrs_ip_set(self): ip_policies = self.get("exclude", []) ip_policy_cidrs = [ip_policy.cidr for ip_policy in ip_policies] return netaddr.IPSet(ip_policy_cidrs)
def upgrade(): ip_policy = table('quark_ip_policy', column('id', sa.String(length=36)), column('size', INET())) ip_policy_cidrs = table('quark_ip_policy_cidrs', column('ip_policy_id', sa.String(length=36)), column('cidr', sa.String(length=64))) connection = op.get_bind() # 1. Retrieve all ip_policy_cidr rows. results = connection.execute( select([ip_policy_cidrs.c.ip_policy_id, ip_policy_cidrs.c.cidr]) ).fetchall() # 2. Determine IPSet for each IP Policy. ipp = dict() for ip_policy_id, cidr in results: if ip_policy_id not in ipp: ipp[ip_policy_id] = netaddr.IPSet() ipp[ip_policy_id].add(cidr) # 3. Populate size for each IP Policy. for ip_policy_id in ipp: connection.execute(ip_policy.update().values( size=ipp[ip_policy_id].size).where( ip_policy.c.id == ip_policy_id))
def test_allow_allocation_pool_growth(self): CONF.set_override('allow_allocation_pool_growth', True, 'QUARK') cidr = "192.168.1.0/24" ip_network = netaddr.IPNetwork(cidr) network = dict(name="public", tenant_id="fake", network_plugin="BASE") network = {"network": network} pool = [dict(start='192.168.1.15', end='192.168.1.30')] subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2, cidr=cidr, first_ip=ip_network.first, last_ip=ip_network.last, ip_policy=None, allocation_pools=pool, tenant_id="fake") subnet = {"subnet": subnet} with self._stubs(network, subnet) as (net, sub1): subnet = subnet_api.get_subnet(self.context, 1) start_pools = subnet['allocation_pools'] new_pool = [dict(start='192.168.1.10', end='192.168.1.50')] subnet_update = {"subnet": dict(allocation_pools=new_pool)} subnet = subnet_api.update_subnet(self.context, 1, subnet_update) self.assertNotEqual(start_pools, subnet['allocation_pools']) self.assertEqual(new_pool, subnet['allocation_pools']) policies = policy_api.get_ip_policies(self.context) self.assertEqual(1, len(policies)) policy = policies[0] ip_set = netaddr.IPSet() for ip in policy['exclude']: ip_set.add(netaddr.IPNetwork(ip)) for extent in new_pool: for ip in netaddr.IPRange(extent['start'], extent['end']): self.assertFalse(ip in ip_set) start_ip_set = netaddr.IPSet() for rng in start_pools: start_ip_set.add(netaddr.IPRange(rng['start'], rng['end'])) new_ip_set = netaddr.IPSet() for rng in subnet['allocation_pools']: new_ip_set.add(netaddr.IPRange(rng['start'], rng['end'])) self.assertTrue(start_ip_set | new_ip_set != start_ip_set)
def test_do_not_allow_allocation_pool_growth(self): CONF.set_override('allow_allocation_pool_growth', False, 'QUARK') cidr = "192.168.1.0/24" ip_network = netaddr.IPNetwork(cidr) network = dict(name="public", tenant_id="fake", network_plugin="BASE") network = {"network": network} pool = [dict(start='192.168.1.15', end='192.168.1.30')] subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2, cidr=cidr, first_ip=ip_network.first, last_ip=ip_network.last, ip_policy=None, allocation_pools=pool, tenant_id="fake") subnet = {"subnet": subnet} with self._stubs(network, subnet) as (net, sub1): subnet = subnet_api.get_subnet(self.context, 1) start_pools = subnet['allocation_pools'] new_pool = [dict(start='192.168.1.10', end='192.168.1.50')] start_ip_set = netaddr.IPSet() for rng in start_pools: start_ip_set.add(netaddr.IPRange(rng['start'], rng['end'])) new_ip_set = netaddr.IPSet() for rng in new_pool: new_ip_set.add(netaddr.IPRange(rng['start'], rng['end'])) self.assertTrue(start_ip_set | new_ip_set != start_ip_set) subnet_update = {"subnet": dict(allocation_pools=new_pool)} with self.assertRaises(n_exc.BadRequest): subnet = subnet_api.update_subnet(self.context, 1, subnet_update)