我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用netaddr.iter_iprange()。
def load_network_ips(inventory): networks = inventory['networks'] available_network_ips = {} for net_name, net in networks.iteritems(): if 'available-ips' in net: ip_list_raw = net.get('available-ips') ip_list_out = [] for ip in ip_list_raw: if ' ' in ip: ip_range = ip.split() for _ip in netaddr.iter_iprange(ip_range[0], ip_range[1]): ip_list_out.append(_ip) else: ip_list_out.append(ip) available_network_ips[net_name] = ip_list_out return available_network_ips
def test_iprange_boundaries(): assert list(iter_iprange('192.0.2.0', '192.0.2.7')) == [ IPAddress('192.0.2.0'), IPAddress('192.0.2.1'), IPAddress('192.0.2.2'), IPAddress('192.0.2.3'), IPAddress('192.0.2.4'), IPAddress('192.0.2.5'), IPAddress('192.0.2.6'), IPAddress('192.0.2.7'), ] assert list(iter_iprange('::ffff:192.0.2.0', '::ffff:192.0.2.7')) == [ IPAddress('::ffff:192.0.2.0'), IPAddress('::ffff:192.0.2.1'), IPAddress('::ffff:192.0.2.2'), IPAddress('::ffff:192.0.2.3'), IPAddress('::ffff:192.0.2.4'), IPAddress('::ffff:192.0.2.5'), IPAddress('::ffff:192.0.2.6'), IPAddress('::ffff:192.0.2.7'), ]
def get_ip_range(start, end): generator = iter_iprange(start, end, step=1) ips = [] while True: try: ips.append(str(generator.next())) except StopIteration: break return ips
def test_ip_range(): ip_list = list(iter_iprange('192.0.2.1', '192.0.2.14')) assert len(ip_list) == 14 assert ip_list == [ IPAddress('192.0.2.1'), IPAddress('192.0.2.2'), IPAddress('192.0.2.3'), IPAddress('192.0.2.4'), IPAddress('192.0.2.5'), IPAddress('192.0.2.6'), IPAddress('192.0.2.7'), IPAddress('192.0.2.8'), IPAddress('192.0.2.9'), IPAddress('192.0.2.10'), IPAddress('192.0.2.11'), IPAddress('192.0.2.12'), IPAddress('192.0.2.13'), IPAddress('192.0.2.14'), ] assert cidr_merge(ip_list) == [ IPNetwork('192.0.2.1/32'), IPNetwork('192.0.2.2/31'), IPNetwork('192.0.2.4/30'), IPNetwork('192.0.2.8/30'), IPNetwork('192.0.2.12/31'), IPNetwork('192.0.2.14/32'), ]
def gen_mako_macro(): return '''<% import netaddr from itertools import islice it = netaddr.iter_iprange('100.0.0.0','160.0.0.0') def gen_paths(num): return list('{0}/32'.format(ip) for ip in islice(it, num)) %> '''
def get_ip_range(oms_spec, key): if key in oms_spec: start, end = oms_spec[key].split('-') return iter_iprange(start, end)
def get_subnet(self, start='10.10.1.0', end='10.10.255.0', step=256): subnet_gen = netaddr.iter_iprange(start, end, step=step) i = 1 while True: with self.lock: try: yield (i, str(subnet_gen.next())) except StopIteration: subnet_gen = netaddr.iter_iprange(start, end, step=step) yield (i, str(subnet_gen.next())) i += 1
def valid_hosts(formcls, field): """Validate a list of IPs (Ipv4) or hostnames using python stdlib. This is more robust than the WTForm version as it also considers hostnames. Comma separated values: e.g. '10.7.223.101,10.7.12.0' Space separated values: e.g. '10.223.101 10.7.223.102' Ranges: e.g. '10.7.223.200-10.7.224.10' Hostnames: e.g. foo.x.y.com, baz.bar.z.com :param formcls (object): The form class. :param field (str): The list of ips. """ ip_re = re.compile(r'[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}') data = field.data if ',' in data: ips = [ip for ip in data.split(',') if ip] elif ' ' in data: ips = [ip for ip in data.split(' ') if ip] elif '-' in data and re.match(ip_re, data): try: start, end = data.split('-') ips = iter_iprange(start, end) ips = [str(ip) for ip in list(ips)] except ValueError: raise ValueError( 'Invalid range specified. Format should be: ' 'XXX.XXX.XXX.XXX-XXX.XXX.XXX.XXX ' '(e.g. 10.7.223.200-10.7.224.10)') except AddrFormatError as e: raise ValueError(e) else: # Just use the single ip ips = [data] # If any fails conversion, it is invalid. for ip in ips: # Skip hostnames if not is_ip(ip): if not is_hostname(ip): raise ValueError('Invalid hostname: "{}"'.format(ip)) else: continue try: socket.inet_aton(ip) except socket.error: raise ValueError('Invalid IP: {}'.format(ip))
def _create_lports(self, lswitch, lport_create_args = [], lport_amount=1): LOG.info("create %d lports on lswitch %s" % \ (lport_amount, lswitch["name"])) self.RESOURCE_NAME_FORMAT = "lport_XXXXXX_XXXXXX" batch = lport_create_args.get("batch", lport_amount) LOG.info("Create lports method: %s" % self.install_method) install_method = self.install_method network_cidr = lswitch.get("cidr", None) ip_addrs = None if network_cidr: end_ip = network_cidr.ip + lport_amount if not end_ip in network_cidr: message = _("Network %s's size is not big enough for %d lports.") raise exceptions.InvalidConfigException( message % (network_cidr, lport_amount)) ip_addrs = netaddr.iter_iprange(network_cidr.ip, network_cidr.last) ovn_nbctl = self.controller_client("ovn-nbctl") ovn_nbctl.set_sandbox("controller-sandbox", install_method) ovn_nbctl.enable_batch_mode() base_mac = [i[:2] for i in self.task["uuid"].split('-')] base_mac[0] = str(hex(int(base_mac[0], 16) & 254)) base_mac[3:] = ['00']*3 flush_count = batch lports = [] for i in range(lport_amount): name = self.generate_random_name() lport = ovn_nbctl.lswitch_port_add(lswitch["name"], name) ip = str(ip_addrs.next()) if ip_addrs else "" mac = utils.get_random_mac(base_mac) ovn_nbctl.lport_set_addresses(name, [mac, ip]) ovn_nbctl.lport_set_port_security(name, mac) lports.append(lport) flush_count -= 1 if flush_count < 1: ovn_nbctl.flush() flush_count = batch ovn_nbctl.flush() # ensure all commands be run ovn_nbctl.enable_batch_mode(False) return lports
def main(): module = AnsibleModule( argument_spec=dict( start_cidr=dict(required=True), num_emulation_hosts=dict(required=False, default="null"), num_ip=dict(required=True) ), supports_check_mode=True ) start_cidr = module.params['start_cidr'] num_emulation_hosts = module.params['num_emulation_hosts'] num_ip = module.params['num_ip'] sandbox_cidr = netaddr.IPNetwork(start_cidr) sandbox_hosts = netaddr.iter_iprange(sandbox_cidr.ip, sandbox_cidr.last) ip_data = t_ip_data() chassis_per_host = int(num_ip) / int(num_emulation_hosts) overflow = 0 for i in range(0, int(num_ip)): ''' cidr = start_cidr_ip.split('.')[0] + "." + \ start_cidr_ip.split('.')[1] + "." + \ start_cidr_ip.split('.')[2] + "." + \ str(int(start_cidr_ip.split('.')[3]) + i) ''' # ip_data.index.append(i % int(num_emulation_hosts)) index = i / chassis_per_host if (index >= int(num_emulation_hosts)): index = int(num_emulation_hosts) - 1 overflow += 1 ip_data.index.append(index) ip_data.ip_list.append(str(sandbox_hosts.next())) farm_data = t_farm_data() num_sandbox = 0 sandbox_hosts = netaddr.iter_iprange(sandbox_cidr.ip, sandbox_cidr.last) for i in range(0, int(num_emulation_hosts)): farm_data.farm_index.append(i) num_sandbox = chassis_per_host if (i == int(num_emulation_hosts) - 1): num_sandbox = chassis_per_host + overflow farm_data.num_sandbox_farm.append(num_sandbox) farm_data.start_cidr_farm.append(str(sandbox_hosts.next())) for i in range (0, num_sandbox - 1): sandbox_hosts.next() module.exit_json(changed=True,ip_index=ip_data.index, \ ip_index_list=str(ip_data.ip_list), \ prefixlen=str(sandbox_cidr.prefixlen), farm_index=farm_data.farm_index, num_sandbox_farm=farm_data.num_sandbox_farm, start_cidr_farm=farm_data.start_cidr_farm) # import module snippets