我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用select.select()。
def handle_request(self): """Handle one request, possibly blocking. Respects self.timeout. """ # Support people who used socket.settimeout() to escape # handle_request before self.timeout was available. timeout = self.socket.gettimeout() if timeout is None: timeout = self.timeout elif self.timeout is not None: timeout = min(timeout, self.timeout) fd_sets = select.select([self], [], [], timeout) if not fd_sets[0]: self.handle_timeout() return self._handle_request_noblock()
def _sendall(sock, data): """ Writes data to a socket and does not return until all the data is sent. """ length = len(data) while length: try: sent = sock.send(data) except socket.error, e: if e[0] == errno.EAGAIN: select.select([], [sock], []) continue else: raise data = data[sent:] length -= sent
def recv(self, *args, **kwargs): try: data = self.connection.recv(*args, **kwargs) except OpenSSL.SSL.SysCallError as e: if self.suppress_ragged_eofs and e.args == (-1, 'Unexpected EOF'): return b'' else: raise SocketError(str(e)) except OpenSSL.SSL.ZeroReturnError as e: if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: return b'' else: raise except OpenSSL.SSL.WantReadError: rd, wd, ed = select.select( [self.socket], [], [], self.socket.gettimeout()) if not rd: raise timeout('The read operation timed out') else: return self.recv(*args, **kwargs) else: return data
def recv_into(self, *args, **kwargs): try: return self.connection.recv_into(*args, **kwargs) except OpenSSL.SSL.SysCallError as e: if self.suppress_ragged_eofs and e.args == (-1, 'Unexpected EOF'): return 0 else: raise SocketError(str(e)) except OpenSSL.SSL.ZeroReturnError as e: if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: return 0 else: raise except OpenSSL.SSL.WantReadError: rd, wd, ed = select.select( [self.socket], [], [], self.socket.gettimeout()) if not rd: raise timeout('The read operation timed out') else: return self.recv_into(*args, **kwargs)
def register(self, fileobj, events, data=None): key = super(KqueueSelector, self).register(fileobj, events, data) if events & EVENT_READ: kevent = select.kevent(key.fd, select.KQ_FILTER_READ, select.KQ_EV_ADD) _syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0) if events & EVENT_WRITE: kevent = select.kevent(key.fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD) _syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0) return key
def unregister(self, fileobj): key = super(KqueueSelector, self).unregister(fileobj) if key.events & EVENT_READ: kevent = select.kevent(key.fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE) try: _syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0) except SelectorError: pass if key.events & EVENT_WRITE: kevent = select.kevent(key.fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE) try: _syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0) except SelectorError: pass return key
def serve_forever(self, poll_interval=0.5): """Handle one request at a time until shutdown. Polls for shutdown every poll_interval seconds. Ignores self.timeout. If you need to do periodic tasks, do them in another thread. """ self.__is_shut_down.clear() try: while not self.__shutdown_request: # XXX: Consider using another file descriptor or # connecting to the socket to wake this up instead of # polling. Polling reduces our responsiveness to a # shutdown request and wastes cpu at all other times. r, w, e = select.select([self], [], [], poll_interval) if self in r: self._handle_request_noblock() finally: self.__shutdown_request = False self.__is_shut_down.set()
def shutdown(self): """Stops the serve_forever loop. Blocks until the loop has finished. This must be called while serve_forever() is running in another thread, or it will deadlock. """ self.__shutdown_request = True self.__is_shut_down.wait() # The distinction between handling, getting, processing and # finishing a request is fairly arbitrary. Remember: # # - handle_request() is the top-level call. It calls # select, get_request(), verify_request() and process_request() # - get_request() is different for stream or datagram sockets # - process_request() is the place that may fork a new process # or create a new thread to finish the request # - finish_request() instantiates the request handler class; # this constructor will handle the request all by itself
def _handle_request_noblock(self): """Handle one request, without blocking. I assume that select.select has returned that the socket is readable before this function was called, so there should be no risk of blocking in get_request(). """ try: request, client_address = self.get_request() except socket.error: return if self.verify_request(request, client_address): try: self.process_request(request, client_address) except: self.handle_error(request, client_address) self.shutdown_request(request)
def interact(self): """Interaction function, emulates a very dumb telnet client.""" if sys.platform == "win32": self.mt_interact() return while 1: rfd, wfd, xfd = select.select([self, sys.stdin], [], []) if self in rfd: try: text = self.read_eager() except EOFError: print '*** Connection closed by remote host ***' break if text: sys.stdout.write(text) sys.stdout.flush() if sys.stdin in rfd: line = sys.stdin.readline() if not line: break self.write(line)
def readwrite(obj, flags): try: if flags & select.POLLIN: obj.handle_read_event() if flags & select.POLLOUT: obj.handle_write_event() if flags & select.POLLPRI: obj.handle_expt_event() if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL): obj.handle_close() except socket.error, e: if e.args[0] not in _DISCONNECTED: obj.handle_error() else: obj.handle_close() except _reraised_exceptions: raise except: obj.handle_error()
def loop(timeout=30.0, use_poll=False, map=None, count=None): if map is None: map = socket_map if use_poll and hasattr(select, 'poll'): poll_fun = poll2 else: poll_fun = poll if count is None: while map: poll_fun(timeout, map) else: while map and count > 0: poll_fun(timeout, map) count = count - 1
def set_reuse_addr(self): # try to re-use a server port if possible try: self.socket.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1 ) except socket.error: pass # ================================================== # predicates for select() # these are used as filters for the lists of sockets # to pass to select(). # ==================================================
def DefaultSelector(): """ This function serves as a first call for DefaultSelector to detect if the select module is being monkey-patched incorrectly by eventlet, greenlet, and preserve proper behavior. """ global _DEFAULT_SELECTOR if _DEFAULT_SELECTOR is None: if _can_allocate('kqueue'): _DEFAULT_SELECTOR = KqueueSelector elif _can_allocate('epoll'): _DEFAULT_SELECTOR = EpollSelector elif _can_allocate('poll'): _DEFAULT_SELECTOR = PollSelector elif hasattr(select, 'select'): _DEFAULT_SELECTOR = SelectSelector else: # Platform-specific: AppEngine raise ValueError('Platform does not have a selector') return _DEFAULT_SELECTOR()
def _recv(self, which, maxsize): conn, maxsize = self.get_conn_maxsize(which, maxsize) if conn is None: return None if not select.select([conn], [], [], 0)[0]: return '' r = conn.read(maxsize) if not r: return self._close(which) if self.universal_newlines: r = r.replace("\r\n", "\n").replace("\r", "\n") return r ################################################################################
def _recvall(sock, length): """ Attempts to receive length bytes from a socket, blocking if necessary. (Socket may be blocking or non-blocking.) """ dataList = [] recvLen = 0 while length: try: data = sock.recv(length) except socket.error, e: if e[0] == errno.EAGAIN: select.select([sock], [], []) continue else: raise if not data: # EOF break dataList.append(data) dataLen = len(data) recvLen += dataLen length -= dataLen return ''.join(dataList), recvLen
def recvall(sock, length): """ Attempts to receive length bytes from a socket, blocking if necessary. (Socket may be blocking or non-blocking.) """ dataList = [] recvLen = 0 while length: try: data = sock.recv(length) except socket.error, e: if e[0] == errno.EAGAIN: select.select([sock], [], []) continue else: raise if not data: # EOF break dataList.append(data) dataLen = len(data) recvLen += dataLen length -= dataLen return ''.join(dataList), recvLen
def processInput(self): """Wait for and process a single packet.""" pkt = Packet() select.select([self._sock], [], []) pkt.read(self._sock) # Body chunks have no packet type code. if self._request is not None: self._processBody(pkt) return if not pkt.length: raise ProtocolError, 'unexpected empty packet' pkttype = pkt.data[0] if pkttype == PKTTYPE_FWD_REQ: self._forwardRequest(pkt) elif pkttype == PKTTYPE_SHUTDOWN: self._shutdown(pkt) elif pkttype == PKTTYPE_PING: self._ping(pkt) elif pkttype == PKTTYPE_CPING: self._cping(pkt) else: raise ProtocolError, 'unknown packet type'
def __serve(self, client, server): 'Private class method.' pairs = {client: server, server: client} while pairs: read, write, error = _select.select(pairs.keys(), [], []) for socket in read: string = socket.recv(self.BUFFERSIZE) if string: pairs[socket].sendall(string) else: pairs[socket].shutdown(_socket.SHUT_WR) socket.shutdown(_socket.SHUT_RD) del pairs[socket] client.close() server.close() ################################################################################
def _setup_input_pipes(input_pipes): """ Given a mapping of input pipes, return a tuple with 2 elements. The first is a list of file descriptors to pass to ``select`` as writeable descriptors. The second is a dictionary mapping paths to existing named pipes to their adapters. """ wds = [] fifos = {} for pipe, adapter in six.viewitems(input_pipes): if isinstance(pipe, int): # This is assumed to be an open system-level file descriptor wds.append(pipe) else: if not os.path.exists(pipe): raise Exception('Input pipe does not exist: %s' % pipe) if not stat.S_ISFIFO(os.stat(pipe).st_mode): raise Exception('Input pipe must be a fifo object: %s' % pipe) fifos[pipe] = adapter return wds, fifos
def __init__(self): if hasattr(select, 'epoll'): self._impl = select.epoll() model = 'epoll' elif hasattr(select, 'kqueue'): self._impl = KqueueLoop() model = 'kqueue' elif hasattr(select, 'select'): self._impl = SelectLoop() model = 'select' else: raise Exception('can not find any available functions in select ' 'package') self._fdmap = {} # (f, handler) self._last_time = time.time() self._periodic_callbacks = [] self._stopping = False logging.debug('using event model: %s', model)
def keep_reading(self): """Output thread method for the process Sends the process output to the ViewController (through OutputTranscoder) """ while True: if self.stop: break ret = self.process.poll() if ret is not None: self.stop = True readable, writable, executable = select.select([self.master], [], [], 5) if readable: """ We read the new content """ data = os.read(self.master, 1024) text = data.decode('UTF-8', errors='replace') log_debug("RAW", repr(text)) log_debug("PID", os.getenv('BASHPID')) self.output_transcoder.decode(text) # log_debug("{} >> {}".format(int(time.time()), repr(text)))
def run(self): while True : writefd = [] if not self.messages.empty(): # Expects a message to contain either the string 'exit' # or a line of input in a tuple: ('input', None) message = self.messages.get() if message == 'exit': self.messages.task_done() break else: message, _encoding = message writefd = [self.master] r,w,_ = select.select([self.master], writefd, [], 0) if r: # Read when the binary has new output for us (sometimes this came from us writing) line = os.read(self.master, 1024) # Reads up to a kilobyte at once. Should this be higher/lower? self.RECV_LINE.emit(line) if w: os.write(self.master, message + "\n") self.messages.task_done()
def loop_read(self, max_packets=1): """Process read network events. Use in place of calling loop() if you wish to handle your client reads as part of your own application. Use socket() to obtain the client socket to call select() or equivalent on. Do not use if you are using the threaded interface loop_start().""" if self._sock is None and self._ssl is None: return MQTT_ERR_NO_CONN max_packets = len(self._out_messages) + len(self._in_messages) if max_packets < 1: max_packets = 1 for i in range(0, max_packets): rc = self._packet_read() if rc > 0: return self._loop_rc_handle(rc) elif rc == MQTT_ERR_AGAIN: return MQTT_ERR_SUCCESS return MQTT_ERR_SUCCESS
def loop_write(self, max_packets=1): """Process read network events. Use in place of calling loop() if you wish to handle your client reads as part of your own application. Use socket() to obtain the client socket to call select() or equivalent on. Use want_write() to determine if there is data waiting to be written. Do not use if you are using the threaded interface loop_start().""" if self._sock is None and self._ssl is None: return MQTT_ERR_NO_CONN max_packets = len(self._out_packet) + 1 if max_packets < 1: max_packets = 1 for i in range(0, max_packets): rc = self._packet_write() if rc > 0: return self._loop_rc_handle(rc) elif rc == MQTT_ERR_AGAIN: return MQTT_ERR_SUCCESS return MQTT_ERR_SUCCESS
def select_folder_and_parse(self, folder, readonly=False): """Set the current folder on the server. Future calls to methods such as search and fetch will act on the selected folder. Returns a dictionary containing the ``SELECT`` response. At least the ``EXISTS``, ``FLAGS`` and ``RECENT`` keys are guaranteed to exist. An example:: {'EXISTS': 3, 'FLAGS': ('\\Answered', '\\Flagged', '\\Deleted', ... ), 'RECENT': 0, 'PERMANENTFLAGS': ('\\Answered', '\\Flagged', '\\Deleted', ... ), 'READ-WRITE': True, 'UIDNEXT': 11, 'UIDVALIDITY': 1239278212} """ self._command_and_check('select', self._normalise_folder(folder), readonly) untagged = self._imap.untagged_responses return self._process_select_response(from_bytes(untagged))
def __init__(self, iocp_notifier): self._poller_name = 'select' self._fds = {} self._events = {} self._terminate = False self.rset = set() self.wset = set() self.xset = set() self.iocp_notifier = iocp_notifier self.cmd_rsock, self.cmd_wsock = _AsyncPoller._socketpair() self.cmd_rsock.setblocking(0) self.cmd_wsock.setblocking(0) self.poller = select.select self._polling = False self._lock = threading.RLock() self.poll_thread = threading.Thread(target=self.poll) self.poll_thread.daemon = True self.poll_thread.start()
def __init__(self): if not hasattr(self, 'poller'): self.poller = select.kqueue()
def register(self, fid, event): flags = select.KQ_EV_ADD if event & _AsyncPoller._Read: flags |= select.KQ_EV_ENABLE else: flags |= select.KQ_EV_DISABLE self.poller.control([select.kevent(fid, filter=select.KQ_FILTER_READ, flags=flags)], 0) flags = select.KQ_EV_ADD if event & _AsyncPoller._Write: flags |= select.KQ_EV_ENABLE else: flags |= select.KQ_EV_DISABLE self.poller.control([select.kevent(fid, filter=select.KQ_FILTER_WRITE, flags=flags)], 0)
def unregister(self, fid): flags = select.KQ_EV_DELETE self.poller.control([select.kevent(fid, filter=select.KQ_FILTER_READ, flags=flags)], 0) self.poller.control([select.kevent(fid, filter=select.KQ_FILTER_WRITE, flags=flags)], 0)