Python socket 模块,htonl() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.htonl()

项目:psystem    作者:gokhanm    | 项目源码 | 文件源码
def netmask(self, interface_name, netmask):
        """
            Checking interface first, if interface name found in Get().interfaces()
            validating Ipv4. After that applied ip address to interace

            interface_name = Applied Interface
            netmask = New netmask ip address
        """
        interface_check = Get().interfaces
        valid_ipv4 = validators.ipv4(netmask)

        if not interface_name in interface_check:
            raise WrongInterfaceName("Wrong Interface Name %s" % interface_name)
        elif not valid_ipv4 is True:
            raise NotValidIPv4Address("Not Valid IPv4 Address %s" % netmask)
        else:
            prefix_len = self.get_net_size(netmask.split('.'))
            ifname = interface_name.encode(encoding='UTF-8')
            netmask = ctypes.c_uint32(~((2 ** (32 - prefix_len)) - 1)).value
            nmbytes = socket.htonl(netmask)
            ifreq = struct.pack('16sH2sI8s', ifname, AF_INET, b'\x00'*2, nmbytes, b'\x00'*8) 
            fcntl.ioctl(self.sock, SIOCSIFNETMASK, ifreq)
项目:wechat    作者:zgliujiangang    | 项目源码 | 文件源码
def encrypt(self,text,appid):
        """???????
        @param text: ???????
        @return: ????????
        """
        # 16?????????????
        text = self.get_random_str() + struct.pack("I",socket.htonl(len(text))) + text + appid
        # ???????????????????
        pkcs7 = PKCS7Encoder()
        text = pkcs7.encode(text)
        # ??
        cryptor = AES.new(self.key,self.mode,self.key[:16])
        try:
            ciphertext = cryptor.encrypt(text)
            # ??BASE64????????????
            return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
        except Exception,e:
            #print e
            return  ierror.WXBizMsgCrypt_EncryptAES_Error,None
项目:wechat-async-sdk    作者:ihjmh    | 项目源码 | 文件源码
def encrypt(self, text, appid):
        """???????

        @param text: ???????
        @return: ????????
        """
        # 16?????????????
        text = self.get_random_str() + struct.pack("I", socket.htonl(len(text))) + to_binary(text) + appid
        # ???????????????????
        pkcs7 = PKCS7Encoder()
        text = pkcs7.encode(text)
        # ??
        cryptor = AES.new(self.key, self.mode, self.key[:16])
        try:
            ciphertext = cryptor.encrypt(text)
            # ??BASE64????????????
            return base64.b64encode(ciphertext)
        except Exception as e:
            raise EncryptAESError(e)
项目:weixin-sdk    作者:xfan001    | 项目源码 | 文件源码
def encrypt(self,text,appid):
        """???????
        @param text: ???????
        @return: ????????
        """
        # 16?????????????
        text = self.get_random_str() + struct.pack("I",socket.htonl(len(text))) + text + appid
        # ???????????????????
        pkcs7 = PKCS7Encoder()
        text = pkcs7.encode(text)
        # ??
        cryptor = AES.new(self.key,self.mode,self.key[:16])
        try:
            ciphertext = cryptor.encrypt(text)
            # ??BASE64????????????
            return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
        except Exception,e:
            #print e
            return  ierror.WXBizMsgCrypt_EncryptAES_Error,None
项目:wechat_mall    作者:a741424975game    | 项目源码 | 文件源码
def encrypt(self, text, appid):
        """???????
        @param text: ???????
        @return: ????????
        """
        # 16?????????????
        pack_str = struct.pack(b"I", socket.htonl(len(text)))
        text = smart_bytes(self.get_random_str()) + pack_str + smart_bytes(text) + smart_bytes(appid)
        # ???????????????????
        pkcs7 = PKCS7Encoder()
        text = pkcs7.encode(text)
        # ??
        cryptor = AES.new(self.key, self.mode, self.key[:16])
        try:
            ciphertext = cryptor.encrypt(text)
            # ??BASE64????????????
            return WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
        except Exception:
            return WXBizMsgCrypt_EncryptAES_Error, None
项目:weixincrop    作者:tonghou23    | 项目源码 | 文件源码
def encrypt(self,text,corpid):
        """???????
        @param text: ???????
        @return: ????????
        """      
        # 16?????????????
        text = self.get_random_str() + struct.pack("I",socket.htonl(len(text))) + text + corpid
        # ???????????????????
        pkcs7 = PKCS7Encoder()
        text = pkcs7.encode(text)
        # ??    
        cryptor = AES.new(self.key,self.mode,self.key[:16])
        try:
            ciphertext = cryptor.encrypt(text)
            # ??BASE64????????????
            return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
        except Exception,e:
            print e 
            return  ierror.WXBizMsgCrypt_EncryptAES_Error,None
项目:zabbix_wechat    作者:linuxyan    | 项目源码 | 文件源码
def encrypt(self,text,corpid):
        """???????
        @param text: ???????
        @return: ????????
        """      
        # 16?????????????
        text = self.get_random_str() + struct.pack("I",socket.htonl(len(text))) + text + corpid
        # ???????????????????
        pkcs7 = PKCS7Encoder()
        text = pkcs7.encode(text)
        # ??    
        cryptor = AES.new(self.key,self.mode,self.key[:16])
        try:
            ciphertext = cryptor.encrypt(text)
            # ??BASE64????????????
            return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
        except Exception,e:
            print e 
            return  ierror.WXBizMsgCrypt_EncryptAES_Error,None
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def send(channel, *args):
    buf = marshall(args)
    value = htonl(len(buf))
    size = struct.pack("L", value)
    channel.send(size)
    channel.send(buf)
项目:hwfw-tool    作者:LeeXiaolan    | 项目源码 | 文件源码
def toString(self, noItemData=True):
    self.header.fileLength = (
        self.header._FORMAT.size 
        + self.header.itemCount * HuaweiFirmwareItem._FORMAT.size
        - 0x4c # FIXME: Can not find where does this bias come from.
    )
    strs = [
      self.header.toString()[20:], # Partial header used for calculate CRC32 value.
    ]
    if self.header.extraHeaderLength:
      strs.append(self.extraHeader)
      self.header.fileLength += len(self.extraHeader)
    data = []
    for item in self.items:
      strs.append(item.toString())
      data.append(item.data)
      self.header.fileLength += item.size
    # Convert to big endian.
    self.header.fileLength = socket.htonl(self.header.fileLength)

    # Update header CRC32 value.
    self.header.headerCrc = seqCrc32(strs)

    if not noItemData:
      strs.extend(data)
      # All data are present, now update file CRC32 value.
      strs[0] = self.header.toString()[12:]
      self.header.fileCrc = seqCrc32(strs)

    # Using the latest header with correct CRC32 value and file length.
    strs[0] = self.header.toString()
    return ''.join(strs)
项目:my-weather-indicator    作者:atareao    | 项目源码 | 文件源码
def route_to_dbus(route, family):
        addr, netmask, gateway, metric = route
        return [
            fixups.addr_to_dbus(addr, family),
            fixups.mask_to_dbus(netmask),
            fixups.addr_to_dbus(gateway, family),
            socket.htonl(metric)
        ]


# Constants below are generated with makeconstants.py. Do not edit manually.
项目:Software-Architecture-with-Python    作者:PacktPublishing    | 项目源码 | 文件源码
def send(channel, *args):
    """ Send a message to a channel """

    buf = pickle.dumps(args)
    value = socket.htonl(len(buf))
    size = struct.pack("L",value)
    channel.send(size)
    channel.send(buf)
项目:Python-Network-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def send(channel, *args):
    buffer = pickle.dumps(args)
    value = socket.htonl(len(buffer))
    size = struct.pack("L",value)
    channel.send(size)
    channel.send(buffer)
项目:Python-Network-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def convert_integer():
    data = 1234
    # 32-bit
    print ("Original: %s => Long  host byte order: %s, Network byte order: %s" %(data, socket.ntohl(data), socket.htonl(data)))
    # 16-bit
    print ("Original: %s => Short  host byte order: %s, Network byte order: %s" %(data, socket.ntohs(data), socket.htons(data)))
项目:wego    作者:wegostudio    | 项目源码 | 文件源码
def encrypt(self, xml):
        """???????
        @param text: ???????
        @return: ????????
        """
        # 16?????????????
        xml = xml.encode('utf-8')
        if PY2:
            text = self.get_random_str() +\
                struct.pack("I", socket.htonl(len(xml))) + xml + self.appid
        else:
            text = self.get_random_str().encode('utf-8') +\
                struct.pack("I", socket.htonl(len(xml))) +\
                xml + self.appid.encode('utf-8')
        # ???????????????????
        pkcs7 = PKCS7Encoder()
        text = pkcs7.encode(text)
        # ??
        cryptor = AES.new(self.key, self.mode, self.key[:16])
        try:
            ciphertext = cryptor.encrypt(text)
            # ??BASE64????????????
            result = base64.b64encode(ciphertext).decode("utf8")
            return Crypt_OK, result
        except Exception:
            return Crypt_EncryptAES_Error, None
项目:heartbreaker    作者:lokori    | 项目源码 | 文件源码
def handle_write(self):
            while len(self.buffer) > 0:
                payload, params = self.buffer.pop(0)
                sndrcvinfo = sctp.sndrcvinfo()
                sndrcvinfo.ppid = socket.htonl(params.get('ppid',0))
                fromaddr,flags,msg,notif="",128,payload[:1460],sndrcvinfo
                sent = self.sctp_send(msg,ppid=sndrcvinfo.ppid,flags=notif.flags)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1<<34)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1, 2, 3 ]
        bad_values = [ -1, -2, -3, -1, -2, -3 ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises((OverflowError, ValueError), socket.ntohl, k)
            self.assertRaises((OverflowError, ValueError), socket.ntohs, k)
            self.assertRaises((OverflowError, ValueError), socket.htonl, k)
            self.assertRaises((OverflowError, ValueError), socket.htons, k)
项目:chatRoom    作者:JINLINBI    | 项目源码 | 文件源码
def send(channel,*args):
    buffers=pickle.dumps(args)
    value=socket.htonl(len(buffers))
    size=struct.pack("L",value)
    channel.send(size)
    channel.send(buffers)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1L<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1L<<34)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1L, 2L, 3L ]
        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1L<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1L<<34)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1L, 2L, 3L ]
        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
项目:mysql_backup_client    作者:daiguadaidai    | 项目源码 | 文件源码
def num2ip(self, num):
        """???10???????IP
        Args:
            num: ????
        Return:
            ???? ??? IP
            ?: 10.10.10.11
        Raise: None
        """

        return socket.inet_ntoa(struct.pack('I', socket.htonl(num)))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1<<34)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1, 2, 3 ]
        bad_values = [ -1, -2, -3, -1, -2, -3 ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1L<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1L<<34)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1L, 2L, 3L ]
        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises((OverflowError, ValueError), socket.ntohl, k)
            self.assertRaises((OverflowError, ValueError), socket.ntohs, k)
            self.assertRaises((OverflowError, ValueError), socket.htonl, k)
            self.assertRaises((OverflowError, ValueError), socket.htons, k)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1<<34)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1, 2, 3 ]
        bad_values = [ -1, -2, -3, -1, -2, -3 ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1L<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1L<<34)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1L, 2L, 3L ]
        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def int_to_ip(int_ip):
    return socket.inet_ntoa(struct.pack('I',socket.htonl(int_ip)))
项目:ops_agent    作者:sjqzhang    | 项目源码 | 文件源码
def int_to_ip(self, int_ip):
        return socket.inet_ntoa(struct.pack('I', socket.htonl(int_ip)))
项目:oejia_wx    作者:JoneXiong    | 项目源码 | 文件源码
def _encrypt(self, text, _id):
        text = to_binary(text)
        tmp_list = []
        tmp_list.append(to_binary(self.get_random_string()))
        length = struct.pack(b'I', socket.htonl(len(text)))
        tmp_list.append(length)
        tmp_list.append(text)
        tmp_list.append(to_binary(_id))

        text = b''.join(tmp_list)
        text = PKCS7Encoder.encode(text)

        ciphertext = to_binary(self.cipher.encrypt(text))
        return base64.b64encode(ciphertext)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1<<34)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1, 2, 3 ]
        bad_values = [ -1, -2, -3, -1, -2, -3 ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
项目:minion    作者:alibaba    | 项目源码 | 文件源码
def int_to_ipstr(int_ip):
    return socket.inet_ntoa(
            struct.pack('I', socket.htonl(int_ip))
            )
项目:TornadoWeb    作者:VxCoder    | 项目源码 | 文件源码
def _encrypt(self, text, _id):
        text = to_binary(text)
        tmp_list = []
        tmp_list.append(to_binary(self.get_random_string()))
        length = struct.pack(b'I', socket.htonl(len(text)))
        tmp_list.append(length)
        tmp_list.append(text)
        tmp_list.append(to_binary(_id))

        text = b''.join(tmp_list)
        text = PKCS7Encoder.encode(text)

        ciphertext = to_binary(self.cipher.encrypt(text))
        return base64.b64encode(ciphertext)
项目:Aquila    作者:thinkdb    | 项目源码 | 文件源码
def num2ip(arg, int_ip):
    """
    IP address and number conversion
    arg == ip, Convert digital to IP address
    """
    if arg == 'ip':
        ip = socket.inet_ntoa(struct.pack('I', socket.htonl(int_ip)))
    else:
        ip = str(socket.ntohl(struct.unpack('I', socket.inet_aton(int_ip))[0]))
    return ip
项目:Aquila    作者:thinkdb    | 项目源码 | 文件源码
def num2ip(arg, int_ip):
    """
    IP address and number conversion
    """
    if arg == 'ip':
        ip = socket.inet_ntoa(struct.pack('I', socket.htonl(int_ip)))
    else:
        ip = str(socket.ntohl(struct.unpack('I', socket.inet_aton(int_ip))[0]))
    return ip
项目:011_python_network_programming_cookbook_demo    作者:jerry-0824    | 项目源码 | 文件源码
def convert_integer():
    data = 1234
    # 32-bit
    print "Original: %s => Long host byte order: %s, Network byte order: %s"\
            %(data, socket.ntohl(data), socket.htonl(data))
    # 16-bit
    print "Original: %s => Short host byte order: %s, Network byte order: %s"\
            %(data, socket.ntohs(data), socket.htons(data))
项目:011_python_network_programming_cookbook_demo    作者:jerry-0824    | 项目源码 | 文件源码
def send(channel, *args):
    buffer = cPickle.dumps(args)
    value = socket.htonl(len(buffer))
    size = struct.pack("L", value)
    channel.send(size)
    channel.send(buffer)
项目:CyberScan    作者:medbenali    | 项目源码 | 文件源码
def read_routes():
    f=open("/proc/net/route","r")
    routes = []
    s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",LOOPBACK_NAME))
    addrfamily = struct.unpack("h",ifreq[16:18])[0]
    if addrfamily == socket.AF_INET:
        ifreq2 = ioctl(s, SIOCGIFNETMASK,struct.pack("16s16x",LOOPBACK_NAME))
        msk = socket.ntohl(struct.unpack("I",ifreq2[20:24])[0])
        dst = socket.ntohl(struct.unpack("I",ifreq[20:24])[0]) & msk
        ifaddr = scapy.utils.inet_ntoa(ifreq[20:24])
        routes.append((dst, msk, "0.0.0.0", LOOPBACK_NAME, ifaddr))
    else:
        warning("Interface lo: unkown address family (%i)"% addrfamily)

    for l in f.readlines()[1:]:
        iff,dst,gw,flags,x,x,x,msk,x,x,x = l.split()
        flags = int(flags,16)
        if flags & RTF_UP == 0:
            continue
        if flags & RTF_REJECT:
            continue
        try:
            ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",iff))
        except IOError: # interface is present in routing tables but does not have any assigned IP
            ifaddr="0.0.0.0"
        else:
            addrfamily = struct.unpack("h",ifreq[16:18])[0]
            if addrfamily == socket.AF_INET:
                ifaddr = scapy.utils.inet_ntoa(ifreq[20:24])
            else:
                warning("Interface %s: unkown address family (%i)"%(iff, addrfamily))
                continue
        routes.append((socket.htonl(long(dst,16))&0xffffffffL,
                       socket.htonl(long(msk,16))&0xffffffffL,
                       scapy.utils.inet_ntoa(struct.pack("I",long(gw,16))),
                       iff, ifaddr))

    f.close()
    return routes

############
### IPv6 ###
############
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def read_routes():
    try:
        f=open("/proc/net/route","r")
    except IOError:
        warning("Can't open /proc/net/route !")
        return []
    routes = []
    s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",LOOPBACK_NAME))
    addrfamily = struct.unpack("h",ifreq[16:18])[0]
    if addrfamily == socket.AF_INET:
        ifreq2 = ioctl(s, SIOCGIFNETMASK,struct.pack("16s16x",LOOPBACK_NAME))
        msk = socket.ntohl(struct.unpack("I",ifreq2[20:24])[0])
        dst = socket.ntohl(struct.unpack("I",ifreq[20:24])[0]) & msk
        ifaddr = scapy.utils.inet_ntoa(ifreq[20:24])
        routes.append((dst, msk, "0.0.0.0", LOOPBACK_NAME, ifaddr))
    else:
        warning("Interface lo: unkown address family (%i)"% addrfamily)

    for l in f.readlines()[1:]:
        iff,dst,gw,flags,x,x,x,msk,x,x,x = l.split()
        flags = int(flags,16)
        if flags & RTF_UP == 0:
            continue
        if flags & RTF_REJECT:
            continue
        try:
            ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",iff))
        except IOError: # interface is present in routing tables but does not have any assigned IP
            ifaddr="0.0.0.0"
        else:
            addrfamily = struct.unpack("h",ifreq[16:18])[0]
            if addrfamily == socket.AF_INET:
                ifaddr = scapy.utils.inet_ntoa(ifreq[20:24])
            else:
                warning("Interface %s: unkown address family (%i)"%(iff, addrfamily))
                continue
        routes.append((socket.htonl(long(dst,16))&0xffffffffL,
                       socket.htonl(long(msk,16))&0xffffffffL,
                       scapy.utils.inet_ntoa(struct.pack("I",long(gw,16))),
                       iff, ifaddr))

    f.close()
    return routes

############
### IPv6 ###
############
项目:CVE-2016-6366    作者:RiskSense-Ops    | 项目源码 | 文件源码
def read_routes():
    f=open("/proc/net/route","r")
    routes = []
    s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",LOOPBACK_NAME))
    addrfamily = struct.unpack("h",ifreq[16:18])[0]
    if addrfamily == socket.AF_INET:
        ifreq2 = ioctl(s, SIOCGIFNETMASK,struct.pack("16s16x",LOOPBACK_NAME))
        msk = socket.ntohl(struct.unpack("I",ifreq2[20:24])[0])
        dst = socket.ntohl(struct.unpack("I",ifreq[20:24])[0]) & msk
        ifaddr = scapy.utils.inet_ntoa(ifreq[20:24])
        routes.append((dst, msk, "0.0.0.0", LOOPBACK_NAME, ifaddr))
    else:
        warning("Interface lo: unkown address family (%i)"% addrfamily)

    for l in f.readlines()[1:]:
        iff,dst,gw,flags,x,x,x,msk,x,x,x = l.split()
        flags = int(flags,16)
        if flags & RTF_UP == 0:
            continue
        if flags & RTF_REJECT:
            continue
        try:
            ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",iff))
        except IOError: # interface is present in routing tables but does not have any assigned IP
            ifaddr="0.0.0.0"
        else:
            addrfamily = struct.unpack("h",ifreq[16:18])[0]
            if addrfamily == socket.AF_INET:
                ifaddr = scapy.utils.inet_ntoa(ifreq[20:24])
            else:
                warning("Interface %s: unkown address family (%i)"%(iff, addrfamily))
                continue
        routes.append((socket.htonl(long(dst,16))&0xffffffffL,
                       socket.htonl(long(msk,16))&0xffffffffL,
                       scapy.utils.inet_ntoa(struct.pack("I",long(gw,16))),
                       iff, ifaddr))

    f.close()
    return routes

############
### IPv6 ###
############
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def read_routes():
    try:
        f=open("/proc/net/route","rb")
    except IOError:
        warning("Can't open /proc/net/route !")
        return []
    routes = []
    s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",LOOPBACK_NAME.encode('utf-8')))
    addrfamily = struct.unpack("h",ifreq[16:18])[0]
    if addrfamily == socket.AF_INET:
        ifreq2 = ioctl(s, SIOCGIFNETMASK,struct.pack("16s16x",LOOPBACK_NAME.encode('utf-8')))
        msk = socket.ntohl(struct.unpack("I",ifreq2[20:24])[0])
        dst = socket.ntohl(struct.unpack("I",ifreq[20:24])[0]) & msk
        ifaddr = scapy.utils.inet_ntoa(ifreq[20:24])
        routes.append((dst, msk, "0.0.0.0", LOOPBACK_NAME, ifaddr))
    else:
        warning("Interface lo: unkown address family (%i)"% addrfamily)

    for l in f.readlines()[1:]:
        iff,dst,gw,flags,x,x,x,msk,x,x,x = l.split()
        flags = int(flags,16)
        if flags & RTF_UP == 0:
            continue
        if flags & RTF_REJECT:
            continue
        try:
            ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",iff))
        except IOError: # interface is present in routing tables but does not have any assigned IP
            ifaddr="0.0.0.0"
        else:
            addrfamily = struct.unpack("h",ifreq[16:18])[0]
            if addrfamily == socket.AF_INET:
                ifaddr = scapy.utils.inet_ntoa(ifreq[20:24])
            else:
                warning("Interface %s: unkown address family (%i)"%(iff, addrfamily))
                continue
        routes.append((socket.htonl(int(dst,16))&0xffffffff,
                       socket.htonl(int(msk,16))&0xffffffff,
                       scapy.utils.inet_ntoa(struct.pack("I",int(gw,16))),
                       iff.decode('utf-8'), ifaddr))

    f.close()
    return routes

############
### IPv6 ###
############
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def read_routes():
    try:
        f=open("/proc/net/route","r")
    except IOError:
        warning("Can't open /proc/net/route !")
        return []
    routes = []
    s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",LOOPBACK_NAME))
    addrfamily = struct.unpack("h",ifreq[16:18])[0]
    if addrfamily == socket.AF_INET:
        ifreq2 = ioctl(s, SIOCGIFNETMASK,struct.pack("16s16x",LOOPBACK_NAME))
        msk = socket.ntohl(struct.unpack("I",ifreq2[20:24])[0])
        dst = socket.ntohl(struct.unpack("I",ifreq[20:24])[0]) & msk
        ifaddr = scapy.utils.inet_ntoa(ifreq[20:24])
        routes.append((dst, msk, "0.0.0.0", LOOPBACK_NAME, ifaddr))
    else:
        warning("Interface lo: unkown address family (%i)"% addrfamily)

    for l in f.readlines()[1:]:
        iff,dst,gw,flags,x,x,x,msk,x,x,x = l.split()
        flags = int(flags,16)
        if flags & RTF_UP == 0:
            continue
        if flags & RTF_REJECT:
            continue
        try:
            ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",iff))
        except IOError: # interface is present in routing tables but does not have any assigned IP
            ifaddr="0.0.0.0"
        else:
            addrfamily = struct.unpack("h",ifreq[16:18])[0]
            if addrfamily == socket.AF_INET:
                ifaddr = scapy.utils.inet_ntoa(ifreq[20:24])
            else:
                warning("Interface %s: unkown address family (%i)"%(iff, addrfamily))
                continue
        routes.append((socket.htonl(long(dst,16))&0xffffffffL,
                       socket.htonl(long(msk,16))&0xffffffffL,
                       scapy.utils.inet_ntoa(struct.pack("I",long(gw,16))),
                       iff, ifaddr))

    f.close()
    return routes

############
### IPv6 ###
############
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def read_routes():
    try:
        f=open("/proc/net/route","rb")
    except IOError:
        warning("Can't open /proc/net/route !")
        return []
    routes = []
    s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",LOOPBACK_NAME.encode('utf-8')))
    addrfamily = struct.unpack("h",ifreq[16:18])[0]
    if addrfamily == socket.AF_INET:
        ifreq2 = ioctl(s, SIOCGIFNETMASK,struct.pack("16s16x",LOOPBACK_NAME.encode('utf-8')))
        msk = socket.ntohl(struct.unpack("I",ifreq2[20:24])[0])
        dst = socket.ntohl(struct.unpack("I",ifreq[20:24])[0]) & msk
        ifaddr = scapy.utils.inet_ntoa(ifreq[20:24])
        routes.append((dst, msk, "0.0.0.0", LOOPBACK_NAME, ifaddr))
    else:
        warning("Interface lo: unkown address family (%i)"% addrfamily)

    for l in f.readlines()[1:]:
        iff,dst,gw,flags,x,x,x,msk,x,x,x = l.split()
        flags = int(flags,16)
        if flags & RTF_UP == 0:
            continue
        if flags & RTF_REJECT:
            continue
        try:
            ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",iff))
        except IOError: # interface is present in routing tables but does not have any assigned IP
            ifaddr="0.0.0.0"
        else:
            addrfamily = struct.unpack("h",ifreq[16:18])[0]
            if addrfamily == socket.AF_INET:
                ifaddr = scapy.utils.inet_ntoa(ifreq[20:24])
            else:
                warning("Interface %s: unkown address family (%i)"%(iff, addrfamily))
                continue
        routes.append((socket.htonl(int(dst,16))&0xffffffff,
                       socket.htonl(int(msk,16))&0xffffffff,
                       scapy.utils.inet_ntoa(struct.pack("I",int(gw,16))),
                       iff.decode('utf-8'), ifaddr))

    f.close()
    return routes

############
### IPv6 ###
############
项目:scapy-bpf    作者:guedou    | 项目源码 | 文件源码
def read_routes():
    try:
        f=open("/proc/net/route","r")
    except IOError:
        warning("Can't open /proc/net/route !")
        return []
    routes = []
    s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",LOOPBACK_NAME))
    addrfamily = struct.unpack("h",ifreq[16:18])[0]
    if addrfamily == socket.AF_INET:
        ifreq2 = ioctl(s, SIOCGIFNETMASK,struct.pack("16s16x",LOOPBACK_NAME))
        msk = socket.ntohl(struct.unpack("I",ifreq2[20:24])[0])
        dst = socket.ntohl(struct.unpack("I",ifreq[20:24])[0]) & msk
        ifaddr = scapy.utils.inet_ntoa(ifreq[20:24])
        routes.append((dst, msk, "0.0.0.0", LOOPBACK_NAME, ifaddr))
    else:
        warning("Interface lo: unkown address family (%i)"% addrfamily)

    for l in f.readlines()[1:]:
        iff,dst,gw,flags,x,x,x,msk,x,x,x = l.split()
        flags = int(flags,16)
        if flags & RTF_UP == 0:
            continue
        if flags & RTF_REJECT:
            continue
        try:
            ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",iff))
        except IOError: # interface is present in routing tables but does not have any assigned IP
            ifaddr="0.0.0.0"
        else:
            addrfamily = struct.unpack("h",ifreq[16:18])[0]
            if addrfamily == socket.AF_INET:
                ifaddr = scapy.utils.inet_ntoa(ifreq[20:24])
            else:
                warning("Interface %s: unkown address family (%i)"%(iff, addrfamily))
                continue
        routes.append((socket.htonl(long(dst,16))&0xffffffffL,
                       socket.htonl(long(msk,16))&0xffffffffL,
                       scapy.utils.inet_ntoa(struct.pack("I",long(gw,16))),
                       iff, ifaddr))

    f.close()
    return routes

############
### IPv6 ###
############
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def read_routes():
    try:
        f=open("/proc/net/route","r")
    except IOError:
        warning("Can't open /proc/net/route !")
        return []
    routes = []
    s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",LOOPBACK_NAME))
    addrfamily = struct.unpack("h",ifreq[16:18])[0]
    if addrfamily == socket.AF_INET:
        ifreq2 = ioctl(s, SIOCGIFNETMASK,struct.pack("16s16x",LOOPBACK_NAME))
        msk = socket.ntohl(struct.unpack("I",ifreq2[20:24])[0])
        dst = socket.ntohl(struct.unpack("I",ifreq[20:24])[0]) & msk
        ifaddr = scapy.utils.inet_ntoa(ifreq[20:24])
        routes.append((dst, msk, "0.0.0.0", LOOPBACK_NAME, ifaddr))
    else:
        warning("Interface lo: unkown address family (%i)"% addrfamily)

    for l in f.readlines()[1:]:
        iff,dst,gw,flags,x,x,x,msk,x,x,x = l.split()
        flags = int(flags,16)
        if flags & RTF_UP == 0:
            continue
        if flags & RTF_REJECT:
            continue
        try:
            ifreq = ioctl(s, SIOCGIFADDR,struct.pack("16s16x",iff))
        except IOError: # interface is present in routing tables but does not have any assigned IP
            ifaddr="0.0.0.0"
        else:
            addrfamily = struct.unpack("h",ifreq[16:18])[0]
            if addrfamily == socket.AF_INET:
                ifaddr = scapy.utils.inet_ntoa(ifreq[20:24])
            else:
                warning("Interface %s: unkown address family (%i)"%(iff, addrfamily))
                continue
        routes.append((socket.htonl(long(dst,16))&0xffffffffL,
                       socket.htonl(long(msk,16))&0xffffffffL,
                       scapy.utils.inet_ntoa(struct.pack("I",long(gw,16))),
                       iff, ifaddr))

    f.close()
    return routes

############
### IPv6 ###
############