我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用netaddr.IPNetwork()。
def is_address_in_network(network, address): """ Determine whether the provided address is within a network range. :param network (str): CIDR presentation format. For example, '192.168.1.0/24'. :param address: An individual IPv4 or IPv6 address without a net mask or subnet prefix. For example, '192.168.1.1'. :returns boolean: Flag indicating whether address is in network. """ try: network = netaddr.IPNetwork(network) except (netaddr.core.AddrFormatError, ValueError): raise ValueError("Network (%s) is not in CIDR presentation format" % network) try: address = netaddr.IPAddress(address) except (netaddr.core.AddrFormatError, ValueError): raise ValueError("Address (%s) is not in correct presentation format" % address) if address in network: return True else: return False
def _get_ipv6_network_from_address(address): """Get an netaddr.IPNetwork for the given IPv6 address :param address: a dict as returned by netifaces.ifaddresses :returns netaddr.IPNetwork: None if the address is a link local or loopback address """ if address['addr'].startswith('fe80') or address['addr'] == "::1": return None prefix = address['netmask'].split("/") if len(prefix) > 1: netmask = prefix[1] else: netmask = address['netmask'] return netaddr.IPNetwork("%s/%s" % (address['addr'], netmask))
def contains(self, ip_address): "Try to match a candidate ip_address and see whether it was in the list of allowed cidrs" ip_address = netaddr.IPNetwork(ip_address) result = False for cidr in self.cidr: net = netaddr.IPNetwork(cidr) if ip_address in net: result = True break if self.invert: return not result return result # Test program
def validate(self, value): super(IPField, self).validate(value) if not value and not self.required: return try: if self.mask: self.ip = netaddr.IPNetwork(value) else: self.ip = netaddr.IPAddress(value) except Exception: raise ValidationError(self.invalid_format_message) if not any([self.version & IPv4 > 0 and self.ip.version == 4, self.version & IPv6 > 0 and self.ip.version == 6]): raise ValidationError(self.invalid_version_message) if self.mask: if self.ip.version == 4 and \ not self.min_mask <= self.ip.prefixlen <= self.max_v4_mask: raise ValidationError(self.invalid_mask_message) if self.ip.version == 6 and \ not self.min_mask <= self.ip.prefixlen <= self.max_v6_mask: raise ValidationError(self.invalid_mask_message)
def is_valid_cidr(address): """Check if the provided ipv4 or ipv6 address is a valid CIDR address.""" try: # Validate the correct CIDR Address netaddr.IPNetwork(address) except netaddr.core.AddrFormatError: return False except UnboundLocalError: # NOTE(MotoKen): work around bug in netaddr 0.7.5 (see detail in # https://github.com/drkjam/netaddr/issues/2) return False # Prior validation partially verify /xx part # Verify it here ip_segment = address.split('/') if (len(ip_segment) <= 1 or ip_segment[1] == ''): return False return True
def resource_setup(cls): super(L2GatewayConnectionTest, cls).resource_setup() # create primary tenant's VLAN network _subnet = cls.getattr_or_skip_test("vlan_subnet_ipv4_dict") for _x in ('mask_bits',): if _x in _subnet: _subnet[_x] = int(_subnet[_x]) # cidr must be presented & in IPNetwork structure _subnet['cidr'] = netaddr.IPNetwork(_subnet['cidr']) _start = _subnet.pop('start', None) _end = _subnet.pop('end', None) if _start and _end: _subnet['allocation_pools'] = [{'start': _start, 'end': _end}] cls.network = cls.create_network() # baseAdminNetworkTest does not derive ip_version, mask_bits from cidr _subnet['ip_version'] = 4 if 'mask_bits' not in _subnet: _subnet['mask_bits'] = _subnet['cidr'].prefixlen cls.subnet = cls.create_subnet(cls.network, **_subnet)
def _create_subnet_with_last_subnet_block(cls, network, ip_version=4): """Derive last subnet CIDR block from tenant CIDR and create the subnet with that derived CIDR """ if ip_version == 4: cidr = netaddr.IPNetwork(CONF.network.project_network_cidr) mask_bits = CONF.network.project_network_mask_bits elif ip_version == 6: cidr = netaddr.IPNetwork(CONF.network.project_network_v6_cidr) mask_bits = CONF.network.project_network_v6_mask_bits subnet_cidr = list(cidr.subnet(mask_bits))[-1] gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1) body = cls.create_subnet(network, gateway=gateway_ip, cidr=subnet_cidr, mask_bits=mask_bits) return body['subnet']
def resource_setup(cls): """ Setting up the resources for the test. """ super(L2GatewayScenarioTest, cls).resource_setup() # Create subnet on the network just created. cls.SUBNET_1_NETWORK_CIDR = CONF.l2gw.subnet_1_cidr # VLAN id used in setups cls.VLAN_1 = CONF.l2gw.vlan_1 cls.VLAN_2 = CONF.l2gw.vlan_2 # IPs of predeployed vms. cls.VM_ON_VDS_TZ1_VLAN16_IP = CONF.l2gw.vm_on_vds_tz1_vlan16_ip cls.VM1_ON_SWITCH_VLAN16 = CONF.l2gw.vm_on_switch_vlan16 cls.VM1_ON_VDS_TZ2_VLAN16_IP = CONF.l2gw.vm_on_vds_tz2_vlan16_ip cls.VM1_ON_VDS_TZ2_VLAN17_IP = CONF.l2gw.vm_on_vds_tz2_vlan17_ip cls.SUBNET_1_MASK = cls.SUBNET_1_NETWORK_CIDR.split("/")[1] cls.CIDR = netaddr.IPNetwork(cls.SUBNET_1_NETWORK_CIDR)
def deploy_l2gateway_topology(self): router_l2gateway = self.create_topology_router("router_l2gateway") # L2gateway network with router network_l2gateway = self.create_topology_network("network_l2gateway") # cidr must be presented & in IPNetwork structure. self.CIDR = netaddr.IPNetwork(self.SUBNET_1_NETWORK_CIDR) self.create_topology_subnet( "subnet1_l2gateway", network_l2gateway, cidr=self.CIDR, router_id=router_l2gateway["id"], mask_bits=int(self.SUBNET_1_MASK)) secgroup = self.create_topology_security_group() secgroups = [{'name': secgroup['name']}] self.create_topology_instance( "server1_l2gateway", [network_l2gateway], security_groups=secgroups) self.create_topology_instance( "server2_l2gateway", [network_l2gateway], security_groups=secgroups)
def setup_link(self, interface, cidr): """Setup a link. ip addr add dev interface ip link set dev interface up """ # clear old ipaddr in interface cmd = ['ip', 'addr', 'flush', 'dev', interface] agent_utils.execute(cmd, root=True) ip = IPNetwork(cidr) cmd = ['ip', 'addr', 'add', cidr, 'broadcast', str(ip.broadcast), 'dev', interface] stdcode, stdout = agent_utils.execute(cmd, root=True) if stdcode == 0: cmd = ['ip', 'link', 'set', 'dev', interface, 'up'] stdcode, stdout = agent_utils.execute(cmd, root=True) if stdcode == 0: return agent_utils.make_response(code=stdcode) # execute failed. message = stdout.pop(0) return agent_utils.make_response(code=stdcode, message=message)
def obj_to_primitive(obj): """Recursively turn an object into a python primitive. A ZunObject becomes a dict, and anything that implements ObjectListBase becomes a list. """ if isinstance(obj, ObjectListBase): return [obj_to_primitive(x) for x in obj] elif isinstance(obj, ZunObject): result = {} for key in obj.obj_fields: if obj.obj_attr_is_set(key) or key in obj.obj_extra_fields: result[key] = obj_to_primitive(getattr(obj, key)) return result elif isinstance(obj, netaddr.IPAddress): return str(obj) elif isinstance(obj, netaddr.IPNetwork): return str(obj) else: return obj
def serialize(self, nested=False): """Serializes an object Pulls all the attributes in an object and creates a dict that can be turned into the json that netbox is expecting. If an attribute's value is a ``Record`` or ``IPRecord`` type it's replaced with the ``id`` field of that object. :returns: dict of values the NetBox API is expecting. """ if nested: return _get_return(self) ret = {} for i in dict(self): current_val = getattr(self, i) if isinstance(current_val, Record): current_val = getattr(current_val, 'serialize')(nested=True) if isinstance(current_val, netaddr.ip.IPNetwork): current_val = str(current_val) ret.update({i: current_val}) return ret
def is_valid_cidr(address): """Verify that address represents a valid CIDR address. :param address: Value to verify :type address: string :returns: bool .. versionadded:: 3.8 """ try: # Validate the correct CIDR Address netaddr.IPNetwork(address) except (TypeError, netaddr.AddrFormatError): return False # Prior validation partially verify /xx part # Verify it here ip_segment = address.split('/') if (len(ip_segment) <= 1 or ip_segment[1] == ''): return False return True
def populate_network_variables(inventory, inventory_source): # Add the networks from the inventory source into the host_vars networks = copy.deepcopy(inventory_source['networks']) for network in networks.values(): # Add properties network address and the netmask addr = network.get('addr', None) if addr: ip = netaddr.IPNetwork(addr) if ip.prefixlen != 1: # We don't put networks in with prefix length == 1 because # the inventory file uses this to note that while the host # has an interface connected to this network, that interface # does not directly get an IP address and the address goes # on a bridge. network['network'] = str(ip.network) network['netmask'] = str(ip.netmask) inventory['all']['vars']['networks'] = networks
def get_client_ip(self, xid, dhcp_options): try: field_name, req_addr = dhcp_options[2] if field_name == 'requested_addr': return 'requested', req_addr raise ValueError except ValueError: for field in dhcp_options: if (field is tuple) and (field[0] == 'requested_addr'): return field[1] if xid in self.dhcp_dic.keys(): client_ip = self.dhcp_dic[xid] return 'stored', client_ip net = IPNetwork(self.ip_address + '/24') return 'generated', str(random.choice(list(net)))
def test_ipnetwork_cidr_merge(): ip_list = ( list(IPNetwork('fe80::/120')) + [ IPNetwork('192.0.2.0/24'), IPNetwork('192.0.4.0/25'), IPNetwork('192.0.4.128/25'), ] + list(map(str, IPNetwork('192.0.3.0/24'))) ) assert len(ip_list) == 515 assert cidr_merge(ip_list) == [ IPNetwork('192.0.2.0/23'), IPNetwork('192.0.4.0/24'), IPNetwork('fe80::/120'), ]
def test_supernetting(): ip = IPNetwork('192.0.2.114') supernets = ip.supernet(22) assert supernets == [ IPNetwork('192.0.0.0/22'), IPNetwork('192.0.2.0/23'), IPNetwork('192.0.2.0/24'), IPNetwork('192.0.2.0/25'), IPNetwork('192.0.2.64/26'), IPNetwork('192.0.2.96/27'), IPNetwork('192.0.2.112/28'), IPNetwork('192.0.2.112/29'), IPNetwork('192.0.2.112/30'), IPNetwork('192.0.2.114/31'), ]
def test_ipnetwork_sort_order(): ip_list = list(IPNetwork('192.0.2.128/28')) random.shuffle(ip_list) assert sorted(ip_list) == [ IPAddress('192.0.2.128'), IPAddress('192.0.2.129'), IPAddress('192.0.2.130'), IPAddress('192.0.2.131'), IPAddress('192.0.2.132'), IPAddress('192.0.2.133'), IPAddress('192.0.2.134'), IPAddress('192.0.2.135'), IPAddress('192.0.2.136'), IPAddress('192.0.2.137'), IPAddress('192.0.2.138'), IPAddress('192.0.2.139'), IPAddress('192.0.2.140'), IPAddress('192.0.2.141'), IPAddress('192.0.2.142'), IPAddress('192.0.2.143'), ]
def test_ipaddress_and_ipnetwork_canonical_sort_order_by_version(): ip_list = [ IPAddress('192.0.2.130'), IPNetwork('192.0.2.128/28'), IPAddress('::'), IPNetwork('192.0.3.0/24'), IPNetwork('192.0.2.0/24'), IPNetwork('fe80::/64'), IPNetwork('172.24/12'), IPAddress('10.0.0.1'), ] random.shuffle(ip_list) ip_list.sort() assert ip_list == [ IPAddress('10.0.0.1'), IPNetwork('172.24.0.0/12'), IPNetwork('192.0.2.0/24'), IPNetwork('192.0.2.128/28'), IPAddress('192.0.2.130'), IPNetwork('192.0.3.0/24'), IPAddress('::'), IPNetwork('fe80::/64'), ]
def test_iterhosts_v4(): assert list(IPNetwork('192.0.2.0/29').iter_hosts()) == [ IPAddress('192.0.2.1'), IPAddress('192.0.2.2'), IPAddress('192.0.2.3'), IPAddress('192.0.2.4'), IPAddress('192.0.2.5'), IPAddress('192.0.2.6'), ] assert list(IPNetwork("192.168.0.0/31")) == [ IPAddress('192.168.0.0'), IPAddress('192.168.0.1'), ] assert list(IPNetwork("1234::/128")) == [IPAddress('1234::')] assert list(IPNetwork("1234::/128").iter_hosts()) == [] assert list(IPNetwork("192.168.0.0/31").iter_hosts()) == [IPAddress('192.168.0.0'),IPAddress('192.168.0.1')] assert list(IPNetwork("192.168.0.0/32").iter_hosts()) == [IPAddress('192.168.0.0')]
def test_advanced_comparisons(): assert IPNetwork('192.0.2.0/24') == IPNetwork('192.0.2.112/24') assert IPNetwork('192.0.2.0/24').ip != IPNetwork('192.0.2.112/24').ip assert IPNetwork('192.0.2.0/24').ip < IPNetwork('192.0.2.112/24').ip assert IPNetwork('192.0.2.0/24').cidr == IPNetwork('192.0.2.112/24').cidr assert IPNetwork('192.0.2.0/24') != IPNetwork('192.0.3.0/24') assert IPNetwork('192.0.2.0/24') < IPNetwork('192.0.3.0/24') assert IPAddress('192.0.2.0') != IPNetwork('192.0.2.0/32') assert IPAddress('192.0.2.0') == IPNetwork('192.0.2.0/32')[0] assert IPAddress('192.0.2.0') == IPNetwork('192.0.2.0/32')[-1] assert IPAddress('192.0.2.0') == IPNetwork('192.0.2.0/32')[0] assert IPAddress('192.0.2.0') == IPNetwork('192.0.2.0/32').ip assert IPAddress('192.0.2.0') == IPNetwork('192.0.2.0/32').ip
def test_ipset_unions_intersections_differences(): adj_cidrs = list(IPNetwork('192.0.2.0/24').subnet(28)) even_cidrs = adj_cidrs[::2] evens = IPSet(even_cidrs) assert evens == IPSet([ '192.0.2.0/28', '192.0.2.32/28', '192.0.2.64/28', '192.0.2.96/28', '192.0.2.128/28', '192.0.2.160/28', '192.0.2.192/28', '192.0.2.224/28', ]) assert IPSet(['192.0.2.0/24']) & evens == IPSet([ '192.0.2.0/28', '192.0.2.32/28', '192.0.2.64/28', '192.0.2.96/28', '192.0.2.128/28', '192.0.2.160/28', '192.0.2.192/28', '192.0.2.224/28']) odds = IPSet(['192.0.2.0/24']) ^ evens assert odds == IPSet([ '192.0.2.16/28', '192.0.2.48/28', '192.0.2.80/28', '192.0.2.112/28', '192.0.2.144/28', '192.0.2.176/28', '192.0.2.208/28', '192.0.2.240/28']) assert evens | odds == IPSet(['192.0.2.0/24']) assert evens & odds == IPSet([]) assert evens ^ odds == IPSet(['192.0.2.0/24'])
def render_ios(self, subgroup=False): config = '' if not subgroup: config = 'no object-group ip address {}\n'.format(self.name) config += 'object-group ip address {}'.format(self.name) for prefix in self._prefixes: if prefix.startswith('@'): ihg = [g for g in self._inline_hostgroups if g.name == prefix[1:]][0] config += ihg.render_ios(True) else: ipn = IPNetwork(prefix) if ipn.prefixlen == 32: config = '{}\nhost {}'.format(config, ipn.ip) else: config = '{}\n{} {}'.format(config, ipn.ip, ipn.netmask) if not subgroup: config += '\nexit' return config
def extract_ipv6_info(): """ Extracts IPv6 information""" print ("IPv6 support built into Python: %s" %socket.has_ipv6) for interface in ni.interfaces(): all_addresses = ni.ifaddresses(interface) print ("Interface %s:" %interface) for family,addrs in all_addresses.items(): fam_name = ni.address_families[family] for addr in addrs: if fam_name == 'AF_INET6': addr = addr['addr'] has_eth_string = addr.split("%eth") if has_eth_string: addr = addr.split("%eth")[0] try: print (" IP Address: %s" %na.IPNetwork(addr)) print (" IP Version: %s" %na.IPNetwork(addr).version) print (" IP Prefix length: %s" %na.IPNetwork(addr).prefixlen) print (" Network: %s" %na.IPNetwork(addr).network) print (" Broadcast: %s" %na.IPNetwork(addr).broadcast) except Exception as e: print ("Skip Non-IPv6 Interface")
def _validate_cidr(network): try: netaddr.IPNetwork(network) except (netaddr.core.AddrFormatError, ValueError): raise ValueError("Network (%s) is not in CIDR presentation format" % network)
def _get_for_address(address, key): """Retrieve an attribute of or the physical interface that the IP address provided could be bound to. :param address (str): An individual IPv4 or IPv6 address without a net mask or subnet prefix. For example, '192.168.1.1'. :param key: 'iface' for the physical interface name or an attribute of the configured interface, for example 'netmask'. :returns str: Requested attribute or None if address is not bindable. """ address = netaddr.IPAddress(address) for iface in netifaces.interfaces(): addresses = netifaces.ifaddresses(iface) if address.version == 4 and netifaces.AF_INET in addresses: addr = addresses[netifaces.AF_INET][0]['addr'] netmask = addresses[netifaces.AF_INET][0]['netmask'] network = netaddr.IPNetwork("%s/%s" % (addr, netmask)) cidr = network.cidr if address in cidr: if key == 'iface': return iface else: return addresses[netifaces.AF_INET][0][key] if address.version == 6 and netifaces.AF_INET6 in addresses: for addr in addresses[netifaces.AF_INET6]: if not addr['addr'].startswith('fe80'): network = netaddr.IPNetwork("%s/%s" % (addr['addr'], addr['netmask'])) cidr = network.cidr if address in cidr: if key == 'iface': return iface elif key == 'netmask' and cidr: return str(cidr).split('/')[1] else: return addr[key] return None
def resolve_network_cidr(ip_address): ''' Resolves the full address cidr of an ip_address based on configured network interfaces ''' netmask = get_netmask_for_address(ip_address) return str(netaddr.IPNetwork("%s/%s" % (ip_address, netmask)).cidr)
def run(self): Analyzer.run(self) data = self.getParam('data', None, 'Data is missing') if self.data_type != 'fqdn' and self.data_type != 'ip': self.error('Invalid data type') if self.allowed_networks is not None: if self.data_type == 'fqdn': address = IPAddress(socket.gethostbyname(data)) else: try: address = IPAddress(data) except Exception as e: self.error("{}".format(e)) if not any(address in IPNetwork(network) for network in self.allowed_networks): self.error('Invalid target: not in any allowed network') scanner_args = { 'url': self.url, 'login': self.login, 'password': self.password } if self.ca_bundle is not None: scanner_args.update({'ca_bundle': self.ca_bundle}) else: scanner_args.update({'insecure': True}) try: scanner = ness6rest.Scanner(**scanner_args) scanner.policy_set(name=self.policy) scanner.scan_add(targets=data, name="cortex scan for " + data) self._run_scan(scanner) results = self._get_scan_results(scanner) self._delete_scan(scanner) except Exception as ex: self.error('Scanner error: %s' % ex) self.report(results)
def _get_for_address(address, key): """Retrieve an attribute of or the physical interface that the IP address provided could be bound to. :param address (str): An individual IPv4 or IPv6 address without a net mask or subnet prefix. For example, '192.168.1.1'. :param key: 'iface' for the physical interface name or an attribute of the configured interface, for example 'netmask'. :returns str: Requested attribute or None if address is not bindable. """ address = netaddr.IPAddress(address) for iface in netifaces.interfaces(): addresses = netifaces.ifaddresses(iface) if address.version == 4 and netifaces.AF_INET in addresses: addr = addresses[netifaces.AF_INET][0]['addr'] netmask = addresses[netifaces.AF_INET][0]['netmask'] network = netaddr.IPNetwork("%s/%s" % (addr, netmask)) cidr = network.cidr if address in cidr: if key == 'iface': return iface else: return addresses[netifaces.AF_INET][0][key] if address.version == 6 and netifaces.AF_INET6 in addresses: for addr in addresses[netifaces.AF_INET6]: network = _get_ipv6_network_from_address(addr) if not network: continue cidr = network.cidr if address in cidr: if key == 'iface': return iface elif key == 'netmask' and cidr: return str(cidr).split('/')[1] else: return addr[key] return None
def _address(addresses, network): """ Return all addresses in the given network Note: list comprehension vs. netaddr vs. simple """ matched = [] for address in addresses: log.debug("_address: ip {} in network {} ".format(address, network)) if IPAddress(address) in IPNetwork(network): matched.append(address) return matched