我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用socket.CMSG_SPACE。
def testCMSG_SPACE(self): # Test CMSG_SPACE() with various valid and invalid values, # checking the assumptions used by sendmsg(). toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1 values = list(range(257)) + list(range(toobig - 257, toobig)) last = socket.CMSG_SPACE(0) # struct cmsghdr has at least three members, two of which are ints self.assertGreater(last, array.array("i").itemsize * 2) for n in values: ret = socket.CMSG_SPACE(n) self.assertGreaterEqual(ret, last) self.assertGreaterEqual(ret, socket.CMSG_LEN(n)) self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0)) self.assertLessEqual(ret, self.socklen_t_limit) last = ret self.assertRaises(OverflowError, socket.CMSG_SPACE, -1) # sendmsg() shares code with these functions, and requires # that it reject values over the limit. self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig) self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize)
def recv(self): """Receive as much data as is available. Returns True if any data was received. Will not block. """ data = None try: fds = array.array("i") data, ancdata, msg_flags, address = self._f.recvmsg( 1024, socket.CMSG_SPACE(16 * fds.itemsize)) for cmsg_level, cmsg_type, cmsg_data in ancdata: if (cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS): fds.fromstring(cmsg_data[ :len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) self._incoming_fds.extend(fds) if data: self._decode(data) return True else: raise ServerDisconnected() except socket.error as e: if e.errno == 11: # No data available; would otherwise block return raise
def _testSendmsgExcessCmsgReject(self): if not hasattr(socket, "CMSG_SPACE"): # Can only send one item with self.assertRaises(socket.error) as cm: self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")]) self.assertIsNone(cm.exception.errno) self.sendToServer(b"done")
def testFDPassCMSG_SPACE(self): # Test using CMSG_SPACE() to calculate ancillary buffer size. self.checkRecvmsgFDs( 4, self.doRecvmsg(self.serv_sock, len(MSG), socket.CMSG_SPACE(4 * SIZEOF_INT)))
def testFDPassSeparateMinSpace(self): # Pass two FDs in two separate arrays, receiving them into the # minimum space for two arrays. self.checkRecvmsgFDs(2, self.doRecvmsg(self.serv_sock, len(MSG), socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT)), maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC)
def testRecvHopLimitCMSG_SPACE(self): # Test receiving hop limit, using CMSG_SPACE to calculate buffer size. self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT))
def testSecondCmsgTrunc0(self): self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT), ignoreflags=socket.MSG_CTRUNC)
def testSecondCmsgTrunc1(self): self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1)
def testSecondCmsgTrunc2Int(self): self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 2 * SIZEOF_INT)
def testSecondCmsgTruncLen0Minus1(self): self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(0) - 1)
def testSecomdCmsgTruncInData(self): # Test truncation of the second of two control messages inside # its associated data. self.serv_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVHOPLIMIT, 1) self.serv_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVTCLASS, 1) self.misc_event.set() msg, ancdata, flags, addr = self.doRecvmsg( self.serv_sock, len(MSG), socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1) self.assertEqual(msg, MSG) self.checkRecvmsgAddress(addr, self.cli_addr) self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT} cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) cmsg_types.remove(cmsg_type) self.assertEqual(len(cmsg_data), SIZEOF_INT) a = array.array("i") a.frombytes(cmsg_data) self.assertGreaterEqual(a[0], 0) self.assertLessEqual(a[0], 255) if ancdata: cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) cmsg_types.remove(cmsg_type) self.assertLess(len(cmsg_data), SIZEOF_INT) self.assertEqual(ancdata, [])
def _testSendmsgExcessCmsgReject(self): if not hasattr(socket, "CMSG_SPACE"): # Can only send one item with self.assertRaises(OSError) as cm: self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")]) self.assertIsNone(cm.exception.errno) self.sendToServer(b"done")
def recvmsg(socket, maxSize=8192, cmsgSize=4096, flags=0): """ Receive a message on a socket. @param socket: The socket to receive the message on. @type socket: L{socket.socket} @param maxSize: The maximum number of bytes to receive from the socket using the datagram or stream mechanism. The default maximum is 8192. @type maxSize: L{int} @param cmsgSize: The maximum number of bytes to receive from the socket outside of the normal datagram or stream mechanism. The default maximum is 4096. @type cmsgSize: L{int} @param flags: Flags to affect how the message is sent. See the C{MSG_} constants in the sendmsg(2) manual page. By default no flags are set. @type flags: L{int} @return: A named 3-tuple of the bytes received using the datagram/stream mechanism, a L{list} of L{tuple}s giving ancillary received data, and flags as an L{int} describing the data received. """ if _PY3: # In Twisted's sendmsg.c, the csmg_space is defined as: # int cmsg_size = 4096; # cmsg_space = CMSG_SPACE(cmsg_size); # Since the default in Python 3's socket is 0, we need to define our # own default of 4096. -hawkie data, ancillary, flags = socket.recvmsg( maxSize, CMSG_SPACE(cmsgSize), flags)[0:3] else: data, flags, ancillary = recv1msg( socket.fileno(), flags, maxSize, cmsgSize) return RecievedMessage(data=data, ancillary=ancillary, flags=flags)