我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用ipaddr.IPAddress()。
def isIPAddress(ip, compressed=True): """Check if an arbitrary string is an IP address, and that it's valid. :type ip: basestring or int :param ip: The IP address to check. :param boolean compressed: If True, return a string representing the compressed form of the address. Otherwise, return an :class:`ipaddr.IPAddress` instance. :rtype: A :class:`ipaddr.IPAddress`, or a string, or False :returns: The IP, as a string or a class, if it passed the checks. Otherwise, returns False. """ try: ip = ipaddr.IPAddress(ip) except ValueError: return False else: if isValidIP(ip): if compressed: return ip.compressed else: return ip return False
def isIPv(version, ip): """Check if **ip** is a certain **version** (IPv4 or IPv6). .. warning: Do *not* put any calls to the logging module in this function, or else an infinite recursion will occur when the call is made, due the the log :class:`~logging.Filter`s in :mod:`~bridgedb.safelog` using this function to validate matches from the regular expression for IP addresses. :param integer version: The IPv[4|6] version to check; must be either ``4`` or ``6``. Any other value will be silently changed to ``4``. :param ip: The IP address to check. May be an any type which :class:`ipaddr.IPAddress` will accept. :rtype: boolean :returns: ``True``, if the address is an IPv4 address. """ try: ipaddr.IPAddress(ip, version=version) except (ipaddr.AddressValueError, Exception): return False else: return True return False
def parse_device_desc_xml(filename): root = ET.parse(filename).getroot() (lo_prefix, mgmt_prefix, hostname, hwsku, d_type) = parse_device(root) results = {} results['DEVICE_METADATA'] = {'localhost': { 'hostname': hostname, 'hwsku': hwsku, }} results['LOOPBACK_INTERFACE'] = {('lo', lo_prefix): {}} mgmt_intf = {} mgmtipn = ipaddress.IPNetwork(mgmt_prefix) gwaddr = ipaddress.IPAddress(int(mgmtipn.network) + 1) results['MGMT_INTERFACE'] = {('eth0', mgmt_prefix): {'gwaddr': gwaddr}} return results
def ip_is_valid(self, ip): """ Returns true if the given string is a valid IP address (v4 or v6). """ try: ipaddr.IPAddress(ip) return True except ValueError: return False #}}} #{{{ Check if network is valid
def get_network(self, net_type): """ Returns a network (as a string) that includes the private/public IPs of all nodes in the config. Raises an EXAConfError if an invalid IP is found or the IP of at least one node is not part of the network defined by the first node section. This function assumes that all nodes have an entry for the requested network type. The calling function has to check if the network type is actually present (private / public). """ network = "" for section in self.config.sections: if self.is_node(section): node_sec = self.config[section] node_network = node_sec.get(net_type) if not node_network or node_network == "": raise EXAConfError("Network type '%s' is missing in section '%s'!" % (net_type, section)) node_ip = node_network.split("/")[0].strip() # check if the extracted IP is valid if not self.ip_is_valid(node_ip): raise EXAConfError("IP %s in section '%s' is invalid!" % (node_ip, section)) # first node : choose the private net as the cluster network (and make it a 'real' network) if network == "": subnet = ipaddr.IPNetwork(node_network) network = "%s/%s" % (str(subnet.network), str(subnet.prefixlen)) # other nodes : check if their IP is part of the chosen net elif ipaddr.IPAddress(node_ip) not in ipaddr.IPNetwork(network): raise EXAConfError("IP %s is not part of network %s!" % (node_ip, network)) return network #}}} #{{{ Get private network
def get_ip_repr(ip_ver, net_int): return ipaddr.IPAddress(net_int if ip_ver == 4 else net_int << 64)
def analy_req(address): mainDomain = conf_read('maindomain') address = address[:-len(mainDomain) - 1] payload = conf_read('payload') encoding = conf_read('encoding') record = address try: if encoding == 'int': record = ipaddr.IPAddress(int(address)).__str__() elif encoding == 'hex': try: address = address.decode('hex') if ipaddr.IPAddress(address).version == 4: record = address elif conf_read('type') == 'AAAA' and ipaddr.IPAddress(address).version == 6: record = address else: pass except: pass # elif False not in map(lambda x:x in map(lambda x:chr(x),range(97,108)),list(address)): elif encoding == 'en': record = numToEnToNum(address) elif payload != 'None' and payload.find(mainDomain) == -1: # record = payload + "www.google.com" record = payload + mainDomain except Exception,e: print '[!] Subdomain Invalid {}'.format(e) finally: return record
def ipListBuild(address): print '1. Single IP Covert For En\n2. Build IP List' opt_req = raw_input("[+] [1 By Default/2]") or '1' if opt_req == '1': print numToEnToNum(address) exit() conf_main = conf_read('maindomain')[:-1] seg_len = raw_input("[+] Please Input Segment Length [24 By Default]") or 24 encode_req = raw_input("[+] Please Input Encoding ['ipv4' By Default]") mainDomain = raw_input("[+] Please Input Server Root Address [{} By Default]".format(conf_main)) or conf_main segment = eval("ipaddr.IPv4Network('{}/{}').iterhosts()".format(address, int(seg_len))) save_file = "{}_{}_{}.txt".format(time.strftime("%Y%m%d%X", time.localtime()).replace(':', ''), mainDomain.replace('.','_'),(encode_req if encode_req else 'ipv4')) results = [] try: if encode_req == '': results += ["{}.{}".format(str(i),mainDomain) for i in list(segment)] elif encode_req == 'en': results += ["{}.{}".format(numToEnToNum(str(i)),mainDomain) for i in list(segment)] elif encode_req == 'int': results += ["{}.{}".format(int(ipaddr.IPAddress(str(i))),mainDomain) for i in list(segment)] elif encode_req == 'hex': results += ["{}.{}".format(str(i).encode('hex'),mainDomain) for i in list(segment)] else: pass f = open(save_file,'a') [f.write(i+'\n') for i in results] f.close() print '[+] Stored in the {}'.format(save_file) except Exception,e: print e exit()
def __init__(self, max_length=50, *args, **kwargs): if not ip_address: raise ImproperlyConfigured( "'ipaddr' package is required to use 'IPAddressType' " "in python 2" ) super(IPAddressType, self).__init__(*args, **kwargs) self.impl = types.Unicode(max_length)
def process_result_value(self, value, dialect): return ip_address(value) if value else None
def _coerce(self, value): return ip_address(value) if value else None
def _ip_subnet_valid(self, ip, subnet): return ipaddr.IPAddress(ip) in ipaddr.IPNetwork(subnet.address)
def select_subnet_for_ip(ip, subnets): for subnet in subnets: if ipaddr.IPAddress(ip) in ipaddr.IPv4Network(subnet.address): return subnet
def __init__(self, server_address, handler, *args, **kwargs): self.session = kwargs.pop("session") (address, _) = server_address version = ipaddr.IPAddress(address).version if version == 4: self.address_family = socket.AF_INET elif version == 6: self.address_family = socket.AF_INET6 BaseHTTPServer.HTTPServer.__init__( self, server_address, handler, *args, **kwargs)
def buildSocket(self, addr): global s ip = ipaddr.IPAddress(addr) ## learn if we're IPv4 or IPv6 if ip.version == 4: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) elif ip.version == 6: s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) return s
def contains(self, ip_address): ip = ipaddr.IPAddress(ip_address) if isinstance(ip, ipaddr.IPv4Address): networks = self.ipv4_networks elif isinstance(ip, ipaddr.IPv6Address): networks = self.ipv6_networks else: raise RuntimeError("Should never happen") for network in networks: if network.Contains(ip): return True return False
def getAddresses(): from scapy.all import get_if_addr, get_if_list from ipaddr import IPAddress addresses = set() for i in get_if_list(): try: addresses.add(get_if_addr(i)) except: pass if '0.0.0.0' in addresses: addresses.remove('0.0.0.0') return [IPAddress(addr) for addr in addresses]
def __init__(self, ip): if ip_library == "ipaddr": self.obj = ipaddr.IPAddress(ip) self.ip = str(self.obj) else: self.obj = ipaddress.ip_address(ip) self.ip = str(self.obj.compressed) self.version = self.obj.version
def __init__(self, vid, conf=None): if conf is None: conf = {} self.vid = vid self.tagged = [] self.untagged = [] self.name = conf.setdefault('name', str(vid)) self.description = conf.setdefault('description', self.name) self.controller_ips = conf.setdefault('controller_ips', []) if self.controller_ips: self.controller_ips = [ ipaddr.IPNetwork(ip) for ip in self.controller_ips] self.unicast_flood = conf.setdefault('unicast_flood', True) self.routes = conf.setdefault('routes', {}) self.ipv4_routes = {} self.ipv6_routes = {} if self.routes: self.routes = [route['route'] for route in self.routes] for route in self.routes: ip_gw = ipaddr.IPAddress(route['ip_gw']) ip_dst = ipaddr.IPNetwork(route['ip_dst']) assert(ip_gw.version == ip_dst.version) if ip_gw.version == 4: self.ipv4_routes[ip_dst] = ip_gw else: self.ipv6_routes[ip_dst] = ip_gw self.arp_cache = {} self.nd_cache = {} self.max_hosts = conf.setdefault('max_hosts', None) self.host_cache = {}
def evaluate(self, hints # Information used for doing identification ): """Given a set of hints, evaluate this identifier and return True if an identification is made. """ try: ip = hints['requester'] except KeyError: return False octets = dns.reversename.from_address(ip)[0:-3] host = '.'.join(octets) host += '.v4.fullbogons.cymru.com' if len(octets) == 4 \ else '.v6.fullbogons.cymru.com' # The query for this is always for an 'A' record, even for # IPv6 hosts. try: resolved = self.resolver.query(host, 'A')[0] except dns.resolver.NXDOMAIN: return False except (dns.exception.Timeout, dns.resolver.NoAnswer, dns.resolver.NoNameservers): return self.fail_result # The query will return 127.0.0.2 if it's in the bogon list. # See http://www.team-cymru.org/bogon-reference-dns.html. if str(resolved) != '127.0.0.2': return False # At this point, we have a bogon. Filter out exclusions. net_ip = ipaddr.IPAddress(ip) for exclude in self.exclude: if net_ip in exclude: return False # Not excluded; must be a legit bogon. return True # A short test program
def evaluate(self, hints # Information used for doing identification ): """Given a set of hints, evaluate this identifier and return True if an identification is made. """ try: ip = hints['requester'] except KeyError: return False addr = ipaddr.IPAddress(ip) ip_reverse = dns.reversename.from_address(ip) # Resolve to a FQDN try: reverse = str(self.resolver.query(ip_reverse, 'PTR')[0]) except (dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoAnswer, dns.resolver.NoNameservers): return False # Resolve the FQDN back to an IP and see if they match. This # prevents someone in control over their reverse resolution # from claiming they're someone they're not. # TODO: Check against _all_ returned IPs record = 'A' if addr.version == 4 else 'AAAA' try: forwards = self.resolver.query(reverse, record) except (dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.resolver.NoAnswer, dns.resolver.NoNameservers): return False if ip not in [ str(f) for f in forwards ]: return False # Try to match with and without the dot at the end. for reverse_candidate in [ reverse, reverse.rstrip('.') ]: if self.matcher.matches(reverse_candidate): return True # No match, no dice. return False # A short test program
def _pass_hostinfo(self, entry): s = None if entry['host'] not in self._state: s = { 'vulnerabilities': [], 'exempt_vulnerabilities': [], 'ports': set(), 'hostname': None, 'ipaddress': None, 'os': None, 'credentialed_checks': False } else: s = self._state[entry['host']] # if the hostname has not been set yet, just default it to the key/target # value if s['hostname'] == None: s['hostname'] = entry['host'] thishostinfo = self._hostinfo_locator(entry) # attempt to determine the ip address; if our target is an ip just use that, # otherwise try to locate the ip address using the supplementary host info try: ipaddr.IPAddress(entry['host']) s['ipaddress'] = entry['host'] except: if thishostinfo != None: s['ipaddress'] = thishostinfo['host-ip'] if thishostinfo != None and 'operating-system' in thishostinfo: s['os'] = thishostinfo['operating-system'] # attempt to extract kernel hostname if 'output of \"uname -a\" is' in entry['output']: unamestr = entry['output'].replace('\n', ' ') m = re.search('output of "uname -a" is : Linux (\S+) ', unamestr) if m != None: s['hostname'] = m.group(1) elif '= Computer name' in entry['output']: cnamestr = entry['output'].replace('\n', ' ') m = re.search('(\S+)\s+= Computer name', cnamestr) if m != None: s['hostname'] = m.group(1) # flip credentialed checks if we find plugin output indicating the scan # included successfully used credentials if 'Credentialed checks : yes' in entry['output']: s['credentialed_checks'] = True self._state[entry['host']] = s
def load_hosts(self): for host_name in self.storage.hosts[2]: stor_node = self.storage.get("hosts/" + host_name, expected_format=None) host = Host(host_name) self.hosts[host.name] = host lshw_xml = stor_node.get('lshw', expected_format='xml') if lshw_xml is None: host.hw_info = None else: try: host.hw_info = get_hw_info(lshw_xml) except: host.hw_info = None info = self.parse_meminfo(stor_node.get('meminfo')) host.mem_total = info['MemTotal'] host.mem_free = info['MemFree'] host.swap_total = info['SwapTotal'] host.swap_free = info['SwapFree'] loadavg = stor_node.get('loadavg') host.load_5m = None if loadavg is None else float(loadavg.strip().split()[1]) ipa = self.storage.get('hosts/%s/ipa' % host.name) ip_rr_s = r"\d+:\s+(?P<adapter>.*?)\s+inet\s+(?P<ip>\d+\.\d+\.\d+\.\d+)/(?P<size>\d+)" info = collections.defaultdict(lambda: []) for line in ipa.split("\n"): match = re.match(ip_rr_s, line) if match is not None: info[match.group('adapter')].append( (IPAddress(match.group('ip')), int(match.group('size')))) for adapter, ips_with_sizes in info.items(): for ip, sz in ips_with_sizes: if self.public_net is not None and ip in self.public_net: host.public_net = NetworkAdapter(adapter, ip) if self.cluster_net is not None and ip in self.cluster_net: host.cluster_net = NetworkAdapter(adapter, ip) interfaces = getattr(self.jstorage.hosts, host_name).interfaces for name, adapter_dct in interfaces.items(): adapter_dct = adapter_dct.copy() dev = adapter_dct.pop('dev') adapter = NetworkAdapter(dev, None) adapter.__dict__.update(adapter_dct) host.net_adapters[dev] = adapter net_stats = self.get_node_net_stats(host.name) perf_adapters = [host.cluster_net, host.public_net] + list(host.net_adapters.values()) for net in perf_adapters: if net is not None and net.name is not None: net.perf_stats = net_stats.get(net.name) host.uptime = float(stor_node.get('uptime').split()[0])