我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.wraps()。
def _handle_parsing_error(func): """Decorator for handling errors in raw output data.""" @six.wraps(func) def wrapper(raw_data): msg = _('Data from Intel Node Manager %s') try: return func(raw_data) except (IndexError, struct.error): raise ironic_exception.IPMIFailure(msg % _('has wrong length.')) except KeyError: raise ironic_exception.IPMIFailure(msg % _('is corrupted.')) except ValueError: raise ironic_exception.IPMIFailure(msg % _('cannot be converted.')) return wrapper
def _safe_operation(operation_name): def handle_func(func): @six.wraps(func) def handle_args(*args, **kwargs): instance, resource, context = args[:3] if resource not in instance.operation_resources_map[ operation_name]: raise exceptions.ResourceNotSupported(resource, operation_name) retries = 1 for i in xrange(retries + 1): try: service = instance.resource_service_map[resource] instance._ensure_endpoint_set(context, service) return func(*args, **kwargs) except exceptions.EndpointNotAvailable as e: if i == retries: raise if cfg.CONF.client.auto_refresh_endpoint: LOG.warn(e.message + ', update endpoint and try again') instance._update_endpoint_from_keystone(context, True) else: raise return handle_args return handle_func
def __getattr__(self, item): if self._pandas_only: raise SkipTest("empyrical.%s expects pandas-only inputs that have " "dt indices/labels" % item) func = super(ConvertPandasEmpyricalProxy, self).__getattr__(item) @wraps(func) def convert_args(*args, **kwargs): args = [self._convert(arg) if isinstance(arg, NDFrame) else arg for arg in args] kwargs = { k: self._convert(v) if isinstance(v, NDFrame) else v for k, v in iteritems(kwargs) } return func(*args, **kwargs) return convert_args
def interprocess_locked(path): """Acquires & releases a interprocess lock around call into decorated function.""" lock = InterProcessLock(path) def decorator(f): @six.wraps(f) def wrapper(*args, **kwargs): with lock: return f(*args, **kwargs) return wrapper return decorator
def keras_test(func): """Function wrapper to clean up after TensorFlow tests. # Arguments func: test function to clean up after. # Returns A function wrapping the input function. """ @six.wraps(func) def wrapper(*args, **kwargs): output = func(*args, **kwargs) if K.backend() == 'tensorflow': K.clear_session() return output return wrapper
def __call__(self, func_or_cls): condition = self.condition reason = self.reason if inspect.isfunction(func_or_cls): @six.wraps(func_or_cls) def wrapped(*args, **kwargs): if condition: raise testtools.TestCase.skipException(reason) return func_or_cls(*args, **kwargs) return wrapped elif inspect.isclass(func_or_cls): orig_func = getattr(func_or_cls, 'setUp') @six.wraps(orig_func) def new_func(self, *args, **kwargs): if condition: raise testtools.TestCase.skipException(reason) orig_func(self, *args, **kwargs) func_or_cls.setUp = new_func return func_or_cls else: raise TypeError('skipUnless can be used only with functions or ' 'classes')
def error_trap(app_name): """Decorator trapping any error during application boot time. :param app_name: Application name :type app_name: str :return: _wrapper function """ @six.wraps(error_trap) def _wrapper(func): @six.wraps(_wrapper) def _inner_wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception: logger = log.getLogger(__name__) logger.exception( 'Failed to load application: \'{}\''.format(app_name)) raise return _inner_wrapper return _wrapper
def keras_test(func): """Function wrapper to clean up after TensorFlow tests. # Arguments func: test function to clean up after. # Returns A function wrapping the input function. """ @six.wraps(func) def wrapper(*args, **kwargs): if K.backend() == 'tensorflow' or K.backend() == 'mxnet': K.clear_session() output = func(*args, **kwargs) if K.backend() == 'tensorflow' or K.backend() == 'mxnet': K.clear_session() return output return wrapper
def retry(exc_tuple, tries=5, delay=1, backoff=2): def retry_dec(f): @six.wraps(f) def func_retry(*args, **kwargs): _tries, _delay = tries, delay while _tries > 1: try: return f(*args, **kwargs) except exc_tuple: time.sleep(_delay) _tries -= 1 _delay *= backoff LOG.debug('Retrying %(args)s, %(tries)s attempts ' 'remaining...', {'args': args, 'tries': _tries}) # NOTE(jdg): Don't log the params passed here # some cmds like createAccount will have sensitive # info in the params, grab only the second tuple # which should be the Method msg = ('Retry count exceeded for command: %s' % args[1],) LOG.error(msg) raise SolidFireRequestException(message=msg) return func_retry return retry_dec
def check_firmware_update_component(func): """Checks the firmware update component.""" @six.wraps(func) def wrapper(self, filename, component_type): """Wrapper around ``update_firmware`` call. :param filename: location of the raw firmware file. :param component_type: Type of component to be applied to. """ component_type = component_type and component_type.lower() if (component_type not in SUPPORTED_FIRMWARE_UPDATE_COMPONENTS): msg = ("Got invalid component type for firmware update: " "``update_firmware`` is not supported on %(component)s" % {'component': component_type}) LOG.error(self._(msg)) # noqa raise exception.InvalidInputError(msg) return func(self, filename, component_type) return wrapper
def retry(*dargs, **dkw): """Wrap a function with a new `Retrying` object. :param dargs: positional arguments passed to Retrying object :param dkw: keyword arguments passed to the Retrying object """ # support both @retry and @retry() as valid syntax if len(dargs) == 1 and callable(dargs[0]): return retry()(dargs[0]) else: def wrap(f): if asyncio and asyncio.iscoroutinefunction(f): r = AsyncRetrying(*dargs, **dkw) elif tornado and tornado.gen.is_coroutine_function(f): r = TornadoRetrying(*dargs, **dkw) else: r = Retrying(*dargs, **dkw) return r.wraps(f) return wrap
def wraps(self, f): """Wrap a function for retrying. :param f: A function to wraps for retrying. """ @six.wraps(f) def wrapped_f(*args, **kw): return self.call(f, *args, **kw) def retry_with(*args, **kwargs): return self.copy(*args, **kwargs).wraps(f) wrapped_f.retry = self wrapped_f.retry_with = retry_with return wrapped_f
def catch_errors(func): """Decorator which catches all errors and tries to print them.""" @six.wraps(func) @click.pass_context def decorator(ctx, *args, **kwargs): try: return func(*args, **kwargs) except exceptions.DecapodAPIError as exc: utils.format_output_json(ctx, exc.json, True) except exceptions.DecapodError as exc: click.echo(six.text_type(exc), err=True) finally: ctx.close() ctx.exit(os.EX_SOFTWARE) return decorator
def with_color(func): """Decorator which adds --color option if available.""" if pygments is None: def decorator(*args, **kwargs): kwargs["color"] = None return func(*args, **kwargs) else: decorator = click.option( "--color", default=None, type=click.Choice(["light", "dark"]), help=( "Colorize output. By default no color is used. " "Parameter means colorscheme of the terminal") )(func) decorator = six.wraps(func)(decorator) return decorator
def wrap_errors(func): """Decorator which logs and catches all errors of decorated function. Also wraps all possible errors into :py:exc:`DecapodAPIError` class. :return: Value of decorated function. :raises decapodlib.exceptions.DecapodError: on any exception in decorated function. """ @six.wraps(func) def decorator(*args, **kwargs): try: return func(*args, **kwargs) except Exception as exc: if isinstance(exc, exceptions.DecapodError): LOG.error("Error on access to API: %s", exc) raise LOG.exception("Exception in decapodlib: %s", exc) raise exceptions.DecapodAPIError(exc) return decorator
def slugify_argument(func): """ Wraps a function that returns a string, adding the 'slugify' argument. >>> slugified_fn = slugify_argument(lambda *args, **kwargs: "YOU ARE A NICE LADY") >>> slugified_fn() 'YOU ARE A NICE LADY' >>> slugified_fn(slugify=True) 'you-are-a-nice-lady' """ @six.wraps(func) def wrapped(*args, **kwargs): if "slugify" in kwargs and kwargs['slugify']: return _slugify(func(*args, **kwargs)) else: return func(*args, **kwargs) return wrapped
def capitalize_argument(func): """ Wraps a function that returns a string, adding the 'capitalize' argument. >>> capsified_fn = capitalize_argument(lambda *args, **kwargs: "what in the beeswax is this?") >>> capsified_fn() 'what in the beeswax is this?' >>> capsified_fn(capitalize=True) 'What In The Beeswax Is This?' """ @six.wraps(func) def wrapped(*args, **kwargs): if "capitalize" in kwargs and kwargs['capitalize']: return func(*args, **kwargs).title() else: return func(*args, **kwargs) return wrapped
def check_keyword_arguments(func): @six.wraps(func) def wrapper(**kw): obj_type = kw.pop('object_type') result = func(**kw) extra_args = set(kw) - set(result) if extra_args: raise exception.InvalidParameterValue( _("Unknown keyword arguments (%(extra)s) were passed " "while creating a test %(object_type)s object.") % {"extra": ", ".join(extra_args), "object_type": obj_type}) return result return wrapper
def _translate_excp(func): """Decorator to translate automaton exceptions into mogan exceptions.""" @six.wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except (automaton_exceptions.InvalidState, automaton_exceptions.NotInitialized, automaton_exceptions.FrozenMachine, automaton_exceptions.NotFound) as e: raise excp.InvalidState(six.text_type(e)) except automaton_exceptions.Duplicate as e: raise excp.DuplicateState(six.text_type(e)) return wrapper
def ks_exceptions(f): """Wraps keystoneclient functions and centralizes exception handling.""" @six.wraps(f) def wrapper(*args, **kwargs): try: return f(*args, **kwargs) except kaexception.EndpointNotFound: service_type = kwargs.get('service_type', 'baremetal') endpoint_type = kwargs.get('endpoint_type', 'internal') raise exception.CatalogNotFound( service_type=service_type, endpoint_type=endpoint_type) except (kaexception.Unauthorized, kaexception.AuthorizationFailure): raise exception.KeystoneUnauthorized() except (kaexception.NoMatchingPlugin, kaexception.MissingRequiredOptions) as e: raise exception.ConfigInvalid(six.text_type(e)) except Exception as e: LOG.exception('Keystone request failed: %(msg)s', {'msg': six.text_type(e)}) raise exception.KeystoneFailure(six.text_type(e)) return wrapper
def authenticated(blocking=True): def decorator(f): @wraps(f) def wrapper(*args, **kwargs): user = None auth = request.headers.get('Authorization') if auth and len(auth) >= 7: sid = auth[7:] # 'Bearer ' prefix session = sessions.get(sid) if session and session.get('user_id'): user = get_user(session['user_id']) if user: user.sid = sid if blocking and not user: raise ApiException('unauthorized', status_code=401) res = f(user=user, *args, **kwargs) return res return wrapper return decorator
def db_transaction(function): @six.wraps(function) def inner(*args, **kwargs): config = alembic_cfg.Config( os.path.join(os.path.dirname(__file__), 'alembic.ini')) config.set_main_option( 'script_location', 'mistral.db.sqlalchemy.migration:alembic_migrations') # attach the Mistral conf to the Alembic conf config.mistral_config = CONF CONF(project='mistral') with db_api.transaction(): return function(*args, **kwargs) return inner
def expects_func_args(*args): def _decorator_checker(dec): @functools.wraps(dec) def _decorator(f): base_f = safe_utils.get_wrapped_function(f) arg_names, a, kw, _default = inspect.getargspec(base_f) if a or kw or set(args) <= set(arg_names): # NOTE : We can't really tell if correct stuff will # be passed if it's a function with *args or **kwargs so # we still carry on and hope for the best return dec(f) else: raise TypeError("Decorated function %(f_name)s does not " "have the arguments expected by the " "decorator %(d_name)s" % {'f_name': base_f.__name__, 'd_name': dec.__name__}) return _decorator return _decorator_checker
def spawn(func, *args, **kwargs): """Passthrough method for eventlet.spawn. This utility exists so that it can be stubbed for testing without interfering with the service spawns. It will also grab the context from the threadlocal store and add it to the store on the new thread. This allows for continuity in logging the context when using this method to spawn a new thread. """ _context = common_context.get_current() @functools.wraps(func) def context_wrapper(*args, **kwargs): # NOTE: If update_store is not called after spawn it won't be # available for the logger to pull from threadlocal storage. if _context is not None: _context.update_store() return func(*args, **kwargs) return eventlet.spawn(context_wrapper, *args, **kwargs)
def spawn_n(func, *args, **kwargs): """Passthrough method for eventlet.spawn_n. This utility exists so that it can be stubbed for testing without interfering with the service spawns. It will also grab the context from the threadlocal store and add it to the store on the new thread. This allows for continuity in logging the context when using this method to spawn a new thread. """ _context = common_context.get_current() @functools.wraps(func) def context_wrapper(*args, **kwargs): # NOTE: If update_store is not called after spawn_n it won't be # available for the logger to pull from threadlocal storage. if _context is not None: _context.update_store() func(*args, **kwargs) eventlet.spawn_n(context_wrapper, *args, **kwargs)
def synchronized(name, semaphores=None, blocking=False): def wrap(f): @six.wraps(f) def inner(*args, **kwargs): lock_name = 'masakari-%s' % name int_lock = lockutils.internal_lock(lock_name, semaphores=semaphores) LOG.debug("Acquiring lock: %(lock_name)s on resource: " "%(resource)s", {'lock_name': lock_name, 'resource': f.__name__}) if not int_lock.acquire(blocking=blocking): raise exception.LockAlreadyAcquired(resource=name) try: return f(*args, **kwargs) finally: LOG.debug("Releasing lock: %(lock_name)s on resource: " "%(resource)s", {'lock_name': lock_name, 'resource': f.__name__}) int_lock.release() return inner return wrap
def eat_exceptions(function): @six.wraps(function) def decorator(*args, **kwargs): try: return function(*args, **kwargs) except HTTPError as exception: if exception.response.status_code == 401: error_and_quit('Your authentication information may be incorrect. Please ' 'reconfigure with ``dbfs configure``') else: error_and_quit(exception.response.content) except Exception as exception: # noqa if not DEBUG_MODE: error_and_quit('{}: {}'.format(type(exception).__name__, str(exception))) decorator.__doc__ = function.__doc__ return decorator
def run_once(function, state={}, errors={}): """A memoization decorator, whose purpose is to cache calls.""" @six.wraps(function) def _wrapper(*args, **kwargs): if function in errors: # Deliberate use of LBYL. six.reraise(*errors[function]) try: return state[function] except KeyError: try: state[function] = result = function(*args, **kwargs) return result except Exception: errors[function] = sys.exc_info() raise return _wrapper
def token_required(self, func=None, callback=None): """Decorator for endpoints which require a verified user.""" # logger.debug("token_required received func={!r}, callback={!r}", func, callback) if func is None: # called with optional arguments; return value will be called without, so pass them through return functools.partial(self.token_required, callback=callback) @six.wraps(func) def decorator(request, *args, **kwargs): userinfo = self.check_token(request) kwargs['userinfo'] = userinfo args = (request,) + args if callback is not None: # logger.debug("calling {!r} with args={!r} and kwargs={!r}", callback, args, kwargs) callback(*args, **kwargs) # logger.debug("calling {!r} with args={!r} and kwargs={!r}", func, args, kwargs) return func(*args, **kwargs) return decorator
def serialized_endpoint(*_callbacks): """Decorator which wraps an endpoint by applying callbacks to the results then serializing to JSON. First, the decorated function is called and must return a `defer.Deferred`. The callbacks supplied to the decorator are then applied followed by any callbacks supplied as keyword arguments to the decorated function. The result is then serialized and returned in the response (with ``Content-Type`` set to ``application/json``). """ def decorator(func): @six.wraps(func) def wrapped(request, *args, **kwargs): callbacks = kwargs.pop('callbacks', []) # logger.debug("in wrapped:\nargs: {!r}\nkwargs: {!r}", args, kwargs) deferred_list = func(*args, **kwargs) for callback in list(_callbacks) + callbacks: deferred_list.addCallback(callback) deferred_list.addCallback(json.dumps, default=stethoscope.utils.json_serialize_datetime) request.setHeader('Content-Type', 'application/json') return deferred_list return wrapped return decorator
def queryapi(version, kind, nsarg=True): """Make Query API. .. py:decorator:: queryapi Creates a named query api. """ def decorator(func): @six.wraps(func) def handler(self, namespace=DEFAULT_NAMESPACE): if not nsarg: namespace = None url = self._generate_url(api_version=version, kind=kind, namespace=namespace) return Query(self, url) return handler return decorator
def template(action): """Handle template actions. .. py:decorator:: template Checks if the kind is 'template' and processes else default processing. """ def decorator(func): @six.wraps(func) def handler(self, obj, namespace=None): apiver, kind, _ = validator.validate(obj) if kind == 'Template': return self._process_template(apiver, kind, action, obj, namespace) else: return func(self, obj, namespace) return handler return decorator
def _isotherguard(method): """Checks if `other` is a GuardFunc object.""" @six.wraps(method) def checks(self, other): if not isinstance(other, GuardFunc): raise TypeError("The right-hand operand has to be instance of GuardFunc") return method(self, other) return checks
def retry(*dargs, **dkw): """ Decorator function that instantiates the Retrying object @param *dargs: positional arguments passed to Retrying object @param **dkw: keyword arguments passed to the Retrying object """ # support both @retry and @retry() as valid syntax if len(dargs) == 1 and callable(dargs[0]): def wrap_simple(f): @six.wraps(f) def wrapped_f(*args, **kw): return Retrying().call(f, *args, **kw) return wrapped_f return wrap_simple(dargs[0]) else: def wrap(f): @six.wraps(f) def wrapped_f(*args, **kw): return Retrying(*dargs, **dkw).call(f, *args, **kw) return wrapped_f return wrap
def __call__(self, func_or_cls): if not self.what: self.what = func_or_cls.__name__ + '()' msg, details = self._build_message() if inspect.isfunction(func_or_cls): @six.wraps(func_or_cls) def wrapped(*args, **kwargs): report_deprecated_feature(LOG, msg, details) return func_or_cls(*args, **kwargs) return wrapped elif inspect.isclass(func_or_cls): orig_init = func_or_cls.__init__ # TODO(tsufiev): change `functools` module to `six` as # soon as six 1.7.4 (with fix for passing `assigned` # argument to underlying `functools.wraps`) is released # and added to the oslo_incubator requrements @functools.wraps(orig_init, assigned=('__name__', '__doc__')) def new_init(self, *args, **kwargs): report_deprecated_feature(LOG, msg, details) orig_init(self, *args, **kwargs) func_or_cls.__init__ = new_init return func_or_cls else: raise TypeError('deprecated can be used only with functions or ' 'classes')
def read_locked(*args, **kwargs): """Acquires & releases a read lock around call into decorated method. NOTE(harlowja): if no attribute name is provided then by default the attribute named '_lock' is looked for (this attribute is expected to be a :py:class:`.ReaderWriterLock`) in the instance object this decorator is attached to. """ def decorator(f): attr_name = kwargs.get('lock', '_lock') @six.wraps(f) def wrapper(self, *args, **kwargs): rw_lock = getattr(self, attr_name) with rw_lock.read_lock(): return f(self, *args, **kwargs) return wrapper # This is needed to handle when the decorator has args or the decorator # doesn't have args, python is rather weird here... if kwargs or not args: return decorator else: if len(args) == 1: return decorator(args[0]) else: return decorator
def write_locked(*args, **kwargs): """Acquires & releases a write lock around call into decorated method. NOTE(harlowja): if no attribute name is provided then by default the attribute named '_lock' is looked for (this attribute is expected to be a :py:class:`.ReaderWriterLock` object) in the instance object this decorator is attached to. """ def decorator(f): attr_name = kwargs.get('lock', '_lock') @six.wraps(f) def wrapper(self, *args, **kwargs): rw_lock = getattr(self, attr_name) with rw_lock.write_lock(): return f(self, *args, **kwargs) return wrapper # This is needed to handle when the decorator has args or the decorator # doesn't have args, python is rather weird here... if kwargs or not args: return decorator else: if len(args) == 1: return decorator(args[0]) else: return decorator
def __getattr__(self, name): if name in ('_mock_methods', '_mock_unsafe'): raise AttributeError(name) elif self._mock_methods is not None: if name not in self._mock_methods or name in _all_magics: raise AttributeError("Mock object has no attribute %r" % name) elif _is_magic(name): raise AttributeError(name) if not self._mock_unsafe: if name.startswith(('assert', 'assret')): raise AttributeError(name) result = self._mock_children.get(name) if result is _deleted: raise AttributeError(name) elif result is None: wraps = None if self._mock_wraps is not None: # XXXX should we get the attribute without triggering code # execution? wraps = getattr(self._mock_wraps, name) result = self._get_child_mock( parent=self, name=name, wraps=wraps, _new_name=name, _new_parent=self ) self._mock_children[name] = result elif isinstance(result, _SpecState): result = create_autospec( result.spec, result.spec_set, result.instance, result.parent, result.name ) self._mock_children[name] = result return result
def __init__(self, spec=None, side_effect=None, return_value=DEFAULT, wraps=None, name=None, spec_set=None, parent=None, _spec_state=None, _new_name='', _new_parent=None, **kwargs): self.__dict__['_mock_return_value'] = return_value _safe_super(CallableMixin, self).__init__( spec, wraps, name, spec_set, parent, _spec_state, _new_name, _new_parent, **kwargs ) self.side_effect = side_effect
def decorate_callable(self, func): if hasattr(func, 'patchings'): func.patchings.append(self) return func @wraps(func) def patched(*args, **keywargs): extra_args = [] entered_patchers = [] exc_info = tuple() try: for patching in patched.patchings: arg = patching.__enter__() entered_patchers.append(patching) if patching.attribute_name is not None: keywargs.update(arg) elif patching.new is DEFAULT: extra_args.append(arg) args += tuple(extra_args) return func(*args, **keywargs) except: if (patching not in entered_patchers and _is_started(patching)): # the patcher may have been started, but an exception # raised whilst entering one of its additional_patchers entered_patchers.append(patching) # Pass the exception to __exit__ exc_info = sys.exc_info() # re-raise the exception raise finally: for patching in reversed(entered_patchers): patching.__exit__(*exc_info) patched.patchings = [self] return patched
def __call__(self, f): if isinstance(f, ClassTypes): return self.decorate_class(f) @wraps(f) def _inner(*args, **kw): self._patch_dict() try: return f(*args, **kw) finally: self._unpatch_dict() return _inner
def _check_return_type(self, func): @wraps(func) def check_return_type(*args, **kwargs): result = func(*args, **kwargs) self._test_case.assertIsInstance(result, self._return_types) return result return check_return_type
def _check_input_not_mutated(self, func): @wraps(func) def check_not_mutated(*args, **kwargs): # Copy inputs to compare them to originals later. arg_copies = [(i, arg.copy()) for i, arg in enumerate(args) if isinstance(arg, (NDFrame, np.ndarray))] kwarg_copies = { k: v.copy() for k, v in iteritems(kwargs) if isinstance(v, (NDFrame, np.ndarray)) } result = func(*args, **kwargs) # Check that inputs weren't mutated by func. for i, arg_copy in arg_copies: assert_allclose( args[i], arg_copy, atol=0.5 * 10 ** (-DECIMAL_PLACES), err_msg="Input 'arg %s' mutated by %s" % (i, func.__name__), ) for kwarg_name, kwarg_copy in iteritems(kwarg_copies): assert_allclose( kwargs[kwarg_name], kwarg_copy, atol=0.5 * 10 ** (-DECIMAL_PLACES), err_msg="Input '%s' mutated by %s" % (kwarg_name, func.__name__), ) return result return check_not_mutated
def keras_test(func): '''Clean up after tensorflow tests. ''' @six.wraps(func) def wrapper(*args, **kwargs): output = func(*args, **kwargs) if K._BACKEND == 'tensorflow': K.clear_session() return output return wrapper
def dummy_decorator(f): @six.wraps(f) def wrapper(*args, **kwargs): return f(*args, **kwargs) return wrapper