我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用IPy.IP。
def equals(self, raw_value): """Compares this IP address to a given one, OR Checks this IP address matchtype. Suported matchtypes: 'private', 'public'. raw_value -- Specific IP address, OR matchtype of IP. """ if self.value is None: output = False elif raw_value == 'private': output = (self.value.iptype() == 'PRIVATE') elif raw_value == 'public': output = (self.value.iptype() == 'PUBLIC') else: value = self.load(raw_value) output = (self.value == value) return output
def _parse_heal_info_stats(tree): bricks_dict = {} for brick in tree.findall("healInfo/bricks/brick"): brick_name = brick.find("name").text brick_host = brick_name.split(":")[0] brick_path = brick_name.split(":")[1] # If brick host is returned as an IP conver to FQDN try: from IPy import IP from dns import resolver, reversename IP(brick_host) addr = reversename.from_address(brick_host) brick_host = str(resolver.query(addr, "PTR")[0])[:-1] except ValueError: pass no_of_entries = 0 try: no_of_entries = int(brick.find("numberOfEntries").text) except ValueError: no_of_entries = 0 bricks_dict["%s:%s" % (brick_host, brick_path)] = no_of_entries return bricks_dict
def generate_target(IPs, ports): """Generate the target for scanning HTTP proxy""" gen = None if '-' in IPs: pairs = IPs.split('-') start = pairs[0] end = pairs[1] gen = int2ips(IP(start).int(), IP(end).int()) else: gen = IP(IPs) for i in gen: for port in ports: if isinstance(port, int): port = str(port) yield ':'.join([i.__str__(), port]) #----------------------------------------------------------------------
def runTest(self): IPs_1 = '123.1.2.0/24' IPs_2 = '123.1.2.3-123.1.3.5' ip_gen = generate_target(IPs_1, PORTS) for i in ip_gen: ip_port = i.split(':') print ip_port try: IPy.IP(ip_port[0]) except ValueError: pass ip_gen = generate_target(IPs_2, PORTS) for i in ip_gen: ip_port = i.split(':') print ip_port try: IPy.IP(ip_port[0]) except ValueError: pass
def is_ipv4(ip): """Check if the [ip] is a real ip addr. Params: ip: :str: General IP format. Returns: :type: bool :desc: if ip is a valid IP addr, return true else return False""" try: IP(ip) return True except ValueError: return False #----------------------------------------------------------------------
def getips(self,ip): iplist=[] try: if "-" in ip.split(".")[3]: startnum=int(ip.split(".")[3].split("-")[0]) endnum=int(ip.split(".")[3].split("-")[1]) for i in range(startnum,endnum): iplist.append("%s.%s.%s.%s" %(ip.split(".")[0],ip.split(".")[1],ip.split(".")[2],i)) else: ips=IP(ip) for i in ips: iplist.append(str(i)) return iplist except: printRed("[!] not a valid ip given. you should put ip like 192.168.1.0/24, 192.168.0.0/16,192.168.0.1-200") exit()
def checkstate(lat): if lat < 0: print "UNKNOWN: {0}ms, something must have gone wrong or your IP is unpingable".format(lat) sys.exit(STATE_UNKNOWN) elif TS_AVGLATW <= 0 or TS_AVGLATC <= 0: print "OK: {0}ms from {1}, no thresholds given or you just want see values first".format(lat,CITY) sys.exit(STATE_OK) elif TS_AVGLATW <= lat < TS_AVGLATC: print "WARNING: {0}ms from {1} is more than {2}ms but less than {3}ms for ip address {4}".format(lat,CITY,TS_AVGLATW,TS_AVGLATC,WAN_IP) sys.exit(STATE_WARNING) elif lat >= TS_AVGLATC: print "CRITICAL: {0}ms from {1} more than {2}ms for ip address {3}".format(lat,CITY,TS_AVGLATC,WAN_IP) sys.exit(STATE_CRITICAL) else: print "OK: {0}ms from {1}".format(lat,CITY) sys.exit(STATE_OK)
def authorize_middleware(app, check_env_for_admin=_no_admin): def wsgi_app(environ, start_response): # deferred.py needs REMOTE_ADDR set to a specific value, # so set it here if we're an internal request private_ip = IPy.IP(environ['REMOTE_ADDR']).iptype() == 'PRIVATE' if private_ip and _check_for_builtin(environ): environ['REMOTE_ADDR'] = '0.1.0.2' if _check_for_builtin(environ) or check_env_for_admin(environ): return app(environ, start_response) else: logging.warning('Failed to authorize request, environment is: %s', environ) status = '403 Forbidden' headers = [('Content-type', 'text/plain')] start_response(status, headers) return ['Forbidden'] return wsgi_app
def get_location_data_for(ip): if not ip: return {} # Check the common private IP spaces (used by Google for sending requests) if IPy.IP(ip).iptype() == 'PRIVATE': return {} start = time.time() data = _get_cache(ip) timelog.log_time_since('Getting IP Cache', start) if not data: #TODO: consider using http://geoiplookup.net/ , which might offer better granularity/resolution url = 'http://freegeoip.net/json/%s' % ip start = time.time() results = urllib.urlopen(url).read() timelog.log_time_since('Getting IPData', start) data = json.loads(results) start = time.time() _save_cache(ip, data) timelog.log_time_since('Saving IPCache', start) return data
def isPublic(self, ipaddr): if (Dreamr.server == True): return True else: # Check if host's interface address is public addrType = IPy.IP(ipaddr).iptype() if (addrType == "PUBLIC"): print("BECOME_SERVER") return True # Server Debug if (DEBUG_MODE): print("BECOME_SERVER_DEBUG") return True return False # SERVER HANDLER THREAD | Fork off a client thread to handle the socket # This is a connected client or server!
def filter_active_router_addresses(gwportprefixes): """Filters a GwPortPrefix queryset, leaving only active router addresses. For any given prefix, if multiple router addresses exist, the lowest IP address will be picked. If the prefix has a virtual address, it will be picked instead. :param gwportprefixes: A GwPortPrefix QuerySet. :returns: A list of GwPortPrefix objects. """ # It is more or less impossible to get Django's ORM to generate the # wonderfully complex SQL needed for this, so we do it by hand. raddrs = gwportprefixes.order_by('prefix__id', '-virtual', 'gw_ip') grouper = groupby(raddrs, attrgetter('prefix_id')) return [next(group) for _key, group in grouper]
def usage(self, request, *args, **kwargs): "Return usage for Prefix.pk == pk" pk = kwargs.pop("pk", None) prefix = Prefix.objects.get(pk=pk) max_addr = IP(prefix.net_address).len() active_addr = prefix_collector.collect_active_ip(prefix) # calculate allocated ratio query = PrefixQuerysetBuilder().within(prefix.net_address) allocated = query.finalize().exclude(vlan__net_type="scope") total_allocated = sum(p.get_prefix_size() for p in allocated) payload = { "max_addr": max_addr, "active_addr": active_addr, "usage": 1.0 * active_addr / max_addr, "allocated": 1.0 * total_allocated / max_addr, "pk": pk } return Response(payload, status=status.HTTP_200_OK)
def filter_full_prefixes(self): """Remove /32 (or /128) prefixes from the queryset. Often useful to reduce noise, as these are of little or no value to most network planning operations. Returns: A lazy iterator of all filtered prefixes """ def _filter_full_prefixes(q): for prefix in q: ip = IP(prefix.net_address) if ip.version() == 4 and ip.prefixlen() < 32: yield prefix continue if ip.version() == 6 and ip.prefixlen() < 128: yield prefix continue self.post_hooks.append(_filter_full_prefixes) return self
def _get_available_subnets(prefix_or_prefixes, used_prefixes): """Get available prefixes within a list of CIDR addresses, based on what prefixes are in use. E.g. this is `get_available_subnets`, but with explicit dependency injection. Args: prefix_or_prefixes: a single or a list of prefixes ("10.0.0.0/8") or IPy.IP objects used_prefixes: prefixes that are in use Returns: An iterable IPy.IPSet of available addresses within prefix_or_prefixes """ if not isinstance(prefix_or_prefixes, list): prefix_or_prefixes = [prefix_or_prefixes] base_prefixes = [str(prefix) for prefix in prefix_or_prefixes] acc = IPSet([IP(prefix) for prefix in prefix_or_prefixes]) used_prefixes = IPSet([IP(used) for used in used_prefixes]) # remove used prefixes acc.discard(used_prefixes) # filter away original prefixes return sorted([ip for ip in acc if str(ip) not in base_prefixes])
def process_searchform(form): """Get searchresults based on form data""" extra = {} kwargs = { 'last_changed__gte': datetime.now() - timedelta( days=form.cleaned_data['days']) } if form.cleaned_data['searchtype'] == 'ip': ip = IP(form.cleaned_data['searchvalue']) if ip.len() == 1: kwargs['ip'] = str(ip) else: extra['where'] = ["ip << '%s'" % str(ip)] else: key = form.cleaned_data['searchtype'] + '__icontains' kwargs[key] = form.cleaned_data['searchvalue'] if form.cleaned_data['status'] != 'any': kwargs['status'] = form.cleaned_data['status'] return Identity.objects.filter(**kwargs).extra(**extra)
def __init__(self, host): if isinstance(host, Host): self.host = host.host self.ip = host.ip self.hostname = host.hostname return elif not host: raise ValueError("Host argument must be hostname or IP address") self.host = host if self.is_ip(): self.ip = host self.hostname = self.get_host_by_addr() or host else: self.hostname = host self.ip = self.get_host_by_name() or None
def get_queryset(self): """Filter for ip family""" if 'scope' in self.request.GET: queryset = (manage.Prefix.objects.within( self.request.GET.get('scope')).select_related('vlan') .order_by('net_address')) elif self.request.GET.get('family'): queryset = manage.Prefix.objects.extra( where=['family(netaddr)=%s'], params=[self.request.GET['family']]) else: queryset = manage.Prefix.objects.all() # Filter prefixes that is smaller than minimum prefix length results = [p for p in queryset if IP(p.net_address).len() >= MINIMUMPREFIXLENGTH] return results
def get(request, prefix): """Handles get request for prefix usage""" try: ip_prefix = IP(prefix) except ValueError: return Response("Bad prefix", status=status.HTTP_400_BAD_REQUEST) if ip_prefix.len() < MINIMUMPREFIXLENGTH: return Response("Prefix is too small", status=status.HTTP_400_BAD_REQUEST) starttime, endtime = get_times(request) db_prefix = manage.Prefix.objects.get(net_address=prefix) serializer = serializers.PrefixUsageSerializer( prefix_collector.fetch_usage(db_prefix, starttime, endtime)) return Response(serializer.data)
def __init__(self, prefix, active_addresses, starttime=None, endtime=None): """ :type prefix: manage.Prefix :type active_addresses: int :type starttime: datetime.datetime :type endtime: datetime.datetime """ self.prefix = prefix.net_address self.active_addresses = active_addresses self.max_addresses = IP(self.prefix).len() self.max_hosts = self.max_addresses - 2 self.usage = self.active_addresses / float(self.max_hosts) * 100 self.starttime = starttime self.net_ident = prefix.vlan.net_ident self.vlan_id = prefix.vlan.vlan self.endtime = endtime if self.starttime else None self.url_machinetracker = reverse( 'machinetracker-prefixid_search_active', args=[prefix.pk]) self.url_report = reverse('report-prefix-prefix', args=[prefix.pk]) self.url_vlan = reverse('vlan-details', args=[prefix.vlan.pk])
def hostname(ip): """ Performs a DNS reverse lookup for an IP address and caches the result in a global variable, which is really, really stupid. :param ip: And IP address string. :returns: A hostname string or a False value if the lookup failed. """ addr = unicode(ip) if addr in _cached_hostname: return _cached_hostname[addr] try: dns = gethostbyaddr(addr) except herror: return False _cached_hostname[addr] = dns[0] return dns[0]
def get_prefix_info(addr): """Returns the smallest prefix from the NAVdb that an IP address fits into. :param addr: An IP address string. :returns: A Prefix object or None if no prefixes matched. """ try: return Prefix.objects.select_related().extra( select={"mask_size": "masklen(netaddr)"}, where=["%s << netaddr AND nettype <> 'scope'"], order_by=["-mask_size"], params=[addr] )[0] except (IndexError, DatabaseError): return None
def normalize_ip_to_string(ipaddr): """Normalizes an IP address to a a sortable string. When sending IP addresses to a browser and asking JavaScript to sort them as strings, this function will help. An IPv4 address will be normalized to '4' + <15-character dotted quad>. An IPv6 address will be normalized to '6' + <39 character IPv6 address> """ try: ipaddr = IP(ipaddr) except ValueError: return ipaddr if ipaddr.version() == 4: quad = str(ipaddr).split('.') return '4%s' % '.'.join([i.zfill(3) for i in quad]) else: return '6%s' % ipaddr.strFullsize()
def resolve_ip_and_sysname(name): """Given a name that can be either an ip or a hostname/domain name, this function looks up IP and hostname. name - ip or hostname Returns: - tuple with ip-addres and sysname """ try: ip_addr = IP(name) except ValueError: ip_addr = IP(gethostbyname(name)) try: sysname = gethostbyaddr(unicode(ip_addr))[0] except SocketError: sysname = unicode(ip_addr) return (ip_addr, sysname)
def does_ip_exist(ip_addr, netbox_id=None): """Checks if the given IP already exist in database. Parameters: * ip_addr - the IP addres to look for. * netbox_id - a netbox primary key that can have the given ip_addr, and the function will still return False. Returns: - True if the IP already exists in the database (and the netbox with the IP is not the same as the given netbox_id). - False if not. """ if netbox_id: ip_qs = Netbox.objects.filter(Q(ip=unicode(ip_addr)), ~Q(id=netbox_id)) else: ip_qs = Netbox.objects.filter(ip=unicode(ip_addr)) return ip_qs.count() > 0
def group_scopes(scopes): """Group scopes by version and type :type scopes: list[Prefix] """ def _prefix_as_int(prefix): return IP(prefix.net_address).int() groups = defaultdict(list) for scope in scopes: prefix = IP(scope.net_address) if prefix.iptype() == 'PRIVATE': groups['private'].append(scope) elif prefix.version() == 4: groups['ipv4'].append(scope) elif prefix.version() == 6: groups['ipv6'].append(scope) if any([groups['private'], groups['ipv4'], groups['ipv6']]): return IpGroup(*[sorted(groups[x], key=_prefix_as_int) for x in ('private', 'ipv4', 'ipv6')]) else: return []
def find_input_type(ip_or_mac): """Try to determine whether input is a valid ip-address, mac-address or an swportid. Return type and reformatted input as a tuple""" # Support mac-adresses on xx:xx... format ip_or_mac = str(ip_or_mac) mac = ip_or_mac.replace(':', '') input_type = "UNKNOWN" if is_valid_ip(ip_or_mac, use_socket_lib=True): input_type = "IP" elif re.match("^[A-Fa-f0-9]{12}$", mac): input_type = "MAC" elif re.match(r"^\d+$", ip_or_mac): input_type = "SWPORTID" return input_type
def check_non_block(ip): """Checks if the ip is in the nonblocklist.""" nonblockdict = parse_nonblock_file(NONBLOCKFILE) # We have the result of the nonblock.cfg-file in the dict # nonblockdict. This dict contains 3 things: # 1 - Specific ip-addresses # 2 - Ip-ranges (129.241.xxx.xxx/xx) # 3 - Ip lists (129.241.xxx.xxx-xxx) # Specific ip-addresses if ip in nonblockdict['ip']: LOGGER.info('Computer in nonblock list, skipping') raise InExceptionListError # Ip-ranges for ip_range in nonblockdict['range']: if ip in IP(ip_range): raise InExceptionListError
def __init__(self, record, local_address=None): """Given a supported neighbor record, tries to identify the remote device and port among the ones registered in NAV's database. If a neighbor can be identified, the identified attribute is set to True. The netbox and interface attributes will represent the identified items. :param record: Some namedtuple instance representing the neighboring record read from the device. :param local_address: The management IP address used by the local system. If supplied, will be used to identify and ignore possible self-loops. """ self.record = record self._invalid_neighbor_ips = list(INVALID_IPS) if local_address: self._invalid_neighbor_ips.append(str(local_address)) self.netbox = self.interfaces = None self.identified = False self.identify()
def _netbox_from_ip(self, ip): """Tries to find a Netbox from NAV's database based on an IP address. :returns: A shadows.Netbox object representing the netbox, or None if no corresponding netbox was found. """ try: ip = six.text_type(IP(ip)) except ValueError: self._logger.warning("Invalid IP (%s) in neighbor record: %r", ip, self.record) return assert ip if ip in self._invalid_neighbor_ips: return return (self._netbox_query(Q(ip=ip)) or self._netbox_query(Q(interface__gwportprefix__gw_ip=ip)))
def _load_existing_mappings(self): """Load the existing ARP records for this box from the db. Returns: A deferred whose result is a dictionary: { (ip, mac): arpid } """ self._logger.debug("Loading open arp records from database") open_arp_records_queryset = manage.Arp.objects.filter( netbox__id=self.netbox.id, end_time__gte=datetime.max).values('id', 'ip', 'mac') open_arp_records = yield db.run_in_thread( storage.shadowify_queryset_and_commit, open_arp_records_queryset) self._logger.debug("Loaded %d open records from arp", len(open_arp_records)) open_mappings = dict(((IP(arp['ip']), arp['mac']), arp['id']) for arp in open_arp_records) defer.returnValue(open_mappings)
def get_ipv4_multicast_groups_per_port(self): """ Returns IGMP snooping information from ports. :returns: A Deferred whose result is a list of MulticastStat tuples """ column = "hpIgmpStatsPortAccess2" ports = yield self.retrieve_columns( [column] ).addCallback(self.translate_result).addCallback(reduce_index) def _split(item): index, columns = item vlan = index[0] group = index[1:5] ifindex = index[5] access = columns[column] return MulticastStat(IP('.'.join(str(i) for i in group)), ifindex, vlan, access) defer.returnValue([_split(i) for i in iteritems(ports)])
def oid_to_ipv4(oid): """Converts a sequence of 4 numbers to an IPv4 object in the fastest known way. :param oid: Any list or tuple of 4 integers. """ if len(oid) != 4: raise ValueError("IPv4 address must be 4 octets, not %d" % len(oid)) try: addr, = unpack("!I", array.array("B", oid).tostring()) except OverflowError as error: raise ValueError(error) return IP(addr, ipversion=4) ############################################################################# # Varios OID consumer functions, which can be fed to the consume() function # ############################################################################# # pylint: disable=invalid-name
def __init__(self, host, community="public", version="1", port=161, retries=3, timeout=1): """Makes a new Snmp-object. :param host: hostname or IP address :param community: community (password), defaults to "public" :param port: udp port number, defaults to "161" """ self.host = host self.community = str(community) self.version = str(version) if self.version == '2': self.version = '2c' if self.version not in ('1', '2c'): raise UnsupportedSnmpVersionError(self.version) self.port = int(port) self.retries = retries self.timeout = timeout self.handle = _MySnmpSession(self._build_cmdline()) self.handle.open()
def _build_cmdline(self): try: address = IP(self.host) except ValueError: host = self.host else: host = ('udp6:[%s]' % self.host if address.version() == 6 else self.host) return ( '-v' + self.version, '-c', self.community, '-r', str(self.retries), '-t', str(self.timeout), '%s:%s' % (host, self.port) )
def is_valid_cidr(cidr): """Verifies that a string is valid IPv4 or IPv6 CIDR specification. A cleaned up version of the CIDR string is returned if it is verified, otherwise a false value is returned. Uses the IPy library to verify addresses. """ if (isinstance(cidr, six.string_types) and not cidr.isdigit() and '/' in cidr): try: valid_cidr = IPy.IP(cidr) is not None except (ValueError, TypeError): return False else: return valid_cidr return False
def VerifyIp(ip): ip_type = None try: ip_type = IP(ip).iptype() if ip_type is not "PUBLIC": ip_type = 'Private' else: ip_type = 'Public' except Exception: pass finally: return ip_type
def __str__(self): all_subnets = True for ip in self.dst_ip: all_subnets = all_subnets and ip.prefixlen() not in [32, 128] more_than_one = len(self.dst_ip) > 1 if all_subnets: tpl = "Destination IP must fall into {}" else: if more_than_one: tpl = "Destination IP must be in {}" else: tpl = "Destination IP must be {}" return tpl.format(self._str_list())
def __init__(self, cfg): ExpResCriterion_DNSRecord.__init__(self, cfg) self.address = [] addresses = self._enforce_list("address", str) for address in addresses: try: ip = IPy.IP(address) except: raise ConfigError( "Invalid IP for {} record: {}".format( self.RECORD_TYPE, address ) ) if ip.version() != self.IP_VER: raise ConfigError( "Invalid IP version ({}) for record type {}.".format( ip.version(), self.RECORD_TYPE ) ) self.address.append(ip)
def is_valid_ipv4(ip, version=4): import os import sys try: from IPy import IP except ImportError: try: command_to_execute = "pip install IPy || easy_install IPy" os.system(command_to_execute) except OSError: print "Can NOT install 'IPy', Aborted!" sys.exit(1) except Exception as e: print "Uncaught exception, %s" % e.message sys.exit(1) from IPy import IP try: result = IP(ip, ipversion=version) except ValueError: return False if result is not None and result != "": return True
def network_size(net, dhcp=None): """ Func return gateway, mask and dhcp pool. """ mask = IP(net).strNetmask() addr = IP(net) gateway = addr[1].strNormal() dhcp_pool = [addr[2].strNormal(), addr[addr.len() - 2].strNormal()] if dhcp: return gateway, mask, dhcp_pool else: return gateway, mask, None # Read write lock # ---------------
def get_ipv4_network(self): xml = self._XMLDesc(0) if util.get_xml_path(xml, "/network/ip") is None: return None addrStr = util.get_xml_path(xml, "/network/ip/@address") netmaskStr = util.get_xml_path(xml, "/network/ip/@netmask") prefix = util.get_xml_path(xml, "/network/ip/@prefix") if prefix: prefix = int(prefix) binstr = ((prefix * "1") + ((32 - prefix) * "0")) netmaskStr = str(IP(int(binstr, base=2))) if netmaskStr: netmask = IP(netmaskStr) gateway = IP(addrStr) network = IP(gateway.int() & netmask.int()) ret = IP(str(network) + "/" + netmaskStr) else: ret = IP(str(addrStr)) return ret
def update_hosts(self, hosts_info): # pylint: disable=no-self-use """ Update hosts info """ if not isinstance(hosts_info, dict): return api_utils.result_handler( status=1, data='Error, args should be a dict') for key, value in hosts_info.items(): if key: try: IPy.IP(value) except Exception: # pylint: disable=broad-except return api_utils.result_handler( status=1, data='The IP %s is invalid' % value) else: return api_utils.result_handler( status=1, data='Domain name is absent') try: functest_flag = "# SUT hosts info for Functest" hosts_list = ('\n{} {} {}'.format(ip, host_name, functest_flag) for host_name, ip in hosts_info.items()) with open("/etc/hosts", 'r') as file_hosts: origin_lines = [line for line in file_hosts if functest_flag not in line] with open("/etc/hosts", 'w') as file_hosts: file_hosts.writelines(origin_lines) file_hosts.write(functest_flag) file_hosts.writelines(hosts_list) except Exception: # pylint: disable=broad-except return api_utils.result_handler( status=1, data='Error when updating hosts info') else: return api_utils.result_handler( status=0, data='Update hosts info successfully')
def data_received(self, data): data = self.get_data(data) if data is None: self.transport.close() return self.request = DNSRecord.parse(data) if self.args.debug: print('Data received: {!r}'.format(self.request)) from IPy import IP ip = IP(self.peername[0]) client_ip = self.peername[0] if ip.iptype() == 'PRIVATE': client_ip = self.args.myip url = 'https://dns.google.com/resolve?name={}&edns_client_subnet={}/24'.format(str(self.request.q.qname), client_ip) # client = ProxyClient() # google_dns_resp = client.query_domain(url, self.args.proxy) try: from asyncio import ensure_future except ImportError: from asyncio import async as ensure_future asyncio.ensure_future(ProxyClient.query_domain(url, self.args.proxy), loop=self.loop).add_done_callback( functools.partial(self.send_resp))
def get_arg(): """????""" parser = argparse.ArgumentParser(prog='prcdns', description='google dns proxy.') parser.add_argument('--debug', help='debug model,default NO', default=False) parser.add_argument('-l', '--listen', help='listening IP,default 0.0.0.0', default='0.0.0.0') parser.add_argument('-p', '--port', help='listening Port,default 3535', default=3535) parser.add_argument('-r', '--proxy', help='Used For Query Google DNS,default direct', default=None) parser.add_argument('-ip', '--myip', help='IP location', default=None) return parser.parse_args()
def IP(ip): return ip
def formatStructValue(self, struct, name, value): if struct.startswith("sockaddr") and name.endswith("family"): return SOCKET_FAMILY.get(value, value) if struct == "sockaddr_in": if name == "sin_port": return ntoh_ushort(value) if name == "sin_addr": ip = ntoh_uint(value.s_addr) return IP(ip) return None
def value_type(value): """Value type simply identifies if the passed value is a domain or IP address. @param string text to test as an IP @returns domain or IP """ try: IP(value) return 'ip' except: return 'domain'
def gen_info(): return ("IP: %s \n Organization: %s \n Operating System: %s" % (host['ip_str'], host.get('org', 'n/a'), host.get('os', 'n/a')))