我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.inet_ntoa()。
def get_iphostname(): '''??linux?????????IP??''' def get_ip(ifname): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ipaddr = socket.inet_ntoa(fcntl.ioctl( sock.fileno(), 0x8915, # SIOCGIFADDR struct.pack('256s', ifname[:15]) )[20:24] ) sock.close() return ipaddr try: ip = get_ip('eth0') except IOError: ip = get_ip('eno1') hostname = socket.gethostname() return {'hostname': hostname, 'ip':ip}
def zeroconf_info(): """zeroconf_info returns a list of tuples of the information about other zeroconf services on the local network. It does this by creating a zeroconf.ServiceBrowser and spending 0.25 seconds querying the network for other services.""" ret_info = [] def on_change(zeroconf, service_type, name, state_change): if state_change is zeroconfig.ServiceStateChange.Added: info = zeroconf.get_service_info(service_type, name) if info: address = "{}".format(socket.inet_ntoa(info.address)) props = str(info.properties.items()) item = ServerInfo(str(info.server), address, info.port, props) ret_info.append(item) zc = zeroconfig.Zeroconf() browser = zeroconfig.ServiceBrowser( zc, "_defusedivision._tcp.local.", handlers=[on_change]) sleep(1) concurrency.concurrent(lambda: zc.close())() return ret_info
def parse_netconn(self, seq, netconn): parts = netconn.split('|') new_conn = {} timestamp = convert_event_time(parts[0]) try: new_conn['remote_ip'] = socket.inet_ntoa(struct.pack('>i', int(parts[1]))) except: new_conn['remote_ip'] = '0.0.0.0' new_conn['remote_port'] = int(parts[2]) new_conn['proto'] = protocols[int(parts[3])] new_conn['domain'] = parts[4] if parts[5] == 'true': new_conn['direction'] = 'Outbound' else: new_conn['direction'] = 'Inbound' return CbNetConnEvent(self.process_model, timestamp, seq, new_conn)
def parse_netconn(self, seq, netconn): new_conn = {} timestamp = convert_event_time(netconn.get("timestamp", None)) direction = netconn.get("direction", "true") if direction == 'true': new_conn['direction'] = 'Outbound' else: new_conn['direction'] = 'Inbound' for ipfield in ('remote_ip', 'local_ip', 'proxy_ip'): try: new_conn[ipfield] = socket.inet_ntoa(struct.pack('>i', int(netconn.get(ipfield, 0)))) except: new_conn[ipfield] = netconn.get(ipfield, '0.0.0.0') for portfield in ('remote_port', 'local_port', 'proxy_port'): new_conn[portfield] = int(netconn.get(portfield, 0)) new_conn['proto'] = protocols.get(int(netconn.get('proto', 0)), "Unknown") new_conn['domain'] = netconn.get('domain', '') return CbNetConnEvent(self.process_model, timestamp, seq, new_conn, version=2)
def handle(self): client_ip = self.client_address[0] addr = '' server = '' try: sock = self.connection sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) odestdata = sock.getsockopt(socket.SOL_IP, 80, 16) port, addr_ip = struct.unpack("!xxH4sxxxxxxxx", odestdata) addr = socket.inet_ntoa(addr_ip) server = reverse(addr) print_log('%s connecting %s:%d %d %s' % (client_ip, addr, port, server[0], str(server[1]))) Proxy[server[0]].proxy(sock, server[1], (addr, port)) except socket.error, e: logging.warn(addr + ':' + str(server) + ':' + str(e)) sock.close()
def map_ip(server, qn, replay): global HOSTS4, IPMAP iplist = [] for i in replay.rr: if i.rtype == 28: answer = str(i.rdata) ipbias = 0 if mutex.acquire(): IPMAP.append((server[0], answer)) ipbias = len(IPMAP) mutex.release() addr_ip = struct.pack('!I', RESERVEDIP + ipbias) addr = socket.inet_ntoa(addr_ip) iplist.append(addr) HOSTS4[qn] = iplist return iplist
def getipaddr(self, ifname='eth0'): import socket import struct ret = '127.0.0.1' try: ret = socket.gethostbyname(socket.getfqdn(socket.gethostname())) except: pass if ret == '127.0.0.1': try: import fcntl s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ret = socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', ifname[:15]))[20:24]) except: pass return ret
def _read_socks5_address(self): atyp = yield self.read_bytes(1) if atyp == b"\x01": data = yield self.read_bytes(4) addr = socket.inet_ntoa(data) elif atyp == b"\x03": length = yield self.read_bytes(1) addr = yield self.read_bytes(length) elif atyp == b"\x04": data = yield self.read_bytes(16) addr = socket.inet_ntop(socket.AF_INET6, data) else: raise GeneralProxyError("SOCKS5 proxy server sent invalid data") data = yield self.read_bytes(2) port = struct.unpack(">H", data)[0] raise gen.Return((addr, port))
def hexstr2ip(hex_str, byteorder="little"): """ little byte order ???????????? 06e6a8c0 -> 192.168.230.6 Args: hex_str: Returns: """ if byteorder == "little": fmt = "<L" else: fmt = ">L" return socket.inet_ntoa(struct.pack(fmt, int(hex_str, base=16)))
def get_dnsserver_list(): if os.name == 'nt': import ctypes import ctypes.wintypes DNS_CONFIG_DNS_SERVER_LIST = 6 buf = ctypes.create_string_buffer(2048) ctypes.windll.dnsapi.DnsQueryConfig(DNS_CONFIG_DNS_SERVER_LIST, 0, None, None, ctypes.byref(buf), ctypes.byref(ctypes.wintypes.DWORD(len(buf)))) ipcount = struct.unpack('I', buf[0:4])[0] iplist = [socket.inet_ntoa(buf[i:i+4]) for i in xrange(4, ipcount*4+4, 4)] return iplist elif os.path.isfile('/etc/resolv.conf'): with open('/etc/resolv.conf', 'rb') as fp: return re.findall(r'(?m)^nameserver\s+(\S+)', fp.read()) else: logging.warning("get_dnsserver_list failed: unsupport platform '%s-%s'", sys.platform, os.name) return []
def update(self, data): """Update info about network interface according to given dnet dictionary""" self.name = data["name"] self.description = data['description'] self.win_index = data['win_index'] # Other attributes are optional if conf.use_winpcapy: self._update_pcapdata() try: self.ip = socket.inet_ntoa(get_if_raw_addr(data['guid'])) except (KeyError, AttributeError, NameError): pass try: self.mac = data['mac'] except KeyError: pass
def bind_nic(): try: import fcntl def get_ip_address(ifname): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return socket.inet_ntoa(fcntl.ioctl( s.fileno(), 0x8915, # SIOCGIFADDR struct.pack('256s', ifname[:15]) )[20:24]) return get_ip_address(nic_name) except ImportError as e: print('Indicate nic feature need to be run under Unix based system.') return '0.0.0.0' except IOError as e: print(nic_name + 'is unacceptable !') return '0.0.0.0' finally: return '0.0.0.0'
def dotted_netmask(mask): """Converts mask from /xx format to xxx.xxx.xxx.xxx Example: if mask is 24 function returns 255.255.255.0 :rtype: str """ bits = 0xffffffff ^ (1 << 32 - mask) - 1 return socket.inet_ntoa(struct.pack('>I', bits))
def report_detail(self, ioc, type, result): events = self.cb.process_events(result["id"], result["segment_id"]) proc = events["process"] if type == "domain" and proc.has_key("netconn_complete"): for netconn in proc["netconn_complete"]: ts, ip, port, proto, domain, dir = netconn.split("|") if ioc in domain: str_ip = socket.inet_ntoa(struct.pack("!i", int(ip))) print "%s\t%s (%s:%s)" % (ts, domain, str_ip, port) elif type == "ipaddr" and proc.has_key("netconn_complete"): for netconn in proc["netconn_complete"]: ts, ip, port, proto, domain, direction = netconn.split("|") packed_ip = struct.unpack("!i", socket.inet_aton(ioc))[0] #import code; code.interact(local=locals()) if packed_ip == int(ip): str_ip = socket.inet_ntoa(struct.pack("!i", int(ip))) print "%s\t%s (%s:%s)" % (ts, domain, str_ip, port) elif type == "md5" and proc.has_key("modload_complete"): for modload in proc["modload_complete"]: ts, md5, path = modload.split("|") if ioc in md5: print "%s\t%s %s" % (ts, md5, path) if result["process_md5"] == ioc: print "%s\t%s %s" % (result["start"], result["process_md5"], result["path"])
def outputNetConn(self, proc, netconn): """ output a single netconn event from a process document the caller is responsible for ensuring that the document meets start time and subnet criteria """ # for convenience, use locals for some process metadata fields hostname = proc.get("hostname", "<unknown>") process_name = proc.get("process_name", "<unknown>") user_name = proc.get("username", "<unknown>") process_md5 = proc.get("process_md5", "<unknown>") cmdline = proc.get("cmdline", "<unknown>") path = proc.get("path", "<unknown>") procstarttime = proc.get("start", "<unknown>") proclastupdate = proc.get("last_update", "<unknown>") # split the netconn into component parts ts, ip, port, proto, domain, dir = netconn.split("|") # get the dotted-quad string representation of the ip str_ip = socket.inet_ntoa(struct.pack("!i", int(ip))) # the underlying data model provides the protocol number # convert this to human-readable strings (tcp or udp) if "6" == proto: proto = "tcp" elif "17" == proto: proto = "udp" # the underlying data model provides a boolean indication as to # if this is an inbound or outbound network connection if "true" == dir: dir = "out" else: dir = "in" # print the record, using pipes as a delimiter print "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|" % (procstarttime,proclastupdate,hostname, user_name, proto, str_ip, port, dir, domain, process_name, process_md5, path, cmdline)
def interface_ip(self): """ Returns ascii representation of the ip address of the interface used to communicate with the Cb Response server. If using NAT, this will be the "internal" IP address of the sensor. """ try: ip_address = socket.inet_ntoa(struct.pack('>i', self._attribute('interface_ip', 0))) except: ip_address = self._attribute('interface_ip', 0) return ip_address
def _read_SOCKS5_address(self, file): atyp = self._readall(file, 1) if atyp == b"\x01": addr = socket.inet_ntoa(self._readall(file, 4)) elif atyp == b"\x03": length = self._readall(file, 1) addr = self._readall(file, ord(length)) elif atyp == b"\x04": addr = socket.inet_ntop(socket.AF_INET6, self._readall(file, 16)) else: raise GeneralProxyError("SOCKS5 proxy server sent invalid data") port = struct.unpack(">H", self._readall(file, 2))[0] return addr, port
def inet_ntop(address_family, packed_ip): if address_family == socket.AF_INET: return socket.inet_ntoa(packed_ip) addr = sockaddr() addr.sa_family = address_family addr_size = ctypes.c_int(ctypes.sizeof(addr)) ip_string = ctypes.create_string_buffer(128) ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) if address_family == socket.AF_INET6: if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr): raise socket.error('packed IP wrong length for inet_ntoa') ctypes.memmove(addr.ipv6_addr, packed_ip, 16) else: raise socket.error('unknown address family') if WSAAddressToStringA( ctypes.byref(addr), addr_size, None, ip_string, ctypes.byref(ip_string_size) ) != 0: raise socket.error(ctypes.FormatError()) return ip_string[:ip_string_size.value - 1]
def deserialize(byts, protocol_version): if len(byts) == 16: return util.inet_ntop(socket.AF_INET6, byts) else: # util.inet_pton could also handle, but this is faster # since we've already determined the AF return socket.inet_ntoa(byts)
def parse_CAddress(vds): d = {} d['nVersion'] = vds.read_int32() d['nTime'] = vds.read_uint32() d['nServices'] = vds.read_uint64() d['pchReserved'] = vds.read_bytes(12) d['ip'] = socket.inet_ntoa(vds.read_bytes(4)) d['port'] = socket.htons(vds.read_uint16()) return d
def reserved_ip(server, qn): global RESERVEDIP, HOSTS4, IPMAP ipbias = 0 if mutex.acquire(): IPMAP.append((server[0], qn)) ipbias = len(IPMAP) mutex.release() addr_ip = struct.pack('!I', RESERVEDIP + ipbias) addr = socket.inet_ntoa(addr_ip) HOSTS4[qn] = [addr] return addr
def dotted_netmask(mask): """ Converts mask from /xx format to xxx.xxx.xxx.xxx Example: if mask is 24 function returns 255.255.255.0 """ bits = 0xffffffff ^ (1 << 32 - mask) - 1 return socket.inet_ntoa(struct.pack('>I', bits))
def get_local_ip(self): return socket.inet_ntoa(self.ip)
def get_local_ip(self): if self.local_ip is None: return None return socket.inet_ntoa(self.local_ip)
def inet_ntop(family, ipstr): if family == socket.AF_INET: return to_bytes(socket.inet_ntoa(ipstr)) elif family == socket.AF_INET6: import re v6addr = ':'.join(('%02X%02X' % (ord(i), ord(j))).lstrip('0') for i, j in zip(ipstr[::2], ipstr[1::2])) v6addr = re.sub('::+', '::', v6addr, count=1) return to_bytes(v6addr)
def parse_header(data): addrtype = ord(data[0]) dest_addr = None dest_port = None header_length = 0 connecttype = (addrtype & 0x8) and 1 or 0 addrtype &= ~0x8 if addrtype == ADDRTYPE_IPV4: if len(data) >= 7: dest_addr = socket.inet_ntoa(data[1:5]) dest_port = struct.unpack('>H', data[5:7])[0] header_length = 7 else: logging.warn('header is too short') elif addrtype == ADDRTYPE_HOST: if len(data) > 2: addrlen = ord(data[1]) if len(data) >= 4 + addrlen: dest_addr = data[2:2 + addrlen] dest_port = struct.unpack('>H', data[2 + addrlen:4 + addrlen])[0] header_length = 4 + addrlen else: logging.warn('header is too short') else: logging.warn('header is too short') elif addrtype == ADDRTYPE_IPV6: if len(data) >= 19: dest_addr = socket.inet_ntop(socket.AF_INET6, data[1:17]) dest_port = struct.unpack('>H', data[17:19])[0] header_length = 19 else: logging.warn('header is too short') else: logging.warn('unsupported addrtype %d, maybe wrong password or ' 'encryption method' % addrtype) if dest_addr is None: return None return connecttype, addrtype, to_bytes(dest_addr), dest_port, header_length
def decode_binary_peers(peers): """ Return a list of IPs and ports, given a binary list of peers, from a tracker response. """ peers = splice(peers, 6) # Cut the response at the end of every peer return [(socket.inet_ntoa(p[:4]), decode_port(p[4:])) for p in peers]
def unpack(self, packet): _ip = layer() _ip.ihl = (ord(packet[0]) & 0xf) * 4 iph = struct.unpack("!BBHHHBBH4s4s", packet[:_ip.ihl]) _ip.ver = iph[0] >> 4 _ip.tos = iph[1] _ip.length = iph[2] _ip.ids = iph[3] _ip.flags = iph[4] >> 13 _ip.offset = iph[4] & 0x1FFF _ip.ttl = iph[5] _ip.protocol = iph[6] _ip.checksum = hex(iph[7]) _ip.src = socket.inet_ntoa(iph[8]) _ip.dst = socket.inet_ntoa(iph[9]) _ip.list = [ _ip.ihl, _ip.ver, _ip.tos, _ip.length, _ip.ids, _ip.flags, _ip.offset, _ip.ttl, _ip.protocol, _ip.src, _ip.dst] return _ip
def get_interface_ip(ifname): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', ifname[:15]))[20:24])
def local_addr(self): """Local address IP (x.x.x.x) :type: :class:`str`""" return socket.inet_ntoa(struct.pack("<I", self.dwLocalAddr))
def remote_addr(self): """remote address IP (x.x.x.x) :type: :class:`str`""" if not self.established: return None return socket.inet_ntoa(struct.pack("<I", self.dwRemoteAddr))
def _read_socks5_address(filezz): atyp = _readall(filezz, 1) if atyp == b"\x01": addr = socket.inet_ntoa(_readall(filezz, 4)) elif atyp == b"\x03": length = _readall(filezz, 1) addr = _readall(filezz, ord(length)) else: raise GeneralProxyError("SOCKS5 proxy server sent invalid data") port = struct.unpack(">H", _readall(filezz, 2))[0] return addr, port # noinspection PyTypeChecker