我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用netaddr.IPRange()。
def test_iprange_slicing(): iprange = IPRange('192.0.2.1', '192.0.2.254') assert list(iprange[0:3]) == [ IPAddress('192.0.2.1'), IPAddress('192.0.2.2'), IPAddress('192.0.2.3'), ] assert list(iprange[0:10:2]) == [ IPAddress('192.0.2.1'), IPAddress('192.0.2.3'), IPAddress('192.0.2.5'), IPAddress('192.0.2.7'), IPAddress('192.0.2.9'), ] assert list(iprange[0:1024:512]) == [IPAddress('192.0.2.1')]
def test_iprange_pickling_v6(): iprange = IPRange('::ffff:192.0.2.1', '::ffff:192.0.2.254') assert iprange == IPRange('::ffff:192.0.2.1', '::ffff:192.0.2.254') assert iprange.first == 281473902969345 assert iprange.last == 281473902969598 assert iprange.version == 6 buf = pickle.dumps(iprange) iprange2 = pickle.loads(buf) assert iprange2 == iprange assert iprange2.first == 281473902969345 assert iprange2.last == 281473902969598 assert iprange2.version == 6
def pop_ip(env=None): """Picks an ip from env['provider_net']. It will first take ips in the extra_ips if possible. extra_ips is a list of isolated ips whereas ips described by the [provider_net.start, provider.end] range is a continuous list of ips. """ # Construct the pool of ips extra_ips = env['provider_net'].get('extra_ips', []) if len(extra_ips) > 0: ip = extra_ips.pop() env['provider_net']['extra_ips'] = extra_ips return ip ips = list(IPRange(env['provider_net']['start'], env['provider_net']['end'])) # Get the next ip ip = str(ips.pop()) # Remove this ip from the env env['provider_net']['end'] = str(ips.pop()) return ip
def _build_excludes(self): self._validate_allocation_pools() subnet_net = netaddr.IPNetwork(self._subnet_cidr) version = subnet_net.version cidrset = netaddr.IPSet( netaddr.IPRange( netaddr.IPAddress(subnet_net.first, version=version), netaddr.IPAddress(subnet_net.last, version=version)).cidrs()) if isinstance(self._alloc_pools, list): for p in self._alloc_pools: start = netaddr.IPAddress(p["start"]) end = netaddr.IPAddress(p["end"]) cidrset -= netaddr.IPSet(netaddr.IPRange( netaddr.IPAddress(start), netaddr.IPAddress(end)).cidrs()) elif self._alloc_pools is None: # Empty list is completely unallocatable, None is fully # allocatable cidrset = netaddr.IPSet() for p in self._policies: cidrset.add(netaddr.IPNetwork(p)) self._exclude_cidrs = cidrset
def parsenoproxy(noproxy): global NOPROXY nops = [i.strip() for i in noproxy.split(",")] for nop in nops: if not nop: continue try: if "-" in nop: spl = nop.split("-", 1) ipns = netaddr.IPRange(spl[0], spl[1]) elif "*" in nop: ipns = netaddr.IPGlob(nop) else: ipns = netaddr.IPNetwork(nop) NOPROXY.add(ipns) except: print("Bad noproxy IP definition") sys.exit()
def _get_ip_addresses(self,input_data): ip_addresses = [] if "-" in input_data: input_data_splitted = input_data.split('-') first_ip_address = input_data_splitted[0] first_ip_address_splitted = first_ip_address.split('.') second_ip_address = '%s.%s.%s.%s'%(first_ip_address_splitted[0],first_ip_address_splitted[1],first_ip_address_splitted[2],input_data_splitted[1]) ip_addresses = IPRange(first_ip_address,second_ip_address) elif "," in input_data: ip_addresses = input_data.split(',') else: ip_addresses = IPNetwork(input_data) return ip_addresses # Description: Process the terminal arguments # Return: (void)
def __init__(self, blacklist_string): """ Initialize a IPBlacklistEntry object based on the blacklist_string argument. :param blacklist_string: The string to build an IPBlacklistEntry object from. :return: None """ self._ip_string = blacklist_string[:blacklist_string.find(",")].strip() self._range_name = blacklist_string[blacklist_string.find(",") + 1:].strip() if "-" in self._ip_string: range_start = self._ip_string[:self._ip_string.find("-")].strip() range_end = self._ip_string[self._ip_string.find("-") + 1:].strip() self._ip_range = IPRange(range_start, range_end) else: self._ip_range = IPNetwork(self._ip_string) # Static Methods # Class Methods # Public Methods
def main(instances, docker_bridge, docker_container, fixture_root, fixture_name): try: addrs = netifaces.ifaddresses(docker_bridge) except ValueError: click.secho('It appears {0} is not a valid newtork interface on this system.'.format( docker_bridge), fg='red') sys.exit(1) try: docker_bridge_addr = IPAddress(addrs[netifaces.AF_INET][0]['addr']) except IndexError: click.secho('It appears {0} does not have an address at this time.'.format(docker_bridge), fg='red') sys.exit(1) network = list(IPRange(docker_bridge_addr + 1, docker_bridge_addr + 1 + instances - 1)) if os.path.exists(os.path.join(fixture_root, fixture_name)): click.secho('[ERROR] ', fg='red', nl=False) click.secho('A fixture named {0} already exists.'.format(fixture_name)) sys.exit(1) fixture_ctx = FixtureContext(fixture_root, fixture_name) for instance in range(0, len(network)): # TODO(sholsapp): We might want to specify different containers for # different instances one day. instance_ctx = InstanceContext( fixture_ctx.fixture_root, instance, network, docker_container) click.secho('Creating instance {0} at {1}... '.format( instance_ctx.instance, instance_ctx.node_root), nl=False) fixture_ctx.instances.append(instance_ctx) click.secho('[GOOD]', fg='green') fixture_ctx.render()
def get_range(self, targets): if targets is None: return None try: target_list = [] for target in targets.split(','): if '/' in target: target_list.extend(list(IPNetwork(target))) elif '-' in target: start_addr = IPAddress(target.split('-')[0]) try: end_addr = IPAddress(target.split('-')[1]) ip_range = IPRange(start_addr, end_addr) except AddrFormatError: end_addr = list(start_addr.words) end_addr[-1] = target.split('-')[1] end_addr = IPAddress('.'.join(map(str, end_addr))) ip_range = IPRange(start_addr, end_addr) target_list.extend(list(ip_range)) else: target_list.append(IPAddress(target)) return target_list except AddrFormatError: sys.exit("Specified an invalid IP address/range/network as target")
def _in_network(value, p_value, exact_match=False): """ """ # 'any' address is an automatic match if we are exact if exact_match and 'any' in p_value: return True addresses = [IPRange(a.split('-')[0], a.split('-')[1]) if '-' in a else IPNetwork(a) for a in value if is_ipv4(a)] fqdns = [a for a in value if not is_ipv4(a)] p_addresses = [IPRange(a.split('-')[0], a.split('-')[1]) if '-' in a else IPNetwork(a) for a in p_value if is_ipv4(a)] p_fqdns = [a for a in p_value if not is_ipv4(a)] # network containment implies exact match... i think? for a in addresses: addr_result = any(a == b or a in b for b in p_addresses) if not addr_result: return False # now match the fqdns if exact_match: fqdn_result = set(fqdns) == set(p_fqdns) else: fqdn_result = set(p_fqdns).issubset(set(fqdns)) return fqdn_result
def test_ipset_basic_api(): range1 = IPRange('192.0.2.1', '192.0.2.15') ip_list = [ IPAddress('192.0.2.1'), '192.0.2.2/31', IPNetwork('192.0.2.4/31'), IPAddress('192.0.2.6'), IPAddress('192.0.2.7'), '192.0.2.8', '192.0.2.9', IPAddress('192.0.2.10'), IPAddress('192.0.2.11'), IPNetwork('192.0.2.12/30'), ] set1 = IPSet(range1.cidrs()) set2 = IPSet(ip_list) assert set2 == IPSet([ '192.0.2.1/32', '192.0.2.2/31', '192.0.2.4/30', '192.0.2.8/29', ]) assert set1 == set2 assert set2.pop() in set1 assert set1 != set2
def test_ipset_constructor(): assert IPSet(['192.0.2.0']) == IPSet(['192.0.2.0/32']) assert IPSet([IPAddress('192.0.2.0')]) == IPSet(['192.0.2.0/32']) assert IPSet([IPNetwork('192.0.2.0')]) == IPSet(['192.0.2.0/32']) assert IPSet(IPNetwork('1234::/32')) == IPSet(['1234::/32']) assert IPSet([IPNetwork('192.0.2.0/24')]) == IPSet(['192.0.2.0/24']) assert IPSet(IPSet(['192.0.2.0/32'])) == IPSet(['192.0.2.0/32']) assert IPSet(IPRange("10.0.0.0", "10.0.1.31")) == IPSet(['10.0.0.0/24', '10.0.1.0/27']) assert IPSet(IPRange('0.0.0.0', '255.255.255.255')) == IPSet(['0.0.0.0/0'])
def test_ipset_member_insertion_and_deletion(): s1 = IPSet() s1.add('192.0.2.0') assert s1 == IPSet(['192.0.2.0/32']) s1.remove('192.0.2.0') assert s1 == IPSet([]) s1.add(IPRange("10.0.0.0", "10.0.0.255")) assert s1 == IPSet(['10.0.0.0/24']) s1.remove(IPRange("10.0.0.128", "10.10.10.10")) assert s1 == IPSet(['10.0.0.0/25'])
def test_ipset_membership(): iprange = IPRange('192.0.1.255', '192.0.2.16') assert iprange.cidrs() == [ IPNetwork('192.0.1.255/32'), IPNetwork('192.0.2.0/28'), IPNetwork('192.0.2.16/32'), ] ipset = IPSet(['192.0.2.0/28']) assert [(str(ip), ip in ipset) for ip in iprange] == [ ('192.0.1.255', False), ('192.0.2.0', True), ('192.0.2.1', True), ('192.0.2.2', True), ('192.0.2.3', True), ('192.0.2.4', True), ('192.0.2.5', True), ('192.0.2.6', True), ('192.0.2.7', True), ('192.0.2.8', True), ('192.0.2.9', True), ('192.0.2.10', True), ('192.0.2.11', True), ('192.0.2.12', True), ('192.0.2.13', True), ('192.0.2.14', True), ('192.0.2.15', True), ('192.0.2.16', False), ]
def test_ipset_clear(): ipset = IPSet(['10.0.0.0/16']) ipset.update(IPRange('10.1.0.0', '10.1.255.255')) assert ipset == IPSet(['10.0.0.0/15']) ipset.clear() assert ipset == IPSet([])
def test_converting_ipsets_to_ipranges(): assert list(IPSet().iter_ipranges()) == [] assert list(IPSet([IPAddress('10.0.0.1')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.1')] assert list(IPSet([IPAddress('10.0.0.1'), IPAddress('10.0.0.2')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.2')]
def test_len_on_ipset_failure_with_large_ipv6_addresses(): s1 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint, 6))) with pytest.raises(IndexError): len(s1) s2 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint - 1, 6))) assert len(s2) == _sys_maxint
def test_ipset_ipv4_and_ipv4_separation(): assert list(IPSet([IPAddress(1, 4), IPAddress(1, 6)]).iter_ipranges()) == [IPRange('0.0.0.1', '0.0.0.1'), IPRange('::1', '::1')]
def test_iprange_boolean_evaluation(): assert bool(IPRange('0.0.0.0', '255.255.255.255')) assert bool(IPRange('0.0.0.0', '0.0.0.0'))
def test_iprange_indexing(): iprange = IPRange('192.0.2.1', '192.0.2.254') assert len(iprange) == 254 assert iprange.first == 3221225985 assert iprange.last == 3221226238 assert iprange[0] == IPAddress('192.0.2.1') assert iprange[-1] == IPAddress('192.0.2.254') with pytest.raises(IndexError): iprange[512]
def test_iprange_ipv6_unsupported_slicing(): with pytest.raises(TypeError): IPRange('::ffff:192.0.2.1', '::ffff:192.0.2.254')[0:10:2]
def test_iprange_membership(): assert IPRange('192.0.2.5', '192.0.2.10') in IPRange('192.0.2.1', '192.0.2.254') assert IPRange('fe80::1', 'fe80::fffe') in IPRange('fe80::', 'fe80::ffff:ffff:ffff:ffff') assert IPRange('192.0.2.5', '192.0.2.10') not in IPRange('::', '::255.255.255.255')
def test_more_iprange_sorting(): ipranges = (IPRange('192.0.2.40', '192.0.2.50'), IPRange('192.0.2.20', '192.0.2.30'), IPRange('192.0.2.1', '192.0.2.254'),) assert sorted(ipranges) == [IPRange('192.0.2.1', '192.0.2.254'), IPRange('192.0.2.20', '192.0.2.30'), IPRange('192.0.2.40', '192.0.2.50')] ipranges = list(ipranges) ipranges.append(IPRange('192.0.2.45', '192.0.2.49')) assert sorted(ipranges) == [ IPRange('192.0.2.1', '192.0.2.254'), IPRange('192.0.2.20', '192.0.2.30'), IPRange('192.0.2.40', '192.0.2.50'), IPRange('192.0.2.45', '192.0.2.49'), ]
def test_iprange_info_and_properties(): iprange = IPRange('192.0.2.1', '192.0.2.254') assert eval(str(iprange.info)) == { 'IPv4': [{ 'date': '1993-05', 'designation': 'Administered by ARIN', 'prefix': '192/8', 'status': 'Legacy', 'whois': 'whois.arin.net'}] } assert iprange.is_reserved() assert iprange.version == 4
def test_iprange_invalid_len_and_alternative(): range1 = IPRange(IPAddress("::0"), IPAddress(_sys_maxint, 6)) with pytest.raises(IndexError): len(range1) range2 = IPRange(IPAddress("::0"), IPAddress(_sys_maxint - 1, 6)) assert len(range2) == _sys_maxint
def test_iprange_pickling_v4(): iprange = IPRange('192.0.2.1', '192.0.2.254') assert iprange == IPRange('192.0.2.1', '192.0.2.254') assert iprange.first == 3221225985 assert iprange.last == 3221226238 assert iprange.version == 4 buf = pickle.dumps(iprange) iprange2 = pickle.loads(buf) assert iprange2 == iprange assert id(iprange2) != id(iprange) assert iprange2.first == 3221225985 assert iprange2.last == 3221226238 assert iprange2.version == 4
def resource_setup(cls): super(PortsRbacTest, cls).resource_setup() # Create a network and subnet. cls.network = cls.create_network() cls.cidr = netaddr.IPNetwork(CONF.network.project_network_cidr) cls.subnet = cls.create_subnet(cls.network, cidr=cls.cidr, mask_bits=24) cls.ip_range = netaddr.IPRange( cls.subnet['allocation_pools'][0]['start'], cls.subnet['allocation_pools'][0]['end']) cls.port = cls.create_port(cls.network) ipaddr = cls.port['fixed_ips'][0]['ip_address'] cls.port_ip_address = ipaddr cls.port_mac_address = cls.port['mac_address']
def resource_setup(cls): super(RouterRbacTest, cls).resource_setup() # Create a network with external gateway so that # ``external_gateway_info`` policies can be validated. post_body = {'router:external': True} cls.network = cls.create_network(**post_body) cls.subnet = cls.create_subnet(cls.network) cls.ip_range = netaddr.IPRange( cls.subnet['allocation_pools'][0]['start'], cls.subnet['allocation_pools'][0]['end']) cls.router = cls.create_router()
def _pool_is_growing(original_pool, new_pool): # create IPSet for original pool ori_set = netaddr.IPSet() for rng in original_pool._alloc_pools: ori_set.add(netaddr.IPRange(rng['start'], rng['end'])) # create IPSet for net pool new_set = netaddr.IPSet() for rng in new_pool._alloc_pools: new_set.add(netaddr.IPRange(rng['start'], rng['end'])) # we are growing the original set is not a superset of the new set return not ori_set.issuperset(new_set)
def test_update_allocation_pools(self): cidr = "192.168.1.0/24" ip_network = netaddr.IPNetwork(cidr) network = dict(name="public", tenant_id="fake", network_plugin="BASE") network = {"network": network} subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2, cidr=cidr, first_ip=ip_network.first, last_ip=ip_network.last, ip_policy=None, tenant_id="fake") subnet = {"subnet": subnet} with self._stubs(network, subnet) as (net, sub1): subnet = subnet_api.get_subnet(self.context, 1) start_pools = subnet['allocation_pools'] new_pools = [ [dict(start='192.168.1.10', end='192.168.1.50')], [dict(start='192.168.1.5', end='192.168.1.25')], [dict(start='192.168.1.50', end='192.168.1.51')], [dict(start='192.168.1.50', end='192.168.1.51'), dict(start='192.168.1.100', end='192.168.1.250')], [dict(start='192.168.1.50', end='192.168.1.51')], start_pools, ] prev_pool = start_pools for pool in new_pools: subnet_update = {"subnet": dict(allocation_pools=pool)} subnet = subnet_api.update_subnet(self.context, 1, subnet_update) self.assertNotEqual(prev_pool, subnet['allocation_pools']) self.assertEqual(pool, subnet['allocation_pools']) policies = policy_api.get_ip_policies(self.context) self.assertEqual(1, len(policies)) policy = policies[0] ip_set = netaddr.IPSet() for ip in policy['exclude']: ip_set.add(netaddr.IPNetwork(ip)) for extent in pool: for ip in netaddr.IPRange(extent['start'], extent['end']): self.assertFalse(ip in ip_set) prev_pool = pool
def test_allow_allocation_pool_growth(self): CONF.set_override('allow_allocation_pool_growth', True, 'QUARK') cidr = "192.168.1.0/24" ip_network = netaddr.IPNetwork(cidr) network = dict(name="public", tenant_id="fake", network_plugin="BASE") network = {"network": network} pool = [dict(start='192.168.1.15', end='192.168.1.30')] subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2, cidr=cidr, first_ip=ip_network.first, last_ip=ip_network.last, ip_policy=None, allocation_pools=pool, tenant_id="fake") subnet = {"subnet": subnet} with self._stubs(network, subnet) as (net, sub1): subnet = subnet_api.get_subnet(self.context, 1) start_pools = subnet['allocation_pools'] new_pool = [dict(start='192.168.1.10', end='192.168.1.50')] subnet_update = {"subnet": dict(allocation_pools=new_pool)} subnet = subnet_api.update_subnet(self.context, 1, subnet_update) self.assertNotEqual(start_pools, subnet['allocation_pools']) self.assertEqual(new_pool, subnet['allocation_pools']) policies = policy_api.get_ip_policies(self.context) self.assertEqual(1, len(policies)) policy = policies[0] ip_set = netaddr.IPSet() for ip in policy['exclude']: ip_set.add(netaddr.IPNetwork(ip)) for extent in new_pool: for ip in netaddr.IPRange(extent['start'], extent['end']): self.assertFalse(ip in ip_set) start_ip_set = netaddr.IPSet() for rng in start_pools: start_ip_set.add(netaddr.IPRange(rng['start'], rng['end'])) new_ip_set = netaddr.IPSet() for rng in subnet['allocation_pools']: new_ip_set.add(netaddr.IPRange(rng['start'], rng['end'])) self.assertTrue(start_ip_set | new_ip_set != start_ip_set)
def test_do_not_allow_allocation_pool_growth(self): CONF.set_override('allow_allocation_pool_growth', False, 'QUARK') cidr = "192.168.1.0/24" ip_network = netaddr.IPNetwork(cidr) network = dict(name="public", tenant_id="fake", network_plugin="BASE") network = {"network": network} pool = [dict(start='192.168.1.15', end='192.168.1.30')] subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2, cidr=cidr, first_ip=ip_network.first, last_ip=ip_network.last, ip_policy=None, allocation_pools=pool, tenant_id="fake") subnet = {"subnet": subnet} with self._stubs(network, subnet) as (net, sub1): subnet = subnet_api.get_subnet(self.context, 1) start_pools = subnet['allocation_pools'] new_pool = [dict(start='192.168.1.10', end='192.168.1.50')] start_ip_set = netaddr.IPSet() for rng in start_pools: start_ip_set.add(netaddr.IPRange(rng['start'], rng['end'])) new_ip_set = netaddr.IPSet() for rng in new_pool: new_ip_set.add(netaddr.IPRange(rng['start'], rng['end'])) self.assertTrue(start_ip_set | new_ip_set != start_ip_set) subnet_update = {"subnet": dict(allocation_pools=new_pool)} with self.assertRaises(n_exc.BadRequest): subnet = subnet_api.update_subnet(self.context, 1, subnet_update)