我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用inspect.getcallargs()。
def authenticate(**credentials): """ If the given credentials are valid, return a User object. """ for backend, backend_path in _get_backends(return_tuples=True): try: inspect.getcallargs(backend.authenticate, **credentials) except TypeError: # This backend doesn't accept these credentials as arguments. Try the next one. continue try: user = backend.authenticate(**credentials) except PermissionDenied: # This backend says to stop in our tracks - this user should not be allowed in at all. return None if user is None: continue # Annotate the user object with the path of the backend. user.backend = backend_path return user # The credentials supplied are invalid to all backends, fire signal user_login_failed.send(sender=__name__, credentials=_clean_credentials(credentials))
def __new__(cls, name, bases, attrs): get_form = attrs.get('get_form') if get_form and inspect.isfunction(get_form): try: inspect.getcallargs(get_form, None) except TypeError: warnings.warn( "`%s.%s.get_form` method must define a default value for " "its `form_class` argument." % (attrs['__module__'], name), RemovedInDjango110Warning, stacklevel=2 ) def get_form_with_form_class(self, form_class=None): if form_class is None: form_class = self.get_form_class() return get_form(self, form_class=form_class) attrs['get_form'] = get_form_with_form_class return super(FormMixinBase, cls).__new__(cls, name, bases, attrs)
def authenticate(**credentials): """ If the given credentials are valid, return a User object. """ for backend, backend_path in _get_backends(return_tuples=True): try: inspect.getcallargs(backend.authenticate, **credentials) except TypeError: # This backend doesn't accept these credentials as arguments. Try the next one. continue try: user = backend.authenticate(**credentials) except PermissionDenied: # This backend says to stop in our tracks - this user should not be allowed in at all. break if user is None: continue # Annotate the user object with the path of the backend. user.backend = backend_path return user # The credentials supplied are invalid to all backends, fire signal user_login_failed.send(sender=__name__, credentials=_clean_credentials(credentials))
def wrap_async(f, _inner_f, check_type_args, check_type_annotations): @wraps(f) async def _f(*args, **kwargs): call_args = inspect.getcallargs(_inner_f, *args, **kwargs) for k, v in list(call_args.items()): if k in check_type_annotations: call_args[k] = check_type(v, check_type_annotations[k]) # Create arguments args = [call_args.pop(a) for a in check_type_args.args] if check_type_args.varargs is not None: args.extend(call_args.pop(check_type_args.varargs)) if check_type_args.varkw is not None: kwargs = call_args.pop(check_type_args.varkw) else: kwargs = {} kwargs.update(call_args) _return = await f(*args, **kwargs) if 'return' in check_type_annotations: return check_type(_return, check_type_annotations['return']) else: return _return return _f
def get_cache_key(self, *args, **kwargs): callargs = inspect.getcallargs(self.fn, *args, **kwargs) values = [] if isfunction(self.vary_on): values = self.vary_on(*args, **kwargs) else: for arg_name, attrs in self.vary_on: value = callargs[arg_name] for attr in attrs: value = getattr(value, attr) values.append(value) args_string = ','.join(self._serialize_for_key(value) for value in values) if len(args_string) > 150: args_string = 'H' + self._hash(args_string) return 'quickcache.{}/{}'.format(self.prefix, args_string)
def api_exc_patch(func): if isinstance(func, FunctionType): @wraps(func) def deco(*args, **kwargs): try: return func(*args, **kwargs) except RQInvalidArgument: raise except Exception as e: if isinstance(e, TypeError): exc_info = sys.exc_info() try: ret = inspect.getcallargs(unwrapper(func), *args, **kwargs) except TypeError: t, v, tb = exc_info raise patch_user_exc(v.with_traceback(tb)) if getattr(e, EXC_EXT_NAME, EXC_TYPE.NOTSET) == EXC_TYPE.NOTSET: patch_system_exc(e) raise return deco return func
def call(self, function, args=None, kw=None): if args is None: args = () if kw is None: kw = {} arguments = inspect.getcallargs(function, *args, **kw) key = self.key_maker.key(function, arguments) try: value = self.cache.get(key) except KeyError: value = function(*args, **kw) self.cache.set(key, value) if self.return_copies: value = copy.deepcopy(value) return value
def Resource(resource_name): def real_dec(func): @functools.wraps(func) def fetch_resource(*args, **kwargs): called_args = inspect.getcallargs(func, *args, **kwargs) # We do not need `self` for building the params self = called_args.pop('self') url = self._BuildUrl('http://stats.nba.com/stats/', resource_name, called_args) resp = self._FetchUrl(url) resp_dict = resp.json() if self._transform_json: resp_dict = Api._TransformResponseDict(resp_dict) return resp_dict return fetch_resource return real_dec # noinspection PyPep8Naming
def _is_valid(cls, *args, **kwargs): # fill our args output_dir = args[0] # boxlib datasets are always directories if not os.path.isdir(output_dir): return False header_filename = os.path.join(output_dir, "Header") jobinfo_filename = os.path.join(output_dir, "job_info") if not os.path.exists(header_filename): # We *know* it's not boxlib if Header doesn't exist. return False args = inspect.getcallargs(cls.__init__, args, kwargs) # This might need to be localized somehow if args['cparam_filename'] is None: return True # Treat as generic boxlib data inputs_filename = os.path.join( os.path.dirname(os.path.abspath(output_dir)), args['cparam_filename']) if not os.path.exists(inputs_filename) and \ not os.path.exists(jobinfo_filename): return True # We have no parameters to go off of # If we do have either inputs or jobinfo, we should be deferring to a # different frontend. return False
def assertEqualException(self, func, call_param_string, locs=None): locs = dict(locs or {}, func=func) try: eval('func(%s)' % call_param_string, None, locs) except Exception, ex1: pass else: self.fail('Exception not raised') try: eval('inspect.getcallargs(func, %s)' % call_param_string, None, locs) except Exception, ex2: pass else: self.fail('Exception not raised') self.assertIs(type(ex1), type(ex2)) self.assertEqual(str(ex1), str(ex2))
def assertEqualException(self, func, call_param_string, locs=None): locs = dict(locs or {}, func=func) try: eval('func(%s)' % call_param_string, None, locs) except Exception as e: ex1 = e else: self.fail('Exception not raised') try: eval('inspect.getcallargs(func, %s)' % call_param_string, None, locs) except Exception as e: ex2 = e else: self.fail('Exception not raised') self.assertIs(type(ex1), type(ex2)) self.assertEqual(str(ex1), str(ex2)) del ex1, ex2
def check_required_args(parameters): """check parameters of action""" def decorated(f): """decorator""" @functools.wraps(f) def wrapper(*args, **kwargs): """wrapper""" func_args = inspect.getcallargs(f, *args, **kwargs) kwargs = func_args.get('kwargs') for item in parameters: if kwargs.get(item) is None: message = "check required args failed, `{0}` is not found in {1}".format(item, f.__name__) LOG.error(message) raise Exception(message) return f(*args, **kwargs) return wrapper return decorated
def dispatch_packet(self, packet): if not getattr(packet, 'no_log', False): self.log.debug('received %r', packet) packet_type = packet.type method = self._packet_handlers.get(packet_type, None) if method is None: self.on_missing_handler(packet) return None arg_spec = inspect.getargspec(method) args, kwargs = packet.get_method_args(len(arg_spec[0])) try: inspect.getcallargs(method, packet_type, *args, **kwargs) except TypeError as e: raise PacketFormatError(text_type(e)) try: ret = method(packet_type, *args, **kwargs) except Exception: raise else: return ret
def is_closable_iterator(obj): # Not an iterator. if not is_iterator(obj): return False # A generator - the easiest thing to deal with. import inspect if inspect.isgenerator(obj): return True # A custom iterator. Look for a close method... if not (hasattr(obj, 'close') and callable(obj.close)): return False # ... which doesn't require any arguments. try: inspect.getcallargs(obj.close) except TypeError: return False else: return True
def assertEqualException(self, func, call_param_string, locs=None): locs = dict(locs or {}, func=func) try: eval('func(%s)' % call_param_string, None, locs) except Exception, ex1: pass else: self.fail('Exception not raised') try: eval('inspect.getcallargs(func, %s)' % call_param_string, None, locs) except Exception, ex2: pass else: self.fail('Exception not raised') self.assertIs(type(ex1), type(ex2)) if check_impl_detail(): self.assertEqual(str(ex1), str(ex2))
def wrap_instance_event(function): """Wraps a method to log the event taken on the instance, and result. This decorator wraps a method to log the start and result of an event, as part of an action taken on an instance. """ @functools.wraps(function) def decorated_function(self, context, *args, **kwargs): wrapped_func = safe_utils.get_wrapped_function(function) keyed_args = inspect.getcallargs(wrapped_func, self, context, *args, **kwargs) instance_uuid = keyed_args['instance']['uuid'] event_name = 'compute_{0}'.format(function.__name__) with compute_utils.EventReporter(context, event_name, instance_uuid): return function(self, context, *args, **kwargs) return decorated_function
def declare_schema(**schemas): """Declares data schema for function arguments. :param schemas: the mapping, where key is argument name, value is schema the schema may be callable object or method of class in this case the wrapped function should be method of same class :raises ValueError: if the passed data does not fit declared schema """ def decorator(func): validators = {k: _build_validator(v) for k, v in schemas.items()} defaults = _get_default_arguments(func) @functools.wraps(func) def wrapper(*args, **kwargs): bound_args = inspect.getcallargs(func, *args, **kwargs) for n, v in bound_args.items(): if v is not defaults.get(n, _SENTINEL) and n in validators: validators[n](args and args[0] or None, v) return func(*args, **kwargs) return wrapper return decorator
def __new__(cls, name, bases, attrs): get_form = attrs.get('get_form') if get_form and inspect.isfunction(get_form): try: inspect.getcallargs(get_form, None) except TypeError: warnings.warn( "`%s.%s.get_form` method must define a default value for " "its `form_class` argument." % (attrs['__module__'], name), RemovedInDjango20Warning, stacklevel=2 ) def get_form_with_form_class(self, form_class=None): if form_class is None: form_class = self.get_form_class() return get_form(self, form_class=form_class) attrs['get_form'] = get_form_with_form_class return super(FormMixinBase, cls).__new__(cls, name, bases, attrs)
def skip(self, *args, **kwargs): if not self.skip_arg: return False elif isinstance(self.skip_arg, six.string_types): callargs = inspect.getcallargs(self.fn, *args, **kwargs) return callargs[self.skip_arg] elif isfunction(self.skip_arg): return self.skip_arg(*args, **kwargs) else: assert False, "skip_arg must be None, a string, or a function " \ "and this should have been checked in __init__"
def register_arg_validator(cls, extras=[]): """????????????? .. note:: - ??????????????????????? - ????????????????????????? ???????????? :param BaesArgValidator cls: ????????? """ assert issubclass(cls, BaseArgValidator) def _inner(func): @wraps(func) def wrapper(*args, **kwargs): callargs = getcallargs(func, *args, **kwargs) validator = cls(callargs, extras) if not validator.is_valid(): validator.handle_errors() return # XXX ??????????????????????????? return func(**validator.cleaned_data) return wrapper return _inner
def _authenticate_with_backend(backend, backend_path, request, credentials): args = (request,) # Does the backend accept a request argument? try: inspect.getcallargs(backend.authenticate, request, **credentials) except TypeError: args = () credentials.pop('request', None) # Does the backend accept a request keyword argument? try: inspect.getcallargs(backend.authenticate, request=request, **credentials) except TypeError: # Does the backend accept credentials without request? try: inspect.getcallargs(backend.authenticate, **credentials) except TypeError: # This backend doesn't accept these credentials as arguments. Try the next one. return None else: warnings.warn( "Update %s.authenticate() to accept a positional " "`request` argument." % backend_path, RemovedInDjango21Warning ) else: credentials['request'] = request warnings.warn( "In %s.authenticate(), move the `request` keyword argument " "to the first positional argument." % backend_path, RemovedInDjango21Warning ) return backend.authenticate(*args, **credentials)
def wrap_exception(notifier=None, event_type=None): """This decorator wraps a method to catch any exceptions. It logs the exception as well as optionally sending it to the notification system. """ def inner(f): def wrapped(self, context, *args, **kwargs): # Don't store self or context in the payload, it now seems to # contain confidential information. try: return f(self, context, *args, **kwargs) except Exception as e: with excutils.save_and_reraise_exception(): if notifier: call_dict = inspect.getcallargs(f, self, context, *args, **kwargs) payload = dict(exception=e, private=dict(args=call_dict) ) temp_type = event_type if not temp_type: # If f has multiple decorators, they must use # functools.wraps to ensure the name is # propagated. temp_type = f.__name__ notifier.error(context, temp_type, payload) return functools.wraps(f)(wrapped) return inner