我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用eventlet.hubs()。
def test_schedule(self): hub = hubs.get_hub() # clean up the runloop, preventing side effects from previous tests # on this thread if hub.running: hub.abort() eventlet.sleep(0) called = [] # t = timer.Timer(0, lambda: (called.append(True), hub.abort())) # t.schedule() # let's have a timer somewhere in the future; make sure abort() still works # (for pyevent, its dispatcher() does not exit if there is something scheduled) # XXX pyevent handles this, other hubs do not # hubs.get_hub().schedule_call_global(10000, lambda: (called.append(True), hub.abort())) hubs.get_hub().schedule_call_global(0, lambda: (called.append(True), hub.abort())) hub.default_sleep = lambda: 0.0 hub.switch() assert called assert not hub.running
def test_sleep(self): # even if there was an error in the mainloop, the hub should continue # to work start = time.time() eventlet.sleep(DELAY) delay = time.time() - start assert delay >= DELAY * \ 0.9, 'sleep returned after %s seconds (was scheduled for %s)' % ( delay, DELAY) def fail(): 1 // 0 hubs.get_hub().schedule_call_global(0, fail) start = time.time() eventlet.sleep(DELAY) delay = time.time() - start assert delay >= DELAY * \ 0.9, 'sleep returned after %s seconds (was scheduled for %s)' % ( delay, DELAY)
def test_exceptionleaks(self): # tests expected behaviour with all versions of greenlet def test_gt(sem): try: raise KeyError() except KeyError: sem.release() hubs.get_hub().switch() # semaphores for controlling execution order sem = eventlet.Semaphore() sem.acquire() g = eventlet.spawn(test_gt, sem) try: sem.acquire() assert sys.exc_info()[0] is None finally: g.kill()
def test_kill(self): """ Checks that killing a process after the hub runloop dies does not immediately return to hub greenlet's parent and schedule a redundant timer. """ hub = hubs.get_hub() def dummyproc(): hub.switch() g = eventlet.spawn(dummyproc) eventlet.sleep(0) # let dummyproc run assert hub.greenlet.parent == eventlet.greenthread.getcurrent() self.assertRaises(KeyboardInterrupt, hub.greenlet.throw, KeyboardInterrupt()) # kill dummyproc, this schedules a timer to return execution to # this greenlet before throwing an exception in dummyproc. # it is from this timer that execution should be returned to this # greenlet, and not by propogating of the terminating greenlet. g.kill() with eventlet.Timeout(0.5, self.CustomException()): # we now switch to the hub, there should be no existing timers # that switch back to this greenlet and so this hub.switch() # call should block indefinitely. self.assertRaises(self.CustomException, hub.switch)
def test_parent(self): """ Checks that a terminating greenthread whose parent was a previous, now-defunct hub greenlet returns execution to the hub runloop and not the hub greenlet's parent. """ hub = hubs.get_hub() def dummyproc(): pass g = eventlet.spawn(dummyproc) assert hub.greenlet.parent == eventlet.greenthread.getcurrent() self.assertRaises(KeyboardInterrupt, hub.greenlet.throw, KeyboardInterrupt()) assert not g.dead # check dummyproc hasn't completed with eventlet.Timeout(0.5, self.CustomException()): # we now switch to the hub which will allow # completion of dummyproc. # this should return execution back to the runloop and not # this greenlet so that hub.switch() would block indefinitely. self.assertRaises(self.CustomException, hub.switch) assert g.dead # sanity check that dummyproc has completed
def verify_hub_empty(): def format_listener(listener): return 'Listener %r for greenlet %r with run callback %r' % ( listener, listener.greenlet, getattr(listener.greenlet, 'run', None)) from eventlet import hubs hub = hubs.get_hub() readers = hub.get_readers() writers = hub.get_writers() num_readers = len(readers) num_writers = len(writers) num_timers = hub.get_timers_count() assert num_readers == 0 and num_writers == 0, \ "Readers: %s (%d) Writers: %s (%d)" % ( ', '.join(map(format_listener, readers)), num_readers, ', '.join(map(format_listener, writers)), num_writers, )
def patch(self): hubs.use_hub() eventlet.monkey_patch(os=False) patch_sendfile()
def test_cancel_immediate(self): hub = hubs.get_hub() stimers = hub.get_timers_count() scanceled = hub.timers_canceled for i in six.moves.range(2000): t = hubs.get_hub().schedule_call_global(60, noop) t.cancel() self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count() + 1) # there should be fewer than 1000 new timers and canceled self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers) self.assert_less_than_equal(hub.timers_canceled, 1000)
def test_cancel_accumulated(self): hub = hubs.get_hub() stimers = hub.get_timers_count() scanceled = hub.timers_canceled for i in six.moves.range(2000): t = hubs.get_hub().schedule_call_global(60, noop) eventlet.sleep() self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count() + 1) t.cancel() self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count() + 1, hub.timers) # there should be fewer than 1000 new timers and canceled self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers) self.assert_less_than_equal(hub.timers_canceled, 1000)
def test_cancel_proportion(self): # if fewer than half the pending timers are canceled, it should # not clean them out hub = hubs.get_hub() uncanceled_timers = [] stimers = hub.get_timers_count() scanceled = hub.timers_canceled for i in six.moves.range(1000): # 2/3rds of new timers are uncanceled t = hubs.get_hub().schedule_call_global(60, noop) t2 = hubs.get_hub().schedule_call_global(60, noop) t3 = hubs.get_hub().schedule_call_global(60, noop) eventlet.sleep() self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count() + 1) t.cancel() self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count() + 1) uncanceled_timers.append(t2) uncanceled_timers.append(t3) # 3000 new timers, plus a few extras self.assert_less_than_equal(stimers + 3000, stimers + hub.get_timers_count()) self.assertEqual(hub.timers_canceled, 1000) for t in uncanceled_timers: t.cancel() self.assert_less_than_equal(hub.timers_canceled, hub.get_timers_count()) eventlet.sleep()
def test_local(self): lst = [1] eventlet.spawn(hubs.get_hub().schedule_call_local, DELAY, lst.pop) eventlet.sleep(0) eventlet.sleep(DELAY * 2) assert lst == [1], lst
def test_ordering(self): lst = [] hubs.get_hub().schedule_call_global(DELAY * 2, lst.append, 3) hubs.get_hub().schedule_call_global(DELAY, lst.append, 1) hubs.get_hub().schedule_call_global(DELAY, lst.append, 2) while len(lst) < 3: eventlet.sleep(DELAY) self.assertEqual(lst, [1, 2, 3])
def test_debug_listeners(self): hubs.get_hub().set_debug_listeners(True) hubs.get_hub().set_debug_listeners(False)
def test_timer_exceptions(self): hubs.get_hub().set_timer_exceptions(True) hubs.get_hub().set_timer_exceptions(False)
def test_kqueue_unsupported(self): # https://github.com/eventlet/eventlet/issues/38 # get_hub on windows broken by kqueue module_source = r''' from __future__ import print_function # Simulate absence of kqueue even on platforms that support it. import select try: del select.kqueue except AttributeError: pass from eventlet.support.six.moves import builtins original_import = builtins.__import__ def fail_import(name, *args, **kwargs): if 'epoll' in name: raise ImportError('disabled for test') if 'kqueue' in name: print('kqueue tried') return original_import(name, *args, **kwargs) builtins.__import__ = fail_import import eventlet.hubs eventlet.hubs.get_default_hub() print('ok') ''' self.write_to_tempfile('newmod', module_source) output, _ = self.launch_subprocess('newmod.py') self.assertEqual(output, 'kqueue tried\nok\n')
def using_pyevent(_f): from eventlet.hubs import get_hub return 'pyevent' in type(get_hub()).__module__