Python socket 模块,recv() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.recv()

项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def _raise_connection_failure(self, error):
        # Catch *all* exceptions from socket methods and close the socket. In
        # regular Python, socket operations only raise socket.error, even if
        # the underlying cause was a Ctrl-C: a signal raised during socket.recv
        # is expressed as an EINTR error from poll. See internal_select_ex() in
        # socketmodule.c. All error codes from poll become socket.error at
        # first. Eventually in PyEval_EvalFrameEx the interpreter checks for
        # signals and throws KeyboardInterrupt into the current frame on the
        # main thread.
        #
        # But in Gevent and Eventlet, the polling mechanism (epoll, kqueue,
        # ...) is called in Python code, which experiences the signal as a
        # KeyboardInterrupt from the start, rather than as an initial
        # socket.error, so we catch that, close the socket, and reraise it.
        self.close()
        if isinstance(error, socket.error):
            _raise_connection_failure(self.address, error)
        else:
            raise error
项目:temboard-agent    作者:dalibo    | 项目源码 | 文件源码
def _wrap_ssl_socket(self, tmp_socket,):
        """
        Wrap the socket with SSL layer.
        """
        if self._ssl is True:
            data = self._protocol.ssl_request()
            tmp_socket.send(data)
            res = tmp_socket.recv(self._socket_read_length)
            if self._protocol.is_error(res[0]):
                raise error('PGC103', 'FATAL', "SSL error")
            if self._protocol.parse_ssl_response(res):
                self._socket = ssl.wrap_socket(tmp_socket)
            else:
                self._socket = tmp_socket
        else:
            self._socket = tmp_socket
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def test_get_hashrate(mock_connect, event_dicts, side_effect, outcome):
    """ Mocks socket.recv function to test various payloads while getting hashrate """
    if event_dicts is None:
        event_bytes = b""
    else:
        event_str = "\n".join([json.dumps(event) for event in event_dicts]) + "\n"
        event_bytes = event_str.encode()

    with mock.patch.object(bitcoin_computer.socket.socket, "recv") as mock_recv:
        # forces the return value on recv to the list of events given
        mock_recv.return_value = event_bytes
        mock_recv.side_effect = side_effect

        if isinstance(outcome, (int, float)):
            # ensures the proper output value
            assert bitcoin_computer.get_hashrate("15min") == outcome
        else:
            # When the statistics event is not given a TimeoutError will occur
            with pytest.raises(outcome):
                bitcoin_computer.get_hashrate("15min")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testRecv(self):
        # Testing non-blocking recv
        conn, addr = self.serv.accept()
        conn.setblocking(0)
        try:
            msg = conn.recv(len(MSG))
        except socket.error:
            pass
        else:
            self.fail("Error trying to do non-blocking recv.")
        read, write, err = select.select([conn], [], [])
        if conn in read:
            msg = conn.recv(len(MSG))
            conn.close()
            self.assertEqual(msg, MSG)
        else:
            self.fail("Error during select call to non-blocking socket.")
项目:P2PChat    作者:whcacademy    | 项目源码 | 文件源码
def socketOperation(socket, sendMessage, receive = True):
    try:
        socket.send(sendMessage.encode('ascii'))
    except IOError as errmsg:
        print('socket', socket, ' sending error: ', errmsg)
        return Exceptions['SOCKET_ERROR']
    if receive:
        try:
            responseData = socket.recv(BUFSIZ)
        except IOError as errmsg:
            print('socket', socket, ' receving error: ', errmsg)
            return Exceptions['SOCKET_ERROR']
        return responseData.decode('ascii')

#
# functions for blocking socket to send and recv message
# with timeout option, return Exception['TIMEOUT'] if timeout
# para: timeout (type-> seconds) 
# will return timeout exception if timeout occurs
#
项目:P2PChat    作者:whcacademy    | 项目源码 | 文件源码
def socketOperationTimeout(socket, sendMessage, timeout):
    readList = [socket]
    try:
        socket.send(sendMessage.encode('ascii'))
    except OSError as errmsg:
        print('socket sending error: ', errmsg)
        return Exceptions['SOCKET_ERROR']
    # realize timeout feature by select
    available = select(readList, [], [], timeout)
    if available:
        sockfd = readList[0]
        try:
            responseData = sockfd.recv(BUFSIZ)
            return responseData.decode('ascii')
        except OSError as errmsg:
            print('socket receving error: ', errmsg)
            return Exceptions['SOCKET_ERROR']
    else:
        return Exceptions['TIMEOUT']

# abstraction for checking exit status
# must be inside of stateLock
项目:P2PChat    作者:whcacademy    | 项目源码 | 文件源码
def socketOperation(socket, sendMessage, receive = True):
    try:
        socket.send(sendMessage.encode('ascii'))
    except IOError as errmsg:
        print('socket', socket, ' sending error: ', errmsg)
        return Exceptions['SOCKET_ERROR']
    if receive:
        try:
            responseData = socket.recv(BUFSIZ)
        except IOError as errmsg:
            print('socket', socket, ' receving error: ', errmsg)
            return Exceptions['SOCKET_ERROR']
        return responseData.decode('ascii')

# abstraction for checking exit status
# must be inside of stateLock
项目:P2PChat    作者:whcacademy    | 项目源码 | 文件源码
def socketOperationTimeout(socket, sendMessage, timeout):
    readList = [socket]
    try:
        socket.send(sendMessage.encode('ascii'))
    except OSError as errmsg:
        print('socket sending error: ', errmsg)
        return Exceptions['SOCKET_ERROR']
    readable, writeable, exceptions  = select(readList, [], [], timeout)
    if readable:
        sockfd = readable[0]
        try:
            responseData = sockfd.recv(BUFSIZ)
            return responseData.decode('ascii')
        except OSError as errmsg:
            print('socket receving error: ', errmsg)
            return Exceptions['SOCKET_ERROR']
    else:
        return Exceptions['TIMEOUT']
#
# functions for facilitation threads of keep alive procedure
# resend 'JOIN' request ever 20 seconds after successfully joining
#
项目:enteletaor    作者:cr0hn    | 项目源码 | 文件源码
def brute_zmq(host, port=5555, user=None, password=None, db=0):

    context = zmq.Context()

    # Configure
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.SUBSCRIBE, b"")  # All topics
    socket.setsockopt(zmq.LINGER, 0)  # All topics
    socket.RCVTIMEO = 1000  # timeout: 1 sec

    # Connect
    socket.connect("tcp://%s:%s" % (host, port))

    # Try to receive
    try:
        socket.recv()

        return True
    except Exception:
        return False
    finally:
        socket.close()
项目:enteletaor    作者:cr0hn    | 项目源码 | 文件源码
def handle_zmq(host, port=5555, extra_config=None):

    # log.debug("      * Connection to ZeroMQ: %s : %s" % (host, port))

    context = zmq.Context()

    # Configure
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.SUBSCRIBE, b"")  # All topics
    socket.setsockopt(zmq.LINGER, 0)  # All topics
    socket.RCVTIMEO = 1000  # timeout: 1 sec

    # Connect
    socket.connect("tcp://%s:%s" % (host, port))

    # Try to receive
    try:
        socket.recv()

        return True
    except Exception:
        return False
    finally:
        socket.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def testRecv(self):
        # Testing non-blocking recv
        conn, addr = self.serv.accept()
        conn.setblocking(0)
        try:
            msg = conn.recv(len(MSG))
        except socket.error:
            pass
        else:
            self.fail("Error trying to do non-blocking recv.")
        read, write, err = select.select([conn], [], [])
        if conn in read:
            msg = conn.recv(len(MSG))
            conn.close()
            self.assertEqual(msg, MSG)
        else:
            self.fail("Error during select call to non-blocking socket.")
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def testRecv(self):
        # Testing non-blocking recv
        conn, addr = self.serv.accept()
        conn.setblocking(0)
        try:
            msg = conn.recv(len(MSG))
        except socket.error:
            pass
        else:
            self.fail("Error trying to do non-blocking recv.")
        read, write, err = select.select([conn], [], [])
        if conn in read:
            msg = conn.recv(len(MSG))
            conn.close()
            self.assertEqual(msg, MSG)
        else:
            self.fail("Error during select call to non-blocking socket.")
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def _raise_connection_failure(self, error):
        # Catch *all* exceptions from socket methods and close the socket. In
        # regular Python, socket operations only raise socket.error, even if
        # the underlying cause was a Ctrl-C: a signal raised during socket.recv
        # is expressed as an EINTR error from poll. See internal_select_ex() in
        # socketmodule.c. All error codes from poll become socket.error at
        # first. Eventually in PyEval_EvalFrameEx the interpreter checks for
        # signals and throws KeyboardInterrupt into the current frame on the
        # main thread.
        #
        # But in Gevent and Eventlet, the polling mechanism (epoll, kqueue,
        # ...) is called in Python code, which experiences the signal as a
        # KeyboardInterrupt from the start, rather than as an initial
        # socket.error, so we catch that, close the socket, and reraise it.
        self.close()
        if isinstance(error, socket.error):
            _raise_connection_failure(self.address, error)
        else:
            raise error
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testRecv(self):
        # Testing non-blocking recv
        conn, addr = self.serv.accept()
        conn.setblocking(0)
        try:
            msg = conn.recv(len(MSG))
        except socket.error:
            pass
        else:
            self.fail("Error trying to do non-blocking recv.")
        read, write, err = select.select([conn], [], [])
        if conn in read:
            msg = conn.recv(len(MSG))
            conn.close()
            self.assertEqual(msg, MSG)
        else:
            self.fail("Error during select call to non-blocking socket.")
项目:mgr.p2p.proxy    作者:tomusdrw    | 项目源码 | 文件源码
def serve( self, handler, n=None ):
        """serve (forever or for n communicaions).

        - receive data
        - call result = handler(data)
        - send back result if not None

        The serving can be stopped by SIGINT.

        :TODO:
            - how to stop?
              maybe use a .run-file, and stop server if file removed?
            - maybe make n_current accessible? (e.g. for logging)
        """
        n_current = 0
        while 1:
            if n is not None  and  n_current >= n:
                break
            data = self.recv()
            result = handler(data)
            if result is not None:
                self.send( result )
            n_current += 1
项目:passbytcp    作者:mxdg    | 项目源码 | 文件源码
def select_recv(conn, buff_size, timeout=None):
    """add timeout for socket.recv()
    :type conn: socket.SocketType
    :type buff_size: int
    :type timeout: float
    :rtype: Union[bytes, None]
    """
    rlist, _, _ = select.select([conn], [], [], timeout)
    if not rlist:
        # timeout
        raise RuntimeError("recv timeout")

    buff = conn.recv(buff_size)
    if not buff:
        raise RuntimeError("received zero bytes, socket was closed")

    return buff
项目:passbytcp    作者:mxdg    | 项目源码 | 文件源码
def select_recv(conn, buff_size, timeout=None):
    """add timeout for socket.recv()
    :type conn: socket.SocketType
    :type buff_size: int
    :type timeout: float
    :rtype: Union[bytes, None]
    """
    rlist, _, _ = select.select([conn], [], [], timeout)
    if not rlist:
        # timeout
        raise RuntimeError("recv timeout")

    buff = conn.recv(buff_size)
    if not buff:
        raise RuntimeError("received zero bytes, socket was closed")

    return buff
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def _raise_connection_failure(self, error):
        # Catch *all* exceptions from socket methods and close the socket. In
        # regular Python, socket operations only raise socket.error, even if
        # the underlying cause was a Ctrl-C: a signal raised during socket.recv
        # is expressed as an EINTR error from poll. See internal_select_ex() in
        # socketmodule.c. All error codes from poll become socket.error at
        # first. Eventually in PyEval_EvalFrameEx the interpreter checks for
        # signals and throws KeyboardInterrupt into the current frame on the
        # main thread.
        #
        # But in Gevent and Eventlet, the polling mechanism (epoll, kqueue,
        # ...) is called in Python code, which experiences the signal as a
        # KeyboardInterrupt from the start, rather than as an initial
        # socket.error, so we catch that, close the socket, and reraise it.
        self.close()
        if isinstance(error, socket.error):
            _raise_connection_failure(self.address, error)
        else:
            raise error
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def testRecv(self):
        # Testing non-blocking recv
        conn, addr = self.serv.accept()
        conn.setblocking(0)
        try:
            msg = conn.recv(len(MSG))
        except socket.error:
            pass
        else:
            self.fail("Error trying to do non-blocking recv.")
        read, write, err = select.select([conn], [], [])
        if conn in read:
            msg = conn.recv(len(MSG))
            conn.close()
            self.assertEqual(msg, MSG)
        else:
            self.fail("Error during select call to non-blocking socket.")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testRecv(self):
        # Testing non-blocking recv
        conn, addr = self.serv.accept()
        conn.setblocking(0)
        try:
            msg = conn.recv(len(MSG))
        except OSError:
            pass
        else:
            self.fail("Error trying to do non-blocking recv.")
        read, write, err = select.select([conn], [], [])
        if conn in read:
            msg = conn.recv(len(MSG))
            conn.close()
            self.assertEqual(msg, MSG)
        else:
            self.fail("Error during select call to non-blocking socket.")
项目:pypilot    作者:pypilot    | 项目源码 | 文件源码
def recv(self):
        return self.b.recv()
项目:pypilot    作者:pypilot    | 项目源码 | 文件源码
def recv(self):
        size = 4096
        data = self.socket.recv(size)

        l = len(data)
        if l == 0:
            return False

        self.in_buffer += data
        if l == size:
            return l+self.recv()
        return l
项目:pypilot    作者:pypilot    | 项目源码 | 文件源码
def PollSockets(self):
        events = self.poller.poll(0)
        while events:
            event = events.pop()
            fd, flag = event
            socket = self.fd_to_socket[fd]
            if socket == self.server_socket:
                connection, address = socket.accept()
                if len(self.sockets) == max_connections:
                    print 'max connections reached!!!', len(self.sockets)
                    self.RemoveSocket(self.sockets[0]) # dump first socket??

                socket = LineBufferedNonBlockingSocket(connection)
                self.sockets.append(socket)
                fd = socket.socket.fileno()
                # print 'new client', address, fd
                self.fd_to_socket[fd] = socket
                self.poller.register(fd, select.POLLIN)
            elif flag & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
                self.RemoveSocket(socket)
            elif flag & select.POLLIN:
                if not socket.recv():
                    self.RemoveSocket(socket)
                while True:
                    line = socket.readline()
                    if not line:
                        break
                    try:
                        self.HandleRequest(socket, line)
                    except:
                        print 'invalid request from socket', line
                        socket.send('invalid request: ' + line + '\n')

        # flush all sockets
        for socket in self.sockets:
            socket.flush()
项目:ave    作者:sonyxperiadev    | 项目源码 | 文件源码
def t37(control, remote, pipe):
    pretty = '%s t37' % __file__
    print(pretty)

    remote = RemoteControl(remote.address, remote.authkey, 0.5)
    try:
        garbage = remote.make_garbage()
    except Exception, e:
        print('FAIL %s: could not handle garbage: %s' % (pretty, e))
        return False

    return True

# check that client receives OverflowError if size field of RPC message exceeds
# system limit for socket.recv().
项目:ave    作者:sonyxperiadev    | 项目源码 | 文件源码
def t37(control, remote, pipe):
    pretty = '%s t37' % __file__
    print(pretty)

    remote = RemoteControl(remote.address, remote.authkey, 0.5)
    try:
        garbage = remote.make_garbage()
    except Exception, e:
        print('FAIL %s: could not handle garbage: %s' % (pretty, e))
        return False

    return True

# check that client receives OverflowError if size field of RPC message exceeds
# system limit for socket.recv().
项目:cbor_py    作者:brianolson    | 项目源码 | 文件源码
def read(self, num):
        start = time.time()
        data = self.socket.recv(num)
        while len(data) < num:
            now = time.time()
            if now > (start + self.timeout_seconds):
                break
            ndat = self.socket.recv(num - len(data))
            if ndat:
                data += ndat
        return data
项目:fdslight    作者:fdslight    | 项目源码 | 文件源码
def evt_read(self):
        if self.__is_listen_socket:
            self.tcp_accept()
            return

        if self.__is_async_socket_client and not self.is_conn_ok():
            self.__conn_ev_flag = 1
            return

        while 1:
            try:
                recv_data = self.socket.recv(4096)
                if not recv_data:
                    self.error()
                    break
                self.reader._putvalue(self.handle_tcp_received_data(recv_data))
            except BlockingIOError:
                self.tcp_readable()
                break
            except ConnectionResetError:
                self.error()
                break
            except ConnectionError:
                self.error()
                break
            ''''''
        return
项目:fdslight    作者:fdslight    | 项目源码 | 文件源码
def handle_tcp_received_data(self, received_data):
        """????????????,????socket.recv???????
        :param received_data:
        :return bytes:
        """
        return received_data
项目:temboard-agent    作者:dalibo    | 项目源码 | 文件源码
def _socket_read(self,):
        """
        Read (from the socket), write (into the buffer)
        """
        self._message_buffer.truncate()
        while not self._message_buffer.is_eop(self._protocol.get_eop_tags()):
            try:
                raw_data = self._socket.recv(self._socket_read_length)
            except socket.timeout as err:
                raise error('PGC105', 'FATAL', "Timeout")
            except socket.error as err:
                raise error('PGC106', 'FATAL', "Socket error: {msg}".format(
                                                                    msg=err))
            self._message_buffer.write(raw_data)
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def test_get_hashrate_inputs(mock_recv, mock_connect, hashrate_sample, outcome):
    """ Ensures input values are checked and handled correctly """
    # sets up the return value for socket.recv
    mock_recv.return_value = str(json.dumps(STAT_EVENT_HASHRATE)+"\n").encode()

    # ensures the proper output value
    if isinstance(outcome, (int, float)):
        assert bitcoin_computer.get_hashrate(hashrate_sample) == outcome
    else:
        # When raises exception when invalid input is given
        with pytest.raises(outcome):
            bitcoin_computer.get_hashrate(hashrate_sample)
项目:pbauto-python    作者:PandorasBoxSDK    | 项目源码 | 文件源码
def send(self, data, wait_for_response):
        # header consists of magic "PBAU" sequence
        # + protocol version (byte, currently 1)
        # + domain id (integer)
        # + message size (short)
        # + connection id (int, user definable, defaults to 0)
        # + protocol flag (byte, 0 for TCP)
        # + checksum (byte)

        header = struct.pack("!BlhlB", 1, self.domain, len(data), 0, 0)

        checksum = struct.pack("!B", sum(bytearray(header)) % 255)
        self.__sock.sendall(b'PBAU' + header + checksum + bytes(data))

        if wait_for_response:
            # receive only header (17 bytes)
            # TODO # WARNING: This does not take into account
            # TODO # cases where socket.recv returns less than 16
            # TODO # in practice this will never happen though.
            header = self.__sock.recv(17)

            # check for magic bytes...
            if header[0:4] == b'PBAU':
                # ...then parse the rest
                header_parsed = struct.unpack("!4sBlhlBB", header)
                response_length = header_parsed[3]

                return ByteUtil(self.__sock.recv(response_length))

        # if not wait_for_response OR invalid response:
        return ByteUtil()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testRecv(self):
        # Testing large receive over TCP
        msg = self.cli_conn.recv(1024)
        self.assertEqual(msg, MSG)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testOverFlowRecv(self):
        # Testing receive in chunks over TCP
        seg1 = self.cli_conn.recv(len(MSG) - 3)
        seg2 = self.cli_conn.recv(1024)
        msg = seg1 + seg2
        self.assertEqual(msg, MSG)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testSendAll(self):
        # Testing sendall() with a 2048 byte string over TCP
        msg = b''
        while 1:
            read = self.cli_conn.recv(1024)
            if not read:
                break
            msg += read
        self.assertEqual(msg, b'f' * 2048)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testFromFd(self):
        # Testing fromfd()
        fd = self.cli_conn.fileno()
        sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
        self.addCleanup(sock.close)
        self.assertIsInstance(sock, socket.socket)
        msg = sock.recv(1024)
        self.assertEqual(msg, MSG)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testDup(self):
        # Testing dup()
        sock = self.cli_conn.dup()
        self.addCleanup(sock.close)
        msg = sock.recv(1024)
        self.assertEqual(msg, MSG)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testDetach(self):
        # Testing detach()
        fileno = self.cli_conn.fileno()
        f = self.cli_conn.detach()
        self.assertEqual(f, fileno)
        # cli_conn cannot be used anymore...
        self.assertRaises(socket.error, self.cli_conn.recv, 1024)
        self.cli_conn.close()
        # ...but we can create another socket using the (still open)
        # file descriptor
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f)
        self.addCleanup(sock.close)
        msg = sock.recv(1024)
        self.assertEqual(msg, MSG)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testSendtoAndRecv(self):
        # Testing sendto() and Recv() over UDP
        msg = self.serv.recv(len(MSG))
        self.assertEqual(msg, MSG)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testClose(self):
        conn, addr = self.serv.accept()
        conn.close()

        sd = self.cli
        read, write, err = select.select([sd], [], [], 1.0)
        self.assertEqual(read, [sd])
        self.assertEqual(sd.recv(1), b'')

        # Calling close() many times should be safe.
        conn.close()
        conn.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testRecv(self):
        msg = self.serv.recv(1024)
        self.assertEqual(msg, MSG)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _testSend(self):
        msg = self.cli.recv(1024)
        self.assertEqual(msg, MSG)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testMakefileAfterMakefileClose(self):
        self.read_file.close()
        msg = self.cli_conn.recv(len(MSG))
        if isinstance(self.read_msg, str):
            msg = msg.decode()
        self.assertEqual(msg, self.read_msg)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def __init__(self, recv_funcs=()):
            # A generator that returns callables that we'll call for each
            # call to recv().
            self._recv_step = iter(recv_funcs)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testMakefileClose(self):
        # The file returned by makefile should keep the socket open...
        self.cli_conn.close()
        msg = self.cli_conn.recv(1024)
        self.assertEqual(msg, self.read_msg)
        # ...until the file is itself closed
        self.read_file.close()
        self.assertRaises(socket.error, self.cli_conn.recv, 1024)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _testSmallReadNonBlocking(self):
        self.evt1.wait(1.0)
        self.write_file.write(self.write_msg)
        self.write_file.flush()
        self.evt2.set()
        # Avoid cloding the socket before the server test has finished,
        # otherwise system recv() will return 0 instead of EWOULDBLOCK.
        self.serv_finished.wait(5.0)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _testOutsideTimeout(self):
        self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
        self.assertRaises(socket.timeout, lambda: sock.recv(5))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testUDPTimeout(self):
        def raise_timeout(*args, **kwargs):
            self.serv.settimeout(1.0)
            self.serv.recv(1024)
        self.assertRaises(socket.timeout, raise_timeout,
                              "Error generating a timeout exception (UDP)")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testTimeoutZero(self):
        ok = False
        try:
            self.serv.settimeout(0.0)
            foo = self.serv.recv(1024)
        except socket.timeout:
            self.fail("caught timeout instead of error (UDP)")
        except socket.error:
            ok = True
        except:
            self.fail("caught unexpected exception (UDP)")
        if not ok:
            self.fail("recv() returned success when we did not expect it")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testStream(self):
        msg = self.conn.recv(1024)
        self.assertEqual(msg, MSG)
        self.assertEqual(self.cliaddr, self.connaddr)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testCreateConnectionBase(self):
        conn, addr = self.serv.accept()
        self.addCleanup(conn.close)
        data = conn.recv(1024)
        conn.sendall(data)