我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用inspect.signature()。
def __init__(self, test): if not callable(test): raise ValueError("Guard test has to be callable") # check if test signature is ok test_args = _getparams(test) if len(tuple(arg for arg in test_args if arg[1] in (p_kind.POSITIONAL_ONLY, p_kind.POSITIONAL_OR_KEYWORD))) != 1: raise ValueError("Guard test has to have only one positional (not varying) argument") def apply_try_conv_to_bool(function, arg): try: return bool(function(arg)) # test must always return boolean, so convert asap. except TypeError: # occures when unorderable types are compared, but we don't want TypeError to be raised. return False # couldn't compare? then guard must say no. self.test = lambda inp: apply_try_conv_to_bool(test, inp)
def __init__(self, test): if not callable(test): raise ValueError("Relguard test has to be callable") # extract simplified signature test_args = _getparams(test) # varying args are not allowed, they make no sense in reguards. _kinds = {x[1] for x in test_args} if (p_kind.POSITIONAL_ONLY in _kinds or p_kind.VAR_POSITIONAL in _kinds or p_kind.VAR_KEYWORD in _kinds): raise ValueError("Relguard test must take only named not varying arguments") self.test = test self.__argnames__ = {x[0] for x in test_args}
def getargspec(func): params = signature(func).parameters args, varargs, keywords, defaults = [], None, None, [] for name, param in params.items(): if param.kind == param.VAR_POSITIONAL: varargs = name elif param.kind == param.VAR_KEYWORD: keywords = name else: args.append(name) if param.default is not param.empty: defaults.append(param.default) return (args, varargs, keywords, tuple(defaults) or None)
def get_cookie(self, key, default=None, secret=None, digestmod=hashlib.sha256): """ Return the content of a cookie. To read a `Signed Cookie`, the `secret` must match the one used to create the cookie (see :meth:`BaseResponse.set_cookie`). If anything goes wrong (missing cookie or wrong signature), return a default value. """ value = self.cookies.get(key) if secret: # See BaseResponse.set_cookie for details on signed cookies. if value and value.startswith('!') and '?' in value: sig, msg = map(tob, value[1:].split('?', 1)) hash = hmac.new(tob(secret), msg, digestmod=digestmod).digest() if _lscmp(sig, base64.b64encode(hash)): dst = pickle.loads(base64.b64decode(msg)) if dst and dst[0] == key: return dst[1] return default return value or default
def yieldroutes(func): """ Return a generator for routes that match the signature (name, args) of the func parameter. This may yield more than one route if the function takes optional keyword arguments. The output is best described by example:: a() -> '/a' b(x, y) -> '/b/<x>/<y>' c(x, y=5) -> '/c/<x>' and '/c/<x>/<y>' d(x=5, y=6) -> '/d' and '/d/<x>' and '/d/<x>/<y>' """ path = '/' + func.__name__.replace('__', '/').lstrip('/') spec = getargspec(func) argc = len(spec[0]) - len(spec[3] or []) path += ('/<%s>' * argc) % tuple(spec[0][:argc]) yield path for arg in spec[0][argc:]: path += '/<%s>' % arg yield path
def __init__(self, strat): self.log = logging.getLogger('strategy.StrategyIterator({})'.format(str(strat))) self.strategy = strat sig = inspect.signature(strat.generate) params = sig.parameters kws = {} self.log.debug('init') for kw in params: if kw in strat._kws: self.log.debug('add keyword {kw}'.format(kw=kw)) kws[kw] = strat._kws[kw] elif params[kw].kind == inspect.Parameter.VAR_KEYWORD: self.log.debug('merge keywords on VAR_KEYWORD {kw}'.format(kw=kw)) kws.update(strat._kws) break self._generator = strat.generate(strat._depth, *strat._args, **kws)
def test_parameters_passed_custom_kwargs(self): params = inspect.signature(lp.figure).parameters with patch('matplotlib.figure.Figure.set_size_inches'), \ patch('latexipy._latexipy.save_figure') as mock_save_figure: with lp.figure('filename', directory='directory', exts='exts', mkdir='mkdir'): pass mock_save_figure.assert_called_once_with( filename='filename', directory='directory', exts='exts', mkdir='mkdir', from_context_manager=True, )
def get_connector_param(self,avail_name): """ Look up parameters for a initializing a connector. """ try: c = self._avail_connectors[avail_name] except KeyError: err = "connector {} not found\n".format(avail_name) raise ValueError(err) meta = {} parameters = inspect.signature(c).parameters for k in parameters: meta[k] = parameters.default if meta[k] == inspect._empty: meta[k] = None return meta
def auto_assign(func): # Signature: sig = signature(func) for name, param in sig.parameters.items(): if param.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD): raise RuntimeError('Unable to auto assign if *args or **kwargs in signature.') # Wrapper: @functools.wraps(func) def wrapper(self, *args, **kwargs): for i, (name, param) in enumerate(sig.parameters.items()): # Skip 'self' param: if i == 0: continue # Search value in args, kwargs or defaults: if i - 1 < len(args): val = args[i - 1] elif name in kwargs: val = kwargs[name] else: val = param.default setattr(self, name, val) func(self, *args, **kwargs) return wrapper
def verify_interface(iface, klass): for method in iface.__abstractmethods__: if not hasattr(klass, method): raise InterfaceNotImplemented( "{0} is missing a {1!r} method".format(klass, method) ) if isinstance(getattr(iface, method), abc.abstractproperty): # Can't properly verify these yet. continue sig = signature(getattr(iface, method)) actual = signature(getattr(klass, method)) if sig != actual: raise InterfaceNotImplemented( "{0}.{1}'s signature differs from the expected. Expected: " "{2!r}. Received: {3!r}".format( klass, method, sig, actual ) )
def getargspec(func): if six.PY2: return inspect.getargspec(func) sig = inspect.signature(func) args = [ p.name for p in sig.parameters.values() if p.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD ] varargs = [ p.name for p in sig.parameters.values() if p.kind == inspect.Parameter.VAR_POSITIONAL ] varargs = varargs[0] if varargs else None varkw = [ p.name for p in sig.parameters.values() if p.kind == inspect.Parameter.VAR_KEYWORD ] varkw = varkw[0] if varkw else None defaults = [ p.default for p in sig.parameters.values() if p.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD and p.default is not p.empty ] or None return args, varargs, varkw, defaults
def func_accepts_kwargs(func): if six.PY2: # Not all callables are inspectable with getargspec, so we'll # try a couple different ways but in the end fall back on assuming # it is -- we don't want to prevent registration of valid but weird # callables. try: argspec = inspect.getargspec(func) except TypeError: try: argspec = inspect.getargspec(func.__call__) except (TypeError, AttributeError): argspec = None return not argspec or argspec[2] is not None return any( p for p in inspect.signature(func).parameters.values() if p.kind == p.VAR_KEYWORD )
def _call_matcher(self, _call): """ Given a call (or simply a (args, kwargs) tuple), return a comparison key suitable for matching with other calls. This is a best effort method which relies on the spec's signature, if available, or falls back on the arguments themselves. """ sig = self._spec_signature if sig is not None: if len(_call) == 2: name = '' args, kwargs = _call else: name, args, kwargs = _call try: return name, sig.bind(*args, **kwargs) except TypeError as e: e.__traceback__ = None return e else: return _call
def get_kwarg_names(func): """ Return a list of valid kwargs to function func """ try: # Python 3.5 sig = inspect.signature(func) except AttributeError: # Below Python 3.5 args, _, _, defaults = inspect.getargspec(func) if defaults: kwonlyargs = args[-len(defaults):] else: kwonlyargs = [] else: kwonlyargs = [p.name for p in sig.parameters.values() if p.default is not p.empty] return kwonlyargs
def _load_callback(callback_name): """ Load a callback function by name and check its form (must have one parameter named "entity"). :param callback_name: Name of the callback function to load. :return: The callback function. """ try: callback_function = getattr(callbacks, callback_name) callback_parameters = signature(callback_function).parameters # check if callback has the correct form (only one parameter named "entity") if len(callback_parameters) == 1 and "entity" in callback_parameters: return callback_function else: raise IllegalArgumentError("Invalid callback: " + str(callback_name)) except AttributeError: raise IllegalConfigurationError("Parsing configuration file failed: Callback " + callback_name + " not found.")
def add_command(self, command, permission): if command in self: raise CommandException('Cannot re-assign command ({command_name})'.format(command_name=command)) def decorator(method): ## Retrieves the functions argument count. ## Used to verify the client/say command is valid. self[command] = ( method, len(signature(method).parameters), permission ) def new(*args): method(*args) return new return decorator
def register_handler( self, # type: BaseSocket method # type: Callable[..., Union[bool, None]] ): # type: (...) -> None """Register a handler for incoming method. Args: method: A function with two given arguments. Its signature should be of the form ``handler(msg, handler)``, where msg is a :py:class:`py2p.base.Message` object, and handler is a :py:class:`py2p.base.BaseConnection` object. It should return ``True`` if it performed an action, to reduce the number of handlers checked. Raises: ValueError: If the method signature doesn't parse correctly """ args = inspect.signature(method) if (len(args.parameters) != (3 if args.parameters.get('self') else 2)): raise ValueError( "This method must contain exactly two arguments " "(or three if first is self)") self.__handlers.append(method)
def register_handler( self, # type: BaseSocket method # type: Callable[..., Union[bool, None]] ): # type: (...) -> None """Register a handler for incoming method. Args: method: A function with two given arguments. Its signature should be of the form ``handler(msg, handler)``, where msg is a :py:class:`py2p.base.Message` object, and handler is a :py:class:`py2p.base.BaseConnection` object. It should return ``True`` if it performed an action, to reduce the number of handlers checked. Raises: ValueError: If the method signature doesn't parse correctly """ args = inspect.getargspec(method) if (args[1:] != (None, None, None) or len(args[0]) != (3 if args[0][0] == 'self' else 2)): raise ValueError( "This method must contain exactly two arguments " "(or three if first is self)") self.__handlers.append(method)
def _register(command, func, kwargs={}): """Register func as a handler for the given command.""" pattern = Pattern(command) sig = inspect.signature(func) func_argnames = set(sig.parameters) when_argnames = set(pattern.argnames) | set(kwargs.keys()) if func_argnames != when_argnames: raise InvalidCommand( 'The function %s%s has the wrong signature for @when(%r)' % ( func.__name__, sig, command ) + '\n\nThe function arguments should be (%s)' % ( ', '.join(pattern.argnames + list(kwargs.keys())) ) ) commands.append((pattern, func, kwargs))
def get_func_standard_and_necessary_keys(func): """????function?????. :param func: ???? :return: standard_keys:?????????? necessary_keys: ??????????????? """ standard_kwargs = dict(inspect.signature(func).parameters) necessary_keys = [] for k, v in standard_kwargs.items(): format_string = v.__str__() if "=" in format_string: standard_kwargs[k] = format_string.split("=")[-1] else: standard_kwargs[k] = None necessary_keys.append(k) return set(standard_kwargs.keys()), set(necessary_keys)
def __init__(self, uri, uri_regex, handler): super(HandlerDef, self).__init__() self.uri = uri self.uri_regex = uri_regex self.handler = handler self._signature = inspect.signature(handler) self._params = { k: v for k, v in list( self._signature.parameters.items() )[1:] } self.path_args = [] self.query_args = [] self.consumes = getattr(handler, 'consumes', None) self.produces = getattr(handler, 'produces', None) self.errors = getattr(handler, 'errors', []) self.deprecated = getattr(handler, 'deprecated', False) self._extract_arguments() self.operation_definition = self._generate_operation_definition()
def auto_assign(func): signature = inspect.signature(func) def wrapper(*args, **kwargs): instance = args[0] bind = signature.bind(*args, **kwargs) for param in signature.parameters.values(): if param.name != 'self': if param.name in bind.arguments: setattr(instance, param.name, bind.arguments[param.name]) if param.name not in bind.arguments and param.default is not param.empty: setattr(instance, param.name, param.default) return func(*args, **kwargs) wrapper.__signature__ = signature # Restore the signature return wrapper
def get_signature(obj, method_name): method = getattr(obj, method_name) # Eat self for unbound methods bc signature doesn't do it if PY3: if (inspect.isclass(obj) and not inspect.ismethod(method) and not isinstance( obj.__dict__.get(method_name), staticmethod)): method = functools.partial(method, None) else: if (isinstance(method, types.UnboundMethodType) and method.__self__ is None): method = functools.partial(method, None) try: return signature(method) except: return None
def run(self, arguments, settings, app): aiotask_context.set('request', self.request) script = os.path.abspath(arguments.script) spec = importlib.util.spec_from_file_location("module.name", script) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) if not hasattr(module, 'run'): logger.warn(f'Not `async def run()` function found in file {script}') return sig = inspect.signature(module.run) if 'container' in sig.parameters: async for txn, tm, container in get_containers(self.request): await module.run(container) await tm.commit(txn=txn) else: await lazy_apply(module.run, app)
def test_gen_mutation(mock_person): import inspect import graphene from graphene.utils.str_converters import to_snake_case from graphene_mongo.mutation import gen_mutation from graphene_mongo.model import ModelSchema model_schema = ModelSchema(mock_person, mock_person._fields, None, None) result = gen_mutation(mock_person, model_schema.schema, model_schema.operators_mutation, model_schema.fields_mutation, None, None) assert issubclass(result, graphene.Mutation) assert hasattr(result, 'mutate') assert result._meta.name == 'Create' + mock_person.__name__ assert result._meta.local_fields[to_snake_case(mock_person.__name__)] assert result._meta.fields[to_snake_case(mock_person.__name__)] operators_mutation = inspect.signature(result.__dict__['Field']).parameters['args'].default assert operators_mutation == model_schema.operators_mutation
def __init__(self, clsname, bases, clsdict): super().__init__(clsname, bases, clsdict) sup = super(self, self) for name, value in clsdict.items(): if name.startswith('_') or not callable(value): continue # Get the previous definition (if any) and compare the signatures prev_dfn = getattr(sup,name,None) if prev_dfn: prev_sig = signature(prev_dfn) val_sig = signature(value) if prev_sig != val_sig: logging.warning('Signature mismatch in %s. %s != %s', value.__qualname__, str(prev_sig), str(val_sig)) # Example
def register(self, meth): ''' Register a new method as a multimethod ''' sig = inspect.signature(meth) # Build a type-signature from the method's annotations types = [] for name, parm in sig.parameters.items(): if name == 'self': continue if parm.annotation is inspect.Parameter.empty: raise TypeError( 'Argument {} must be annotated with a type'.format(name) ) if not isinstance(parm.annotation, type): raise TypeError( 'Argument {} annotation must be a type'.format(name) ) if parm.default is not inspect.Parameter.empty: self._methods[tuple(types)] = meth types.append(parm.annotation) self._methods[tuple(types)] = meth
def typeassert(*ty_args, **ty_kwargs): def decorate(func): # If in optimized mode, disable type checking if not __debug__: return func # Map function argument names to supplied types sig = signature(func) bound_types = sig.bind_partial(*ty_args, **ty_kwargs).arguments @wraps(func) def wrapper(*args, **kwargs): bound_values = sig.bind(*args, **kwargs) # Enforce type assertions across supplied arguments for name, value in bound_values.arguments.items(): if name in bound_types: if not isinstance(value, bound_types[name]): raise TypeError( 'Argument {} must be {}'.format(name, bound_types[name]) ) return func(*args, **kwargs) return wrapper return decorate # Examples
def pdef(self, obj, oname=''): """Print the call signature for any callable object. If the object is a class, print the constructor information.""" if not callable(obj): print('Object is not callable.') return header = '' if inspect.isclass(obj): header = self.__head('Class constructor information:\n') output = self._getdef(obj,oname) if output is None: self.noinfo('definition header',oname) else: print(header,self.format(output), end=' ') # In Python 3, all classes are new-style, so they all have __init__.
def _default_arguments_from_docstring(self, doc): """Parse the first line of docstring for call signature. Docstring should be of the form 'min(iterable[, key=func])\n'. It can also parse cython docstring of the form 'Minuit.migrad(self, int ncall=10000, resume=True, int nsplit=1)'. """ if doc is None: return [] #care only the firstline line = doc.lstrip().splitlines()[0] #p = re.compile(r'^[\w|\s.]+\(([^)]*)\).*') #'min(iterable[, key=func])\n' -> 'iterable[, key=func]' sig = self.docstring_sig_re.search(line) if sig is None: return [] # iterable[, key=func]' -> ['iterable[' ,' key=func]'] sig = sig.groups()[0].split(',') ret = [] for s in sig: #re.compile(r'[\s|\[]*(\w+)(?:\s*=\s*.*)') ret += self.docstring_kwd_re.findall(s) return ret
def ensure_interface(function): signature = inspect.signature(function) parameters = signature.parameters @wraps(function) def wrapped(*args, **kwargs): bound = signature.bind(*args, **kwargs) for name, value in bound.arguments.items(): annotation = parameters[name].annotation if not isinstance(annotation, ABCMeta): continue if not isinstance(value, annotation): raise TypeError( "{} does not implement {} interface" "".format(value, annotation) ) function(*args, **kwargs) return wrapped
def _get_arg_lengths(func): """Returns a two-tuple containing the number of positional arguments as the first item and the number of variable positional arguments as the second item. """ try: funcsig = inspect.signature(func) params_dict = funcsig.parameters parameters = params_dict.values() args_type = (inspect._POSITIONAL_OR_KEYWORD, inspect._POSITIONAL_ONLY) args = [x for x in parameters if x.kind in args_type] vararg = [x for x in parameters if x.kind == inspect._VAR_POSITIONAL] vararg = vararg.pop() if vararg else None except AttributeError: try: try: # For Python 3.2 and earlier. args, vararg = inspect.getfullargspec(func)[:2] except AttributeError: # For Python 2.7 and earlier. args, vararg = inspect.getargspec(func)[:2] except TypeError: # In 3.2 and earlier, raises TypeError raise ValueError # but 3.3 and later raise a ValueError. return (len(args), (1 if vararg else 0))
def test_extended_headers_smoke(header_name): header = getattr(ca, header_name) sig = inspect.signature(header) regular_bind_args = {} extended_bind_args = {} for param in sig.parameters.keys(): regular_bind_args[param] = 0 extended_bind_args[param] = 2 ** 32 reg_args = sig.bind(**regular_bind_args) reg_hdr = header(*reg_args.args, **reg_args.kwargs) ext_args = sig.bind(**extended_bind_args) ext_hdr = header(*ext_args.args, **ext_args.kwargs) print(reg_hdr) assert isinstance(reg_hdr, ca.MessageHeader) print(ext_hdr) assert isinstance(ext_hdr, ca.ExtendedMessageHeader)
def __init__(self, func): self._func = func self.__signature__ = signature(func) update_wrapper(self, func)
def next(self, f): return Command(self.fdo, self.fpre, self.fpost, f, self.name) # these are helper functions to the type signature of the `fdo` function
def signature(self): s = inspect.signature(self.fdo) return s
def return_annotation(self): r = self.signature.return_annotation if r is not inspect._empty: return r return None
def parameters(self): s = self.signature return s.parameters