我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用select.epoll()。
def process(self, fd): '''??????????logic??????''' logs.dblog("proces: proces start") # ??socket sock_state = self.conn_state[fd] # ???? response = self.logic(sock_state.buff_read) # ????????????????????buff_write sock_state.buff_write = "%010d%s" % (len(response), response) # ??????? sock_state.need_write = len(sock_state.buff_write) # ??????? sock_state.state = "write" # ??epoll??????????epoll??????epoll??????????? # ???????? self.epoll_sock.modify(fd, select.EPOLLOUT) # ??????????? logs.dblog("***process: process end fd state change to write***") sock_state.state_log()
def check_fd(self): '''??fd?? ??read ????????????????? ????????????? ''' while True: for fd in self.conn_state.keys(): sock_state = self.conn_state[fd] # fd?read???? read_time ??? # ???fd?epoll????????????????? if sock_state.state == "read" and sock_state.read_stime \ and (time.time() - sock_state.read_stime) >= sock_state.read_itime: # ??????????fd sock_state.state = "closing" self.state_machine(fd) # ?????? time.sleep(60) #}}} #{{{fork_processes
def __init__(self, sock, logic): # ??????,??????socket?????????????????? self.conn_state = {} # ??setFD?????socket????????? self.setFd(sock) # ??epoll?????????????????? self.epoll_sock = select.epoll() # ??????epoll???????socket????????fd????? # ?????????epoll????? EPOLLIN ????? # ??????https://docs.python.org/2.7/library/select.html?highlight=epoll#select.poll.register self.epoll_sock.register(sock.fileno(), select.EPOLLIN) # ?????? self.logic = logic # ????????????? self.sm = { "accept": self.accept, "read": self.read, "write": self.write, "process": self.process, "closing": self.close, }
def accept(self, fd): '''??????fd???fd?? ''' try: # ??fd??????????fd? sock_state = self.conn_state[fd] # ??sock??????soket? sock = sock_state.sock_obj # ??accept???????????????????conn?????socket???addr????????? conn, addr = sock.accept() # ??socket???? conn.setblocking(0) # ?epoll??????socket??fd self.epoll_sock.register(conn.fileno(), select.EPOLLIN) # ???????conn???????sock self.setFd(conn) # ????fd????read epoll???????????????? self.conn_state[conn.fileno()].state = "read" except socket.error as msg: # ECONNABORTED??TCP???????????RST # EAGIIN ??????????????????? # ????accept if msg.args[0] in (errno.ECONNABORTED, errno.EAGAIN): return raise
def process(self, fd): '''read????? process ???? ''' # ??socket sock_state = self.conn_state[fd] # ???? response = self.logic(sock_state.buff_read) # ????????????????????buff_write sock_state.buff_write = "%010d%s" % (len(response), response) # ??????? sock_state.need_write = len(sock_state.buff_write) # ??????? sock_state.state = "write" # ??epoll?????? self.epoll_sock.modify(fd, select.EPOLLOUT) # ??write?? self.state_machine(fd)
def run(self): '''???? ??epoll???????? ''' while True: # epoll?????????????????????????????????????????? # ?????????epoll??????? epoll_list = self.epoll_sock.poll() for fd, events in epoll_list: sock_state = self.conn_state[fd] # ?? epoll?????io?? epoll hang?????? if select.EPOLLHUP & events: sock_state.state = "closing" # ??IO??epoll????????? elif select.EPOLLERR & events: sock_state.state = "closing" self.state_machine(fd) #}}} #{{{fork_processes
def accept(self, fd): '''accpet??epoll????????? ??????socket????? ''' logs.dblog("accept: accept client") try: # ??fd??????????fd? sock_state = self.conn_state[fd] # ??sock??????soket? sock = sock_state.sock_obj # ??accept???????????????????conn?????socket???addr????????? conn, addr = sock.accept() # ??socket???? conn.setblocking(0) # ????????socket??,???IP?? logs.dblog("accept: find new socket client fd(%s)" % conn.fileno()) return conn,addr[0] except socket.error as msg: # EAGIIN ???????????????????(erron???11) # ECONNABORTED??TCP???????????RST(erron???103) # ????accept ?????? retry if msg.errno in (11, 103): return "retry"
def run(self): '''???? ??epoll???????? ''' while True: # epoll?????????????????????????????????????????? # ?????????epoll??????? epoll_list = self.epoll_sock.poll() for fd, events in epoll_list: logs.dblog("epoll: epoll find fd(%s) have signal" % fd) sock_state = self.conn_state[fd] # ?? epoll?????io?? epoll hang?????? if select.EPOLLHUP & events: sock_state.state = "closing" # ??IO??epoll????????? elif select.EPOLLERR & events: sock_state.state = "closing" logs.dblog("epoll: use state_machine process fd(%s)" % fd) self.state_machine(fd)
def _can_allocate(struct): """ Checks that select structs can be allocated by the underlying operating system, not just advertised by the select module. We don't check select() because we'll be hopeful that most platforms that don't have it available will not advertise it. (ie: GAE) """ try: # select.poll() objects won't fail until used. if struct == 'poll': p = select.poll() p.poll(0) # All others will fail on allocation. else: getattr(select, struct)().close() return True except (OSError, AttributeError) as e: return False # Choose the best implementation, roughly: # kqueue == epoll > poll > select. Devpoll not supported. (See above) # select() also can't accept a FD > FD_SETSIZE (usually around 1024)
def DefaultSelector(): """ This function serves as a first call for DefaultSelector to detect if the select module is being monkey-patched incorrectly by eventlet, greenlet, and preserve proper behavior. """ global _DEFAULT_SELECTOR if _DEFAULT_SELECTOR is None: if _can_allocate('kqueue'): _DEFAULT_SELECTOR = KqueueSelector elif _can_allocate('epoll'): _DEFAULT_SELECTOR = EpollSelector elif _can_allocate('poll'): _DEFAULT_SELECTOR = PollSelector elif hasattr(select, 'select'): _DEFAULT_SELECTOR = SelectSelector else: # Platform-specific: AppEngine raise ValueError('Platform does not have a selector') return _DEFAULT_SELECTOR()
def __init__(self): if hasattr(select, 'epoll'): self._impl = select.epoll() model = 'epoll' elif hasattr(select, 'kqueue'): self._impl = KqueueLoop() model = 'kqueue' elif hasattr(select, 'select'): self._impl = SelectLoop() model = 'select' else: raise Exception('can not find any available functions in select ' 'package') self._fdmap = {} # (f, handler) self._last_time = time.time() self._periodic_callbacks = [] self._stopping = False logging.debug('using event model: %s', model)
def select(self, timeout=None): if timeout is not None: if timeout <= 0: timeout = 0.0 else: # select.epoll.poll() has a resolution of 1 millisecond # but luckily takes seconds so we don't need a wrapper # like PollSelector. Just for better rounding. timeout = math.ceil(timeout * 1e3) * 1e-3 timeout = float(timeout) else: timeout = -1.0 # epoll.poll() must have a float. # We always want at least 1 to ensure that select can be called # with no file descriptors registered. Otherwise will fail. max_events = max(len(self._fd_to_key), 1) ready = [] fd_events = _syscall_wrapper(self._epoll.poll, True, timeout=timeout, maxevents=max_events) for fd, event_mask in fd_events: events = 0 if event_mask & ~select.EPOLLIN: events |= EVENT_WRITE if event_mask & ~select.EPOLLOUT: events |= EVENT_READ key = self._key_from_fd(fd) if key: ready.append((key, events & key.events)) return ready
def register(self, sock, events, callback, *args, **kwargs): ev = select.EPOLLERR | select.EPOLLHUP need_modify = False if sock in self.rd_socks: ev |= select.EPOLLIN need_modify = True if sock in self.wr_socks: ev |= select.EPOLLOUT need_modify = True if events & EV_READ: ev |= select.EPOLLIN if events & EV_WRITE: ev |= select.EPOLLOUT if need_modify: self.epoll.modify(sock.fileno(), ev) else: try: self.epoll.register(sock.fileno(), ev) except IOError: return False else: self.fd2socks[sock.fileno()] = sock super(Epoll, self).register(sock, events, callback, *args, **kwargs) return True
def unregister(self, sock, events=EV_READ | EV_WRITE): super(Epoll, self).unregister(sock, events) if events == EV_READ | EV_WRITE: self.epoll.unregister(sock) ck = self.fd2socks.pop(sock.fileno(), None) if ck: return True else: return False else: ev = select.EPOLLERR | select.EPOLLHUP | \ select.EPOLLIN | select.EPOLLOUT if events & EV_READ: ev ^= select.EPOLLIN if events & EV_WRITE: ev ^= select.EPOLLOUT self.epoll.modify(sock.fileno(), ev) return True
def run(self): while True: try: self.check_timer() events = self.epoll.poll(self.MIN_INTERVAL) for fd, event in events: sock = self.fd2socks.get(fd) if not sock: continue if event & select.EPOLLERR or event & select.EPOLLHUP: if self.err_callback: self.err_callback[0](sock, *self.err_callback[1], **self.err_callback[2]) elif event & select.EPOLLIN: callback, args, kwargs = self.rd_socks.get(sock) if callback: callback(sock, *args, **kwargs) elif event & select.EPOLLOUT: callback, args, kwargs = self.wr_socks.get(sock) if callback: callback(sock, *args, **kwargs) except Exception as e: print("exception, %s\n%s" % (e, traceback.format_exc()))
def run(self): while True: dbgPrint("\n -- run func loop") for i in self.conn_state.iterkeys(): dbgPrint("\n -- state of fd: %d" % i) self.conn_state[i].printState(); epoll_list = self.epoll_sock.poll() for fd, events in epoll_list: dbgPrint("\n-- run epoll return fd: %d, event: %s" %(fd, events)) sock_state = self.conn_state[fd] if select.EPOLLHUP & events: dbgPrint("events EPOLLHUP") sock_state.state = "closing" elif select.EPOLLERR & events: dbgPrint("EPOLLERROR") sock_state.state = "closing" self.state_machine(fd)
def __init__(self, addr, port, logic): dbgPrint("\n-- __init__: start!") self.conn_state = {} self.listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) self.listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.listen_sock.bind((addr, port)) self.listen_sock.listen(10) self.setFd(self.listen_sock) self.epoll_sock = select.epoll() self.epoll_sock.register(self.listen_sock.fileno(), select.EPOLLIN) self.logic = logic self.sm = { 'accept': self.accept2read, "read": self.read2process, "write":self.write2read, "process": self.process, "closing": self.close, }
def patch_select_module(testcase, *keep, **replace): """ Helper function that removes all selectors from the select module except those listed in *keep and **replace. Those in keep will be kept if they exist in the select module and those in replace will be patched with the value that is given regardless if they exist or not. Cleanup will restore previous state. This helper also resets the selectors module so that a call to DefaultSelector() will do feature detection again. """ selectors2._DEFAULT_SELECTOR = None for s in ['select', 'poll', 'epoll', 'kqueue']: if s in replace: if hasattr(select, s): old_selector = getattr(select, s) testcase.addCleanup(setattr, select, s, old_selector) else: testcase.addCleanup(delattr, select, s) setattr(select, s, replace[s]) elif s not in keep and hasattr(select, s): old_selector = getattr(select, s) testcase.addCleanup(setattr, select, s, old_selector) delattr(select, s)
def DefaultSelector(): """ This function serves as a first call for DefaultSelector to detect if the select module is being monkey-patched incorrectly by eventlet, greenlet, and preserve proper behavior. """ global _DEFAULT_SELECTOR if _DEFAULT_SELECTOR is None: if platform.python_implementation() == 'Jython': # Platform-specific: Jython _DEFAULT_SELECTOR = JythonSelectSelector elif _can_allocate('kqueue'): _DEFAULT_SELECTOR = KqueueSelector elif _can_allocate('devpoll'): _DEFAULT_SELECTOR = DevpollSelector elif _can_allocate('epoll'): _DEFAULT_SELECTOR = EpollSelector elif _can_allocate('poll'): _DEFAULT_SELECTOR = PollSelector elif hasattr(select, 'select'): _DEFAULT_SELECTOR = SelectSelector else: # Platform-specific: AppEngine raise RuntimeError('Platform does not have a selector.') return _DEFAULT_SELECTOR()
def handle_io(self, timeout): # max_events must be > 0 or epoll gets cranky max_events = max(1, len(self._registered)) events = self._epoll.poll(timeout, max_events) for fd, flags in events: waiters = self._registered[fd] # Clever hack stolen from selectors.EpollSelector: an event # with EPOLLHUP or EPOLLERR flags wakes both readers and # writers. if flags & ~select.EPOLLIN and waiters.write_task is not None: _core.reschedule(waiters.write_task) waiters.write_task = None if flags & ~select.EPOLLOUT and waiters.read_task is not None: _core.reschedule(waiters.read_task) waiters.read_task = None self._update_registrations(fd, True)
def __init__(self): platform = sys.platform if platform.find("win32") > -1 or platform.find("cygwin") > -1: self.__async_mode = "select" self.__iowait_func = self.__select_iowait if platform.find("darwin") > -1 or platform.find("freebsd") > -1: self.__async_mode = "kqueue" self.__kqueue_object = select.kqueue() self.__iowait_func = self.__kqueue_iowait if platform.find("linux") > -1: self.__async_mode = "epoll" self.__epoll_object = select.epoll() self.__iowait_func = self.__epoll_iowait return