我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用select.KQ_EV_ENABLE。
def register(self, fid, event): flags = select.KQ_EV_ADD if event & _AsyncPoller._Read: flags |= select.KQ_EV_ENABLE else: flags |= select.KQ_EV_DISABLE self.poller.control([select.kevent(fid, filter=select.KQ_FILTER_READ, flags=flags)], 0) flags = select.KQ_EV_ADD if event & _AsyncPoller._Write: flags |= select.KQ_EV_ENABLE else: flags |= select.KQ_EV_DISABLE self.poller.control([select.kevent(fid, filter=select.KQ_FILTER_WRITE, flags=flags)], 0)
def modify(self, fid, event): if event & _AsyncPoller._Read: flags = select.KQ_EV_ENABLE else: flags = select.KQ_EV_DISABLE self.poller.control([select.kevent(fid, filter=select.KQ_FILTER_READ, flags=flags)], 0) if event & _AsyncPoller._Write: flags = select.KQ_EV_ENABLE else: flags = select.KQ_EV_DISABLE self.poller.control([select.kevent(fid, filter=select.KQ_FILTER_WRITE, flags=flags)], 0)
def testPair(self): kq = select.kqueue() a, b = socket.socketpair() a.send(b'foo') event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) r = kq.control([event1, event2], 1, 1) self.assertTrue(r) self.assertFalse(r[0].flags & select.KQ_EV_ERROR) self.assertEqual(b.recv(r[0].data), b'foo') a.close() b.close() kq.close()
def __add_ev_read(self, fileno): """ Note:if the event exists,it will not do anything """ if fileno not in self.__rlist and self.__async_mode == "select": self.__rlist.append(fileno) if self.__async_mode == "epoll": if fileno not in self.__epoll_register_info: self.__epoll_register_info[fileno] = None eventmask = self.__epoll_register_info[fileno] event = select.EPOLLIN if eventmask == None: eventmask = event self.__epoll_object.register(fileno, eventmask) self.__epoll_register_info[fileno] = eventmask return is_register_read = (eventmask & select.EPOLLIN) == select.EPOLLIN if is_register_read == False: eventmask = event | eventmask self.__epoll_object.modify(fileno, eventmask) if self.__async_mode == "kqueue": filter_ = select.KQ_FILTER_READ flags = select.KQ_EV_ADD | select.KQ_EV_ERROR | select.KQ_EV_ENABLE if fileno not in self.__kqueue_event_map: kevent = select.kevent(fileno, filter_, flags) kevent.udata = 0 else: kevent = self.__kqueue_event_map[fileno] read_exists = (kevent.udata & EV_TYPE_READ) == EV_TYPE_READ if read_exists == False: kevent.filter = filter_ kevent.udata = (kevent.udata | EV_TYPE_READ) kevent.flags = flags if fileno not in self.__kqueue_change_event_map: self.__kqueue_change_event_map[fileno] = [] self.__kqueue_change_event_map[fileno].append(kevent) '''''' return
def __add_ev_write(self, fileno): if fileno not in self.__wlist and self.__async_mode == "select": self.__wlist.append(fileno) if self.__async_mode == "epoll": if fileno not in self.__epoll_register_info: self.__epoll_register_info[fileno] = None eventmask = self.__epoll_register_info[fileno] event = select.EPOLLOUT if eventmask == None: eventmask = event self.__epoll_object.register(fileno, eventmask) self.__epoll_register_info[fileno] = eventmask return is_register_write = (eventmask & select.EPOLLOUT) == select.EPOLLOUT if is_register_write == False: eventmask = event | eventmask self.__epoll_object.modify(fileno, eventmask) if self.__async_mode == "kqueue": filter_ = select.KQ_FILTER_WRITE flags = select.KQ_EV_ADD | select.KQ_EV_ERROR | select.KQ_EV_ENABLE if fileno not in self.__kqueue_event_map: kevent = select.kevent(fileno, filter_, flags) kevent.udata = 0 else: kevent = self.__kqueue_event_map[fileno] write_exists = (kevent.udata & EV_TYPE_WRITE) == EV_TYPE_WRITE if write_exists == False: kevent.filter = filter_ kevent.flags = flags kevent.udata = (kevent.udata | EV_TYPE_WRITE) if fileno not in self.__kqueue_change_event_map: self.__kqueue_change_event_map[fileno] = [] self.__kqueue_change_event_map[fileno].append(kevent) '''''' return
def update_channel(self, channel): ''' ??channel_map,??????kevent_lst ''' import select if not channel: return fd = channel._fd if not fd in self.channel_map: # ?????,????? self.channel_count += 1 self.channel_map[fd] = channel change_event_lst = [] # ???????,????? if channel.need_read == True: # ?????,????????? kevent = select.kevent(channel._fd, filter=select.KQ_FILTER_READ, flags=select.KQ_EV_ADD | select.KQ_EV_ENABLE) change_event_lst.append(kevent) else: # ????? kevent = select.kevent(channel._fd, filter=select.KQ_FILTER_READ, flags=select.KQ_EV_DELETE) change_event_lst.append(kevent) if channel.need_write == True: # ?????,????????? kevent = select.kevent(channel._fd, filter=select.KQ_FILTER_WRITE, flags=select.KQ_EV_ADD | select.KQ_EV_ENABLE) change_event_lst.append(kevent) pass else: # ????? kevent = select.kevent(channel._fd, filter=select.KQ_FILTER_WRITE, flags=select.KQ_EV_DELETE) change_event_lst.append(kevent) try: self.kq.control(change_event_lst, len(change_event_lst), 0) except select.error, err: # ??kqueue?? raise Exception()