我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用eventlet.spawn()。
def _get_port_from_pool(self, pool_key, pod, subnets): try: port_id = self._available_ports_pools[pool_key].pop() except IndexError: raise exceptions.ResourceNotReady(pod) if config.CONF.kubernetes.port_debug: neutron = clients.get_neutron_client() neutron.update_port( port_id, { "port": { 'name': pod['metadata']['name'], 'device_id': pod['metadata']['uid'] } }) # check if the pool needs to be populated if (self._get_pool_size(pool_key) < oslo_cfg.CONF.vif_pool.ports_pool_min): eventlet.spawn(self._populate_pool, pool_key, pod, subnets) return self._existing_vifs[port_id]
def _get_port_from_pool(self, pool_key, pod, subnets): try: port_id = self._available_ports_pools[pool_key].pop() except IndexError: raise exceptions.ResourceNotReady(pod) if config.CONF.kubernetes.port_debug: neutron = clients.get_neutron_client() neutron.update_port( port_id, { "port": { 'name': pod['metadata']['name'], } }) # check if the pool needs to be populated if (self._get_pool_size(pool_key) < oslo_cfg.CONF.vif_pool.ports_pool_min): eventlet.spawn(self._populate_pool, pool_key, pod, subnets) return self._existing_vifs[port_id]
def __init__(self): bigswitch_config.register_config() networking_bigswitch_l3_pe.lib.config.register_config() api_url = cfg.CONF.networking_bigswitch_l3_pe.api_url username, password = cfg.CONF.RESTPROXY.server_auth.split(':') self.neutron_id = cfg.CONF.RESTPROXY.neutron_id exclude_physical_networks = \ cfg.CONF.networking_bigswitch_l3_pe.exclude_physical_networks self.sync = Synchronizer(api_url, username, password, self.neutron_id, exclude_physical_networks) self.notifier = EventNotifier() self.watcher = EventWatcher() self.keystone_client = KeystoneClient() self.db_plugin = db_base_plugin_v2.NeutronDbPluginV2() eventlet.spawn(self.watcher.watch) eventlet.spawn(self._bcf_sync, cfg.CONF.networking_bigswitch_l3_pe.sync_interval)
def init_host(self, tg, **kwargs): LOG.info(_LI('Willing init host function.......')) if CONF.is_all: pid = os.fork() if pid == 0: child_started = False while True: enable_spawn = kwargs.get('enable_spawn', True) if enable_spawn: eventlet.spawn(self.get_all_user_all_weibo_info, **kwargs) child_started = True else: kwargs['tg'] = tg self.get_all_user_all_weibo_info(**kwargs) child_started = True if not child_started: break os._exit(2) LOG.debug(_LI('Started child %d' % pid)) # ?????? 14400s ????4?
def _eventlet_serve(sock, handle, concurrency): """ Serve requests forever. This code is nearly identical to ``eventlet.convenience.serve`` except that it attempts to join the pool at the end, which allows for gunicorn graceful shutdowns. """ pool = eventlet.greenpool.GreenPool(concurrency) server_gt = eventlet.greenthread.getcurrent() while True: try: conn, addr = sock.accept() gt = pool.spawn(handle, conn, addr) gt.link(_eventlet_stop, server_gt, conn) conn, addr, gt = None, None, None except eventlet.StopServe: pool.waitall() return
def _eventlet_serve(sock, handle, concurrency): """ Serve requests forever. This code is nearly identical to ``eventlet.convenience.serve`` except that it attempts to join the pool at the end, which allows for gunicorn graceful shutdowns. """ pool = eventlet.greenpool.GreenPool(concurrency) server_gt = eventlet.greenthread.getcurrent() while True: try: conn, addr = sock.accept() gt = pool.spawn(handle, conn, addr) gt.link(_eventlet_stop, server_gt, conn) conn, addr, gt = None, None, None except eventlet.StopServe: sock.close() pool.waitall() return
def run(self): acceptors = [] for sock in self.sockets: gsock = GreenSocket(sock) gsock.setblocking(1) hfun = partial(self.handle, gsock) acceptor = eventlet.spawn(_eventlet_serve, gsock, hfun, self.worker_connections) acceptors.append(acceptor) eventlet.sleep(0.0) while self.alive: self.notify() eventlet.sleep(1.0) self.notify() try: with eventlet.Timeout(self.cfg.graceful_timeout) as t: [a.kill(eventlet.StopServe()) for a in acceptors] [a.wait() for a in acceptors] except eventlet.Timeout as te: if te != t: raise [a.kill() for a in acceptors]
def main(port = 6659): wireUpBlpapiImplementation(blpapi) server = None try: try: app.sessionForRequests = openBloombergSession() app.sessionForSubscriptions = openBloombergSession() app.allSubscriptions = {} except: traceback.print_exc() eventlet.spawn(lambda: handleSubscriptions(app, socketio)) socketio.run(app, port = port) except KeyboardInterrupt: print("Ctrl+C received, exiting...") finally: if app.sessionForRequests is not None: app.sessionForRequests.stop() if app.sessionForSubscriptions is not None: app.sessionForSubscriptions.stop() if server is not None: server.socket.close()
def spawn(*args, **kwargs): raise_error = kwargs.pop('raise_error', False) def _launch(func, *args, **kwargs): # Mimic gevent's default raise_error=False behaviour # by not propagating an exception to the joiner. try: return func(*args, **kwargs) except TaskExit: pass except: if raise_error: raise # Log uncaught exception. # Note: this is an intentional divergence from gevent # behaviour; gevent silently ignores such exceptions. LOG.error('hub: uncaught exception: %s', traceback.format_exc()) return eventlet.spawn(_launch, *args, **kwargs)
def __init__(self, listen_info, handle=None, backlog=None, spawn='default', **ssl_args): assert backlog is None assert spawn == 'default' if ':' in listen_info[0]: self.server = eventlet.listen(listen_info, family=socket.AF_INET6) else: self.server = eventlet.listen(listen_info) if ssl_args: def wrap_and_handle(sock, addr): ssl_args.setdefault('server_side', True) handle(ssl.wrap_socket(sock, **ssl_args), addr) self.handle = wrap_and_handle else: self.handle = handle
def test_semaphore(self, inspect): slot = mock.Mock() slot.side_effect = lambda **k: time.sleep(.3) signal = Signal('tost') signal.connect(slot) x = Task.get_or_create(signal, dict(some_kwarg='foo'), logger=logging.getLogger('TaskX')) y = Task.get_or_create(signal, dict(some_kwarg='foo'), logger=logging.getLogger('TaskY')) eventlet.spawn(x) time.sleep(.1) eventlet.spawn(y) time.sleep(.1) assert slot.call_count == 1 time.sleep(.4) assert slot.call_count == 2
def spawn(func, *args, **kwargs): """Passthrough method for eventlet.spawn. This utility exists so that it can be stubbed for testing without interfering with the service spawns. It will also grab the context from the threadlocal store and add it to the store on the new thread. This allows for continuity in logging the context when using this method to spawn a new thread. """ _context = common_context.get_current() @functools.wraps(func) def context_wrapper(*args, **kwargs): # NOTE: If update_store is not called after spawn it won't be # available for the logger to pull from threadlocal storage. if _context is not None: _context.update_store() return func(*args, **kwargs) return eventlet.spawn(context_wrapper, *args, **kwargs)
def spawn_n(func, *args, **kwargs): """Passthrough method for eventlet.spawn_n. This utility exists so that it can be stubbed for testing without interfering with the service spawns. It will also grab the context from the threadlocal store and add it to the store on the new thread. This allows for continuity in logging the context when using this method to spawn a new thread. """ _context = common_context.get_current() @functools.wraps(func) def context_wrapper(*args, **kwargs): # NOTE: If update_store is not called after spawn_n it won't be # available for the logger to pull from threadlocal storage. if _context is not None: _context.update_store() func(*args, **kwargs) eventlet.spawn_n(context_wrapper, *args, **kwargs)
def coro_wrap_greenthread(): result = [] gt = eventlet.spawn(eventlet_slow_append, result, 1, 0.020) value = yield From(aioeventlet.wrap_greenthread(gt)) result.append(value) gt = eventlet.spawn(eventlet_slow_append, result, 2, 0.010) value = yield From(aioeventlet.wrap_greenthread(gt)) result.append(value) gt = eventlet.spawn(eventlet_slow_error) try: yield From(aioeventlet.wrap_greenthread(gt)) except ValueError as exc: result.append(str(exc)) result.append(4) raise Return(result)
def test_soon_spawn(self): result = [] def func1(): result.append("spawn") def func2(): result.append("spawn_after") self.loop.stop() def schedule_greenthread(): eventlet.spawn(func1) eventlet.spawn_after(0.010, func2) self.loop.call_soon(schedule_greenthread) self.loop.run_forever() self.assertEqual(result, ["spawn", "spawn_after"])
def test_yield_future_not_running(self): result = [] def func(event, fut): event.send('link') value = aioeventlet.yield_future(fut) result.append(value) self.loop.stop() event = eventlet.event.Event() fut = asyncio.Future(loop=self.loop) eventlet.spawn(func, event, fut) event.wait() self.loop.call_soon(fut.set_result, 21) self.loop.run_forever() self.assertEqual(result, [21])
def test_yield_future_invalid_type(self): def func(obj): return aioeventlet.yield_future(obj) @asyncio.coroutine def coro_func(): print("do something") def regular_func(): return 3 for obj in (coro_func, regular_func): gt = eventlet.spawn(func, coro_func) # ignore logged traceback with tests.mock.patch('traceback.print_exception') as m_print: self.assertRaises(TypeError, gt.wait)
def test_hub_exceptions(self): debug.hub_exceptions(True) server = eventlet.listen(('0.0.0.0', 0)) client = eventlet.connect(('127.0.0.1', server.getsockname()[1])) client_2, addr = server.accept() def hurl(s): s.recv(1) {}[1] # keyerror with capture_stderr() as fake: gt = eventlet.spawn(hurl, client_2) eventlet.sleep(0) client.send(b' ') eventlet.sleep(0) # allow the "hurl" greenlet to trigger the KeyError # not sure why the extra context switch is needed eventlet.sleep(0) self.assertRaises(KeyError, gt.wait) debug.hub_exceptions(False) # look for the KeyError exception in the traceback assert 'KeyError: 1' in fake.getvalue(), "Traceback not in:\n" + fake.getvalue()
def test_exceptionleaks(self): # tests expected behaviour with all versions of greenlet def test_gt(sem): try: raise KeyError() except KeyError: sem.release() hubs.get_hub().switch() # semaphores for controlling execution order sem = eventlet.Semaphore() sem.acquire() g = eventlet.spawn(test_gt, sem) try: sem.acquire() assert sys.exc_info()[0] is None finally: g.kill()
def test_kill(self): """ Checks that killing a process after the hub runloop dies does not immediately return to hub greenlet's parent and schedule a redundant timer. """ hub = hubs.get_hub() def dummyproc(): hub.switch() g = eventlet.spawn(dummyproc) eventlet.sleep(0) # let dummyproc run assert hub.greenlet.parent == eventlet.greenthread.getcurrent() self.assertRaises(KeyboardInterrupt, hub.greenlet.throw, KeyboardInterrupt()) # kill dummyproc, this schedules a timer to return execution to # this greenlet before throwing an exception in dummyproc. # it is from this timer that execution should be returned to this # greenlet, and not by propogating of the terminating greenlet. g.kill() with eventlet.Timeout(0.5, self.CustomException()): # we now switch to the hub, there should be no existing timers # that switch back to this greenlet and so this hub.switch() # call should block indefinitely. self.assertRaises(self.CustomException, hub.switch)
def test_parent(self): """ Checks that a terminating greenthread whose parent was a previous, now-defunct hub greenlet returns execution to the hub runloop and not the hub greenlet's parent. """ hub = hubs.get_hub() def dummyproc(): pass g = eventlet.spawn(dummyproc) assert hub.greenlet.parent == eventlet.greenthread.getcurrent() self.assertRaises(KeyboardInterrupt, hub.greenlet.throw, KeyboardInterrupt()) assert not g.dead # check dummyproc hasn't completed with eventlet.Timeout(0.5, self.CustomException()): # we now switch to the hub which will allow # completion of dummyproc. # this should return execution back to the runloop and not # this greenlet so that hub.switch() would block indefinitely. self.assertRaises(self.CustomException, hub.switch) assert g.dead # sanity check that dummyproc has completed
def test_raised_multiple_readers(self): debug.hub_prevent_multiple_readers(True) def handle(sock, addr): sock.recv(1) sock.sendall(b"a") raise eventlet.StopServe() listener = eventlet.listen(('127.0.0.1', 0)) eventlet.spawn(eventlet.serve, listener, handle) def reader(s): s.recv(1) s = eventlet.connect(('127.0.0.1', listener.getsockname()[1])) a = eventlet.spawn(reader, s) eventlet.sleep(0) self.assertRaises(RuntimeError, s.recv, 1) s.sendall(b'b') a.wait()
def test_zero_timeout_and_back(self): listen = eventlet.listen(('', 0)) # Keep reference to server side of socket server = eventlet.spawn(listen.accept) client = eventlet.connect(listen.getsockname()) client.settimeout(0.05) # Now must raise socket.timeout self.assertRaises(socket.timeout, client.recv, 1) client.settimeout(0) # Now must raise socket.error with EAGAIN try: client.recv(1) assert False except socket.error as e: assert get_errno(e) == errno.EAGAIN client.settimeout(0.05) # Now socket.timeout again self.assertRaises(socket.timeout, client.recv, 1) server.wait()
def test_pipe_writes_large_messages(self): r, w = os.pipe() r = greenio.GreenPipe(r, 'rb') w = greenio.GreenPipe(w, 'wb') large_message = b"".join([1024 * six.int2byte(i) for i in range(65)]) def writer(): w.write(large_message) w.close() gt = eventlet.spawn(writer) for i in range(65): buf = r.read(1024) expected = 1024 * six.int2byte(i) self.assertEqual( buf, expected, "expected=%r..%r, found=%r..%r iter=%d" % (expected[:4], expected[-4:], buf[:4], buf[-4:], i)) gt.wait()
def test_socket_file_read_non_int(): listen_socket = eventlet.listen(('localhost', 0)) def server(): conn, _ = listen_socket.accept() conn.recv(1) conn.sendall(b'response') conn.close() eventlet.spawn(server) sock = eventlet.connect(listen_socket.getsockname()) fd = sock.makefile('rwb') fd.write(b'?') fd.flush() with eventlet.Timeout(1): try: fd.read("This shouldn't work") assert False except TypeError: pass
def test_putting_to_queue(self): timer = eventlet.Timeout(0.1) try: size = 2 self.pool = IntPool(min_size=0, max_size=size) queue = Queue() results = [] def just_put(pool_item, index): self.pool.put(pool_item) queue.put(index) for index in six.moves.range(size + 1): pool_item = self.pool.get() eventlet.spawn(just_put, pool_item, index) for _ in six.moves.range(size + 1): x = queue.get() results.append(x) self.assertEqual(sorted(results), list(six.moves.range(size + 1))) finally: timer.cancel()
def test_calls_init(self): init_args = [] class Init(corolocal.local): def __init__(self, *args): init_args.append((args, eventlet.getcurrent())) my_local = Init(1, 2, 3) self.assertEqual(init_args[0][0], (1, 2, 3)) self.assertEqual(init_args[0][1], eventlet.getcurrent()) def do_something(): my_local.foo = 'bar' self.assertEqual(len(init_args), 2, init_args) self.assertEqual(init_args[1][0], (1, 2, 3)) self.assertEqual(init_args[1][1], eventlet.getcurrent()) eventlet.spawn(do_something).wait()
def test_calling_methods(self): class Caller(corolocal.local): def callme(self): return self.foo my_local = Caller() my_local.foo = "foo1" self.assertEqual("foo1", my_local.callme()) def do_something(): my_local.foo = "foo2" self.assertEqual("foo2", my_local.callme()) eventlet.spawn(do_something).wait() my_local.foo = "foo3" self.assertEqual("foo3", my_local.callme())
def test_select_mark_file_as_reopened(): # https://github.com/eventlet/eventlet/pull/294 # Fix API inconsistency in select and Hub. # mark_as_closed takes one argument, but called without arguments. # on_error takes file descriptor, but called with an exception object. s = original_socket.socket() s.setblocking(0) s.bind(('127.0.0.1', 0)) s.listen(5) gt = eventlet.spawn(select.select, [s], [s], [s]) eventlet.sleep(0.01) with eventlet.Timeout(0.5) as t: with tests.assert_raises(hubs.IOClosed): hubs.get_hub().mark_as_reopened(s.fileno()) gt.wait() t.cancel()
def test_ssl_close(self): def serve(listener): sock, addr = listener.accept() sock.recv(8192) try: self.assertEqual(b'', sock.recv(8192)) except greenio.SSL.ZeroReturnError: pass sock = listen_ssl_socket() server_coro = eventlet.spawn(serve, sock) raw_client = eventlet.connect(sock.getsockname()) client = ssl.wrap_socket(raw_client) client.sendall(b'X') greenio.shutdown_safe(client) client.close() server_coro.wait()
def test_ssl_unwrap(self): def serve(): sock, addr = listener.accept() self.assertEqual(sock.recv(6), b'before') sock_ssl = ssl.wrap_socket(sock, tests.private_key_file, tests.certificate_file, server_side=True) sock_ssl.do_handshake() self.assertEqual(sock_ssl.recv(6), b'during') sock2 = sock_ssl.unwrap() self.assertEqual(sock2.recv(5), b'after') sock2.close() listener = eventlet.listen(('127.0.0.1', 0)) server_coro = eventlet.spawn(serve) client = eventlet.connect(listener.getsockname()) client.sendall(b'before') client_ssl = ssl.wrap_socket(client) client_ssl.do_handshake() client_ssl.sendall(b'during') client2 = client_ssl.unwrap() client2.sendall(b'after') server_coro.wait()