我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用types.CoroutineType()。
def _rule_action_match(self, message, _) : # installed as a message filter to invoke actions corresponding to rules # that the message matches. To avoid spurious method-not-handled errors # from eavesdropping on method calls not addressed to me, this routine # always returns a “handled” status. That means this same Connection # object should not be used for both eavesdropping and for normal # method calls. handled = False for entry in self._match_actions.values() : if matches_rule(message, entry.rule) : for action in entry.actions : result = action.func(self, message, action.user_data) if isinstance(result, types.CoroutineType) : assert self.loop != None, "no event loop to attach coroutine to" self.loop.create_task(result) #end if #end for handled = True # passed to at least one handler #end if #end for return \ (DBUS.HANDLER_RESULT_NOT_YET_HANDLED, DBUS.HANDLER_RESULT_HANDLED)[handled] #end _rule_action_match
def is_internal_attribute(obj, attr): """Test if the attribute given is an internal python attribute. For example this function returns `True` for the `func_code` attribute of python objects. This is useful if the environment method :meth:`~SandboxedEnvironment.is_safe_attribute` is overridden. >>> from jinja2.sandbox import is_internal_attribute >>> is_internal_attribute(str, "mro") True >>> is_internal_attribute(str, "upper") False """ if isinstance(obj, types.FunctionType): if attr in UNSAFE_FUNCTION_ATTRIBUTES: return True elif isinstance(obj, types.MethodType): if attr in UNSAFE_FUNCTION_ATTRIBUTES or \ attr in UNSAFE_METHOD_ATTRIBUTES: return True elif isinstance(obj, type): if attr == 'mro': return True elif isinstance(obj, (types.CodeType, types.TracebackType, types.FrameType)): return True elif isinstance(obj, types.GeneratorType): if attr in UNSAFE_GENERATOR_ATTRIBUTES: return True elif hasattr(types, 'CoroutineType') and isinstance(obj, types.CoroutineType): if attr in UNSAFE_COROUTINE_ATTRIBUTES: return True elif hasattr(types, 'AsyncGeneratorType') and isinstance(obj, types.AsyncGeneratorType): if attr in UNSAFE_ASYNC_GENERATOR_ATTRIBUTES: return True return attr.startswith('__')
def test_onerror_with_args_return_None(self): @decorators.onerror("<i>foo</i>", parse_mode="HTML") async def foo(beard, *args, **kwargs): return beard, args, kwargs foo_coro = foo(None) self.assertIsInstance(foo, types.FunctionType) self.assertIsInstance(foo_coro, types.CoroutineType) self.loop.run_until_complete(foo_coro)
def test_debugonly_return_None(self): @decorators.debugonly("<i>foo</i>", parse_mode="HTML") async def foo(beard, *args, **kwargs): return beard, args, kwargs foo_coro = foo(TestBeard()) self.assertIsInstance(foo, types.FunctionType) self.assertIsInstance(foo_coro, types.CoroutineType) self.loop.run_until_complete(foo_coro)
def set_message(self, message) : def wrap_message(c_conn, c_message, c_user_data) : conn = Connection(dbus.dbus_connection_ref(c_conn)) msg = Message(dbus.dbus_message_ref(c_message)) user_data = conn._user_data.get(c_user_data) result = message(conn, msg, user_data) if isinstance(result, types.CoroutineType) : assert self.loop != None, "no event loop to attach coroutine to" self.loop.create_task(result) result = DBUS.HANDLER_RESULT_HANDLED #end if return \ result #end wrap_message #begin set_message if message != None : self._wrap_message_func = DBUS.ObjectPathMessageFunction(wrap_message) else : self._wrap_message_func = None #end if self._dbobj.message_function = self._wrap_message_func return \ self #end set_message #end ObjectPathVTable
def add_filter(self, function, user_data, free_data = None) : "adds a filter callback that gets to look at all incoming messages" \ " before they get to the dispatch system. The same function can be added" \ " multiple times as long as the user_data is different" def wrap_function(c_conn, message, _data) : result = function(self, Message(dbus.dbus_message_ref(message)), user_data) if isinstance(result, types.CoroutineType) : assert self.loop != None, "no event loop to attach coroutine to" self.loop.create_task(result) result = DBUS.HANDLER_RESULT_HANDLED #end if return \ result #end wrap_function def wrap_free_data(_data) : free_data(data) #end wrap_free_data #begin add_filter filter_key = (function, data_key(user_data)) filter_value = \ { "function" : DBUS.HandleMessageFunction(wrap_function), "free_data" : (lambda : None, lambda : DBUS.FreeFunction(wrap_free_data))[free_data != None](), } # pass user_data id because libdbus identifies filter entry by both function address and user data address if not dbus.dbus_connection_add_filter(self._dbobj, filter_value["function"], filter_key[1], filter_value["free_data"]) : raise CallFailed("dbus_connection_add_filter") #end if self._filters[filter_key] = filter_value # need to ensure wrapped functions don’t disappear prematurely #end add_filter
def _bus_name_acquired(conn, msg, self) : bus_name = msg.expect_objects("s")[0] self.bus_names_pending.discard(bus_name) if bus_name not in self.bus_names_acquired : self.bus_names_acquired.add(bus_name) if self._bus_name_acquired_action != None : result = self._bus_name_acquired_action(self, bus_name, self._bus_name_acquired_action_arg) if isinstance(result, types.CoroutineType) : assert self.loop != None, "no event loop to attach coroutine to" self.loop.create_task(result) #end if #end if #end if #end _bus_name_acquired
def _bus_name_lost(conn, msg, self) : bus_name = msg.expect_objects("s")[0] self.bus_names_pending.discard(bus_name) if bus_name in self.bus_names_acquired : self.bus_names_acquired.remove(bus_name) if self._bus_name_lost_action != None : result = self._bus_name_lost_action(self, bus_name, self._bus_name_lost_action_arg) if isinstance(result, types.CoroutineType) : assert self.loop != None, "no event loop to attach coroutine to" self.loop.create_task(result) #end if #end if #end if #end _bus_name_lost
def maybe_async(value): if isinstance(value, (types.CoroutineType, types.GeneratorType, asyncio.Future)): return value else: future = asyncio.Future() future.set_result(value) return future