我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.ntohs()。
def validate(self, data): eth_length = 14 eth_header = data[:eth_length] eth = struct.unpack('!6s6sH' , eth_header) eth_protocol = socket.ntohs(eth[2]) # print 'Destination MAC : ' + self.eth_addr(data[0:6]) + ' Source MAC : ' + self.eth_addr(data[6:12]) + ' Protocol : ' + str(eth_protocol) ip_header = data[eth_length:20+eth_length] iph = struct.unpack('!BBHHHBBH4s4s' , ip_header) version_ihl = iph[0] ihl = version_ihl & 0xF iph_length = ihl * 4 protocol = iph[6] s_addr = socket.inet_ntoa(iph[8]) d_addr = socket.inet_ntoa(iph[9]) # udp_header = data[iph_length+eth_length:iph_length+20+eth_length] # udph = struct.unpack('!HHLLBBHHH' , udp_header) # print "[{}] - {}:{} -> {}:{}".format(protocol, s_addr,udph[0], d_addr,udph[1])
def user_login(self): login = socket.ntohs(1) login = struct.pack('h',login) action = socket.ntohs(2) peer_id = socket.ntohl(632949210) myid = 632949211 action = struct.pack('h', action) peer_id = struct.pack('I', peer_id) myid = struct.pack('I',myid) print ("user login...") msgbody_len = 4 msgbody_len = socket.ntohs(msgbody_len) msgbody_len = struct.pack('h', msgbody_len) send_login = login + peer_id + msgbody_len + myid self.send(send_login)
def reverse(self, val): if self.size == 16: val = socket.ntohs(val) elif self.size == 32: val = socket.ntohl(val) return val
def get_pid_port_tcp(self, port): for item in self.get_extended_tcp_table(): lPort = socket.ntohs(item.dwLocalPort) lAddr = socket.inet_ntoa(struct.pack('L', item.dwLocalAddr)) pid = item.dwOwningPid if lPort == port: return pid else: return None ################################################################################# # The GetExtendedUdpTable function retrieves a table that contains a list of UDP endpoints available to the application. # # DWORD GetExtendedUdpTable( # _Out_ PVOID pUdpTable, # _Inout_ PDWORD pdwSize, # _In_ BOOL bOrder, # _In_ ULONG ulAf, # _In_ UDP_TABLE_CLASS TableClass, # _In_ ULONG Reserved # );
def get_pid_port_udp(self, port): for item in self.get_extended_udp_table(): lPort = socket.ntohs(item.dwLocalPort) lAddr = socket.inet_ntoa(struct.pack('L', item.dwLocalAddr)) pid = item.dwOwningPid if lPort == port: return pid else: return None ############################################################################### # Retrieves the name of the executable file for the specified process. # # DWORD WINAPI GetProcessImageFileName( # _In_ HANDLE hProcess, # _Out_ LPTSTR lpImageFileName, # _In_ DWORD nSize # );
def main(): connection=socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(3)) while True: raw_data,addr=connection.recvfrom(65536) dest_mac, src_mac, eth_proto, data = ethernet_frame(raw_data) print('\nEthernet Frame:') print(TAB_1+'Destination: {}, Source: {}, Protocol: {}'.format(dest_mac,src_mac,eth_proto)) if eth_proto==8: (version,header_length,ttl,proto,src,target,data)=ipv4_packet(data) print(TAB_1+'IPv4 Packet:') print(TAB_2 + 'Version: {}, Header Length: {}, TTL: {},'.format(version,header_length,ttl)) print(TAB_2 + 'Protocol: {}, Source: {}, Target: {}'.format(proto, src, target)) elif eth_proto==1: icmp = ICMP(ipv4.data)
def parse_ethernet_header(self, packet): dst_mac = self.bytes_to_mac(packet[0:6]) src_mac = self.bytes_to_mac(packet[6:12]) eth_protocol = socket.ntohs((packet[13] << 8) + packet[12]) return dst_mac, src_mac, eth_protocol
def remote_port(self): """:type: :class:`int`""" if not self.established: return None return socket.ntohs(self.dwRemotePort)
def local_port(self): """:type: :class:`int`""" return socket.ntohs(self.dwLocalPort)
def getPortNumHostByteOrder(st_network): """ Convert 16-bit positive integers from network to host byte order. On machines where the host byte order is the same as network byte order, this is a no-op; otherwise, it performs a 2-byte swap operation :param st_network: :return: 20480 - 80 """ try: st_network = int(st_network) return socket.ntohs(st_network) except Exception as e: logging.error("[NetworkByteOrderPortFalse]: %f %s" % (st_network, repr(e)))
def convert_integer(): data = 1234 # 32-bit print ("Original: %s => Long host byte order: %s, Network byte order: %s" %(data, socket.ntohl(data), socket.htonl(data))) # 16-bit print ("Original: %s => Short host byte order: %s, Network byte order: %s" %(data, socket.ntohs(data), socket.htons(data)))
def test_socket_has_some_reexports(): assert tsocket.SOL_SOCKET == stdlib_socket.SOL_SOCKET assert tsocket.TCP_NODELAY == stdlib_socket.TCP_NODELAY assert tsocket.gaierror == stdlib_socket.gaierror assert tsocket.ntohs == stdlib_socket.ntohs ################################################################ # name resolution ################################################################
def __init__(self, interface): global s, redirect_to_mac, arp super(Arp_Spoof, self).__init__() s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0800)) try: s.bind((interface, socket.htons(0x0800))) except socket.error: print("\033[1;31mUnable to bind to interface... unknown type\033[00m") exit()
def __init__(self, interface): global s, sent, rev sent = 0 rev = 0 super(Arp_Ping, self).__init__() s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0800)) try: s.bind((interface, socket.htons(0x0800))) except socket.error: print("\033[1;31mUnable to bind to interface... unknown type\033[00m") exit()
def testNtoH(self): # This just checks that htons etc. are their own inverse, # when looking at the lower 16 or 32 bits. sizes = {socket.htonl: 32, socket.ntohl: 32, socket.htons: 16, socket.ntohs: 16} for func, size in sizes.items(): mask = (1<<size) - 1 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): self.assertEqual(i & mask, func(func(i&mask)) & mask) swapped = func(mask) self.assertEqual(swapped & mask, mask) self.assertRaises(OverflowError, func, 1<<34)
def testNtoHErrors(self): good_values = [ 1, 2, 3, 1, 2, 3 ] bad_values = [ -1, -2, -3, -1, -2, -3 ] for k in good_values: socket.ntohl(k) socket.ntohs(k) socket.htonl(k) socket.htons(k) for k in bad_values: self.assertRaises((OverflowError, ValueError), socket.ntohl, k) self.assertRaises((OverflowError, ValueError), socket.ntohs, k) self.assertRaises((OverflowError, ValueError), socket.htonl, k) self.assertRaises((OverflowError, ValueError), socket.htons, k)
def testNtoH(self): # This just checks that htons etc. are their own inverse, # when looking at the lower 16 or 32 bits. sizes = {socket.htonl: 32, socket.ntohl: 32, socket.htons: 16, socket.ntohs: 16} for func, size in sizes.items(): mask = (1L<<size) - 1 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): self.assertEqual(i & mask, func(func(i&mask)) & mask) swapped = func(mask) self.assertEqual(swapped & mask, mask) self.assertRaises(OverflowError, func, 1L<<34)
def testNtoHErrors(self): good_values = [ 1, 2, 3, 1L, 2L, 3L ] bad_values = [ -1, -2, -3, -1L, -2L, -3L ] for k in good_values: socket.ntohl(k) socket.ntohs(k) socket.htonl(k) socket.htons(k) for k in bad_values: self.assertRaises(OverflowError, socket.ntohl, k) self.assertRaises(OverflowError, socket.ntohs, k) self.assertRaises(OverflowError, socket.htonl, k) self.assertRaises(OverflowError, socket.htons, k)
def get_total_length(self): return ntohs(self.ip_len)
def testNtoHErrors(self): good_values = [ 1, 2, 3, 1, 2, 3 ] bad_values = [ -1, -2, -3, -1, -2, -3 ] for k in good_values: socket.ntohl(k) socket.ntohs(k) socket.htonl(k) socket.htons(k) for k in bad_values: self.assertRaises(OverflowError, socket.ntohl, k) self.assertRaises(OverflowError, socket.ntohs, k) self.assertRaises(OverflowError, socket.htonl, k) self.assertRaises(OverflowError, socket.htons, k)
def checksum(data): data = six.binary_type(data) # input can be bytearray. if len(data) % 2: data += b'\x00' s = sum(array.array('H', data)) s = (s & 0xffff) + (s >> 16) s += (s >> 16) return socket.ntohs(~s & 0xffff) # avoid circular import
def checksum(data): if len(data) % 2: data += b'\x00' s = sum(array.array('H',data)) s = (s & 0xffff) + (s >> 16) s += (s >> 16) return _socket.ntohs(~s & 0xffff)
def testNtoHErrors(self): good_values = [ 1, 2, 3, 1L, 2L, 3L ] bad_values = [ -1, -2, -3, -1L, -2L, -3L ] for k in good_values: socket.ntohl(k) socket.ntohs(k) socket.htonl(k) socket.htons(k) for k in bad_values: self.assertRaises((OverflowError, ValueError), socket.ntohl, k) self.assertRaises((OverflowError, ValueError), socket.ntohs, k) self.assertRaises((OverflowError, ValueError), socket.htonl, k) self.assertRaises((OverflowError, ValueError), socket.htons, k)
def worker_send(self): while True: msgbody = input('say sth: ') msgbody = msgbody.rstrip() msgbody_len = len(msgbody) msgbody_len = socket.ntohs(msgbody_len) msgbody_len = struct.pack('h', msgbody_len) send_msg = action + peer_id + msgbody_len + msgbody.encode('utf-8') self.send(send_msg)
def __init__(self): self.hostmac = "" self.hostip = "" self.conf = True self.ifaceHost = "em1" self.ifaceNetwork = "eth0" self.sockHost = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003)) self.sockNetwork = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003)) try: self.sockHost.bind((self.ifaceHost, 0)) self.sockNetwork.bind((self.ifaceNetwork, 0)) except: #exit("You need 2 physical network interfaces to use FENRIR !") print("You need 2 physical network interfaces to use FENRIR !") self.inputs = [self.sockHost, self.sockNetwork]
def bindAllIface(self): self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
def parsePacket(s): d = {} # http://www.tcpdump.org/linktypes.html llHeaders = { 0: 4, 1: 14, 108: 4, 228: 0 } if pcapObj.datalink() in llHeaders: s = s[llHeaders[pcapObj.datalink()]:] else: stats['unknown L2 protocol'] += 1 d['version'] = (ord(s[0]) & 0xf0) >> 4 d['header_len'] = ord(s[0]) & 0x0f d['tos'] = ord(s[1]) d['total_len'] = socket.ntohs(struct.unpack('H', s[2:4])[0]) d['id'] = socket.ntohs(struct.unpack('H', s[4:6])[0]) d['flags'] = (ord(s[6]) & 0xe0) >> 5 d['fragment_offset'] = socket.ntohs(struct.unpack('H', s[6:8])[0] & 0x1f) d['ttl'] = ord(s[8]) d['protocol'] = ord(s[9]) d['checksum'] = socket.ntohs(struct.unpack('H', s[10:12])[0]) d['source_address'] = pcap.ntoa(struct.unpack('i', s[12:16])[0]) d['destination_address'] = pcap.ntoa(struct.unpack('i', s[16:20])[0]) if d['header_len'] > 5: d['options'] = s[20:4 * (d['header_len'] - 5)] else: d['options'] = None s = s[4 * d['header_len']:] if d['protocol'] == 17: d['source_port'] = socket.ntohs(struct.unpack('H', s[0:2])[0]) d['destination_port'] = socket.ntohs(struct.unpack('H', s[2:4])[0]) s = s[8:] stats['UDP packets'] += 1 d['data'] = s stats['IP packets'] += 1 return d