我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.ntohl()。
def receive(channel): """ Receive a message from a channel """ size = struct.calcsize("L") size = channel.recv(size) try: size = socket.ntohl(struct.unpack("L", size)[0]) except struct.error as e: return '' buf = "" while len(buf) < size: buf = channel.recv(size - len(buf)) return pickle.loads(buf)[0]
def _convert_host_to_hex(host): """ Convert the provided host to the format in /proc/net/tcp* /proc/net/tcp uses little-endian four byte hex for ipv4 /proc/net/tcp6 uses little-endian per 4B word for ipv6 Args: host: String with either hostname, IPv4, or IPv6 address Returns: List of tuples containing address family and the little-endian converted host """ ips = [] if host is not None: for family, ip in _convert_host_to_ip(host): hexip_nf = binascii.b2a_hex(socket.inet_pton(family, ip)) hexip_hf = "" for i in range(0, len(hexip_nf), 8): ipgroup_nf = hexip_nf[i:i+8] ipgroup_hf = socket.ntohl(int(ipgroup_nf, base=16)) hexip_hf = "%s%08X" % (hexip_hf, ipgroup_hf) ips.append((family, hexip_hf)) return ips
def reducer_top100(self, _, values): """?????""" for cnt, ip in heapq.nlargest(100, values, key=lambda x: int(x[0])): ip_num = -1 try: # ?IP???INT/LONG ?? ip_num = socket.ntohl(struct.unpack("I",socket.inet_aton(str(ip)))[0]) # ?????? ?? DataFrame addr_df = self.ip_addr_df[(self.ip_addr_df.ip_start_num <= ip_num) & (ip_num <= self.ip_addr_df.ip_end_num)] # ????????? ?? addr = addr_df.at[addr_df.index.tolist()[0], 'addr'] yield cnt, '{ip} {addr}'.format(ip=ip, addr=addr) except: yield cnt, ip
def ip2int(): file_object_read = open('ipDB_0401.txt','r') file_object_write = open('ipDB_int_0401.txt','w') all_the_text = file_object_read.read() # ip2int line_arr = all_the_text.split('\n') for i in range(len(line_arr)): cell = line_arr[i].split(' ') cell[0] = str(socket.ntohl(struct.unpack("=I",socket.inet_aton(cell[0]))[0])) cell[1] = str(socket.ntohl(struct.unpack("=I",socket.inet_aton(cell[1]))[0])) line_arr[i] = ' '.join(cell) all_the_text = '\n'.join(line_arr) file_object_write.write(all_the_text) file_object_read.close() file_object_write.close() # ????IP
def user_login(self): login = socket.ntohs(1) login = struct.pack('h',login) action = socket.ntohs(2) peer_id = socket.ntohl(632949210) myid = 632949211 action = struct.pack('h', action) peer_id = struct.pack('I', peer_id) myid = struct.pack('I',myid) print ("user login...") msgbody_len = 4 msgbody_len = socket.ntohs(msgbody_len) msgbody_len = struct.pack('h', msgbody_len) send_login = login + peer_id + msgbody_len + myid self.send(send_login)
def reverse(self, val): if self.size == 16: val = socket.ntohs(val) elif self.size == 32: val = socket.ntohl(val) return val
def receive(channel): size = struct.calcsize("L") size = channel.recv(size) size = ntohl(struct.unpack("L", size)[0]) buf = "" while len(buf) < size: buf = channel.recv(size - len(buf)) return unmarshall(buf)[0] # Echo server sample
def route_to_python(route, family): addr, netmask, gateway, metric = route return [ fixups.addr_to_python(addr, family), netmask, fixups.addr_to_python(gateway, family), socket.ntohl(metric) ]
def nfq_callback(qh, unused_nfmsg, nfad, unused_data): packet = nfq.nfq_get_msg_packet_hdr(nfad).contents packet_id = socket.ntohl(packet.packet_id) payload_pointer = ctypes.c_void_p() size = nfq.nfq_get_payload(nfad, ctypes.byref(payload_pointer)) payload = ctypes.string_at(payload_pointer, size) packet = Packet(packet_id, size, payload, qh) py_callbacks[qh](packet) return 0 # Maps queue handles to user-specified callbacks.
def receive(channel): size = struct.calcsize("L") size = channel.recv(size) try: size = socket.ntohl(struct.unpack("L", size)[0]) except struct.error as e: return '' buf = "" while len(buf) < size: buf = channel.recv(size - len(buf)) return pickle.loads(buf)[0]
def convert_integer(): data = 1234 # 32-bit print ("Original: %s => Long host byte order: %s, Network byte order: %s" %(data, socket.ntohl(data), socket.htonl(data))) # 16-bit print ("Original: %s => Short host byte order: %s, Network byte order: %s" %(data, socket.ntohs(data), socket.htons(data)))
def decrypt(self, text): """????????????? @param text: ?? @return: ?????????? """ try: cryptor = AES.new(self.key, self.mode, self.key[:16]) # ??BASE64??????????AES-CBC?? plain_text = cryptor.decrypt(base64.b64decode(text)) except Exception: # print e return Crypt_DecryptAES_Error, None try: if PY2: pad = ord(plain_text[-1]) else: pad = plain_text[-1] # ??????? # pkcs7 = PKCS7Encoder() # plain_text = pkcs7.encode(plain_text) # ??16?????? content = plain_text[16:-pad] xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0]) xml_content = content[4: xml_len + 4] from_appid = content[xml_len + 4:].decode("utf8") except Exception: return Crypt_IllegalBuffer, None if from_appid != self.appid: return Crypt_ValidateAppid_Error, None return 0, xml_content
def handle_read(self): fromaddr, flags,msg,notif = self.sctp_recv(65536) params={'fromaddr':fromaddr,'flags':flags,'ppid':socket.ntohl(notif.ppid)} Endpoint.handle_read(self,msg,params)
def testNtoH(self): # This just checks that htons etc. are their own inverse, # when looking at the lower 16 or 32 bits. sizes = {socket.htonl: 32, socket.ntohl: 32, socket.htons: 16, socket.ntohs: 16} for func, size in sizes.items(): mask = (1<<size) - 1 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): self.assertEqual(i & mask, func(func(i&mask)) & mask) swapped = func(mask) self.assertEqual(swapped & mask, mask) self.assertRaises(OverflowError, func, 1<<34)
def testNtoHErrors(self): good_values = [ 1, 2, 3, 1, 2, 3 ] bad_values = [ -1, -2, -3, -1, -2, -3 ] for k in good_values: socket.ntohl(k) socket.ntohs(k) socket.htonl(k) socket.htons(k) for k in bad_values: self.assertRaises((OverflowError, ValueError), socket.ntohl, k) self.assertRaises((OverflowError, ValueError), socket.ntohs, k) self.assertRaises((OverflowError, ValueError), socket.htonl, k) self.assertRaises((OverflowError, ValueError), socket.htons, k)
def testNtoH(self): # This just checks that htons etc. are their own inverse, # when looking at the lower 16 or 32 bits. sizes = {socket.htonl: 32, socket.ntohl: 32, socket.htons: 16, socket.ntohs: 16} for func, size in sizes.items(): mask = (1L<<size) - 1 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): self.assertEqual(i & mask, func(func(i&mask)) & mask) swapped = func(mask) self.assertEqual(swapped & mask, mask) self.assertRaises(OverflowError, func, 1L<<34)
def testNtoHErrors(self): good_values = [ 1, 2, 3, 1L, 2L, 3L ] bad_values = [ -1, -2, -3, -1L, -2L, -3L ] for k in good_values: socket.ntohl(k) socket.ntohs(k) socket.htonl(k) socket.htons(k) for k in bad_values: self.assertRaises(OverflowError, socket.ntohl, k) self.assertRaises(OverflowError, socket.ntohs, k) self.assertRaises(OverflowError, socket.htonl, k) self.assertRaises(OverflowError, socket.htons, k)
def _ip2num(self, ip): """??IP?????""" ip_num = -1 try: # ?IP???INT/LONG ?? ip_num = socket.ntohl(struct.unpack("I",socket.inet_aton(str(ip)))[0]) except: pass finally: return ip_num
def ip2num(self, ip): """???IP????? Args: ip: ??ip: 10.10.10.11 Return: ???? Long ????? ?: 123456789 Raise: None """ return socket.ntohl(struct.unpack('I', socket.inet_aton(str(ip)))[0])
def ip_to_num(ip): ip_num = socket.ntohl(struct.unpack("I", socket.inet_aton(str(ip)))[0]) return ip_num
def decrypt(self,text,appid): """????????????? @param text: ?? @return: ?????????? """ try: cryptor = AES.new(self.key,self.mode,self.key[:16]) # ??BASE64??????????AES-CBC?? plain_text = cryptor.decrypt(base64.b64decode(text)) except Exception,e: #print e return ierror.WXBizMsgCrypt_DecryptAES_Error,None try: pad = ord(plain_text[-1]) # ??????? #pkcs7 = PKCS7Encoder() #plain_text = pkcs7.encode(plain_text) # ??16?????? content = plain_text[16:-pad] xml_len = socket.ntohl(struct.unpack("I",content[ : 4])[0]) xml_content = content[4 : xml_len+4] from_appid = content[xml_len+4:] except Exception,e: #print e return ierror.WXBizMsgCrypt_IllegalBuffer,None if from_appid != appid: return ierror.WXBizMsgCrypt_ValidateAppid_Error,None return 0,xml_content
def testNtoHErrors(self): good_values = [ 1, 2, 3, 1, 2, 3 ] bad_values = [ -1, -2, -3, -1, -2, -3 ] for k in good_values: socket.ntohl(k) socket.ntohs(k) socket.htonl(k) socket.htons(k) for k in bad_values: self.assertRaises(OverflowError, socket.ntohl, k) self.assertRaises(OverflowError, socket.ntohs, k) self.assertRaises(OverflowError, socket.htonl, k) self.assertRaises(OverflowError, socket.htons, k)
def decrypt(self, text, appid): """????????????? @param text: ?? @return: ?????????? """ try: cryptor = AES.new(self.key, self.mode, self.key[:16]) # ??BASE64??????????AES-CBC?? plain_text = cryptor.decrypt(base64.b64decode(text)) except Exception as e: raise DecryptAESError(e) try: if six.PY2: pad = ord(plain_text[-1]) else: pad = plain_text[-1] # ??????? # pkcs7 = PKCS7Encoder() # plain_text = pkcs7.encode(plain_text) # ??16?????? content = plain_text[16:-pad] xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0]) xml_content = content[4: xml_len + 4] from_appid = content[xml_len + 4:] except Exception as e: raise IllegalBuffer(e) if from_appid != appid: raise ValidateAppIDError() return xml_content
def decrypt(self, text, appid): """????????????? @param text: ?? @return: ?????????? """ try: cryptor = AES.new(self.key, self.mode, self.key[:16]) # ??BASE64??????????AES-CBC?? plain_text = cryptor.decrypt(base64.b64decode(text)) except Exception: return WXBizMsgCrypt_DecryptAES_Error, None try: pad = plain_text[-1] # ??????? # pkcs7 = PKCS7Encoder() # plain_text = pkcs7.encode(plain_text) # ??16?????? content = plain_text[16:-pad] xml_len = socket.ntohl(struct.unpack(b"I", content[:4])[0]) xml_content = content[4:xml_len+4] from_appid = smart_bytes(content[xml_len+4:]) except Exception: return WXBizMsgCrypt_IllegalBuffer, None if from_appid != smart_bytes(appid): return WXBizMsgCrypt_ValidateAppid_Error, None return 0, xml_content
def testNtoHErrors(self): good_values = [ 1, 2, 3, 1L, 2L, 3L ] bad_values = [ -1, -2, -3, -1L, -2L, -3L ] for k in good_values: socket.ntohl(k) socket.ntohs(k) socket.htonl(k) socket.htons(k) for k in bad_values: self.assertRaises((OverflowError, ValueError), socket.ntohl, k) self.assertRaises((OverflowError, ValueError), socket.ntohs, k) self.assertRaises((OverflowError, ValueError), socket.htonl, k) self.assertRaises((OverflowError, ValueError), socket.htons, k)
def decrypt(self,text,corpid): """????????????? @param text: ?? @return: ?????????? """ try: cryptor = AES.new(self.key,self.mode,self.key[:16]) # ??BASE64??????????AES-CBC?? plain_text = cryptor.decrypt(base64.b64decode(text)) except Exception,e: print e return ierror.WXBizMsgCrypt_DecryptAES_Error,None try: pad = ord(plain_text[-1]) # ??????? #pkcs7 = PKCS7Encoder() #plain_text = pkcs7.encode(plain_text) # ??16?????? content = plain_text[16:-pad] xml_len = socket.ntohl(struct.unpack("I",content[ : 4])[0]) xml_content = content[4 : xml_len+4] from_corpid = content[xml_len+4:] except Exception,e: print e return ierror.WXBizMsgCrypt_IllegalBuffer,None if from_corpid != corpid: return ierror.WXBizMsgCrypt_ValidateCorpid_Error,None return 0,xml_content
def ip_to_int(ip): return socket.ntohl(struct.unpack("I",socket.inet_aton(ip))[0])
def ip_to_int(self, ip): return struct.unpack('I', struct.pack('I', (socket.ntohl(struct.unpack('I', socket.inet_aton(ip))[0]))))[0]