我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用functools.wraps()。
def auth(func): @wraps(func) def _decorator(request, *args, **kwargs): auth = get_auth() if auth.get('disable', False) is True: return func(request, *args, **kwargs) if 'authorized_ips' in auth: ip = get_client_ip(request) if is_authorized(ip, auth['authorized_ips']): return func(request, *args, **kwargs) prepare_credentials(auth) if request.META.get('HTTP_AUTHORIZATION'): authmeth, auth = request.META['HTTP_AUTHORIZATION'].split(' ') if authmeth.lower() == 'basic': auth = base64.b64decode(auth).decode('utf-8') username, password = auth.split(':') if (username == HEARTBEAT['auth']['username'] and password == HEARTBEAT['auth']['password']): return func(request, *args, **kwargs) response = HttpResponse( "Authentication failed", status=401) response['WWW-Authenticate'] = 'Basic realm="Welcome to 1337"' return response return _decorator
def meta_compile(compile): "Wraps the compile() method to do dark magic, see `meta_shellcode`." @functools.wraps(compile) def compile_wrapper(self, state = None): if state is None: state = CompilerState() self.offset = state.offset if hasattr(self, '_compile_hook'): bytes = self._compile_hook(compile, state) else: bytes = compile(self, state) state.next_piece( len(bytes) ) return bytes return compile_wrapper
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)): def decorator(func): def _handle_timeout(signum, frame): raise TimeoutError(error_message) def wrapper(*args, **kwargs): signal.signal(signal.SIGALRM, _handle_timeout) signal.alarm(seconds) try: result = func(*args, **kwargs) finally: signal.alarm(0) return result return wraps(func)(wrapper) return decorator
def locked(path, timeout=None): """Decorator which enables locks for decorated function. Arguments: - path: path for lockfile. - timeout (optional): Timeout for acquiring lock. Usage: @locked('/var/run/myname', timeout=0) def myname(...): ... """ def decor(func): @functools.wraps(func) def wrapper(*args, **kwargs): lock = FileLock(path, timeout=timeout) lock.acquire() try: return func(*args, **kwargs) finally: lock.release() return wrapper return decor
def set_package(fxn): """Set __package__ on the returned module. This function is deprecated. """ @functools.wraps(fxn) def set_package_wrapper(*args, **kwargs): warnings.warn('The import system now takes care of this automatically.', DeprecationWarning, stacklevel=2) module = fxn(*args, **kwargs) if getattr(module, '__package__', None) is None: module.__package__ = module.__name__ if not hasattr(module, '__path__'): module.__package__ = module.__package__.rpartition('.')[0] return module return set_package_wrapper
def __getattr__(self, name): # Attribute lookups are delegated to the underlying file # and cached for non-numeric results # (i.e. methods are cached, closed and friends are not) file = self.__dict__['file'] a = getattr(file, name) if hasattr(a, '__call__'): func = a @_functools.wraps(func) def func_wrapper(*args, **kwargs): return func(*args, **kwargs) # Avoid closing the file as long as the wrapper is alive, # see issue #18879. func_wrapper._closer = self._closer a = func_wrapper if not isinstance(a, int): setattr(self, name, a) return a # The underlying __enter__ method returns the wrong object # (self.file) so override it to return the wrapper
def login_required(func): @wraps(func) def wrapper(*args, **kwargs): logger.info ("get request, path: %s" % request.path) token = request.form.get("token", None) if (token == None): logger.info ("get request without token, path: %s" % request.path) return json.dumps({'success':'false', 'message':'user or key is null'}) result = post_to_user("/authtoken/", {'token':token}) if result.get('success') == 'true': username = result.get('username') beans = result.get('beans') else: return result #if (cur_user == None): # return json.dumps({'success':'false', 'message':'token failed or expired', 'Unauthorized': 'True'}) return func(username, beans, request.form, *args, **kwargs) return wrapper
def test_wraps(self): self.assert_ok("""\ from functools import wraps def my_decorator(f): dec = wraps(f) def wrapper(*args, **kwds): print('Calling decorated function') return f(*args, **kwds) wrapper = dec(wrapper) return wrapper @my_decorator def example(): '''Docstring''' return 17 assert example() == 17 """)
def ensure_empty_collections(*collections): """ Clears collections listed after function has completed. Will throw an assertion if any collection is not empty when called. """ def clear(f): @wraps(f) def wrapper(*args, **kwargs): db = api.common.get_conn() collection_size = lambda name: len(list(db[name].find())) for collection in collections: assert collection_size(collection) == 0, "Collection was not empty: " + collection result = f(*args, **kwargs) return result return wrapper return clear
def clear_collections(*collections): """ Clears collections listed after function has completed. Will throw an assertion if any collection is not empty when called. """ def clear(f): @wraps(f) def wrapper(*args, **kwargs): db = api.common.get_conn() try: result = f(*args, **kwargs) finally: #Clear the collections. for collection in collections: db[collection].remove() #Ensure they are then empty. for collection in collections: collection_size = lambda collection: len(list(db[collection].find())) assert collection_size(collection) == 0, "Collection: {} was not able to be cleared.".format(collection) return result return wrapper return clear
def block_before_competition(return_result): """ Wraps a routing function that should be blocked before the start time of the competition """ def decorator(f): """ Inner decorator """ @wraps(f) def wrapper(*args, **kwds): if datetime.utcnow().timestamp() > api.config.get_settings()["start_time"].timestamp(): return f(*args, **kwds) else: return return_result return wrapper return decorator
def _silent_connection_failure(func): """Decorator used to avoid raising an exception when the database timeouts Parameters: func: Function to decorate. """ @wraps(func) def wrapper(*args, **kwargs): """Wraps the function to catch timeout exception. """ if not FLAGS.disable_mongodb_exception: return func(*args, **kwargs) try: result = func(*args, **kwargs) except pymongo.errors.ServerSelectionTimeoutError as e: logging.error("Unable to reach the caching server: %s", e) return None return result return wrapper
def array_stream(func): """ Decorates streaming functions to make sure that the stream is a stream of ndarrays. Objects that are not arrays are transformed into arrays. If the stream is in fact a single ndarray, this ndarray is repackaged into a sequence of length 1. The first argument of the decorated function is assumed to be an iterable of arrays, or an iterable of objects that can be casted to arrays. """ @wraps(func) # thanks functools def decorated(arrays, *args, **kwargs): if isinstance(arrays, ndarray): arrays = (arrays,) return func(map(atleast_1d, arrays), *args, **kwargs) return decorated
def call(f): """ Wrap a function in a processor that calls `f` on the argument before passing it along. Useful for creating simple arguments to the `@preprocess` decorator. Parameters ---------- f : function Function accepting a single argument and returning a replacement. Usage ----- >>> @preprocess(x=call(lambda x: x + 1)) ... def foo(x): ... return x ... >>> foo(1) 2 """ @wraps(f) def processor(func, argname, arg): return f(arg) return processor
def require_initialized(exception): """ Decorator for API methods that should only be called after TradingAlgorithm.initialize. `exception` will be raised if the method is called before initialize has completed. Usage ----- @require_initialized(SomeException("Don't do that!")) def method(self): # Do stuff that should only be allowed after initialize. """ def decorator(method): @wraps(method) def wrapped_method(self, *args, **kwargs): if not self.initialized: raise exception return method(self, *args, **kwargs) return wrapped_method return decorator
def coerce_numbers_to_my_dtype(f): """ A decorator for methods whose signature is f(self, other) that coerces ``other`` to ``self.dtype``. This is used to make comparison operations between numbers and `Factor` instances work independently of whether the user supplies a float or integer literal. For example, if I write:: my_filter = my_factor > 3 my_factor probably has dtype float64, but 3 is an int, so we want to coerce to float64 before doing the comparison. """ @wraps(f) def method(self, other): if isinstance(other, Number): other = coerce_to_dtype(self.dtype, other) return f(self, other) return method
def with_algo(f): name = f.__name__ if not name.startswith('test_'): raise ValueError('This must decorate a test case') tfm_name = name[len('test_'):] @wraps(f) def wrapper(self, data_frequency, days=None): sim_params, source = self.sim_and_source[data_frequency] algo = TradingAlgorithm( initialize=initialize_with(self, tfm_name, days), handle_data=handle_data_wrapper(f), sim_params=sim_params, env=self.env, ) algo.run(source) return wrapper
def authorized(admin_only=False): def wrap(user_handler): @wraps(user_handler) def authorized_handler(self, *args, **kwargs): self.set_cache(is_public=False) request = self.request if request.method == 'GET': if not self.current_user_id: next_url = self.get_argument('next', '/') self.redirect(self.get_login_url() + "?next=" + next_url, status=302 if request.version == 'HTTP/1.0' else 303) elif admin_only and not self.is_admin: raise HTTPError(403) else: user_handler(self, *args, **kwargs) elif not self.current_user_id: raise HTTPError(403) elif admin_only and not self.is_admin: raise HTTPError(403) else: user_handler(self, *args, **kwargs) return authorized_handler return wrap
def memoized(func): """Decorator that caches a function's return value each time it is called. If called later with the same arguments, the cached value is returned, and the function is not re-evaluated. Based upon from http://wiki.python.org/moin/PythonDecoratorLibrary#Memoize Nota bene: this decorator memoizes /all/ calls to the function. For a memoization decorator with limited cache size, consider: http://code.activestate.com/recipes/496879-memoize-decorator-function-with-cache-size-limit/ """ cache = {} @wraps(func) def func_wrapper(*args, **kwargs): key = cPickle.dumps((args, kwargs)) if key not in cache: cache[key] = func(*args, **kwargs) return cache[key] return func_wrapper
def calculate_mantl_vars(func): """calculate Mantl vars""" @wraps(func) def inner(*args, **kwargs): name, attrs, groups = func(*args, **kwargs) # attrs if attrs.get('role', '') == 'control': attrs['consul_is_server'] = True else: attrs['consul_is_server'] = False # groups if attrs.get('publicly_routable', False): groups.append('publicly_routable') return name, attrs, groups return inner
def _check_inputs_and_outputs(fcn): @functools.wraps(fcn) def call(conn, inputs, outputs, identifier, *a, **kw): assert isinstance(inputs, (list, tuple, set)), inputs assert isinstance(outputs, (list, tuple, set)), outputs assert '' not in inputs, inputs assert '' not in outputs, outputs # this is for actually locking inputs/outputs inputs, outputs = list(map(str, inputs)), list(map(str, outputs)) locks = inputs + [''] + outputs if kw.pop('history', None): igraph = [EDGE_RE.sub('*', inp) for inp in inputs] ograph = [EDGE_RE.sub('*', out) for out in outputs] graph_id = EDGE_RE.sub('*', str(identifier)) graph = igraph + [''] + ograph + ['', graph_id] if all(x.startswith('test.') for x in igraph + ograph): graph = ['', ''] else: graph = ['', ''] return fcn(conn, locks, graph, str(identifier), *a, **kw) return call
def immutable(cls): cls.__frozen = False def frozensetattr(self, key, value): if self.__frozen and not hasattr(self, key): print("Class {} is frozen. Cannot set {} = {}" .format(cls.__name__, key, value)) else: object.__setattr__(self, key, value) def init_decorator(func): @wraps(func) def wrapper(self, *args, **kwargs): func(self, *args, **kwargs) self.__frozen = True return wrapper cls.__setattr__ = frozensetattr cls.__init__ = init_decorator(cls.__init__) return cls
def set_self_blocking(function): @functools.wraps(function) def wrapper(*args, **kwargs): self = args[0] try: _is_blocking = self.gettimeout() if _is_blocking == 0: self.setblocking(True) return function(*args, **kwargs) except Exception as e: raise finally: # set orgin blocking if _is_blocking == 0: self.setblocking(False) return wrapper
def adapt_rgb(apply_to_rgb): """Return decorator that adapts to RGB images to a gray-scale filter. This function is only intended to be used for functions that don't accept volumes as input, since checking an image's shape is fragile. Parameters ---------- apply_to_rgb : function Function that returns a filtered image from an image-filter and RGB image. This will only be called if the image is RGB-like. """ def decorator(image_filter): @functools.wraps(image_filter) def image_filter_adapted(image, *args, **kwargs): if is_rgb_like(image): return apply_to_rgb(image_filter, image, *args, **kwargs) else: return image_filter(image, *args, **kwargs) return image_filter_adapted return decorator
def patch(obj, func_name, before_func, after_func): if not hasattr(obj, func_name): return target_func = getattr(obj, func_name) # already patched if hasattr(target_func, '__stackimpact_orig__'): return @wraps(target_func) def wrapper(*args, **kwds): if before_func: before_func(*args, **kwds) ret = target_func(*args, **kwds) if after_func: after_func(ret) return ret wrapper.__orig__ = target_func setattr(obj, func_name, wrapper)
def adapted(func): @wraps(func) def wrapper(**kwargs): final_args = {} for name, value in kwargs.items(): adapt = func.__annotations__.get(name) if adapt is not None: final_args[name] = adapt(value) else: final_args[name] = value result = func(**final_args) adapt = func.__annotations__.get('return') if adapt is not None: return adapt(result) return result return wrapper
def admin_access(): def access(func): @wraps(func) def wrapped(self, bot: Bot, update: Update, *args, **kwargs): user = update.effective_user admins = getattr(self, 'admins', None) if admins is None: self.logger.warning('Specify self.admins (list of users ids) parameter in ' 'manager or channel classes in order to restrict access to bot.') return func(self, bot, update, *args, **kwargs) if user.id not in admins: self.logger.info(f"Unauthorized access denied for {user}.") return return func(self, bot, update, *args, **kwargs) return wrapped return access
def __call__(self, func_or_msg=None, **kwargs): """ If the first argument is a function, return a decorator which runs the wrapped function inside Timer's context manager. Otherwise, treat the first argument as a 'msg' string and return an updated Timer instance, referencing the same logger. A 'level' keyword can also be passed to override self.level. """ if isinstance(func_or_msg, collections.Callable): func = func_or_msg # use the function name when no explicit 'msg' is provided if not self.msg: self.msg = "run '%s'" % func.__name__ @wraps(func) def wrapper(*args, **kwds): with self: return func(*args, **kwds) return wrapper else: msg = func_or_msg or kwargs.get("msg") level = kwargs.get("level", self.level) return self.__class__(self.logger, msg, level)
def retry(times=3): def wrapper(func): @wraps(func) def fun(*args, **kwargs): count = 0 while count < times: try: return func(*args, **kwargs) except Exception as e: count = count + 1 logger.error("connection failed after retried 3 times. {} {}".format(args, kwargs)) raise Exception("connection failed after retried 3 times. {} {}".format(args, kwargs)) return fun return wrapper
def verify_request_signature(func): @wraps(func) def decorated(*args, **kwargs): signature = request.headers.get('x-hub-signature', None) if signature: elements = signature.split('=') method = elements[0] signature_hash = elements[1] expected_hash = hmac.new(APP_SECRET, msg=request.get_data(), digestmod=method).hexdigest() if signature_hash != expected_hash: LOGGER.error('Signature was invalid') return make_response('', 403) else: LOGGER.error('Could not validate the signature') return func(*args, **kwargs) return decorated
def logger(command): """ Add a logger to the decorated command. """ @wraps(command) # pylint: disable=bad-whitespace # pylint: disable=unused-argument def wrapper(bot, update, **kwargs): command( update, **kwargs) if command.__name__ == 'unknown': command.__name__ = _get_command_name(update.message.text) message = LOG_TEMPLATE.format(user=update.user.first_name, command=command.__name__) logging.info(message) return wrapper
def run_in_executor(f): """ A decorator to run the given method in the ThreadPoolExecutor. """ @wraps(f) def new_f(self, *args, **kwargs): if self.is_shutdown: return try: future = self.executor.submit(f, self, *args, **kwargs) future.add_done_callback(_future_completed) except Exception: log.exception("Failed to submit task to executor") return new_f
def handle_bad_input_mc(f): @wraps(f) def handle(*args, **kwargs): pre_type = args[0].pre_type if pre_type == PreType.None: return handle_bad_input(f)(*args, **kwargs) EType = { PreType.SimplePre : SimplePre.InvalidMcOperation, PreType.SimplePreLAG : SimplePreLAG.InvalidMcOperation }[pre_type] Codes = { PreType.SimplePre : SimplePre.McOperationErrorCode, PreType.SimplePreLAG : SimplePreLAG.McOperationErrorCode }[pre_type] try: return handle_bad_input(f)(*args, **kwargs) except EType as e: error = Codes._VALUES_TO_NAMES[e.code] print "Invalid PRE operation (%s)" % error return handle
def validate(validation_class, silent=not settings.DEBUG): def decorator(fun): @functools.wraps(fun) def wrapper(*a, **kw): if flask.request.method == 'GET': p = {k: v for k, v in flask.request.values and flask.request.values.items() or {}} else: try: p = flask.request.data and json.loads(flask.request.data.decode()) except Exception: raise exceptions.WrongParametersException if silent: try: validation_class(p) except PyCombValidationError: raise exceptions.WrongParametersException else: validation_class(p) return fun(*a, params=p, **kw) return wrapper return decorator
def handle_errors(f): """A decorator that allows to ignore certain types of errors.""" @functools.wraps(f) def wrapper(*args, **kwargs): param_name = 'ignore_errors' ignored_errors = kwargs.get(param_name, tuple()) if param_name in kwargs: del kwargs[param_name] try: return f(*args, **kwargs) except ignored_errors: # Silently ignore errors pass return wrapper
def add_arg_scope(func): """Decorates a function with args so it can be used within an arg_scope. Args: func: function to decorate. Returns: A tuple with the decorated function func_with_args(). """ @functools.wraps(func) def func_with_args(*args, **kwargs): current_scope = _current_arg_scope() current_args = kwargs key_func = (func.__module__, func.__name__) if key_func in current_scope: current_args = current_scope[key_func].copy() current_args.update(kwargs) return func(*args, **current_args) _add_op(func) return func_with_args
def overloaded(func): @wraps(func) def overloaded_func(*args, **kwargs): for f in overloaded_func.overloads: try: return f(*args, **kwargs) except TypeError: pass else: # it will be nice if the error message prints a list of # possible signatures here raise TypeError("No compatible signatures") def overload_with(func): overloaded_func.overloads.append(func) return overloaded_func overloaded_func.overloads = [func] overloaded_func.overload_with = overload_with return overloaded_func #############
def consumer(func): """A decorator, advances func to its first yield point when called. Modifed this original example code from PEP 342 to use the new functools.wraps decorator. This convenience function makes it look like the original function, which is almost always what we want, especially if we designed the original function to be wrapped in the first place! Maybe `consumer` should go into functools too! """ from functools import wraps @wraps(func) def wrapper(*args,**kw): gen = func(*args, **kw) gen.next() return gen return wrapper
def retry_with(handle_retry, exceptions, conditions, max_attempts): def wrapper(func): @functools.wraps(func) def decorated(*args, **kwargs): error = None result = None for attempt in range(1, max_attempts + 1): try: result = func(*args, **kwargs) if any(guard(result) for guard in conditions): handle_retry.retry(func, args, kwargs, None, attempt) continue return result except Exception as err: error = err if any(isinstance(err, exc) for exc in exceptions): handle_retry.retry(func, args, kwargs, err, attempt) continue break logger.info('failed after {} attempts'.format(attempt)) if error is not None: raise error return result return decorated return wrapper
def restart_on_change(restart_map, stopstart=False, restart_functions=None): """Restart services based on configuration files changing This function is used a decorator, for example:: @restart_on_change({ '/etc/ceph/ceph.conf': [ 'cinder-api', 'cinder-volume' ] '/etc/apache/sites-enabled/*': [ 'apache2' ] }) def config_changed(): pass # your code here In this example, the cinder-api and cinder-volume services would be restarted if /etc/ceph/ceph.conf is changed by the ceph_client_changed function. The apache2 service would be restarted if any file matching the pattern got changed, created or removed. Standard wildcards are supported, see documentation for the 'glob' module for more information. @param restart_map: {path_file_name: [service_name, ...] @param stopstart: DEFAULT false; whether to stop, start OR restart @param restart_functions: nonstandard functions to use to restart services {svc: func, ...} @returns result from decorated function """ def wrap(f): @functools.wraps(f) def wrapped_f(*args, **kwargs): return restart_on_change_helper( (lambda: f(*args, **kwargs)), restart_map, stopstart, restart_functions) return wrapped_f return wrap
def cached(func): """Cache return values for multiple executions of func + args For example:: @cached def unit_get(attribute): pass unit_get('test') will cache the result of unit_get + 'test' for future calls. """ @wraps(func) def wrapper(*args, **kwargs): global cache key = str((func, args, kwargs)) try: return cache[key] except KeyError: pass # Drop out of the exception handler scope. res = func(*args, **kwargs) cache[key] = res return res wrapper._wrapped = func return wrapper
def translate_exc(from_exc, to_exc): def inner_translate_exc1(f): @wraps(f) def inner_translate_exc2(*args, **kwargs): try: return f(*args, **kwargs) except from_exc: raise to_exc return inner_translate_exc2 return inner_translate_exc1
def os_requires_version(ostack_release, pkg): """ Decorator for hook to specify minimum supported release """ def wrap(f): @wraps(f) def wrapped_f(*args): if os_release(pkg) < ostack_release: raise Exception("This hook is not supported on releases" " before %s" % ostack_release) f(*args) return wrapped_f return wrap
def os_workload_status(configs, required_interfaces, charm_func=None): """ Decorator to set workload status based on complete contexts """ def wrap(f): @wraps(f) def wrapped_f(*args, **kwargs): # Run the original function first f(*args, **kwargs) # Set workload status now that contexts have been # acted on set_os_workload_status(configs, required_interfaces, charm_func) return wrapped_f return wrap
def admin_login_required(method): def is_admin(user): if isinstance(user.is_admin, bool): return user.is_admin else: return user.is_admin() @functools.wraps(method) def wrapper(*args, **kwargs): if not current_user.is_authenticated: flash("This section is for logged in users only.", 'warning') return redirect(url_for('redberry.home')) if not hasattr(current_user, 'is_admin'): flash("Redberry expects your user instance to implement an `is_admin` boolean attribute " "or an `is_admin()` method.", 'warning') return redirect(url_for('redberry.home')) if not is_admin(current_user): flash("This section is for admin users only.", 'warning') return redirect(url_for('redberry.home')) return method(*args, **kwargs) return wrapper ############ # CMS ROUTES ############
def with_context_manager(ctx_manager): def decorator(fn): @wraps(fn) def context_manager_wrapper(*a, **kw): with ctx_manager: return fn(*a, **kw) context_manager_wrapper.djpt_patched = True return context_manager_wrapper return decorator
def lazy_property(func): attribute = '_lazy_' + func.__name__ @property @functools.wraps(func) def wrapper(self): if not hasattr(self, attribute): setattr(self, attribute, func(self)) return getattr(self, attribute) return wrapper # ?bias_shape ? None??????bias
def parse_args(**k): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): pv = {} for key, val in k.iteritems(): arg_type, default_val = val pv[key] = parse_single_arg(key, kwargs, arg_type, default_val) kwargs.update(pv) return func(*args, **kwargs) return wrapper return decorator