Python socket 模块,ntohl() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.ntohl()

项目:Software-Architecture-with-Python    作者:PacktPublishing    | 项目源码 | 文件源码
def receive(channel):
    """ Receive a message from a channel """

    size = struct.calcsize("L")
    size = channel.recv(size)
    try:
        size = socket.ntohl(struct.unpack("L", size)[0])
    except struct.error as e:
        return ''

    buf = ""

    while len(buf) < size:
        buf = channel.recv(size - len(buf))

    return pickle.loads(buf)[0]
项目:tidb-ansible    作者:pingcap    | 项目源码 | 文件源码
def _convert_host_to_hex(host):
    """
    Convert the provided host to the format in /proc/net/tcp*

    /proc/net/tcp uses little-endian four byte hex for ipv4
    /proc/net/tcp6 uses little-endian per 4B word for ipv6

    Args:
        host: String with either hostname, IPv4, or IPv6 address

    Returns:
        List of tuples containing address family and the
        little-endian converted host
    """
    ips = []
    if host is not None:
        for family, ip in _convert_host_to_ip(host):
            hexip_nf = binascii.b2a_hex(socket.inet_pton(family, ip))
            hexip_hf = ""
            for i in range(0, len(hexip_nf), 8):
                ipgroup_nf = hexip_nf[i:i+8]
                ipgroup_hf = socket.ntohl(int(ipgroup_nf, base=16))
                hexip_hf = "%s%08X" % (hexip_hf, ipgroup_hf)
            ips.append((family, hexip_hf))
    return ips
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def _convert_host_to_hex(host):
    """
    Convert the provided host to the format in /proc/net/tcp*

    /proc/net/tcp uses little-endian four byte hex for ipv4
    /proc/net/tcp6 uses little-endian per 4B word for ipv6

    Args:
        host: String with either hostname, IPv4, or IPv6 address

    Returns:
        List of tuples containing address family and the
        little-endian converted host
    """
    ips = []
    if host is not None:
        for family, ip in _convert_host_to_ip(host):
            hexip_nf = binascii.b2a_hex(socket.inet_pton(family, ip))
            hexip_hf = ""
            for i in range(0, len(hexip_nf), 8):
                ipgroup_nf = hexip_nf[i:i+8]
                ipgroup_hf = socket.ntohl(int(ipgroup_nf, base=16))
                hexip_hf = "%s%08X" % (hexip_hf, ipgroup_hf)
            ips.append((family, hexip_hf))
    return ips
项目:nginx_log_parse    作者:daiguadaidai    | 项目源码 | 文件源码
def reducer_top100(self, _, values):
        """?????"""

        for cnt, ip in heapq.nlargest(100, values, key=lambda x: int(x[0])):
            ip_num = -1
            try:
                # ?IP???INT/LONG ??
                ip_num = socket.ntohl(struct.unpack("I",socket.inet_aton(str(ip)))[0])
                # ?????? ?? DataFrame
                addr_df = self.ip_addr_df[(self.ip_addr_df.ip_start_num <= ip_num) & 
                                          (ip_num <= self.ip_addr_df.ip_end_num)]
                # ????????? ??
                addr = addr_df.at[addr_df.index.tolist()[0], 'addr']
                yield cnt, '{ip}    {addr}'.format(ip=ip, addr=addr)
            except:
                yield cnt, ip
项目:ipDB    作者:DrizzleRisk    | 项目源码 | 文件源码
def ip2int():
    file_object_read = open('ipDB_0401.txt','r')
    file_object_write = open('ipDB_int_0401.txt','w')

    all_the_text = file_object_read.read()

    # ip2int
    line_arr = all_the_text.split('\n')
    for i in range(len(line_arr)):
        cell = line_arr[i].split(' ')
        cell[0] = str(socket.ntohl(struct.unpack("=I",socket.inet_aton(cell[0]))[0]))
        cell[1] = str(socket.ntohl(struct.unpack("=I",socket.inet_aton(cell[1]))[0]))
        line_arr[i] = ' '.join(cell)
    all_the_text = '\n'.join(line_arr)    

    file_object_write.write(all_the_text)
    file_object_read.close()
    file_object_write.close()

# ????IP
项目:fascinatedNight    作者:songshixuan    | 项目源码 | 文件源码
def user_login(self):
        login = socket.ntohs(1)
        login = struct.pack('h',login)
        action   = socket.ntohs(2)
        peer_id  = socket.ntohl(632949210)
        myid = 632949211
        action   = struct.pack('h', action)
        peer_id  = struct.pack('I', peer_id)
        myid = struct.pack('I',myid)

        print ("user login...")
        msgbody_len = 4
        msgbody_len  = socket.ntohs(msgbody_len)
        msgbody_len  = struct.pack('h', msgbody_len)
        send_login = login + peer_id + msgbody_len + myid
        self.send(send_login)
项目:scapy-vxlan    作者:p4lang    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def receive(channel):
    size = struct.calcsize("L")
    size = channel.recv(size)
    size = ntohl(struct.unpack("L", size)[0])
    buf = ""
    while len(buf) < size:
        buf = channel.recv(size - len(buf))
        return unmarshall(buf)[0]

# Echo server sample
项目:my-weather-indicator    作者:atareao    | 项目源码 | 文件源码
def route_to_python(route, family):
        addr, netmask, gateway, metric = route
        return [
            fixups.addr_to_python(addr, family),
            netmask,
            fixups.addr_to_python(gateway, family),
            socket.ntohl(metric)
        ]
项目:CyberScan    作者:medbenali    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:packet-queue    作者:google    | 项目源码 | 文件源码
def nfq_callback(qh, unused_nfmsg, nfad, unused_data):
  packet = nfq.nfq_get_msg_packet_hdr(nfad).contents
  packet_id = socket.ntohl(packet.packet_id)

  payload_pointer = ctypes.c_void_p()
  size = nfq.nfq_get_payload(nfad, ctypes.byref(payload_pointer))
  payload = ctypes.string_at(payload_pointer, size)

  packet = Packet(packet_id, size, payload, qh)
  py_callbacks[qh](packet)
  return 0

# Maps queue handles to user-specified callbacks.
项目:CVE-2016-6366    作者:RiskSense-Ops    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:Python-Network-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def receive(channel):
    size = struct.calcsize("L")
    size = channel.recv(size)
    try:
        size = socket.ntohl(struct.unpack("L", size)[0])
    except struct.error as e:
        return ''
    buf = ""
    while len(buf) < size:
        buf = channel.recv(size - len(buf))
    return pickle.loads(buf)[0]
项目:Python-Network-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def convert_integer():
    data = 1234
    # 32-bit
    print ("Original: %s => Long  host byte order: %s, Network byte order: %s" %(data, socket.ntohl(data), socket.htonl(data)))
    # 16-bit
    print ("Original: %s => Short  host byte order: %s, Network byte order: %s" %(data, socket.ntohs(data), socket.htons(data)))
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:wego    作者:wegostudio    | 项目源码 | 文件源码
def decrypt(self, text):
        """?????????????
        @param text: ??
        @return: ??????????
        """
        try:
            cryptor = AES.new(self.key, self.mode, self.key[:16])
            # ??BASE64??????????AES-CBC??
            plain_text = cryptor.decrypt(base64.b64decode(text))
        except Exception:
            # print e
            return Crypt_DecryptAES_Error, None
        try:
            if PY2:
                pad = ord(plain_text[-1])
            else:
                pad = plain_text[-1]
            # ???????
            # pkcs7 = PKCS7Encoder()
            # plain_text = pkcs7.encode(plain_text)
            # ??16??????
            content = plain_text[16:-pad]
            xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])
            xml_content = content[4: xml_len + 4]
            from_appid = content[xml_len + 4:].decode("utf8")
        except Exception:
            return Crypt_IllegalBuffer, None
        if from_appid != self.appid:
            return Crypt_ValidateAppid_Error, None
        return 0, xml_content
项目:heartbreaker    作者:lokori    | 项目源码 | 文件源码
def handle_read(self):
            fromaddr, flags,msg,notif = self.sctp_recv(65536)
            params={'fromaddr':fromaddr,'flags':flags,'ppid':socket.ntohl(notif.ppid)}
            Endpoint.handle_read(self,msg,params)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1<<34)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1, 2, 3 ]
        bad_values = [ -1, -2, -3, -1, -2, -3 ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises((OverflowError, ValueError), socket.ntohl, k)
            self.assertRaises((OverflowError, ValueError), socket.ntohs, k)
            self.assertRaises((OverflowError, ValueError), socket.htonl, k)
            self.assertRaises((OverflowError, ValueError), socket.htons, k)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1L<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1L<<34)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1L, 2L, 3L ]
        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1L<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1L<<34)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1L, 2L, 3L ]
        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
项目:scapy-bpf    作者:guedou    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:scapy-radio    作者:BastilleResearch    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:nginx_log_parse    作者:daiguadaidai    | 项目源码 | 文件源码
def reducer_top100(self, _, values):
        """?????"""
        for cnt, ip in heapq.nlargest(100, values, key=lambda x: int(x[0])):
            ip_num = -1
            try:
                # ?IP???INT/LONG ??
                ip_num = socket.ntohl(struct.unpack("I",socket.inet_aton(str(ip)))[0])
                # ?????? ?? DataFrame
                addr_df = self.ip_addr_df[(self.ip_addr_df.ip_start_num <= ip_num) & 
                                          (ip_num <= self.ip_addr_df.ip_end_num)]
                # ????????? ??
                addr = addr_df.at[addr_df.index.tolist()[0], 'addr']
                yield cnt, '{ip}    {addr}'.format(ip=ip, addr=addr)
            except:
                yield cnt, ip
项目:nginx_log_parse    作者:daiguadaidai    | 项目源码 | 文件源码
def _ip2num(self, ip):
        """??IP?????"""
        ip_num = -1
        try:
            # ?IP???INT/LONG ??
            ip_num = socket.ntohl(struct.unpack("I",socket.inet_aton(str(ip)))[0])
        except:
            pass
        finally:
           return ip_num
项目:mysql_backup_client    作者:daiguadaidai    | 项目源码 | 文件源码
def ip2num(self, ip):
        """???IP?????
        Args:
            ip: ??ip: 10.10.10.11
        Return: 
            ???? Long ?????
            ?: 123456789
        Raise: None
        """

        return socket.ntohl(struct.unpack('I', socket.inet_aton(str(ip)))[0])
项目:internet-content-detection    作者:liubo0621    | 项目源码 | 文件源码
def ip_to_num(ip):
    ip_num = socket.ntohl(struct.unpack("I", socket.inet_aton(str(ip)))[0])
    return ip_num
项目:wechat    作者:zgliujiangang    | 项目源码 | 文件源码
def decrypt(self,text,appid):
        """?????????????
        @param text: ??
        @return: ??????????
        """
        try:
            cryptor = AES.new(self.key,self.mode,self.key[:16])
            # ??BASE64??????????AES-CBC??
            plain_text  = cryptor.decrypt(base64.b64decode(text))
        except Exception,e:
            #print e
            return  ierror.WXBizMsgCrypt_DecryptAES_Error,None
        try:
            pad = ord(plain_text[-1])
            # ???????
            #pkcs7 = PKCS7Encoder()
            #plain_text = pkcs7.encode(plain_text)
            # ??16??????
            content = plain_text[16:-pad]
            xml_len = socket.ntohl(struct.unpack("I",content[ : 4])[0])
            xml_content = content[4 : xml_len+4]
            from_appid = content[xml_len+4:]
        except Exception,e:
            #print e
            return  ierror.WXBizMsgCrypt_IllegalBuffer,None
        if  from_appid != appid:
            return ierror.WXBizMsgCrypt_ValidateAppid_Error,None
        return 0,xml_content
项目:isf    作者:w3h    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1<<34)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1, 2, 3 ]
        bad_values = [ -1, -2, -3, -1, -2, -3 ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
项目:wechat-async-sdk    作者:ihjmh    | 项目源码 | 文件源码
def decrypt(self, text, appid):
        """?????????????

        @param text: ??
        @return: ??????????
        """
        try:
            cryptor = AES.new(self.key, self.mode, self.key[:16])
            # ??BASE64??????????AES-CBC??
            plain_text = cryptor.decrypt(base64.b64decode(text))
        except Exception as e:
            raise DecryptAESError(e)

        try:
            if six.PY2:
                pad = ord(plain_text[-1])
            else:
                pad = plain_text[-1]
            # ???????
            # pkcs7 = PKCS7Encoder()
            # plain_text = pkcs7.encode(plain_text)
            # ??16??????
            content = plain_text[16:-pad]
            xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])
            xml_content = content[4: xml_len + 4]
            from_appid = content[xml_len + 4:]
        except Exception as e:
            raise IllegalBuffer(e)
        if from_appid != appid:
            raise ValidateAppIDError()
        return xml_content
项目:weixin-sdk    作者:xfan001    | 项目源码 | 文件源码
def decrypt(self,text,appid):
        """?????????????
        @param text: ??
        @return: ??????????
        """
        try:
            cryptor = AES.new(self.key,self.mode,self.key[:16])
            # ??BASE64??????????AES-CBC??
            plain_text  = cryptor.decrypt(base64.b64decode(text))
        except Exception,e:
            #print e
            return  ierror.WXBizMsgCrypt_DecryptAES_Error,None
        try:
            pad = ord(plain_text[-1])
            # ???????
            #pkcs7 = PKCS7Encoder()
            #plain_text = pkcs7.encode(plain_text)
            # ??16??????
            content = plain_text[16:-pad]
            xml_len = socket.ntohl(struct.unpack("I",content[ : 4])[0])
            xml_content = content[4 : xml_len+4]
            from_appid = content[xml_len+4:]
        except Exception,e:
            #print e
            return  ierror.WXBizMsgCrypt_IllegalBuffer,None
        if  from_appid != appid:
            return ierror.WXBizMsgCrypt_ValidateAppid_Error,None
        return 0,xml_content
项目:wechat_mall    作者:a741424975game    | 项目源码 | 文件源码
def decrypt(self, text, appid):
        """?????????????
        @param text: ??
        @return: ??????????
        """
        try:
            cryptor = AES.new(self.key, self.mode, self.key[:16])
            # ??BASE64??????????AES-CBC??
            plain_text = cryptor.decrypt(base64.b64decode(text))
        except Exception:
            return WXBizMsgCrypt_DecryptAES_Error, None
        try:
            pad = plain_text[-1]
            # ???????
            # pkcs7 = PKCS7Encoder()
            # plain_text = pkcs7.encode(plain_text)
            # ??16??????
            content = plain_text[16:-pad]
            xml_len = socket.ntohl(struct.unpack(b"I", content[:4])[0])
            xml_content = content[4:xml_len+4]
            from_appid = smart_bytes(content[xml_len+4:])
        except Exception:
            return WXBizMsgCrypt_IllegalBuffer, None
        if from_appid != smart_bytes(appid):
            return WXBizMsgCrypt_ValidateAppid_Error, None
        return 0, xml_content
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1L<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1L<<34)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1L, 2L, 3L ]
        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises((OverflowError, ValueError), socket.ntohl, k)
            self.assertRaises((OverflowError, ValueError), socket.ntohs, k)
            self.assertRaises((OverflowError, ValueError), socket.htonl, k)
            self.assertRaises((OverflowError, ValueError), socket.htons, k)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1<<34)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1, 2, 3 ]
        bad_values = [ -1, -2, -3, -1, -2, -3 ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
项目:weixincrop    作者:tonghou23    | 项目源码 | 文件源码
def decrypt(self,text,corpid):
        """?????????????
        @param text: ?? 
        @return: ??????????
        """
        try:
            cryptor = AES.new(self.key,self.mode,self.key[:16])
            # ??BASE64??????????AES-CBC??
            plain_text  = cryptor.decrypt(base64.b64decode(text))
        except Exception,e:
            print e 
            return  ierror.WXBizMsgCrypt_DecryptAES_Error,None
        try:
            pad = ord(plain_text[-1]) 
            # ??????? 
            #pkcs7 = PKCS7Encoder()
            #plain_text = pkcs7.encode(plain_text)   
            # ??16??????
            content = plain_text[16:-pad]
            xml_len = socket.ntohl(struct.unpack("I",content[ : 4])[0])
            xml_content = content[4 : xml_len+4] 
            from_corpid = content[xml_len+4:]
        except Exception,e:
            print e
            return  ierror.WXBizMsgCrypt_IllegalBuffer,None
        if  from_corpid != corpid:
            return ierror.WXBizMsgCrypt_ValidateCorpid_Error,None
        return 0,xml_content
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1L<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1L<<34)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1L, 2L, 3L ]
        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def ip_to_int(ip):
    return  socket.ntohl(struct.unpack("I",socket.inet_aton(ip))[0])
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def ip_to_int(self, ip):
        return struct.unpack('I', struct.pack('I', (socket.ntohl(struct.unpack('I', socket.inet_aton(ip))[0]))))[0]
项目:zabbix_wechat    作者:linuxyan    | 项目源码 | 文件源码
def decrypt(self,text,corpid):
        """?????????????
        @param text: ?? 
        @return: ??????????
        """
        try:
            cryptor = AES.new(self.key,self.mode,self.key[:16])
            # ??BASE64??????????AES-CBC??
            plain_text  = cryptor.decrypt(base64.b64decode(text))
        except Exception,e:
            print e 
            return  ierror.WXBizMsgCrypt_DecryptAES_Error,None
        try:
            pad = ord(plain_text[-1]) 
            # ??????? 
            #pkcs7 = PKCS7Encoder()
            #plain_text = pkcs7.encode(plain_text)   
            # ??16??????
            content = plain_text[16:-pad]
            xml_len = socket.ntohl(struct.unpack("I",content[ : 4])[0])
            xml_content = content[4 : xml_len+4] 
            from_corpid = content[xml_len+4:]
        except Exception,e:
            print e
            return  ierror.WXBizMsgCrypt_IllegalBuffer,None
        if  from_corpid != corpid:
            return ierror.WXBizMsgCrypt_ValidateCorpid_Error,None
        return 0,xml_content