我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用select.EPOLLET。
def _run(self): while self._running: events = self._poll.poll(EPOLL_TIMEOUT) for fd, event in events: if not (event & (select.EPOLLPRI | select.EPOLLET)): continue self.changed(self.read())
def wait_edge(fd,pin,callback,debouncingtime): debouncingtime=debouncingtime/1000 timestampprec=time.time() counter=0 po = select.epoll() po.register(fd,select.EPOLLET) while True: events = po.poll() timestamp=time.time() if (timestamp-timestampprec>debouncingtime) and counter>0: callback(pin) counter=counter+1 timestampprec=timestamp
def wait_edge(self,fd,callback,debouncingtime): # Convert in millisecondi debouncingtime=debouncingtime/1000.0 timestampprec=time.time() counter=0 po = select.epoll() po.register(fd,select.EPOLLET) while True: events = po.poll() timestamp=time.time() if (timestamp-timestampprec>debouncingtime) and counter>0: callback() counter=counter+1 timestampprec=timestamp
def run(self): self.exc = None try: sysfs.edge(self._pin, self._trigger) initial_edge = True with sysfs.value_descriptor(self._pin) as fd: e = select.epoll() e.register(fd, EPOLLIN | EPOLLET | EPOLLPRI) try: while not self._finished: events = e.poll(0.1, maxevents=1) if initial_edge: initial_edge = False elif len(events) > 0: with self._lock: self._event_detected = True self.notify_callbacks() finally: e.unregister(fd) e.close() except BaseException as e: self.exc = e finally: sysfs.edge(self._pin, NONE)
def blocking_wait_for_edge(pin, trigger, timeout=-1): assert trigger in [RISING, FALLING, BOTH] if pin in _threads: raise RuntimeError("Conflicting edge detection events already exist for this GPIO channel") try: sysfs.edge(pin, trigger) finished = False initial_edge = True with sysfs.value_descriptor(pin) as fd: e = select.epoll() e.register(fd, EPOLLIN | EPOLLET | EPOLLPRI) try: while not finished: # TODO: implement bouncetime events = e.poll(timeout / 1000.0, maxevents=1) if initial_edge: initial_edge = False else: finished = True n = len(events) if n == 0: return None else: return pin finally: e.unregister(fd) e.close() finally: sysfs.edge(pin, NONE)
def run_forever(): listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) listen_fd.bind(('', master_connector.tcp_port)) listen_fd.listen(master_connector.max_minions) master_connector.epoll_fd.register(listen_fd.fileno(), select.EPOLLIN) datalist = {} master_connector.establish_vswitch('master') try: while True: epoll_list = master_connector.epoll_fd.poll() for fd, events in epoll_list: if fd == listen_fd.fileno(): fileno, addr = listen_fd.accept() fileno.setblocking(0) master_connector.epoll_fd.register(fileno.fileno(), select.EPOLLIN | select.EPOLLET) master_connector.conn[fileno.fileno()] = (fileno, addr[0]) master_connector.build_gre_conn('master', addr[0]) elif select.EPOLLIN & events: datas = b'' while True: try: data = master_connector.conn[fd][0].recv(10) if not data and not datas: master_connector.close_connection(fd) break else: datas += data except socket.error as msg: if msg.errno == errno.EAGAIN: try: datalist[fd] = master_connector.do_message_response(datas) master_connector.epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT) except: master_connector.close_connection(fd) else: master_connector.close_connection(fd) break elif select.EPOLLOUT & events: sendLen = 0 while True: sendLen += master_connector.conn[fd][0].send(datalist[fd][sendLen:]) if sendLen == len(datalist[fd]): break master_connector.epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET) elif select.EPOLLHUP & events: master_connector.close_connection(fd) else: continue finally: os.system('ovs-vsctl del-br ovs-master >/dev/null 2>&1')
def test_control_and_wait(self): client, server = self._connected_pair() ep = select.epoll(16) ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) now = time.time() events = ep.poll(1, 4) then = time.time() self.assertFalse(then - now > 0.1, then - now) events.sort() expected = [(client.fileno(), select.EPOLLOUT), (server.fileno(), select.EPOLLOUT)] expected.sort() self.assertEqual(events, expected) self.assertFalse(then - now > 0.01, then - now) now = time.time() events = ep.poll(timeout=2.1, maxevents=4) then = time.time() self.assertFalse(events) client.send(b"Hello!") server.send(b"world!!!") now = time.time() events = ep.poll(1, 4) then = time.time() self.assertFalse(then - now > 0.01) events.sort() expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT), (server.fileno(), select.EPOLLIN | select.EPOLLOUT)] expected.sort() self.assertEqual(events, expected) ep.unregister(client.fileno()) ep.modify(server.fileno(), select.EPOLLOUT) now = time.time() events = ep.poll(1, 4) then = time.time() self.assertFalse(then - now > 0.01) expected = [(server.fileno(), select.EPOLLOUT)] self.assertEqual(events, expected)
def test_control_and_wait(self): client, server = self._connected_pair() ep = select.epoll(16) ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) now = time.time() events = ep.poll(1, 4) then = time.time() self.assertFalse(then - now > 0.1, then - now) events.sort() expected = [(client.fileno(), select.EPOLLOUT), (server.fileno(), select.EPOLLOUT)] expected.sort() self.assertEqual(events, expected) events = ep.poll(timeout=2.1, maxevents=4) self.assertFalse(events) client.send("Hello!") server.send("world!!!") now = time.time() events = ep.poll(1, 4) then = time.time() self.assertFalse(then - now > 0.01) events.sort() expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT), (server.fileno(), select.EPOLLIN | select.EPOLLOUT)] expected.sort() self.assertEqual(events, expected) ep.unregister(client.fileno()) ep.modify(server.fileno(), select.EPOLLOUT) now = time.time() events = ep.poll(1, 4) then = time.time() self.assertFalse(then - now > 0.01) expected = [(server.fileno(), select.EPOLLOUT)] self.assertEqual(events, expected)
def __init__(self, number, direction=INPUT, callback=None, edge=None, active_low=0): """ @type number: int @param number: The pin number @type direction: int @param direction: Pin direction, enumerated by C{Direction} @type callback: callable @param callback: Method be called when pin changes state @type edge: int @param edge: The edge transition that triggers callback, enumerated by C{Edge} @type active_low: int @param active_low: Indicator of whether this pin uses inverted logic for HIGH-LOW transitions. """ self._number = number self._direction = direction self._callback = callback self._active_low = active_low if not os.path.isdir(self._sysfs_gpio_value_path()): with open(SYSFS_EXPORT_PATH, 'w') as export: export.write('%d' % number) else: Logger.debug("SysfsGPIO: Pin %d already exported" % number) self._fd = open(self._sysfs_gpio_value_path(), 'r+') if callback and not edge: raise Exception('You must supply a edge to trigger callback on') with open(self._sysfs_gpio_direction_path(), 'w') as fsdir: fsdir.write(direction) if edge: with open(self._sysfs_gpio_edge_path(), 'w') as fsedge: fsedge.write(edge) self._poll = select.epoll() self._poll.register(self, (select.EPOLLPRI | select.EPOLLET)) self.thread = Thread(target=self._run) self.thread.daemon = True self._running = True self.start() if active_low: if active_low not in ACTIVE_LOW_MODES: raise Exception('You must supply a value for active_low which is either 0 or 1.') with open(self._sysfs_gpio_active_low_path(), 'w') as fsactive_low: fsactive_low.write(str(active_low))
def test_control_and_wait(self): client, server = self._connected_pair() ep = select.epoll(16) ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) now = time.monotonic() events = ep.poll(1, 4) then = time.monotonic() self.assertFalse(then - now > 0.1, then - now) events.sort() expected = [(client.fileno(), select.EPOLLOUT), (server.fileno(), select.EPOLLOUT)] expected.sort() self.assertEqual(events, expected) events = ep.poll(timeout=2.1, maxevents=4) self.assertFalse(events) client.send(b"Hello!") server.send(b"world!!!") now = time.monotonic() events = ep.poll(1, 4) then = time.monotonic() self.assertFalse(then - now > 0.01) events.sort() expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT), (server.fileno(), select.EPOLLIN | select.EPOLLOUT)] expected.sort() self.assertEqual(events, expected) ep.unregister(client.fileno()) ep.modify(server.fileno(), select.EPOLLOUT) now = time.monotonic() events = ep.poll(1, 4) then = time.monotonic() self.assertFalse(then - now > 0.01) expected = [(server.fileno(), select.EPOLLOUT)] self.assertEqual(events, expected)
def test_control_and_wait(self): client, server = self._connected_pair() ep = select.epoll(16) ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) now = time.time() events = ep.poll(1, 4) then = time.time() self.assertFalse(then - now > 0.1, then - now) events.sort() expected = [(client.fileno(), select.EPOLLOUT), (server.fileno(), select.EPOLLOUT)] expected.sort() self.assertEqual(events, expected) self.assertFalse(then - now > 0.01, then - now) now = time.time() events = ep.poll(timeout=2.1, maxevents=4) then = time.time() self.assertFalse(events) client.send("Hello!") server.send("world!!!") now = time.time() events = ep.poll(1, 4) then = time.time() self.assertFalse(then - now > 0.01) events.sort() expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT), (server.fileno(), select.EPOLLIN | select.EPOLLOUT)] expected.sort() self.assertEqual(events, expected) ep.unregister(client.fileno()) ep.modify(server.fileno(), select.EPOLLOUT) now = time.time() events = ep.poll(1, 4) then = time.time() self.assertFalse(then - now > 0.01) expected = [(server.fileno(), select.EPOLLOUT)] self.assertEqual(events, expected)