Python socket 模块,CMSG_LEN 实例源码

我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用socket.CMSG_LEN

项目:pysnmp    作者:etingof    | 项目源码 | 文件源码
def getRecvFrom(addressType):
        def recvfrom(s, sz):
            _to = None
            data, ancdata, msg_flags, _from = s.recvmsg(sz, socket.CMSG_LEN(sz))
            for anc in ancdata:
                if anc[0] == socket.SOL_IP and anc[1] == socket.IP_PKTINFO:
                    addr = in_pktinfo.from_buffer_copy(anc[2])
                    addr = ipaddress.IPv4Address(memoryview(addr.ipi_addr).tobytes())
                    _to = (str(addr), s.getsockname()[1])
                elif anc[0] == socket.SOL_IPV6 and anc[1] == socket.IPV6_PKTINFO:
                    addr = in6_pktinfo.from_buffer_copy(anc[2])
                    addr = ipaddress.ip_address(memoryview(addr.ipi6_addr).tobytes())
                    _to = (str(addr), s.getsockname()[1])
            return data, addressType(_from).setLocalAddress(_to)

        return recvfrom
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testCMSG_LEN(self):
        # Test CMSG_LEN() with various valid and invalid values,
        # checking the assumptions used by recvmsg() and sendmsg().
        toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1
        values = list(range(257)) + list(range(toobig - 257, toobig))

        # struct cmsghdr has at least three members, two of which are ints
        self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2)
        for n in values:
            ret = socket.CMSG_LEN(n)
            # This is how recvmsg() calculates the data size
            self.assertEqual(ret - socket.CMSG_LEN(0), n)
            self.assertLessEqual(ret, self.socklen_t_limit)

        self.assertRaises(OverflowError, socket.CMSG_LEN, -1)
        # sendmsg() shares code with these functions, and requires
        # that it reject values over the limit.
        self.assertRaises(OverflowError, socket.CMSG_LEN, toobig)
        self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testCMSG_SPACE(self):
        # Test CMSG_SPACE() with various valid and invalid values,
        # checking the assumptions used by sendmsg().
        toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1
        values = list(range(257)) + list(range(toobig - 257, toobig))

        last = socket.CMSG_SPACE(0)
        # struct cmsghdr has at least three members, two of which are ints
        self.assertGreater(last, array.array("i").itemsize * 2)
        for n in values:
            ret = socket.CMSG_SPACE(n)
            self.assertGreaterEqual(ret, last)
            self.assertGreaterEqual(ret, socket.CMSG_LEN(n))
            self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0))
            self.assertLessEqual(ret, self.socklen_t_limit)
            last = ret

        self.assertRaises(OverflowError, socket.CMSG_SPACE, -1)
        # sendmsg() shares code with these functions, and requires
        # that it reject values over the limit.
        self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig)
        self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def recvfds(sock, size):
        '''Receive an array of fds over an AF_UNIX socket.'''
        a = array.array('i')
        bytes_size = a.itemsize * size
        msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
        if not msg and not ancdata:
            raise EOFError
        try:
            if ACKNOWLEDGE:
                sock.send(b'A')
            if len(ancdata) != 1:
                raise RuntimeError('received %d items of ancdata' %
                                   len(ancdata))
            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
            if (cmsg_level == socket.SOL_SOCKET and
                cmsg_type == socket.SCM_RIGHTS):
                if len(cmsg_data) % a.itemsize != 0:
                    raise ValueError
                a.frombytes(cmsg_data)
                assert len(a) % 256 == msg[0]
                return list(a)
        except (ValueError, IndexError):
            pass
        raise RuntimeError('Invalid data received')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testCMSG_LEN(self):
        # Test CMSG_LEN() with various valid and invalid values,
        # checking the assumptions used by recvmsg() and sendmsg().
        toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1
        values = list(range(257)) + list(range(toobig - 257, toobig))

        # struct cmsghdr has at least three members, two of which are ints
        self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2)
        for n in values:
            ret = socket.CMSG_LEN(n)
            # This is how recvmsg() calculates the data size
            self.assertEqual(ret - socket.CMSG_LEN(0), n)
            self.assertLessEqual(ret, self.socklen_t_limit)

        self.assertRaises(OverflowError, socket.CMSG_LEN, -1)
        # sendmsg() shares code with these functions, and requires
        # that it reject values over the limit.
        self.assertRaises(OverflowError, socket.CMSG_LEN, toobig)
        self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testCMSG_SPACE(self):
        # Test CMSG_SPACE() with various valid and invalid values,
        # checking the assumptions used by sendmsg().
        toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1
        values = list(range(257)) + list(range(toobig - 257, toobig))

        last = socket.CMSG_SPACE(0)
        # struct cmsghdr has at least three members, two of which are ints
        self.assertGreater(last, array.array("i").itemsize * 2)
        for n in values:
            ret = socket.CMSG_SPACE(n)
            self.assertGreaterEqual(ret, last)
            self.assertGreaterEqual(ret, socket.CMSG_LEN(n))
            self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0))
            self.assertLessEqual(ret, self.socklen_t_limit)
            last = ret

        self.assertRaises(OverflowError, socket.CMSG_SPACE, -1)
        # sendmsg() shares code with these functions, and requires
        # that it reject values over the limit.
        self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig)
        self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testSingleCmsgTruncInData(self):
        # Test truncation of a control message inside its associated
        # data.  The message may be returned with its data truncated,
        # or not returned at all.
        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
                                  socket.IPV6_RECVHOPLIMIT, 1)
        self.misc_event.set()
        msg, ancdata, flags, addr = self.doRecvmsg(
            self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1)

        self.assertEqual(msg, MSG)
        self.checkRecvmsgAddress(addr, self.cli_addr)
        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)

        self.assertLessEqual(len(ancdata), 1)
        if ancdata:
            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
            self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
            self.assertLess(len(cmsg_data), SIZEOF_INT)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testCMSG_LEN(self):
        # Test CMSG_LEN() with various valid and invalid values,
        # checking the assumptions used by recvmsg() and sendmsg().
        toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1
        values = list(range(257)) + list(range(toobig - 257, toobig))

        # struct cmsghdr has at least three members, two of which are ints
        self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2)
        for n in values:
            ret = socket.CMSG_LEN(n)
            # This is how recvmsg() calculates the data size
            self.assertEqual(ret - socket.CMSG_LEN(0), n)
            self.assertLessEqual(ret, self.socklen_t_limit)

        self.assertRaises(OverflowError, socket.CMSG_LEN, -1)
        # sendmsg() shares code with these functions, and requires
        # that it reject values over the limit.
        self.assertRaises(OverflowError, socket.CMSG_LEN, toobig)
        self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testCMSG_SPACE(self):
        # Test CMSG_SPACE() with various valid and invalid values,
        # checking the assumptions used by sendmsg().
        toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1
        values = list(range(257)) + list(range(toobig - 257, toobig))

        last = socket.CMSG_SPACE(0)
        # struct cmsghdr has at least three members, two of which are ints
        self.assertGreater(last, array.array("i").itemsize * 2)
        for n in values:
            ret = socket.CMSG_SPACE(n)
            self.assertGreaterEqual(ret, last)
            self.assertGreaterEqual(ret, socket.CMSG_LEN(n))
            self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0))
            self.assertLessEqual(ret, self.socklen_t_limit)
            last = ret

        self.assertRaises(OverflowError, socket.CMSG_SPACE, -1)
        # sendmsg() shares code with these functions, and requires
        # that it reject values over the limit.
        self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig)
        self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testSingleCmsgTruncInData(self):
        # Test truncation of a control message inside its associated
        # data.  The message may be returned with its data truncated,
        # or not returned at all.
        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
                                  socket.IPV6_RECVHOPLIMIT, 1)
        self.misc_event.set()
        msg, ancdata, flags, addr = self.doRecvmsg(
            self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1)

        self.assertEqual(msg, MSG)
        self.checkRecvmsgAddress(addr, self.cli_addr)
        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)

        self.assertLessEqual(len(ancdata), 1)
        if ancdata:
            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
            self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
            self.assertLess(len(cmsg_data), SIZEOF_INT)
项目:python-cookbook-3rd    作者:tuanavu    | 项目源码 | 文件源码
def recv_fd(sock):
    '''
    Receive a single file descriptor
    '''
    msg, ancdata, flags, addr = sock.recvmsg(1,
                                             socket.CMSG_LEN(struct.calcsize('i')))

    cmsg_level, cmsg_type, cmsg_data = ancdata[0]
    assert cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS
    sock.sendall(b'OK')
    return struct.unpack('i', cmsg_data)[0]
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testFDPassCMSG_LEN(self):
        # Test using CMSG_LEN() to calculate ancillary buffer size.
        self.checkRecvmsgFDs(1,
                             self.doRecvmsg(self.serv_sock, len(MSG),
                                            socket.CMSG_LEN(4 * SIZEOF_INT)),
                             # RFC 3542 says implementations may set
                             # MSG_CTRUNC if there isn't enough space
                             # for trailing padding.
                             ignoreflags=socket.MSG_CTRUNC)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testFDPassSeparateMinSpace(self):
        # Pass two FDs in two separate arrays, receiving them into the
        # minimum space for two arrays.
        self.checkRecvmsgFDs(2,
                             self.doRecvmsg(self.serv_sock, len(MSG),
                                            socket.CMSG_SPACE(SIZEOF_INT) +
                                            socket.CMSG_LEN(SIZEOF_INT)),
                             maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testCmsgTruncLen0Minus1(self):
        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
                                                 socket.CMSG_LEN(0) - 1))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testCmsgTruncLen0Plus1(self):
        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testCmsgTruncLen1(self):
        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT),
                                 maxdata=SIZEOF_INT)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testCmsgTruncLen2Minus1(self):
        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1,
                                 maxdata=(2 * SIZEOF_INT) - 1)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _testRecvHopLimitCMSG_SPACE(self):
        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
        self.sendToServer(MSG)

    # Could test receiving into buffer sized using CMSG_LEN, but RFC
    # 3542 says portable applications must provide space for trailing
    # padding.  Implementations may set MSG_CTRUNC if there isn't
    # enough space for the padding.
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testSingleCmsgTruncLen0Minus1(self):
        self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testSecondCmsgTruncLen0Minus1(self):
        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
                                        socket.CMSG_LEN(0) - 1)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testSecomdCmsgTruncInData(self):
        # Test truncation of the second of two control messages inside
        # its associated data.
        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
                                  socket.IPV6_RECVHOPLIMIT, 1)
        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
                                  socket.IPV6_RECVTCLASS, 1)
        self.misc_event.set()
        msg, ancdata, flags, addr = self.doRecvmsg(
            self.serv_sock, len(MSG),
            socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1)

        self.assertEqual(msg, MSG)
        self.checkRecvmsgAddress(addr, self.cli_addr)
        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)

        cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}

        cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
        cmsg_types.remove(cmsg_type)
        self.assertEqual(len(cmsg_data), SIZEOF_INT)
        a = array.array("i")
        a.frombytes(cmsg_data)
        self.assertGreaterEqual(a[0], 0)
        self.assertLessEqual(a[0], 255)

        if ancdata:
            cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
            cmsg_types.remove(cmsg_type)
            self.assertLess(len(cmsg_data), SIZEOF_INT)

        self.assertEqual(ancdata, [])
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def getRecvFrom(addressType):
        def recvfrom(s, sz):
            _to = None
            data, ancdata, msg_flags, _from = s.recvmsg(sz, socket.CMSG_LEN(sz))
            for anc in ancdata:
                if anc[0] == socket.SOL_IP and anc[1] == socket.IP_PKTINFO:
                    addr = in_pktinfo.from_buffer_copy(anc[2])
                    addr = ipaddress.IPv4Address(memoryview(addr.ipi_addr).tobytes())
                    _to = (str(addr), s.getsockname()[1])
                elif anc[0] == socket.SOL_IPV6 and anc[1] == socket.IPV6_PKTINFO:
                    addr = in6_pktinfo.from_buffer_copy(anc[2])
                    addr = ipaddress.ip_address(memoryview(addr.ipi6_addr).tobytes())
                    _to = (str(addr), s.getsockname()[1])
            return data, addressType(_from).setLocalAddress(_to)
        return recvfrom
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testFDPassCMSG_LEN(self):
        # Test using CMSG_LEN() to calculate ancillary buffer size.
        self.checkRecvmsgFDs(1,
                             self.doRecvmsg(self.serv_sock, len(MSG),
                                            socket.CMSG_LEN(4 * SIZEOF_INT)),
                             # RFC 3542 says implementations may set
                             # MSG_CTRUNC if there isn't enough space
                             # for trailing padding.
                             ignoreflags=socket.MSG_CTRUNC)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testFDPassSeparateMinSpace(self):
        # Pass two FDs in two separate arrays, receiving them into the
        # minimum space for two arrays.
        self.checkRecvmsgFDs(2,
                             self.doRecvmsg(self.serv_sock, len(MSG),
                                            socket.CMSG_SPACE(SIZEOF_INT) +
                                            socket.CMSG_LEN(SIZEOF_INT)),
                             maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testCmsgTruncLen0(self):
        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testCmsgTruncLen0Plus1(self):
        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testCmsgTruncLen1(self):
        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT),
                                 maxdata=SIZEOF_INT)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testCmsgTruncLen2Minus1(self):
        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1,
                                 maxdata=(2 * SIZEOF_INT) - 1)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _testRecvHopLimitCMSG_SPACE(self):
        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
        self.sendToServer(MSG)

    # Could test receiving into buffer sized using CMSG_LEN, but RFC
    # 3542 says portable applications must provide space for trailing
    # padding.  Implementations may set MSG_CTRUNC if there isn't
    # enough space for the padding.
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testSecondCmsgTruncLen0Minus1(self):
        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
                                        socket.CMSG_LEN(0) - 1)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testSecomdCmsgTruncInData(self):
        # Test truncation of the second of two control messages inside
        # its associated data.
        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
                                  socket.IPV6_RECVHOPLIMIT, 1)
        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
                                  socket.IPV6_RECVTCLASS, 1)
        self.misc_event.set()
        msg, ancdata, flags, addr = self.doRecvmsg(
            self.serv_sock, len(MSG),
            socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1)

        self.assertEqual(msg, MSG)
        self.checkRecvmsgAddress(addr, self.cli_addr)
        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)

        cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}

        cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
        cmsg_types.remove(cmsg_type)
        self.assertEqual(len(cmsg_data), SIZEOF_INT)
        a = array.array("i")
        a.frombytes(cmsg_data)
        self.assertGreaterEqual(a[0], 0)
        self.assertLessEqual(a[0], 255)

        if ancdata:
            cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
            cmsg_types.remove(cmsg_type)
            self.assertLess(len(cmsg_data), SIZEOF_INT)

        self.assertEqual(ancdata, [])
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def recvfds(sock, size):
        '''Receive an array of fds over an AF_UNIX socket.'''
        a = array.array('i')
        bytes_size = a.itemsize * size
        msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
        if not msg and not ancdata:
            raise EOFError
        try:
            if ACKNOWLEDGE:
                sock.send(b'A')
            if len(ancdata) != 1:
                raise RuntimeError('received %d items of ancdata' %
                                   len(ancdata))
            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
            if (cmsg_level == socket.SOL_SOCKET and
                cmsg_type == socket.SCM_RIGHTS):
                if len(cmsg_data) % a.itemsize != 0:
                    raise ValueError
                a.frombytes(cmsg_data)
                assert len(a) % 256 == msg[0]
                return list(a)
        except (ValueError, IndexError):
            pass
        raise RuntimeError('Invalid data received')
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testFDPassCMSG_LEN(self):
        # Test using CMSG_LEN() to calculate ancillary buffer size.
        self.checkRecvmsgFDs(1,
                             self.doRecvmsg(self.serv_sock, len(MSG),
                                            socket.CMSG_LEN(4 * SIZEOF_INT)),
                             # RFC 3542 says implementations may set
                             # MSG_CTRUNC if there isn't enough space
                             # for trailing padding.
                             ignoreflags=socket.MSG_CTRUNC)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testFDPassSeparateMinSpace(self):
        # Pass two FDs in two separate arrays, receiving them into the
        # minimum space for two arrays.
        self.checkRecvmsgFDs(2,
                             self.doRecvmsg(self.serv_sock, len(MSG),
                                            socket.CMSG_SPACE(SIZEOF_INT) +
                                            socket.CMSG_LEN(SIZEOF_INT)),
                             maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testCmsgTruncLen0(self):
        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testCmsgTruncLen0Plus1(self):
        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testCmsgTruncLen1(self):
        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT),
                                 maxdata=SIZEOF_INT)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testCmsgTruncLen2Minus1(self):
        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1,
                                 maxdata=(2 * SIZEOF_INT) - 1)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _testRecvHopLimitCMSG_SPACE(self):
        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
        self.sendToServer(MSG)

    # Could test receiving into buffer sized using CMSG_LEN, but RFC
    # 3542 says portable applications must provide space for trailing
    # padding.  Implementations may set MSG_CTRUNC if there isn't
    # enough space for the padding.
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testSecondCmsgTruncLen0Minus1(self):
        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
                                        socket.CMSG_LEN(0) - 1)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testSecomdCmsgTruncInData(self):
        # Test truncation of the second of two control messages inside
        # its associated data.
        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
                                  socket.IPV6_RECVHOPLIMIT, 1)
        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
                                  socket.IPV6_RECVTCLASS, 1)
        self.misc_event.set()
        msg, ancdata, flags, addr = self.doRecvmsg(
            self.serv_sock, len(MSG),
            socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1)

        self.assertEqual(msg, MSG)
        self.checkRecvmsgAddress(addr, self.cli_addr)
        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)

        cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}

        cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
        cmsg_types.remove(cmsg_type)
        self.assertEqual(len(cmsg_data), SIZEOF_INT)
        a = array.array("i")
        a.frombytes(cmsg_data)
        self.assertGreaterEqual(a[0], 0)
        self.assertLessEqual(a[0], 255)

        if ancdata:
            cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
            cmsg_types.remove(cmsg_type)
            self.assertLess(len(cmsg_data), SIZEOF_INT)

        self.assertEqual(ancdata, [])
项目:PyQYT    作者:collinsctk    | 项目源码 | 文件源码
def getRecvFrom(addressType):
        def recvfrom(s, sz):
            _to = None
            data, ancdata, msg_flags, _from = s.recvmsg(sz, socket.CMSG_LEN(sz))
            for anc in ancdata:
                if anc[0] == socket.SOL_IP and anc[1] == socket.IP_PKTINFO:
                    addr = in_pktinfo.from_buffer_copy(anc[2])
                    addr = ipaddress.IPv4Address(memoryview(addr.ipi_addr).tobytes())
                    _to = (str(addr), s.getsockname()[1])
                elif anc[0] == socket.SOL_IPV6 and anc[1] == socket.IPV6_PKTINFO:
                    addr = in6_pktinfo.from_buffer_copy(anc[2])
                    addr = ipaddress.ip_address(memoryview(addr.ipi6_addr).tobytes())
                    _to = (str(addr), s.getsockname()[1])
            return data, addressType(_from).setLocalAddress(_to)
        return recvfrom