我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用inspect.isgenerator()。
def default(self, obj): if hasattr(obj, "to_json"): return self.default(obj.to_json()) elif hasattr(obj, "__dict__"): d = dict( (key, value) for key, value in inspect.getmembers(obj) if not key.startswith("__") and not inspect.isabstract(value) and not inspect.isbuiltin(value) and not inspect.isfunction(value) and not inspect.isgenerator(value) and not inspect.isgeneratorfunction(value) and not inspect.ismethod(value) and not inspect.ismethoddescriptor(value) and not inspect.isroutine(value) ) return self.default(d) return obj
def test_excluding_predicates(self): self.istest(inspect.isbuiltin, 'sys.exit') if check_impl_detail(): self.istest(inspect.isbuiltin, '[].append') self.istest(inspect.iscode, 'mod.spam.__code__') self.istest(inspect.isframe, 'tb.tb_frame') self.istest(inspect.isfunction, 'mod.spam') self.istest(inspect.isfunction, 'mod.StupidGit.abuse') self.istest(inspect.ismethod, 'git.argue') self.istest(inspect.ismodule, 'mod') self.istest(inspect.istraceback, 'tb') self.istest(inspect.isdatadescriptor, 'collections.defaultdict.default_factory') self.istest(inspect.isgenerator, '(x for x in range(2))') self.istest(inspect.isgeneratorfunction, 'generator_function_example') if hasattr(types, 'GetSetDescriptorType'): self.istest(inspect.isgetsetdescriptor, 'type(tb.tb_frame).f_locals') else: self.assertFalse(inspect.isgetsetdescriptor(type(tb.tb_frame).f_locals)) if hasattr(types, 'MemberDescriptorType'): self.istest(inspect.ismemberdescriptor, 'type(lambda: None).__globals__') else: self.assertFalse(inspect.ismemberdescriptor(datetime.timedelta.days))
def _uid_str(uid_list: str or [str] or Generator) -> str: """ Prepare list of uid for use in commands: delete/copy/move/seen uid_list can be: str, list, tuple, set, fetch generator """ if not uid_list: raise MailBox.MailBoxUidParamError('uid_list should not be empty') if type(uid_list) is str: uid_list = uid_list.split(',') if inspect.isgenerator(uid_list): uid_list = [msg.uid for msg in uid_list if msg.uid] if type(uid_list) not in (list, tuple, set): raise MailBox.MailBoxUidParamError('Wrong uid_list type: {}'.format(type(uid_list))) for uid in uid_list: if type(uid) is not str: raise MailBox.MailBoxUidParamError('uid {} is not string'.format(str(uid))) if not uid.strip().isdigit(): raise MailBox.MailBoxUidParamError('Wrong uid: {}'.format(uid)) return ','.join((i.strip() for i in uid_list))
def test_excluding_predicates(self): self.istest(inspect.isbuiltin, 'sys.exit') self.istest(inspect.isbuiltin, '[].append') self.istest(inspect.iscode, 'mod.spam.func_code') self.istest(inspect.isframe, 'tb.tb_frame') self.istest(inspect.isfunction, 'mod.spam') self.istest(inspect.ismethod, 'mod.StupidGit.abuse') self.istest(inspect.ismethod, 'git.argue') self.istest(inspect.ismodule, 'mod') self.istest(inspect.istraceback, 'tb') self.istest(inspect.isdatadescriptor, '__builtin__.file.closed') self.istest(inspect.isdatadescriptor, '__builtin__.file.softspace') self.istest(inspect.isgenerator, '(x for x in xrange(2))') self.istest(inspect.isgeneratorfunction, 'generator_function_example') if hasattr(types, 'GetSetDescriptorType'): self.istest(inspect.isgetsetdescriptor, 'type(tb.tb_frame).f_locals') else: self.assertFalse(inspect.isgetsetdescriptor(type(tb.tb_frame).f_locals)) if hasattr(types, 'MemberDescriptorType'): self.istest(inspect.ismemberdescriptor, 'datetime.timedelta.days') else: self.assertFalse(inspect.ismemberdescriptor(datetime.timedelta.days))
def delete(self, paths): """Delete L{Namespace}s matching C{paths}. @param paths: A sequence of L{Namespace.path}s. @raises NotEmptyError: Raised if the L{Namespace} is not empty. @return: A C{list} of C{(objectID, Namespace.path)} 2-tuples representing the deleted L{Namespace}s. """ if isgenerator(paths): paths = list(paths) if getChildNamespaces(paths).any() or getChildTags(paths).any(): raise NotEmptyError("Can't delete non-empty namespaces.") result = getNamespaces(paths=paths) deletedNamespaces = list(result.values(Namespace.objectID, Namespace.path)) values = [(objectID, systemTag) for objectID, _ in deletedNamespaces for systemTag in (u'fluiddb/namespaces/description', u'fluiddb/namespaces/path')] if values: self._factory.tagValues(self._user).delete(values) result.remove() return deletedNamespaces
def delete(self, values): """Delete L{TagValue}s. @param values: A sequence of C{(objectID, Tag.path)} 2-tuples to delete values for. @raise FeatureError: Raised if the given list of values is empty. @return: The number of values deleted. """ if isgenerator(values): values = list(values) if not values: raise FeatureError("Can't delete an empty list of tag values.") paths = set([path for objectID, path in values]) objectIDs = set([objectID for objectID, path in values]) tagIDs = dict(getTags(paths).values(Tag.path, Tag.id)) values = [(objectID, tagIDs[path]) for objectID, path in values] result = getTagValues(values).remove() if result: touchObjects(objectIDs) return result
def delete(self, paths): """See L{TagAPI.delete}. Permissions for deleted L{Tag}s are removed from the cache. """ if isgenerator(paths): paths = list(paths) # FIXME getObjectIDs is called twice--once here and once in # TagAPI.delete. It would be better if we only did this once, not to # mention that this breaks encapsulation by bypassing the model layer # and accessing the data layer directly. -jkakar objectIDs = set(getObjectIDs(paths)) RecentObjectActivityCache().clear(objectIDs) usernames = set([path.split('/')[0] for path in paths]) RecentUserActivityCache().clear(usernames) PermissionCache().clearTagPermissions(paths) return self._api.delete(paths)
def update(self, db_name, coll_name, results, task, query, upsert=True): self.db_coll = self.db_conn[db_name][coll_name] if inspect.isgenerator(results): pass elif not isinstance(results, list): results = [results] for r in results: assert isinstance(r, dict), 'result saved to mongodb must be dict.' if not query: query = {'taskid': task.get('taskid')} self.db_coll.update( query, {'$set': r, '$setOnInsert': {'updated_at': time.time()}}, upsert=upsert, )
def is_closable_iterator(obj): # Not an iterator. if not is_iterator(obj): return False # A generator - the easiest thing to deal with. import inspect if inspect.isgenerator(obj): return True # A custom iterator. Look for a close method... if not (hasattr(obj, 'close') and callable(obj.close)): return False # ... which doesn't require any arguments. try: inspect.getcallargs(obj.close) except TypeError: return False else: return True
def test_excluding_predicates(self): self.istest(inspect.isbuiltin, 'sys.exit') if check_impl_detail(): self.istest(inspect.isbuiltin, '[].append') self.istest(inspect.iscode, 'mod.spam.func_code') self.istest(inspect.isframe, 'tb.tb_frame') self.istest(inspect.isfunction, 'mod.spam') self.istest(inspect.ismethod, 'mod.StupidGit.abuse') self.istest(inspect.ismethod, 'git.argue') self.istest(inspect.ismodule, 'mod') self.istest(inspect.istraceback, 'tb') self.istest(inspect.isdatadescriptor, '__builtin__.file.closed') self.istest(inspect.isdatadescriptor, '__builtin__.file.softspace') self.istest(inspect.isgenerator, '(x for x in xrange(2))') self.istest(inspect.isgeneratorfunction, 'generator_function_example') if hasattr(types, 'GetSetDescriptorType'): self.istest(inspect.isgetsetdescriptor, 'type(tb.tb_frame).f_locals') else: self.assertFalse(inspect.isgetsetdescriptor(type(tb.tb_frame).f_locals)) if hasattr(types, 'MemberDescriptorType'): self.istest(inspect.ismemberdescriptor, 'type(lambda: None).func_globals') else: self.assertFalse(inspect.ismemberdescriptor(type(lambda: None).func_globals))
def default(self, obj): # if hasattr(obj, "to_json"): # return self.default(obj.to_json()) if isinstance(obj, Enum): return obj.name elif hasattr(obj, "__dict__"): d = dict( (key, value) for key, value in inspect.getmembers(obj) if not key.startswith("__") and not inspect.isabstract(value) and not inspect.isbuiltin(value) and not inspect.isfunction(value) and not inspect.isgenerator(value) and not inspect.isgeneratorfunction(value) and not inspect.ismethod(value) and not inspect.ismethoddescriptor(value) and not inspect.isroutine(value) and not self.isempty(value) and not value is None ) return self.default(d) return obj
def call_agents(self, data): """call a method on all agents""" try: d = {'args': [], 'kwargs': {}} d.update(data) if self.agents: results = [getattr(agent, d['func'])(*d['args'], **d['kwargs']) for agent in self.agents.values()] if inspect.isgenerator(results[0]): results = yield from asyncio.gather(*results) else: results = [] return {'status': 'ok', 'results': results} except Exception as e: tb = traceback.format_exc() logger.exception(e) logger.exception(tb) return {'status': 'failed', 'exception': e, 'traceback': tb}
def call_agent(self, data): """call a method on an agent and get the result""" d = {'args': [], 'kwargs': {}} d.update(data) id = d['id'] # check locally if id in self.agents: agent = self.agents[id] result = getattr(agent, d['func'])(*d['args'], **d['kwargs']) if inspect.isgenerator(result): result = yield from result return result # pass request to the arbiter else: d['cmd'] = 'call_agent' return (yield from self.arbiter.send_recv(d))
def __getattr__(self, item): attr = getattr(self.delegate, item) if inspect.iscoroutinefunction(attr) or hasattr(attr, "_is_coroutine") and attr._is_coroutine or inspect.iscoroutine( attr): async def wrapper(*args, **kwargs): return self._wrap(await attr(*args, **kwargs)) return wrapper() if inspect.iscoroutine(attr) else wrapper elif inspect.isgeneratorfunction(attr) or inspect.isgenerator(attr): def wrapper(*args, **kwargs): for entry in attr(*args, **kwargs): yield self._wrap(entry) return wrapper if inspect.isgeneratorfunction(attr) else wrapper() elif inspect.isfunction(attr): def wrapper(*args, **kwargs): return self._wrap(attr(*args, **kwargs)) return wrapper else: return self._wrap(attr)
def test_tree(self): prefix_tree = PrefixTree() values = ['amy', 'ann', 'anne', 'emma', 'rob', 'roger', 'anna'] # Test setter for value in values: prefix_tree[value] = value # Test getter result = prefix_tree['ann'] self.assertTrue(result) self.assertTrue(inspect.isgenerator(result)) # expect 'ann', 'anne' and 'anna' self.assertEqual(len(list(result)), 3) # Test containment self.assertTrue('amy' in prefix_tree) self.assertFalse('am' in prefix_tree)
def _send_code(self, code, wire_dir=None): if not self.connected: raise RuntimeError('HBI wire not connected') # convert wire_dir as early as possible, will save conversions in case the code is of complex structure if wire_dir is None: wire_dir = b'' elif not isinstance(wire_dir, (bytes, bytearray)): wire_dir = str(wire_dir).encode('utf-8') # use a generator function to pull code from hierarchy def pull_code(container): for mc in container: if inspect.isgenerator(mc): yield from pull_code(mc) else: yield mc if inspect.isgenerator(code): for c in pull_code(code): await self._send_text(c, wire_dir) else: await self._send_text(code, wire_dir)
def default(self, obj): if hasattr(obj, "to_json"): return self.default(obj.to_json()) elif hasattr(obj, "__dict__"): data = dict( (key, value) for key, value in inspect.getmembers(obj) if not key.startswith("__") and not inspect.isabstract(value) and not inspect.isbuiltin(value) and not inspect.isfunction(value) and not inspect.isgenerator(value) and not inspect.isgeneratorfunction(value) and not inspect.ismethod(value) and not inspect.ismethoddescriptor(value) and not inspect.isroutine(value) ) return self.default(data) return obj
def post_process_extensions(self, extensions, resp_obj, request, action_args): for ext in extensions: response = None if inspect.isgenerator(ext): # If it's a generator, run the second half of # processing try: with ResourceExceptionHandler(): response = ext.send(resp_obj) except StopIteration: # Normal exit of generator continue except Fault as ex: response = ex else: # Regular functions get post-processing... try: with ResourceExceptionHandler(): response = ext(req=request, resp_obj=resp_obj, **action_args) except Fault as ex: response = ex # We had a response... if response: return response return None
def test_as_completed(monkeypatch): service = RemoteService(DUMMY_SERVICE_URL) monkeypatch.setattr(urllib.request, 'urlopen', dummy_urlopen) items = [service.call_method_async("test", []) for _ in range(10)] data = as_completed(*items) assert inspect.isgenerator(data) results = list(data) assert len(results) == 10
def isgenerator(o): if isinstance(o, UnboundMethod): o = o._func return inspect.isgeneratorfunction(o) or inspect.isgenerator(o)
def isgenerator(func): try: return func.func_code.co_flags & CO_GENERATOR != 0 except AttributeError: return False # Make a function to help check if an exception is derived from BaseException. # In Python 2.4, we just use Exception instead.
def post_process_extensions(self, extensions, resp_obj, request, action_args): for ext in extensions: response = None if inspect.isgenerator(ext): # If it's a generator, run the second half of # processing try: with ResourceExceptionHandler(): response = ext.send(resp_obj) except StopIteration: # Normal exit of generator continue except Fault as ex: response = ex else: # Regular functions get post-processing... try: with ResourceExceptionHandler(): response = ext(req=request, resp_obj=resp_obj, **action_args) except exception.VersionNotFoundForAPIMethod: # If an attached extension (@wsgi.extends) for the # method has no version match it is not an error. We # just don't run the extends code continue except Fault as ex: response = ex # We had a response... if response: return response return None
def underscore_memoization(func): """ Decorator for methods:: class A(object): def x(self): if self._x: self._x = 10 return self._x Becomes:: class A(object): @underscore_memoization def x(self): return 10 A now has an attribute ``_x`` written by this decorator. """ name = '_' + func.__name__ def wrapper(self): try: return getattr(self, name) except AttributeError: result = func(self) if inspect.isgenerator(result): result = list(result) setattr(self, name, result) return result return wrapper
def is_class_instance(obj): """Like inspect.* methods.""" return not (inspect.isclass(obj) or inspect.ismodule(obj) or inspect.isbuiltin(obj) or inspect.ismethod(obj) or inspect.ismethoddescriptor(obj) or inspect.iscode(obj) or inspect.isgenerator(obj))
def memoize_default(default=NO_DEFAULT, evaluator_is_first_arg=False, second_arg_is_evaluator=False): """ This is a typical memoization decorator, BUT there is one difference: To prevent recursion it sets defaults. Preventing recursion is in this case the much bigger use than speed. I don't think, that there is a big speed difference, but there are many cases where recursion could happen (think about a = b; b = a). """ def func(function): def wrapper(obj, *args, **kwargs): if evaluator_is_first_arg: cache = obj.memoize_cache elif second_arg_is_evaluator: # needed for meta classes cache = args[0].memoize_cache else: cache = obj._evaluator.memoize_cache try: memo = cache[function] except KeyError: memo = {} cache[function] = memo key = (obj, args, frozenset(kwargs.items())) if key in memo: return memo[key] else: if default is not NO_DEFAULT: memo[key] = default rv = function(obj, *args, **kwargs) if inspect.isgenerator(rv): rv = list(rv) memo[key] = rv return rv return wrapper return func
def underscore_memoization(func): """ Decorator for methods:: class A(object): def x(self): if self._x: self._x = 10 return self._x Becomes:: class A(object): @underscore_memoization def x(self): return 10 A now has an attribute ``_x`` written by this decorator. """ name = '_' + func.__name__ def wrapper(self): try: return getattr(self, name) except AttributeError: result = func(self) if inspect.isgenerator(result): result = list(result) setattr(self, name, result) return result return wrapper # for fast_parser, should not be deleted
def _is_iterable(data): return isinstance(data, list) or isinstance(data, tuple) or isinstance(data, set) or inspect.isgenerator(data)
def call(self, context, method, *args, **kwargs): """Call a glance client method.""" if self.client is None: self.client = self._glance_client(context) retry_excs = (glanceclient.exc.ServiceUnavailable, glanceclient.exc.InvalidEndpoint, glanceclient.exc.CommunicationError) retries = CONF.glance_num_retries if retries < 0: LOG.warning("Treating negative retries as 0") retries = 0 num_attempts = retries + 1 for attempt in range(1, num_attempts + 1): client = self._glance_client(context) try: controller = getattr(client, kwargs.pop('controller', 'images')) result = getattr(controller, method)(*args, **kwargs) if inspect.isgenerator(result): # Convert generator results to a list, so that we can # catch any potential exceptions now and retry the call. return list(result) return result except retry_excs as e: if attempt < num_attempts: extra = "retrying" else: extra = "done trying" LOG.exception("Error contacting glance server " "'%(server)s' for '%(method)s', " "%(extra)s.", {'server': self.api_server, 'method': method, 'extra': extra}) if attempt == num_attempts: raise exception.GlanceConnectionFailed( server=str(self.api_server), reason=six.text_type(e)) time.sleep(1)
def fix_http_content_length(): """ Reverse operate done by cherrypy `_be_ie_unfriendly`. """ response = cherrypy.serving.response if not inspect.isgenerator(response.body): # Dont do this in `stream` mode response.body = response.collapse_body().strip() response.headers['Content-Length'] = str(len(response.collapse_body()))
def unescape_response(): """ Unescape the html body which escaped by `_cpcompat.escape_html()`. """ response = cherrypy.serving.response if not inspect.isgenerator(response.body): # Dont do this in `stream` mode response.body = six.binary_type(unescape_html(response.collapse_body()))
def _json_stream_output(next_handler, *args, **kwargs): """ Output JSON in stream mode. """ cherrypy.response.headers['Content-Type'] = "application/json" _outputs = next_handler(*args, **kwargs) if inspect.isgenerator(_outputs): def _stream_outputs(): for _content in _outputs: yield json.dumps(_content) return _stream_outputs() else: return json.dumps(_outputs)
def test_filter_all(self, mock_filter_one): mock_filter_one.side_effect = [True, False, True] filter_obj_list = ['obj1', 'obj2', 'obj3'] container = {} base_filter = base_filters.BaseFilter() extra_spec = {} result = base_filter.filter_all(filter_obj_list, container, extra_spec) self.assertTrue(inspect.isgenerator(result)) self.assertEqual(['obj1', 'obj3'], list(result))
def do(f): @wraps(f) def wrapper(*args, **kwargs): gen = f(*args, **kwargs) if not inspect.isgenerator(gen): res = gen def generator_no_yield(): return res yield gen = generator_no_yield() return Effect(ChainedIntent(gen)) return wrapper
def run_task(self, module, task, response): """ Processing the task, catching exceptions and logs, return a `ProcessorResult` object """ self.logger = logger = module.logger result = None exception = None stdout = sys.stdout self.task = task if isinstance(response, dict): response = rebuild_response(response) self.response = response self.save = (task.get('track') or {}).get('save', {}) try: if self.__env__.get('enable_stdout_capture', True): sys.stdout = ListO(module.log_buffer) self._reset() result = self._run_task(task, response) if inspect.isgenerator(result): for r in result: self._run_func(self.on_result, r, response, task) else: self._run_func(self.on_result, result, response, task) except Exception as e: logger.exception(e) exception = e finally: follows = self._follows messages = self._messages logs = list(module.log_buffer) extinfo = self._extinfo save = self.save sys.stdout = stdout self.task = None self.response = None self.save = None module.log_buffer[:] = [] return ProcessorResult(result, follows, messages, logs, exception, extinfo, save)
def memoize_default(default=NO_DEFAULT, evaluator_is_first_arg=False, second_arg_is_evaluator=False): """ This is a typical memoization decorator, BUT there is one difference: To prevent recursion it sets defaults. Preventing recursion is in this case the much bigger use than speed. I don't think, that there is a big speed difference, but there are many cases where recursion could happen (think about a = b; b = a). """ def func(function): def wrapper(obj, *args, **kwargs): if evaluator_is_first_arg: cache = obj.memoize_cache elif second_arg_is_evaluator: # needed for meta classes cache = args[0].memoize_cache else: cache = obj.evaluator.memoize_cache try: memo = cache[function] except KeyError: memo = {} cache[function] = memo key = (obj, args, frozenset(kwargs.items())) if key in memo: return memo[key] else: if default is not NO_DEFAULT: memo[key] = default rv = function(obj, *args, **kwargs) if inspect.isgenerator(rv): rv = list(rv) memo[key] = rv return rv return wrapper return func
def is_generator(obj): import inspect return obj is not None and (inspect.isgeneratorfunction(obj) or inspect.isgenerator(obj) or hasattr(obj, 'next') or hasattr(obj, '__next__'))
def test_excluding_predicates(self): global tb self.istest(inspect.isbuiltin, 'sys.exit') self.istest(inspect.isbuiltin, '[].append') self.istest(inspect.iscode, 'mod.spam.__code__') try: 1/0 except: tb = sys.exc_info()[2] self.istest(inspect.isframe, 'tb.tb_frame') self.istest(inspect.istraceback, 'tb') if hasattr(types, 'GetSetDescriptorType'): self.istest(inspect.isgetsetdescriptor, 'type(tb.tb_frame).f_locals') else: self.assertFalse(inspect.isgetsetdescriptor(type(tb.tb_frame).f_locals)) finally: # Clear traceback and all the frames and local variables hanging to it. tb = None self.istest(inspect.isfunction, 'mod.spam') self.istest(inspect.isfunction, 'mod.StupidGit.abuse') self.istest(inspect.ismethod, 'git.argue') self.istest(inspect.ismodule, 'mod') self.istest(inspect.isdatadescriptor, 'collections.defaultdict.default_factory') self.istest(inspect.isgenerator, '(x for x in range(2))') self.istest(inspect.isgeneratorfunction, 'generator_function_example') if hasattr(types, 'MemberDescriptorType'): self.istest(inspect.ismemberdescriptor, 'datetime.timedelta.days') else: self.assertFalse(inspect.ismemberdescriptor(datetime.timedelta.days))
def delete(self, paths): """Delete L{Tag}s matching C{paths}. L{TagValue}s and permissions associated with the deleted L{Tag}s are removed by cascading deletes in the database schema. @param paths: A sequence of L{Tag.path}s. @return: A C{list} of C{(objectID, Tag.path)} 2-tuples representing the L{Tag}s that were removed. """ if isgenerator(paths): paths = list(paths) result = getTags(paths=paths) deletedTagPaths = list(result.values(Tag.objectID, Tag.path)) # Delete the fluiddb/tags/description tag values stored for removed # tags. Associated TagValue's are removed by an ON DELETE CASCADE # trigger. self._factory.tagValues(self._user).delete( [(objectID, path) for objectID, _ in deletedTagPaths for path in [u'fluiddb/tags/description', u'fluiddb/tags/path']]) # Touch all the objects for the given tag paths. objectIDs = list(getObjectIDs(paths)) touchObjects(objectIDs) result.remove() return deletedTagPaths
def delete(self, usernames): """Delete L{User}s matching C{username}s. @param usernames: A sequence of L{User.username}s. @raise FeatureError: Raised if no L{User.username}s are provided. @raise UnknownUserError: Raised if one or more usernames don't match existing L{User}s. @return: A C{list} of C{(objectID, User.username)} 2-tuples representing the L{User}s that that were removed. """ if isgenerator(usernames): usernames = list(usernames) if not usernames: raise FeatureError('At least one username must be provided.') usernames = set(usernames) result = getUsers(usernames=usernames) existingUsernames = set(result.values(User.username)) unknownUsernames = usernames - existingUsernames if unknownUsernames: raise UnknownUserError(list(unknownUsernames)) admin = getUser(u'fluiddb') deletedUsers = list(result.values(User.objectID, User.username)) # FIXME: Deleting a user will leave the permission exception lists # containing the user in a corrupt state. result.remove() self._factory.tagValues(admin).delete( [(objectID, systemTag) for objectID, _ in deletedUsers for systemTag in [u'fluiddb/users/username', u'fluiddb/users/name', u'fluiddb/users/email', u'fluiddb/users/role']]) return deletedUsers
def delete(self, paths): """See L{NamespaceAPI.delete}. @raise PermissionDeniedError: Raised if the user is not authorized to delete a given L{Namespace}. """ if isgenerator(paths): paths = list(paths) pathsAndOperations = [(path, Operation.DELETE_NAMESPACE) for path in paths] deniedOperations = checkPermissions(self._user, pathsAndOperations) if deniedOperations: raise PermissionDeniedError(self._user.username, deniedOperations) return self._api.delete(paths)