Python select 模块,EPOLLET 实例源码

我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用select.EPOLLET

项目:respeaker_python_library    作者:respeaker    | 项目源码 | 文件源码
def _run(self):

        while self._running:
            events = self._poll.poll(EPOLL_TIMEOUT)
            for fd, event in events:
                if not (event & (select.EPOLLPRI | select.EPOLLET)):
                    continue

                self.changed(self.read())
项目:TanzoBot    作者:tanzilli    | 项目源码 | 文件源码
def wait_edge(fd,pin,callback,debouncingtime):
    debouncingtime=debouncingtime/1000
    timestampprec=time.time()
    counter=0
    po = select.epoll()
    po.register(fd,select.EPOLLET)
    while True:
        events = po.poll()
        timestamp=time.time()
        if (timestamp-timestampprec>debouncingtime) and counter>0:
            callback(pin)
        counter=counter+1
        timestampprec=timestamp
项目:TanzoBot    作者:tanzilli    | 项目源码 | 文件源码
def wait_edge(self,fd,callback,debouncingtime):
        # Convert in millisecondi
        debouncingtime=debouncingtime/1000.0 
        timestampprec=time.time()
        counter=0
        po = select.epoll()
        po.register(fd,select.EPOLLET)
        while True:
            events = po.poll()
            timestamp=time.time()
            if (timestamp-timestampprec>debouncingtime) and counter>0:
                callback()
            counter=counter+1
            timestampprec=timestamp
项目:OPi.GPIO    作者:rm-hull    | 项目源码 | 文件源码
def run(self):
        self.exc = None
        try:
            sysfs.edge(self._pin, self._trigger)
            initial_edge = True

            with sysfs.value_descriptor(self._pin) as fd:
                e = select.epoll()
                e.register(fd, EPOLLIN | EPOLLET | EPOLLPRI)
                try:
                    while not self._finished:
                        events = e.poll(0.1, maxevents=1)
                        if initial_edge:
                            initial_edge = False
                        elif len(events) > 0:
                            with self._lock:
                                self._event_detected = True
                                self.notify_callbacks()

                finally:
                    e.unregister(fd)
                    e.close()

        except BaseException as e:
            self.exc = e

        finally:
            sysfs.edge(self._pin, NONE)
项目:OPi.GPIO    作者:rm-hull    | 项目源码 | 文件源码
def blocking_wait_for_edge(pin, trigger, timeout=-1):
    assert trigger in [RISING, FALLING, BOTH]

    if pin in _threads:
        raise RuntimeError("Conflicting edge detection events already exist for this GPIO channel")

    try:
        sysfs.edge(pin, trigger)

        finished = False
        initial_edge = True

        with sysfs.value_descriptor(pin) as fd:
            e = select.epoll()
            e.register(fd, EPOLLIN | EPOLLET | EPOLLPRI)
            try:
                while not finished:
                    # TODO: implement bouncetime
                    events = e.poll(timeout / 1000.0, maxevents=1)
                    if initial_edge:
                        initial_edge = False
                    else:
                        finished = True

                n = len(events)
                if n == 0:
                    return None
                else:
                    return pin
            finally:
                e.unregister(fd)
                e.close()

    finally:
        sysfs.edge(pin, NONE)
项目:docklet    作者:unias    | 项目源码 | 文件源码
def run_forever():
        listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
        listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        listen_fd.bind(('', master_connector.tcp_port))
        listen_fd.listen(master_connector.max_minions)

        master_connector.epoll_fd.register(listen_fd.fileno(), select.EPOLLIN)

        datalist = {}

        master_connector.establish_vswitch('master')
        try:
            while True:
                epoll_list = master_connector.epoll_fd.poll()
                for fd, events in epoll_list:
                    if fd == listen_fd.fileno():
                        fileno, addr = listen_fd.accept()
                        fileno.setblocking(0)
                        master_connector.epoll_fd.register(fileno.fileno(), select.EPOLLIN | select.EPOLLET)
                        master_connector.conn[fileno.fileno()] = (fileno, addr[0])
                        master_connector.build_gre_conn('master', addr[0])
                    elif select.EPOLLIN & events:
                        datas = b''
                        while True:
                            try:
                                data = master_connector.conn[fd][0].recv(10)
                                if not data and not datas:
                                    master_connector.close_connection(fd)
                                    break
                                else:
                                    datas += data
                            except socket.error as msg:
                                if msg.errno == errno.EAGAIN:
                                    try:
                                        datalist[fd] = master_connector.do_message_response(datas)
                                        master_connector.epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT)
                                    except:
                                        master_connector.close_connection(fd)
                                else:
                                    master_connector.close_connection(fd)
                                break
                    elif select.EPOLLOUT & events:
                        sendLen = 0
                        while True:
                            sendLen += master_connector.conn[fd][0].send(datalist[fd][sendLen:])
                            if sendLen == len(datalist[fd]):
                                break
                        master_connector.epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET)
                    elif select.EPOLLHUP & events:
                        master_connector.close_connection(fd)
                    else:
                        continue
        finally:
            os.system('ovs-vsctl del-br ovs-master >/dev/null 2>&1')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_control_and_wait(self):
        client, server = self._connected_pair()

        ep = select.epoll(16)
        ep.register(server.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
        ep.register(client.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)

        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.1, then - now)

        events.sort()
        expected = [(client.fileno(), select.EPOLLOUT),
                    (server.fileno(), select.EPOLLOUT)]
        expected.sort()

        self.assertEqual(events, expected)
        self.assertFalse(then - now > 0.01, then - now)

        now = time.time()
        events = ep.poll(timeout=2.1, maxevents=4)
        then = time.time()
        self.assertFalse(events)

        client.send(b"Hello!")
        server.send(b"world!!!")

        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.01)

        events.sort()
        expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
                    (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
        expected.sort()

        self.assertEqual(events, expected)

        ep.unregister(client.fileno())
        ep.modify(server.fileno(), select.EPOLLOUT)
        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.01)

        expected = [(server.fileno(), select.EPOLLOUT)]
        self.assertEqual(events, expected)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_control_and_wait(self):
        client, server = self._connected_pair()

        ep = select.epoll(16)
        ep.register(server.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
        ep.register(client.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)

        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.1, then - now)

        events.sort()
        expected = [(client.fileno(), select.EPOLLOUT),
                    (server.fileno(), select.EPOLLOUT)]
        expected.sort()

        self.assertEqual(events, expected)

        events = ep.poll(timeout=2.1, maxevents=4)
        self.assertFalse(events)

        client.send("Hello!")
        server.send("world!!!")

        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.01)

        events.sort()
        expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
                    (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
        expected.sort()

        self.assertEqual(events, expected)

        ep.unregister(client.fileno())
        ep.modify(server.fileno(), select.EPOLLOUT)
        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.01)

        expected = [(server.fileno(), select.EPOLLOUT)]
        self.assertEqual(events, expected)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_control_and_wait(self):
        client, server = self._connected_pair()

        ep = select.epoll(16)
        ep.register(server.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
        ep.register(client.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)

        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.1, then - now)

        events.sort()
        expected = [(client.fileno(), select.EPOLLOUT),
                    (server.fileno(), select.EPOLLOUT)]
        expected.sort()

        self.assertEqual(events, expected)

        events = ep.poll(timeout=2.1, maxevents=4)
        self.assertFalse(events)

        client.send("Hello!")
        server.send("world!!!")

        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.01)

        events.sort()
        expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
                    (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
        expected.sort()

        self.assertEqual(events, expected)

        ep.unregister(client.fileno())
        ep.modify(server.fileno(), select.EPOLLOUT)
        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.01)

        expected = [(server.fileno(), select.EPOLLOUT)]
        self.assertEqual(events, expected)
项目:respeaker_python_library    作者:respeaker    | 项目源码 | 文件源码
def __init__(self, number, direction=INPUT, callback=None, edge=None, active_low=0):
        """
        @type  number: int
        @param number: The pin number
        @type  direction: int
        @param direction: Pin direction, enumerated by C{Direction}
        @type  callback: callable
        @param callback: Method be called when pin changes state
        @type  edge: int
        @param edge: The edge transition that triggers callback,
                     enumerated by C{Edge}
        @type active_low: int
        @param active_low: Indicator of whether this pin uses inverted
                           logic for HIGH-LOW transitions.
        """
        self._number = number
        self._direction = direction
        self._callback = callback
        self._active_low = active_low

        if not os.path.isdir(self._sysfs_gpio_value_path()):
            with open(SYSFS_EXPORT_PATH, 'w') as export:
                export.write('%d' % number)
        else:
            Logger.debug("SysfsGPIO: Pin %d already exported" % number)

        self._fd = open(self._sysfs_gpio_value_path(), 'r+')

        if callback and not edge:
            raise Exception('You must supply a edge to trigger callback on')

        with open(self._sysfs_gpio_direction_path(), 'w') as fsdir:
            fsdir.write(direction)

        if edge:
            with open(self._sysfs_gpio_edge_path(), 'w') as fsedge:
                fsedge.write(edge)
                self._poll = select.epoll()
                self._poll.register(self, (select.EPOLLPRI | select.EPOLLET))
                self.thread = Thread(target=self._run)
                self.thread.daemon = True
                self._running = True
                self.start()

        if active_low:
            if active_low not in ACTIVE_LOW_MODES:
                raise Exception('You must supply a value for active_low which is either 0 or 1.')
            with open(self._sysfs_gpio_active_low_path(), 'w') as fsactive_low:
                fsactive_low.write(str(active_low))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_control_and_wait(self):
        client, server = self._connected_pair()

        ep = select.epoll(16)
        ep.register(server.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
        ep.register(client.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)

        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.1, then - now)

        events.sort()
        expected = [(client.fileno(), select.EPOLLOUT),
                    (server.fileno(), select.EPOLLOUT)]
        expected.sort()

        self.assertEqual(events, expected)
        self.assertFalse(then - now > 0.01, then - now)

        now = time.time()
        events = ep.poll(timeout=2.1, maxevents=4)
        then = time.time()
        self.assertFalse(events)

        client.send(b"Hello!")
        server.send(b"world!!!")

        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.01)

        events.sort()
        expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
                    (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
        expected.sort()

        self.assertEqual(events, expected)

        ep.unregister(client.fileno())
        ep.modify(server.fileno(), select.EPOLLOUT)
        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.01)

        expected = [(server.fileno(), select.EPOLLOUT)]
        self.assertEqual(events, expected)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_control_and_wait(self):
        client, server = self._connected_pair()

        ep = select.epoll(16)
        ep.register(server.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
        ep.register(client.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)

        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.1, then - now)

        events.sort()
        expected = [(client.fileno(), select.EPOLLOUT),
                    (server.fileno(), select.EPOLLOUT)]
        expected.sort()

        self.assertEqual(events, expected)

        events = ep.poll(timeout=2.1, maxevents=4)
        self.assertFalse(events)

        client.send("Hello!")
        server.send("world!!!")

        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.01)

        events.sort()
        expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
                    (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
        expected.sort()

        self.assertEqual(events, expected)

        ep.unregister(client.fileno())
        ep.modify(server.fileno(), select.EPOLLOUT)
        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.01)

        expected = [(server.fileno(), select.EPOLLOUT)]
        self.assertEqual(events, expected)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_control_and_wait(self):
        client, server = self._connected_pair()

        ep = select.epoll(16)
        ep.register(server.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
        ep.register(client.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)

        now = time.monotonic()
        events = ep.poll(1, 4)
        then = time.monotonic()
        self.assertFalse(then - now > 0.1, then - now)

        events.sort()
        expected = [(client.fileno(), select.EPOLLOUT),
                    (server.fileno(), select.EPOLLOUT)]
        expected.sort()

        self.assertEqual(events, expected)

        events = ep.poll(timeout=2.1, maxevents=4)
        self.assertFalse(events)

        client.send(b"Hello!")
        server.send(b"world!!!")

        now = time.monotonic()
        events = ep.poll(1, 4)
        then = time.monotonic()
        self.assertFalse(then - now > 0.01)

        events.sort()
        expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
                    (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
        expected.sort()

        self.assertEqual(events, expected)

        ep.unregister(client.fileno())
        ep.modify(server.fileno(), select.EPOLLOUT)
        now = time.monotonic()
        events = ep.poll(1, 4)
        then = time.monotonic()
        self.assertFalse(then - now > 0.01)

        expected = [(server.fileno(), select.EPOLLOUT)]
        self.assertEqual(events, expected)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_control_and_wait(self):
        client, server = self._connected_pair()

        ep = select.epoll(16)
        ep.register(server.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
        ep.register(client.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)

        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.1, then - now)

        events.sort()
        expected = [(client.fileno(), select.EPOLLOUT),
                    (server.fileno(), select.EPOLLOUT)]
        expected.sort()

        self.assertEqual(events, expected)
        self.assertFalse(then - now > 0.01, then - now)

        now = time.time()
        events = ep.poll(timeout=2.1, maxevents=4)
        then = time.time()
        self.assertFalse(events)

        client.send("Hello!")
        server.send("world!!!")

        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.01)

        events.sort()
        expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
                    (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
        expected.sort()

        self.assertEqual(events, expected)

        ep.unregister(client.fileno())
        ep.modify(server.fileno(), select.EPOLLOUT)
        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
        self.assertFalse(then - now > 0.01)

        expected = [(server.fileno(), select.EPOLLOUT)]
        self.assertEqual(events, expected)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_control_and_wait(self):
        client, server = self._connected_pair()

        ep = select.epoll(16)
        ep.register(server.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
        ep.register(client.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)

        now = time.monotonic()
        events = ep.poll(1, 4)
        then = time.monotonic()
        self.assertFalse(then - now > 0.1, then - now)

        events.sort()
        expected = [(client.fileno(), select.EPOLLOUT),
                    (server.fileno(), select.EPOLLOUT)]
        expected.sort()

        self.assertEqual(events, expected)

        events = ep.poll(timeout=2.1, maxevents=4)
        self.assertFalse(events)

        client.send(b"Hello!")
        server.send(b"world!!!")

        now = time.monotonic()
        events = ep.poll(1, 4)
        then = time.monotonic()
        self.assertFalse(then - now > 0.01)

        events.sort()
        expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
                    (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
        expected.sort()

        self.assertEqual(events, expected)

        ep.unregister(client.fileno())
        ep.modify(server.fileno(), select.EPOLLOUT)
        now = time.monotonic()
        events = ep.poll(1, 4)
        then = time.monotonic()
        self.assertFalse(then - now > 0.01)

        expected = [(server.fileno(), select.EPOLLOUT)]
        self.assertEqual(events, expected)