我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用ipaddr.IPNetwork()。
def __init__(self, data # Data suitable for this class ): valid, message = data_is_valid(data) if not valid: raise ValueError("Invalid data: %s" % message) self.cidrs = [] for iface in netifaces.interfaces(): ifaddrs = netifaces.ifaddresses(iface) if netifaces.AF_INET in ifaddrs: for ifaddr in ifaddrs[netifaces.AF_INET]: if 'addr' in ifaddr: self.cidrs.append(ipaddr.IPNetwork(ifaddr['addr'])) if netifaces.AF_INET6 in ifaddrs: for ifaddr in ifaddrs[netifaces.AF_INET6]: if 'addr' in ifaddr: #add v6 but remove stuff like %eth0 that gets thrown on end of some addrs self.cidrs.append(ipaddr.IPNetwork(ifaddr['addr'].split('%')[0]))
def evaluate(self, hints # Information used for doing identification ): """Given a set of hints, evaluate this identifier and return True if an identification is made. """ try: ip = ipaddr.IPNetwork(hints['requester']) except KeyError: return False # TODO: Find out of there's a more hash-like way to do this # instead of a linear search. This would be great if it # weren't GPL: https://pypi.python.org/pypi/pytricia for cidr in self.cidrs: if ip in cidr: return True return False # A short test program
def __init__(self, data # Data suitable for this class ): valid, message = data_is_valid(data) if not valid: raise ValueError("Invalid data: %s" % message) self.exclude = [ ipaddr.IPNetwork(addr) for addr in data['exclude'] ] try: timeout = pscheduler.iso8601_as_timedelta(data['timeout']) self.timeout = pscheduler.timedelta_as_seconds(timeout) except KeyError: self.timeout = 2 try: self.fail_result = data['fail-result'] except KeyError: self.fail_result = False self.resolver = dns.resolver.Resolver() self.resolver.timeout = self.timeout self.resolver.lifetime = self.timeout
def Validate(self, value, unused_key=None): """Validates a subnet.""" if value is None: raise validation.MissingAttribute('subnet must be specified') if not isinstance(value, basestring): raise validation.ValidationError('subnet must be a string, not \'%r\'' % type(value)) try: ipaddr.IPNetwork(value) except ValueError: raise validation.ValidationError('%s is not a valid IPv4 or IPv6 subnet' % value) parts = value.split('/') if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]): raise validation.ValidationError('Prefix length of subnet %s must be an ' 'integer (quad-dotted masks are not ' 'supported)' % value) return value
def test_duplicate(net_str, net2_str=None, dup_found=True): usres_monitor.add_net(ipaddr.IPNetwork(net_str)) try: usres_monitor.add_net( ipaddr.IPNetwork(net2_str if net2_str else net_str) ) except USRESMonitorException as e: if "it was already in the db" in str(e): test_outcome("test_duplicate", "{} and {}".format( net_str, net2_str if net2_str else net_str), "OK" if dup_found else "FAIL") if not dup_found: raise AssertionError("Duplicate found") return test_outcome("test_duplicate", "{} and {}".format( net_str, net2_str if net2_str else net_str), "FAIL" if dup_found else "OK") if dup_found: raise AssertionError("Duplicate not found")
def read_testbed_topo(self): with open(self.testbed_filename) as f: topo = csv.DictReader(f) for line in topo: tb_prop = {} name = '' for key in line: if ('uniq-name' in key or 'conf-name' in key) and '#' in line[key]: ### skip comment line continue elif 'uniq-name' in key or 'conf-name' in key: name = line[key] elif 'ptf_ip' in key and line[key]: ptfaddress = ipaddress.IPNetwork(line[key]) tb_prop['ptf_ip'] = str(ptfaddress.ip) tb_prop['ptf_netmask'] = str(ptfaddress.netmask) else: tb_prop[key] = line[key] if name: self.testbed_topo[name] = tb_prop return
def is_valid_ip(ip_address, project): """ Verify that an IP address is not being blacklisted for the given project. """ blacklist = project.get_option('sentry:blacklisted_ips') if not blacklist: return True ip_network = IPNetwork(ip_address) for addr in blacklist: # We want to error fast if it's an exact match if ip_address == addr: return False # Check to make sure it's actually a range before # attempting to see if we're within that range if '/' in addr and ip_network in IPNetwork(addr): return False return True # #845, add for server name filter, by hzwangzhiwei @20160803
def parse_device_desc_xml(filename): root = ET.parse(filename).getroot() (lo_prefix, mgmt_prefix, hostname, hwsku, d_type) = parse_device(root) results = {} results['DEVICE_METADATA'] = {'localhost': { 'hostname': hostname, 'hwsku': hwsku, }} results['LOOPBACK_INTERFACE'] = {('lo', lo_prefix): {}} mgmt_intf = {} mgmtipn = ipaddress.IPNetwork(mgmt_prefix) gwaddr = ipaddress.IPAddress(int(mgmtipn.network) + 1) results['MGMT_INTERFACE'] = {('eth0', mgmt_prefix): {'gwaddr': gwaddr}} return results
def __init__(self, data # Data suitable for this class ): valid, message = data_is_valid(data) if not valid: raise ValueError("Invalid data: %s" % message) self.cidrs = [] for cidr in data['cidrs']: self.cidrs.append(ipaddr.IPNetwork(cidr))
def net_is_valid(self, net): """ Returns true if the given string is a valid IP network (v4 or v6). """ try: ipaddr.IPNetwork(net) return True except ValueError: return False #}}} #{{{ IP type
def get_network(self, net_type): """ Returns a network (as a string) that includes the private/public IPs of all nodes in the config. Raises an EXAConfError if an invalid IP is found or the IP of at least one node is not part of the network defined by the first node section. This function assumes that all nodes have an entry for the requested network type. The calling function has to check if the network type is actually present (private / public). """ network = "" for section in self.config.sections: if self.is_node(section): node_sec = self.config[section] node_network = node_sec.get(net_type) if not node_network or node_network == "": raise EXAConfError("Network type '%s' is missing in section '%s'!" % (net_type, section)) node_ip = node_network.split("/")[0].strip() # check if the extracted IP is valid if not self.ip_is_valid(node_ip): raise EXAConfError("IP %s in section '%s' is invalid!" % (node_ip, section)) # first node : choose the private net as the cluster network (and make it a 'real' network) if network == "": subnet = ipaddr.IPNetwork(node_network) network = "%s/%s" % (str(subnet.network), str(subnet.prefixlen)) # other nodes : check if their IP is part of the chosen net elif ipaddr.IPAddress(node_ip) not in ipaddr.IPNetwork(network): raise EXAConfError("IP %s is not part of network %s!" % (node_ip, network)) return network #}}} #{{{ Get private network
def _ValidateEntry(self, entry): if not entry.subnet: return MISSING_SUBNET try: ipaddr.IPNetwork(entry.subnet) except ValueError: return BAD_IPV_SUBNET % entry.subnet parts = entry.subnet.split('/') if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]): return BAD_PREFIX_LENGTH % entry.subnet
def is_private_network(cidr): return ipaddr.IPNetwork(cidr).is_private
def test_min_max(net_str, target_prefix_len, exp_first, exp_last, shoud_fail=False): failed = False try: net = ipaddr.IPNetwork(net_str) # assert net_str.lower() == "{}/{}".format(exp_first, net.prefixlen), \ # ("String representations of original net and first expected " # "prefix don't match.") min_int, max_int, _ = usres_monitor.get_sre(net, target_prefix_len) first = usres_monitor.get_ip_repr(net.version, min_int) last = usres_monitor.get_ip_repr(net.version, max_int) res = "{}/{}".format(first, target_prefix_len) exp = "{}/{}".format(exp_first.lower(), target_prefix_len) assert res == exp, \ ("First expected prefix doesn't match: {} vs {}".format( res, exp) ) res = "{}/{}".format(last, target_prefix_len) exp = "{}/{}".format(exp_last.lower(), target_prefix_len) assert res == exp, \ ("Last expected prefix doesn't match: {} vs {}".format( res, exp) ) except AssertionError: failed = True if shoud_fail: pass else: raise assert shoud_fail == failed test_outcome("get_sre", "{} in /{}".format(net_str, target_prefix_len), "OK{}".format(" (failed as expected)" if shoud_fail else ""))
def get_net(net): if isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network)): return net return ipaddr.IPNetwork(net)
def load_cluster_networks(self): self.cluster_net = None self.public_net = None osd = self.get_alive_osd() if osd is not None: cluster_net_str = osd.config.get('cluster_network') if cluster_net_str is not None and cluster_net_str != "": self.cluster_net = IPNetwork(cluster_net_str) public_net_str = osd.config.get('public_network', None) if public_net_str is not None and public_net_str != "": self.public_net = IPNetwork(public_net_str)
def test_0020_select_subnet_for_ip(self): subnets = dormitory.Subnet.q.order_by(dormitory.Subnet.gateway).all() for subnet in subnets: for ip in ipaddr.IPNetwork(subnet.address).iterhosts(): selected = select_subnet_for_ip(ip.compressed, subnets) self.assertEqual(subnet, selected)
def netmask(self): net = ipaddr.IPNetwork(self.address) return str(net.netmask)
def ip_version(self): return ipaddr.IPNetwork(self.address).version
def _ip_subnet_valid(self, ip, subnet): return ipaddr.IPAddress(ip) in ipaddr.IPNetwork(subnet.address)
def cni_ipam(host_cidrv4: str, host_gateway: str): """ With the class variables provide a way to generate a static host-local ipam :param host_cidrv4: :param host_gateway: :return: dict """ host = ipaddr.IPNetwork(host_cidrv4) subnet = host ip_cut = int(host.ip.__str__().split(".")[-1]) if ConfigSyncSchedules.sub_ips: sub = ipaddr.IPNetwork(host.network).ip + (ip_cut * ConfigSyncSchedules.sub_ips) host = ipaddr.IPNetwork(sub) range_start = host.ip + ConfigSyncSchedules.skip_ips range_end = range_start + ConfigSyncSchedules.range_nb_ips ipam = { "type": "host-local", "subnet": "%s/%s" % (subnet.network.__str__(), subnet.prefixlen), "rangeStart": range_start.__str__(), "rangeEnd": range_end.__str__(), "gateway": host_gateway, "routes": [ {"dst": "%s/32" % EC.perennial_local_host_ip, "gw": ipaddr.IPNetwork(host_cidrv4).ip.__str__()}, {"dst": "0.0.0.0/0"}, ], "dataDir": "/var/lib/cni/networks" } return ipam
def __init__(self, ip): if ip_library == "ipaddr": self.obj = ipaddr.IPNetwork(ip) self.ip = str(self.obj.ip) else: self.obj = ipaddress.ip_network(ip) self.ip = str(self.obj.network_address) self.prefixlen = self.obj.prefixlen self.max_prefixlen = self.obj.max_prefixlen self.version = self.obj.version
def is_valid_url(url): """ Tests a URL to ensure it doesn't appear to be a blacklisted IP range. """ parsed = urlparse(url) if not parsed.hostname: return False server_hostname = get_server_hostname() if parsed.hostname == server_hostname: return True try: ip_address = socket.gethostbyname(parsed.hostname) except socket.gaierror: return False if ip_address == server_hostname: return True ip_network = IPNetwork(ip_address) for addr in DISALLOWED_IPS: if ip_network in addr: return False return True
def __init__(self, vid, conf=None): if conf is None: conf = {} self.vid = vid self.tagged = [] self.untagged = [] self.name = conf.setdefault('name', str(vid)) self.description = conf.setdefault('description', self.name) self.controller_ips = conf.setdefault('controller_ips', []) if self.controller_ips: self.controller_ips = [ ipaddr.IPNetwork(ip) for ip in self.controller_ips] self.unicast_flood = conf.setdefault('unicast_flood', True) self.routes = conf.setdefault('routes', {}) self.ipv4_routes = {} self.ipv6_routes = {} if self.routes: self.routes = [route['route'] for route in self.routes] for route in self.routes: ip_gw = ipaddr.IPAddress(route['ip_gw']) ip_dst = ipaddr.IPNetwork(route['ip_dst']) assert(ip_gw.version == ip_dst.version) if ip_gw.version == 4: self.ipv4_routes[ip_dst] = ip_gw else: self.ipv6_routes[ip_dst] = ip_gw self.arp_cache = {} self.nd_cache = {} self.max_hosts = conf.setdefault('max_hosts', None) self.host_cache = {}
def get(self): esstore = current_app.extensions['es_access_store'] mpstore = current_app.extensions['mp_access_store'] args = self.parser.parse_args() event = args['event'][:120] auth_token=request.headers.get('Auth-Token') ip_resolver = current_app.config['IP_RESOLVER'] ip = RateLimiter.get_remote_addr() ip_net = IPNetwork(ip) resolved_org = ip_resolver['default'] for net, org in ip_resolver.items(): if isinstance(net, (IPv4Network, IPv6Network)): if net.overlaps(ip_net): resolved_org = org break data = dict(ip=ip, org=resolved_org, host=request.host, timestamp=datetime.now(), event=event) if auth_token: payload = TokenAuthentication.get_payload_from_token(auth_token) data['app_name'] = payload['app_name'] # esstore.store_event(data) mpstore.store_event(data) data['timestamp']= str(data['timestamp']) return CTTVResponse.OK(SimpleResult(None, data=data))
def parse_graph(self): """ Parse the xml graph file """ deviceinfo = {} deviceroot = self.root.find(self.pngtag).find('Devices') devices = deviceroot.findall('Device') if devices is not None: for dev in devices: hostname = dev.attrib['Hostname'] if hostname is not None: deviceinfo[hostname] = {} hwsku = dev.attrib['HwSku'] devtype = dev.attrib['Type'] deviceinfo[hostname]['HwSku'] = hwsku deviceinfo[hostname]['Type'] = devtype self.links[hostname] = {} devicel2info = {} devicel3s = self.root.find(self.dpgtag).findall('DevicesL3Info') devicel2s = self.root.find(self.dpgtag).findall('DevicesL2Info') if devicel2s is not None: for l2info in devicel2s: hostname = l2info.attrib['Hostname'] if hostname is not None: devicel2info[hostname] = {} vlans = l2info.findall('InterfaceVlan') for vlan in vlans: portname = vlan.attrib['portname'] portmode = vlan.attrib['mode'] portvlanid = vlan.attrib['vlanids'] portvlanlist = self.port_vlanlist(portvlanid) devicel2info[hostname][portname] = {'mode': portmode, 'vlanids': portvlanid, 'vlanlist': portvlanlist} if devicel3s is not None: for l3info in devicel3s: hostname = l3info.attrib['Hostname'] if hostname is not None: management_ip = l3info.find('ManagementIPInterface').attrib['Prefix'] deviceinfo[hostname]['ManagementIp'] = management_ip mgmtip = ipaddress.IPNetwork(management_ip) deviceinfo[hostname]['mgmtip'] = str(mgmtip.ip) management_gw = str(mgmtip.network+1) deviceinfo[hostname]['ManagementGw'] = management_gw allinks = self.root.find(self.pngtag).find('DeviceInterfaceLinks').findall('DeviceInterfaceLink') if allinks is not None: for link in allinks: start_dev = link.attrib['StartDevice'] end_dev = link.attrib['EndDevice'] if start_dev: self.links[start_dev][link.attrib['StartPort']] = {'peerdevice':link.attrib['EndDevice'], 'peerport': link.attrib['EndPort']} if end_dev: self.links[end_dev][link.attrib['EndPort']] = {'peerdevice': link.attrib['StartDevice'], 'peerport': link.attrib['StartPort']} self.devices = deviceinfo self.vlanport = devicel2info
def add_controller_ips(self, controller_ips, vlan): ofmsgs = [] for controller_ip in controller_ips: controller_ip_host = ipaddr.IPNetwork( '/'.join((str(controller_ip.ip), str(controller_ip.max_prefixlen)))) if controller_ip_host.version == 4: ofmsgs.append(self.valve_flowcontroller( self.dp.eth_src_table, self.valve_in_match( eth_type=ether.ETH_TYPE_ARP, nw_dst=controller_ip_host, vlan=vlan), priority=self.dp.highest_priority)) ofmsgs.append(self.valve_flowcontroller( self.dp.eth_src_table, self.valve_in_match( eth_type=ether.ETH_TYPE_IP, eth_dst=self.FAUCET_MAC, vlan=vlan, nw_proto=inet.IPPROTO_ICMP, nw_src=controller_ip, nw_dst=controller_ip_host), priority=self.dp.highest_priority)) else: ofmsgs.append(self.valve_flowcontroller( self.dp.eth_src_table, self.valve_in_match( eth_type=ether.ETH_TYPE_IPV6, vlan=vlan, nw_proto=inet.IPPROTO_ICMPV6, ipv6_nd_target=controller_ip_host, icmpv6_type=icmpv6.ND_NEIGHBOR_SOLICIT), priority=self.dp.highest_priority)) ofmsgs.append(self.valve_flowcontroller( self.dp.eth_src_table, self.valve_in_match( eth_type=ether.ETH_TYPE_IPV6, eth_dst=self.FAUCET_MAC, vlan=vlan, nw_proto=inet.IPPROTO_ICMPV6, icmpv6_type=icmpv6.ND_NEIGHBOR_ADVERT), priority=self.dp.highest_priority)) ofmsgs.append(self.valve_flowcontroller( self.dp.eth_src_table, self.valve_in_match( eth_type=ether.ETH_TYPE_IPV6, vlan=vlan, nw_proto=inet.IPPROTO_ICMPV6, nw_dst=controller_ip_host, icmpv6_type=icmpv6.ICMPV6_ECHO_REQUEST), priority=self.dp.highest_priority)) return ofmsgs