我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用errno.WSAENOTSOCK。
def test_invalidDescriptor(self): """ An implementation of L{IReactorSocket.adoptStreamPort} raises L{socket.error} if passed an integer which is not associated with a socket. """ reactor = self.buildReactor() probe = socket.socket() fileno = probe.fileno() probe.close() exc = self.assertRaises( socket.error, reactor.adoptStreamPort, fileno, socket.AF_INET, ServerFactory()) if platform.isWindows() and _PY3: self.assertEqual(exc.args[0], errno.WSAENOTSOCK) else: self.assertEqual(exc.args[0], errno.EBADF)
def test_invalidDescriptor(self): """ An implementation of L{IReactorSocket.adoptDatagramPort} raises L{socket.error} if passed an integer which is not associated with a socket. """ reactor = self.buildReactor() probe = socket.socket() fileno = probe.fileno() probe.close() exc = self.assertRaises( socket.error, reactor.adoptDatagramPort, fileno, socket.AF_INET, DatagramProtocol()) if platform.isWindows() and _PY3: self.assertEqual(exc.args[0], errno.WSAENOTSOCK) else: self.assertEqual(exc.args[0], errno.EBADF)
def getHandleErrorCode(self): # Windows 2000 SP 4 and Windows XP SP 2 give back WSAENOTSOCK for # SSL.Connection.write for some reason. if platform.getType() == 'win32': return errno.WSAENOTSOCK return ProperlyCloseFilesMixin.getHandleErrorCode(self)
def remove_notsocks (map): global _select_errors, _logger # on Windows we can get WSAENOTSOCK if the client # rapidly connect and disconnects killed = 0 for fd, obj in list(map.items()): r = []; w = []; e = [] is_r = obj.readable() is_w = obj.writable() if is_r: r = [fd] # accepting sockets should not be writable if is_w and not obj.accepting: w = [fd] if is_r or is_w: e = [fd] try: select.select (r, w, e, 0) except: #_logger and _logger.trace () killed += 1 _select_errors += 1 try: try: obj.handle_expt () except: obj.handle_error () except: _logger and _logger.trace () try: del map [fd] except KeyError: pass return killed
def poll_fun_wrap (timeout, map = None): global _logger if map is None: map = asyncore.socket_map if EXHAUST_DNS: asyndns.pop_all () try: poll_fun (timeout, map) except (TypeError, OSError) as why: # WSAENOTSOCK remove_notsocks (map) except ValueError: # negative file descriptor, testing all sockets killed = remove_notsocks (map) # or too many file descriptors in select(), divide and conquer if not killed: half = int (len (map) / 2) tmap = {} cc = 0 for k, v in list(map.items ()): tmap [k] = v cc += 1 if cc == half: poll_fun_wrap (timeout, tmap) tmap = {} poll_fun_wrap (timeout, tmap) except: _logger and _logger.trace () raise
def getHandleErrorCode(self): """ Return the errno expected to result from writing to a closed platform socket handle. """ # Windows and Python 3: returns WSAENOTSOCK # Windows and Python 2: returns EBADF # Linux, FreeBSD, Mac OS X: returns EBADF if platform.isWindows() and _PY3: return errno.WSAENOTSOCK return errno.EBADF
def getHandleErrorCode(self): """ Return the argument L{OpenSSL.SSL.Error} will be constructed with for this case. This is basically just a random OpenSSL implementation detail. It would be better if this test worked in a way which did not require this. """ # Windows 2000 SP 4 and Windows XP SP 2 give back WSAENOTSOCK for # SSL.Connection.write for some reason. The twisted.protocols.tls # implementation of IReactorSSL doesn't suffer from this imprecation, # though, since it is isolated from the Windows I/O layer (I suppose?). # If test_properlyCloseFiles waited for the SSL handshake to complete # and performed an orderly shutdown, then this would probably be a # little less weird: writing to a shutdown SSL connection has a more # well-defined failure mode (or at least it should). # So figure out if twisted.protocols.tls is in use. If it can be # imported, it should be. if requireModule('twisted.protocols.tls') is None: # It isn't available, so we expect WSAENOTSOCK if we're on Windows. if platform.getType() == 'win32': return errno.WSAENOTSOCK # Otherwise, we expect an error about how we tried to write to a # shutdown connection. This is terribly implementation-specific. return [('SSL routines', 'SSL_write', 'protocol is shutdown')]