我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用inspect.getfullargspec()。
def inject_function(f): full_arg_spec = inspect.getfullargspec(f) args = full_arg_spec.args annotations = full_arg_spec.annotations def _inner(self): container = Container() objects = {} for arg in args: if arg in ('self',): continue try: obj_type = annotations[arg] except KeyError: obj_type = arg obj = container.get_object(obj_type) objects[arg] = obj setattr(self, arg, obj) return f(self, **objects) return _inner
def typecheck(func): if not hasattr(func,'__annotations__'): return method import inspect argspec = inspect.getfullargspec(func) def check( t, T ): if type(T) == type: return isinstance(t,T) #types else: return T(t) #predicates def wrapper(*args): if len(argspec.args) != len(args): raise TypeError( "%s() takes exactly %s positional argument (%s given)" %(func.__name__,len(argspec.args),len(args)) ) for argname,t in zip(argspec.args, args): if argname in func.__annotations__: T = func.__annotations__[argname] if not check( t, T ): raise TypeError( "%s( %s:%s ) but received %s=%s" % (func.__name__, argname,T, argname,repr(t)) ) r = func(*args) if 'return' in func.__annotations__: T = func.__annotations__['return'] if not check( r, T ): raise TypeError( "%s() -> %s but returned %s"%(func.__name__,T,repr(r)) ) return r return wrapper
def quick_init(self, locals_): if getattr(self, "_serializable_initialized", False): return if sys.version_info >= (3, 0): spec = inspect.getfullargspec(self.__init__) # Exclude the first "self" parameter if spec.varkw: kwargs = locals_[spec.varkw] else: kwargs = dict() else: spec = inspect.getargspec(self.__init__) if spec.keywords: kwargs = locals_[spec.keywords] else: kwargs = dict() if spec.varargs: varargs = locals_[spec.varargs] else: varargs = tuple() in_order_args = [locals_[arg] for arg in spec.args][1:] self.__args = tuple(in_order_args) + varargs self.__kwargs = kwargs setattr(self, "_serializable_initialized", True)
def curry(func): """ Return the curried version of a function. """ spec = inspect.getfullargspec(func) if spec.varargs or spec.varkw or spec.kwonlyargs: raise TypeError('cannot curry a variadic function') def incomplete_factory(arity, used_args): return lambda *args: ( func(*(used_args + args)) if len(used_args) + len(args) >= arity else incomplete_factory(arity, used_args + args) ) return incomplete_factory(len(spec.args), ())
def _get_close_args(self, data): """ this functions extracts the code, reason from the close body if they exists, and if the self.on_close except three arguments """ import inspect # if the on_close callback is "old", just return empty list if sys.version_info < (3, 0): if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3: return [] else: if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3: return [] if data and len(data) >= 2: code = 256*six.byte2int(data[0:1]) + six.byte2int(data[1:2]) reason = data[2:].decode('utf-8') return [code, reason] return [None, None]
def __init__(self, func, role='func', doc=None, config={}): self._f = func self._role = role # e.g. "func" or "meth" if doc is None: if func is None: raise ValueError("No function or docstring given") doc = inspect.getdoc(func) or '' NumpyDocString.__init__(self, doc) if not self['Signature'] and func is not None: func, func_name = self.get_func() try: # try to read signature if sys.version_info[0] >= 3: argspec = inspect.getfullargspec(func) else: argspec = inspect.getargspec(func) argspec = inspect.formatargspec(*argspec) argspec = argspec.replace('*', '\*') signature = '%s%s' % (func_name, argspec) except TypeError as e: signature = '%s()' % func_name self['Signature'] = signature
def clone(cls, obj, **kwargs): assert isinstance(obj, Serializable) d = obj.__getstate__() # Split the entries in kwargs between positional and keyword arguments # and update d['__args'] and d['__kwargs'], respectively. if sys.version_info >= (3, 0): spec = inspect.getfullargspec(obj.__init__) else: spec = inspect.getargspec(obj.__init__) in_order_args = spec.args[1:] d["__args"] = list(d["__args"]) for kw, val in kwargs.items(): if kw in in_order_args: d["__args"][in_order_args.index(kw)] = val else: d["__kwargs"][kw] = val out = type(obj).__new__(type(obj)) out.__setstate__(d) return out
def call_converter(converter:callable, predecessors:dict) -> dict: """Return the result obtained by calling the converter with the given predecessors {type: value} as input. """ argspec = inspect.getfullargspec(converter) all_args = tuple(argspec.args + argspec.kwonlyargs) nb_args = len(all_args) # if number of (positional) args equals the number of predecessor if (nb_args == len(predecessors) == 1) or (len(argspec.args) == len(predecessors) == 1): params = {all_args[0]: next(iter(predecessors.values()))} else: # let's use args name params = {arg: predecessors[arg] for arg in all_args if arg in predecessors} if len(params) < len(predecessors): # let's use annotations # map predecessor type -> arg, inferred from annotations, minus already matching args matching_annotations = {argspec.annotations[arg]: arg for arg in all_args if argspec.annotations.get(arg) in predecessors and arg not in params} params.update({ arg: predecessors[pred] for pred, arg in matching_annotations.items() }) return converter(**params)
def __init__(self, func, role='func', doc=None, config={}): self._f = func self._role = role # e.g. "func" or "meth" if doc is None: if func is None: raise ValueError("No function or docstring given") doc = inspect.getdoc(func) or '' NumpyDocString.__init__(self, doc) if not self['Signature'] and func is not None: func, func_name = self.get_func() try: # try to read signature if sys.version_info[0] >= 3: argspec = inspect.getfullargspec(func) else: argspec = inspect.getargspec(func) argspec = inspect.formatargspec(*argspec) argspec = argspec.replace('*','\*') signature = '%s%s' % (func_name, argspec) except TypeError as e: signature = '%s()' % func_name self['Signature'] = signature
def add_event(self, loop, instance, fn): """ Wraps the given function in a corutine which calls it every N seconds :param loop: The event loop (cls._loop) :param instance: The instantiated class :param fn: The function to be called :return: None """ # Verify function signature argspec = inspect.getfullargspec(fn) n_args = len(argspec.args) - 1 if 'self' in argspec.args else len(argspec.args) n_required_args = n_args - (len(argspec.defaults) if argspec.defaults else 0) assert n_required_args == 0, 'Functions decorated with @Periodic cannot have any required parameters.' @asyncio.coroutine def periodic_fn(): # If coroutine yield from else call normally is_coroutine = asyncio.iscoroutinefunction(fn) while True: last_exec = time.time() (yield from fn()) if is_coroutine else fn() yield from asyncio.sleep(max(0, self.time + last_exec - time.time())) super().add_event(loop, instance, periodic_fn())
def __call__(self, fn, *args, **kwargs): try: fn._decorators.append(self) except AttributeError: fn._decorators = [self] # Verify function signature argspec = inspect.getfullargspec(fn) n_args = len(argspec.args) - 1 if 'self' in argspec.args else len(argspec.args) assert n_args >= 1, 'Functions decorated with @Subscribe must have a parameter for the message.' n_required_args = n_args - (len(argspec.defaults) if argspec.defaults else 0) assert n_required_args <= 1, 'Functions decorated with @Subscribe can only have one required parameter.' # Add typing information if not already defined first_arg = argspec.args[1] if 'self' in argspec.args else argspec.args[0] if first_arg not in argspec.annotations.keys(): fn.__annotations__[first_arg] = self.subs[-1] return fn
def _get_arg_lengths(func): """Returns a two-tuple containing the number of positional arguments as the first item and the number of variable positional arguments as the second item. """ try: funcsig = inspect.signature(func) params_dict = funcsig.parameters parameters = params_dict.values() args_type = (inspect._POSITIONAL_OR_KEYWORD, inspect._POSITIONAL_ONLY) args = [x for x in parameters if x.kind in args_type] vararg = [x for x in parameters if x.kind == inspect._VAR_POSITIONAL] vararg = vararg.pop() if vararg else None except AttributeError: try: try: # For Python 3.2 and earlier. args, vararg = inspect.getfullargspec(func)[:2] except AttributeError: # For Python 2.7 and earlier. args, vararg = inspect.getargspec(func)[:2] except TypeError: # In 3.2 and earlier, raises TypeError raise ValueError # but 3.3 and later raise a ValueError. return (len(args), (1 if vararg else 0))
def assertFullArgSpecEquals(self, routine, args_e, varargs_e=None, varkw_e=None, defaults_e=None, kwonlyargs_e=[], kwonlydefaults_e=None, ann_e={}, formatted=None): args, varargs, varkw, defaults, kwonlyargs, kwonlydefaults, ann = \ inspect.getfullargspec(routine) self.assertEqual(args, args_e) self.assertEqual(varargs, varargs_e) self.assertEqual(varkw, varkw_e) self.assertEqual(defaults, defaults_e) self.assertEqual(kwonlyargs, kwonlyargs_e) self.assertEqual(kwonlydefaults, kwonlydefaults_e) self.assertEqual(ann, ann_e) if formatted is not None: self.assertEqual(inspect.formatargspec(args, varargs, varkw, defaults, kwonlyargs, kwonlydefaults, ann), formatted)
def __init__( self, fun: Callable[..., Any], name: str=None, nargs: Union[str, int]=None, min: int=None, max: int=None, **kw: str ) -> None: self._fun = fun self._argspec = inspect.getfullargspec(fun) self._params = List.wrap(self._argspec.args) self._param_count = self._params.length - (1 if self._params.head.contains('self') else 0) self._name = Maybe(name) self._nargs = Maybe(to_int(nargs)) self._min = Maybe(min) self._max = Maybe(max) self._kw = Map(kw)
def check_session_aspect(func, *args, **kwargs): """ The authentication aspect code, provides an aspect for the aop_check_session decorator """ # Get a list of the names of the non-keyword arguments _argument_specifications = getfullargspec(func) try: # Try and find _session_id arg_idx = _argument_specifications.args.index("_session_id") except: raise Exception("Authentication aspect for \"" + func.__name__ + "\": No _session_id parameter") # Check if the session is valid. _user = check_session(args[arg_idx]) # Call the function and set the _user parameter, if existing. return alter_function_parameter_and_call(func, args, kwargs, '_user', _user)
def alter_function_parameter_and_call(function_object, args, kwargs, name, value, err_if_not_set=None): """Changes or sets the value of an argument, if present, and calls the function. :param function_object: An instance of the called function :param args: A list of arguments :param kwargs: A dict of keyword arguments :param name: Name of the parameter that should be set :param value: Value to set that parameter to :param err_if_not_set: If is set, raise as error if the argument isn't present. :return: The return value of the called function """ argspec = getfullargspec(function_object) if name in argspec.args: arg_idx = argspec.args.index(name) largs = list(args) largs.pop(arg_idx) largs.insert(arg_idx, value) new_args = tuple(largs) return function_object(*new_args, **kwargs) elif err_if_not_set is not None: raise Exception(err_if_not_set) return function_object(args, kwargs)
def __get_result(method, params, original_msg): func_args = getattr(getfullargspec(method), keywords_args) if func_args and "kwargs" in func_args: if isinstance(params, list): result = method(*params, original_message=original_msg) else: result = method(original_message=original_msg, **params) else: if isinstance(params, list): result = method(*params) else: result = method(**params) return result
def _get_close_args(self, data): """ this functions extracts the code, reason from the close body if they exists, and if the self.on_close except three arguments """ import inspect # if the on_close callback is "old", just return empty list if sys.version_info < (3, 0): if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3: return [] else: if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3: return [] if data and len(data) >= 2: code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2]) reason = data[2:].decode('utf-8') return [code, reason] return [None, None]
def getfullargspec(func): """Compatibility function to provide inspect.getfullargspec in Python 2 This should be rewritten using a backport of Python 3 signature once we drop support for Python 2.6. We went for a simpler approach at the time of writing because signature uses OrderedDict which is not available in Python 2.6. """ try: return inspect.getfullargspec(func) except AttributeError: arg_spec = inspect.getargspec(func) import collections tuple_fields = ('args varargs varkw defaults kwonlyargs ' 'kwonlydefaults annotations') tuple_type = collections.namedtuple('FullArgSpec', tuple_fields) return tuple_type(args=arg_spec.args, varargs=arg_spec.varargs, varkw=arg_spec.keywords, defaults=arg_spec.defaults, kwonlyargs=[], kwonlydefaults=None, annotations={})
def signatures_match(cls, orig, stubbed, ignore_self=False): if six.PY2: orig_arguments = inspect.getargspec(orig) stub_arguments = inspect.getargspec(stubbed) else: orig_arguments = inspect.getfullargspec(orig) stub_arguments = inspect.getfullargspec(stubbed) if ignore_self: if 'self' in orig_arguments.args: orig_arguments.args.remove('self') if 'self' in stub_arguments.args: stub_arguments.args.remove('self') assert orig_arguments == stub_arguments, \ 'signature mismatch: %s%s does not match %s%s' % \ (stubbed, inspect.formatargspec(*stub_arguments), orig, inspect.formatargspec(*orig_arguments)) return False #------------------------------------------------[ Impostor ]
def schedule_handler(cls: Any, obj: Any, context: Dict, func: Any, interval: Optional[Union[str, int]]=None, timestamp: Optional[str]=None, timezone: Optional[str]=None) -> Any: async def handler() -> None: values = inspect.getfullargspec(func) kwargs = {k: values.defaults[i] for i, k in enumerate(values.args[len(values.args) - len(values.defaults):])} if values.defaults else {} routine = func(*(obj,), **kwargs) try: if isinstance(routine, Awaitable): await routine except Exception as e: pass context['_schedule_scheduled_functions'] = context.get('_schedule_scheduled_functions', []) context['_schedule_scheduled_functions'].append((interval, timestamp, timezone, func, handler)) start_func = cls.start_scheduler(cls, obj, context) return (await start_func) if start_func else None
def command(arguments: List[ParserElement]): """ command is a decorator which examines the function name and its arguments and registers `<function_name> arguments...` as part of the grammar. """ def decorator(fn): fn_args = inspect.getfullargspec(fn).args _name = '.' + fn.__name__ _arguments = Literal(_name) for i, arg in enumerate(arguments): _arguments += arg.setResultsName(fn_args[i]) global grammar grammar |= _arguments def wrapper(parsed: Dict, event, event_user, event_channel, slack_client): return fn(**parsed, event=event, event_user=event_user, event_channel=event_channel, slack_client=slack_client) mapping[_name] = wrapper return wrapper return decorator
def kwargs_as_needed(func): """ If function does not specify **kwargs, pass only params which it can accept """ spec = inspect.getfullargspec(inspect.unwrap(func)) acceptable_args = set(spec.args or ()) if isinstance(func, MethodType): acceptable_args -= {spec.args[0]} @wraps(func) def inner(*args, **kwargs): if spec.varkw is None: kwargs = intersected_dict(kwargs, acceptable_args) return func(*args, **kwargs) return inner
def get_arg_names(f): """Return arguments of function :param f: :return: String list of arguments """ try: # Python >= 3.3 sig = inspect.signature(f) return [parameter.name for parameter in sig.parameters.values() if parameter.kind == parameter.POSITIONAL_OR_KEYWORD] except AttributeError: try: # Python >= 3.0 return inspect.getfullargspec(f).args except AttributeError: return inspect.getargspec(f).args
def get_arg_default(f, position): try: # Python >= 3.3 sig = inspect.signature(f) arg = list(sig.parameters.values())[position] arg_def = arg.default return arg_def if arg_def != inspect.Parameter.empty else None except AttributeError: try: spec = inspect.getfullargspec(f) except AttributeError: spec = inspect.getargspec(f) args_len = len(spec.args) if spec.defaults and abs(position - args_len) <= len(spec.defaults): return spec.defaults[position - args_len] else: return None
def getargspec(func): spec = getfullargspec(func) kwargs = makelist(spec[0]) + makelist(spec.kwonlyargs) return kwargs, spec[1], spec[2], spec[3]
def inspect_getargspec(func): return ArgSpec( *inspect_getfullargspec(func)[0:4] )
def docroutine(self, object, name=None, mod=None, cl=None): """Produce text documentation for a function or method object.""" realname = object.__name__ name = name or realname note = '' skipdocs = 0 if inspect.ismethod(object): object = object.__func__ if name == realname: title = self.bold(realname) else: if (cl and realname in cl.__dict__ and cl.__dict__[realname] is object): skipdocs = 1 title = self.bold(name) + ' = ' + realname if inspect.isfunction(object): args, varargs, varkw, defaults, kwonlyargs, kwdefaults, ann = inspect.getfullargspec(object) argspec = inspect.formatargspec( args, varargs, varkw, defaults, kwonlyargs, kwdefaults, ann, formatvalue=self.formatvalue, formatannotation=inspect.formatannotationrelativeto(object)) if realname == '<lambda>': title = self.bold(name) + ' lambda ' # XXX lambda's won't usually have func_annotations['return'] # since the syntax doesn't support but it is possible. # So removing parentheses isn't truly safe. argspec = argspec[1:-1] # remove parentheses else: argspec = '(...)' decl = "#### " + "def " + title + argspec + ':' + '\n' + note if skipdocs: return decl + '\n' else: doc = pydoc.getdoc(object) or '' return decl + '\n' + (doc and self.indent(doc).rstrip() + '\n')
def __setstate__(self, d): # convert all __args to keyword-based arguments if sys.version_info >= (3, 0): spec = inspect.getfullargspec(self.__init__) else: spec = inspect.getargspec(self.__init__) in_order_args = spec.args[1:] out = type(self)(**dict(zip(in_order_args, d["__args"]), **d["__kwargs"])) self.__dict__.update(out.__dict__)
def enforce_signature(function): """ Enforces the signature of the function by throwing TypeError's if invalid arguments are provided. The return value is not checked. You can annotate any parameter of your function with the desired type or a tuple of allowed types. If you annotate the function with a value, this value only will be allowed (useful especially for None). Example: >>> @enforce_signature ... def test(arg: bool, another: (int, None)): ... pass ... >>> test(True, 5) >>> test(True, None) Any string value for any parameter e.g. would then trigger a TypeError. :param function: The function to check. """ argspec = inspect.getfullargspec(function) annotations = argspec.annotations argnames = argspec.args unnamed_annotations = {} for i, arg in enumerate(argnames): if arg in annotations: unnamed_annotations[i] = (annotations[arg], arg) def decorated(*args, **kwargs): for i, annotation in unnamed_annotations.items(): if i < len(args): assert_right_type(args[i], annotation[0], annotation[1]) for argname, argval in kwargs.items(): if argname in annotations: assert_right_type(argval, annotations[argname], argname) return function(*args, **kwargs) return decorated
def __call__(self, fn): def newFn(origSelf, *args, **kwargs): if logging.getLogger().isEnabledFor(self.logLevel): argNames = [argName for argName in inspect.getfullargspec(fn)[0] if argName != 'self'] logging.log(self.logLevel, "{} {} {} kw:{}".format(self.text, fn.__name__, [nameNarg for nameNarg in zip(argNames, args) if nameNarg[1] is not origSelf], kwargs)) fn(origSelf, *args) return newFn
def get_default_value(func, arg): argspec = inspect.getfullargspec(func) return next(default for a, default in zip(reversed(argspec[0]), reversed(argspec.defaults)) if a == arg)
def accepts_kwargs(func): # In python3.4.1, there's backwards incompatible # changes when using getargspec with functools.partials. return inspect.getfullargspec(func)[2] # In python3, socket.error is OSError, which is too general # for what we want (i.e FileNotFoundError is a subclass of OSError). # In py3 all the socket related errors are in a newly created # ConnectionError