我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.MutableMapping()。
def evaluate(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> True: # This is a little weird. We have to handle the case of can_have("x").and_("y") here. # Since the only type of Node that can return False rather than just raising a # QueryValidationError is a KeyNode that isn't required, or an OrNode whose children # are all KeyNodes that aren't required, we can't short circuit on a False value. If # we have can_have("x").and_("y"), and only some are True, we want raise a # QueryValidationError. If all are True or all are False, everything's fine. all_true = True all_possible_false = True for child in self.children: is_true = child.evaluate(query, context) all_true = all_true and is_true if child.falsifiable: all_possible_false = all_possible_false and not is_true if all_possible_false or all_true: return True raise BoundKeyExistenceError("Query must have all or none of the elements joined with \"and\" in a \"can_have\" statement!")
def evaluate(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> bool: # We can't short circuit this because we want to raise any WrongValueTypeError or BoundKeyExistenceError that could occur. # We count failures because the only other node than can return False is a KeyNode that isn't required. An OrNode is only # allowed to return False when all its children are False KeyNodes. The AndNodes handle raising an error if a can_have # expression with ands and ors fails. errors = [] failure_count = 0 result = False for child in self.children: try: is_true = child.evaluate(query, context) if not is_true: failure_count += 1 result = is_true or result except (MissingKeyError, BoundKeyExistenceError) as error: errors.append(error) failure_count += 1 if failure_count == len(self.children) and len(errors) > 0: raise errors[0] return result
def with_default(self, value: Union[Any, Callable[[MutableMapping[str, Any]], Any]], supplies_type: Type = None) -> "QueryValidator": if self._current is None or self._current.child is not None: raise QueryValidatorStructureError("No key is selected! Try using \"can_have\" before \"with_default\".") if self._current.required: raise QueryValidatorStructureError("Can't assign a default value to a required key! Try using \"can_have\" instead of \"have\".") if supplies_type: expected_type = supplies_type else: expected_type = type(value) default_node = _DefaultValueNode(self._current.key, value, supplies_type) result = self.as_(expected_type) result._current.child.child = default_node return result
def explain(self) -> MutableMapping: """Returns an explain plan record for this cursor. .. mongodoc:: explain """ c = self.clone() c.__explain = True # always use a hard limit for explains if c.__limit: c.__limit = -abs(self.__limit) async for expl in c: return expl return None
def dereference(self, dbref: DBRef, **kwargs) -> MutableMapping: """Dereference a :class:`~bson.dbref.DBRef`, getting the document it points to. Raises :class:`TypeError` if `dbref` is not an instance of :class:`~bson.dbref.DBRef`. Returns a document, or ``None`` if the reference does not point to a valid document. Raises :class:`ValueError` if `dbref` has a database specified that is different from the current database. :Parameters: - `dbref`: the reference - `**kwargs` (optional): any additional keyword arguments are the same as the arguments to :meth:`~aiomongo.collection.Collection.find`. """ if not isinstance(dbref, DBRef): raise TypeError('cannot dereference a {}'.format(type(dbref))) if dbref.database is not None and dbref.database != self.name: raise ValueError('trying to dereference a DBRef that points to ' 'another database ({} not {})'.format(dbref.database, self.name)) return await self[dbref.collection].find_one({'_id': dbref.id}, **kwargs)
def dehead_filetree(tree: ExtractFileTree) -> ExtractFileTree: """Remove the head of the given filetree while preserving the old head name. So a tree ``{1: [2: [3: [4: [f1, f2]]]}`` will be converted to ``{1: [f1, f2]}``. :param dict tree: The file tree as generated by :py:func:`extract`. :returns: The same tree but deheaded as described. :rtype: dict """ assert len(tree) == 1 head_node = list(tree.keys())[0] head = tree[head_node] while ( isinstance(head, t.MutableSequence) and len(head) == 1 and isinstance(head[0], t.MutableMapping) and len(head[0]) == 1 ): head = list(head[0].values())[0] tree[head_node] = head return tree
def get_all_permissions(self) -> t.Mapping[str, bool]: """Get all course :class:`permissions` for this course role. :returns: A name boolean mapping where the name is the name of the permission and the value indicates if this user has this permission. """ perms: t.Sequence[Permission] = ( Permission.query. filter_by( # type: ignore course_permission=True ).all() ) result: t.MutableMapping[str, bool] = {} for perm in perms: if perm.name in self._permissions: result[perm.name] = not perm.default_value else: result[perm.name] = perm.default_value return result
def get_all_permissions(self) -> t.Mapping[str, bool]: """Get all course permissions (:class:`Permission`) for this role. :returns: A name boolean mapping where the name is the name of the permission and the value indicates if this user has this permission. """ perms: t.Sequence[Permission] = ( Permission.query. filter_by( # type: ignore course_permission=False ).all() ) result: t.MutableMapping[str, bool] = {} for perm in perms: if perm.name in self._permissions: result[perm.name] = not perm.default_value else: result[perm.name] = perm.default_value return result
def __to_json__(self) -> t.MutableMapping[str, t.Any]: """Creates a JSON serializable representation of a role. This object will look like this: .. code:: python { 'id': int, # The id of this role. 'name': str, # The name of this role. } :returns: An object as described above. """ return { 'name': self.name, 'id': self.id, }
def __init__(self, bot, **kwargs): super().__init__(id=int(kwargs.get("id", 0)), cl=bot) #: The name of this guild. self.name = kwargs.get("name", "") #: A mapping of :class:`~.WidgetChannel` in this widget guild. self._channels = {} # type: typing.MutableMapping[int, WidgetChannel] for channel in kwargs.get("channels", []): c = WidgetChannel(bot=self._bot, guild=self, **channel) self._channels[c.id] = c #: A mapping of :class:`~.WidgetMember` in this widget guild. self._members = {} for member in kwargs.get("members", []): m = WidgetMember(bot=self._bot, guild=self, kwargs=member) self._members[m.id] = m
def get_indexed_versions(self) -> typing.MutableMapping[str, str]: """ Returns a dictionary mapping the name of each index containing this document to the version of this document in that index. Note that `version` denotes document version, not bundle version. """ page_size = 64 es_client = ElasticsearchClient.get(self.logger) alias_name = Config.get_es_alias_name(ESIndexType.docs, self.replica) response = es_client.search(index=alias_name, body={ '_source': False, 'stored_fields': [], 'version': True, 'from': 0, 'size': page_size, 'query': { 'terms': { '_id': [str(self.fqid)] } } }) hits = response['hits'] assert hits['total'] <= page_size, 'Document is in too many indices' indices = {hit['_index']: hit['_version'] for hit in hits['hits']} return indices
def serialize(self, schema, data, **kwargs) -> typing.MutableMapping: """Composes the final relationships object.""" document = OrderedDict() if data is None: document['data'] = data elif isinstance(data, Mapping): # JSON API resource linkage or JSON API relationships object if 'type' in data and 'id' in data: document['data'] = data else: # the related resource instance document['data'] = \ schema.ctx.registry.ensure_identifier(data, asdict=True) links = kwargs.get('links') if links is not None: document['links'] = links return document
def serialize(self, schema, data, links=None, pagination=None, **kwargs) -> typing.MutableMapping: """Composes the final JSON API relationships object. :arg ~aiohttp_json_api.pagination.PaginationABC pagination: If not *None*, the links and meta members of the pagination helper are added to the final JSON API relationship object. """ document = OrderedDict() if is_collection(data): document['data'] = [ schema.ctx.registry.ensure_identifier(item, asdict=True) for item in data ] if links is not None: document['links'] = links if pagination is not None: document['links'].update(pagination.links()) document.setdefault('meta', OrderedDict()) document['meta'].update(pagination.meta()) return document
def links(self) -> MutableMapping: """ Return pagination links. **Must be overridden.** A dictionary, which must be included in the top-level *links object*. It contains these keys: * *self* The link to the current page * *first* The link to the first page * *last* The link to the last page * *prev* The link to the previous page (only set, if a previous page exists) * *next* The link to the next page (only set, if a next page exists) """ raise NotImplementedError
def meta(self) -> MutableMapping: """ Return meta object of paginator. * *total-resources* The total number of resources in the collection * *page-limit* The number of resources on a page * *page-offset* The offset of the current page """ return { 'total-resources': self.total_resources, 'page-limit': self.limit, 'page-offset': self.offset }
def meta(self) -> MutableMapping: """ Return meta object of pagination. * *total-resources* The total number of resources in the collection * *last-page* The index of the last page * *page-number* The number of the current page * *page-size* The (maximum) number of resources on a page """ return { 'total-resources': self.total_resources, 'last-page': self.last_page, 'page-number': self.number, 'page-size': self.size }
def complete_pv(pathvalues: MutableMapping[Path, float]) -> MutableMapping[Path, float]: """ Consider a pathvalue dictionary of the form Dict[Path, float] e.g. {1.1.1: 12.0} (here: only one entry). This function will disect each path and assign its value to the truncated path: e.g. here 1, 1.1 and 1.1.1. Thus we get {1: 12.0, 1.1: 12.0, 1.1.1: 12.0}. For more items the values will be summed accordingly. Furthermore the total sum of the items of the topmost level will be assigned to the empty path. For this to make sense we require that no empy path is in the data beforehand"" :param pathvalues: {path: value} dictionary :return: {path: value} dictionary """ if Path(()) in pathvalues: raise ValueError("This function does not allow the empty path as item" "in the data list.") completed = collections.defaultdict(float) for path, value in pathvalues.items(): # len(path) +1 ensures that also the whole tag is considered # starting point 0: also add to empty path. for level in range(0, len(path) + 1): completed[path[:level]] += value return completed
def from_json(cls, json: MutableMapping[str, Any]) \ -> 'RequestReturnedGetNone': with JsonParser(json): assert json.pop('messageType', None) == 'request' return RequestReturnedGetNone( in_scope=ensure(bool, json.pop('inScope')), http_version=parse_http_version(json.pop('httpVersion')), body=b64decode(json.pop('body').encode()), tool_flag=ensure(int, json.pop('toolFlag')), url=ensure(str, json.pop('url')), method=ensure(str, json.pop('method')), protocol=ensure(str, json.pop('protocol')), path=ensure(str, json.pop('path')), headers=tuple(cls.__pop_headers(json)), port=ensure(int, json.pop('port')), host=ensure(str, json.pop('host')), raw=b64decode(json.pop('raw').encode()), reference_id=ensure(int, json.pop('referenceID')), query=ensure((str, type(None)), json.pop('query', None)), )
def from_json(cls, json: MutableMapping[str, Any]) -> 'RequestReturned': with JsonParser(json): json.pop('messageType') json.pop('query', None) # TODO sometimes it's there json.pop('comment', None) # TODO sometimes it's there return RequestReturned( http_version=parse_http_version(json.pop('httpVersion')), headers=tuple( (ensure(str, k), ensure(str, v)) for k, v in pop_all(json.pop('headers')).items() ), body=b64decode(json.pop('body').encode()), **dict(chain( _common_returned(json).items(), pop_all(ensure_values(str, json)).items(), )) )
def evaluate(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> bool: pass
def evaluate(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> True: # This always returns true because it'll raise a QueryValidationError if the query wasn't # valid. We also don't want normal AND behavior at this level so we just evaluate the # children and return True for child in self.children: child.evaluate(query, context) return True
def __init__(self, key: str, value: Union[Any, Callable[[MutableMapping[str, Any]], Any]], supplies_type: Type = None) -> None: self.key = key self.value = value self.supplies_type = supplies_type
def evaluate(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> True: try: value = query[self.key] for type in self.types: if issubclass(type, Enum) and isinstance(value, str): value = type(value) if isinstance(value, type): query[self.key] = value return True raise WrongValueTypeError("{key} must be of type {type} in query!".format(key=self.key, type=self)) except KeyError: if self.child: self.child.evaluate(query, context) return True
def evaluate(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> bool: has_key = self.key in query if self.required and not has_key: raise MissingKeyError("{key} must be in query!".format(key=self.key)) if self.child: self.child.evaluate(query, context) return has_key
def __call__(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> True: return self._root.evaluate(query, context)
def validate_query(validator: QueryValidator, *pre_transforms: Callable[[MutableMapping], None]) -> Callable[[Callable[[Any, MutableMapping[str, Any], PipelineContext], Union[Any, Iterable[Any]]]], Callable[[Any, MutableMapping[str, Any], PipelineContext], Union[Any, Iterable[Any]]]]: def wrapper(method: Callable[[Any, MutableMapping[str, Any], PipelineContext], Union[Any, Iterable[Any]]]) -> Callable[[Any, MutableMapping[str, Any], PipelineContext], Union[Any, Iterable[Any]]]: @wraps(method) def wrapped(self: Any, query: MutableMapping[str, Any], context: PipelineContext = None): for transform in pre_transforms: transform(query) validator(query) return method(self, query, context) return wrapped return wrapper
def __anext__(self) -> MutableMapping: if len(self.__data): return self.__data.popleft() is_refereshed = await self._refresh() if not is_refereshed: raise StopAsyncIteration return self.__data.popleft()
def to_list(self) -> List[MutableMapping]: """ Fetches all data from cursor to in-memory list. """ items = [] async with self: async for item in self: items.append(item) return items
def _command(self, connection: 'aiomongo.Connection', command: Union[str, dict], value: Any=1, check: bool = True, allowable_errors: Optional[List[str]] = None, read_preference: Union[_ALL_READ_PREFERENCES] = ReadPreference.PRIMARY, codec_options: CodecOptions = DEFAULT_CODEC_OPTIONS, **kwargs) -> MutableMapping: """Internal command helper.""" if isinstance(command, str): command = SON([(command, value)]) command.update(kwargs) return await connection.command( self.name, command, read_preference, codec_options, check=check, allowable_errors=allowable_errors )
def insert_one(self, document: MutableMapping, bypass_document_validation: bool = False, check_keys: bool = True) -> InsertOneResult: common.validate_is_document_type('document', document) if '_id' not in document and not isinstance(document, RawBSONDocument): document['_id'] = ObjectId() write_concern = self.write_concern.document acknowledged = write_concern.get('w') != 0 connection = await self.database.client.get_connection() if acknowledged: command = SON([('insert', self.name), ('ordered', True), ('documents', [document])]) if bypass_document_validation and connection.max_wire_version >= 4: command['bypassDocumentValidation'] = True result = await connection.command( self.database.name, command, ReadPreference.PRIMARY, self.__write_response_codec_options, check_keys=check_keys ) helpers._check_write_command_response([(0, result)]) else: if bypass_document_validation and connection.max_wire_version >= 4: raise OperationFailure('Cannot set bypass_document_validation with', ' unacknowledged write concern') _, msg, _ = message.insert( str(self), [document], check_keys, acknowledged, write_concern, False, self.__write_response_codec_options ) connection.send_message(msg) document_id = document['_id'] if not isinstance(document, RawBSONDocument) else None return InsertOneResult(document_id, acknowledged)
def __find_and_modify(self, filter: dict, projection: Optional[Union[list, dict]], sort: Optional[List[tuple]], upsert: Optional[bool] = None, return_document: bool = ReturnDocument.BEFORE, **kwargs) -> MutableMapping: """Internal findAndModify helper.""" common.validate_is_mapping('filter', filter) if not isinstance(return_document, bool): raise ValueError('return_document must be ReturnDocument.BEFORE or ReturnDocument.AFTER') cmd = SON([('findAndModify', self.name), ('query', filter), ('new', return_document)]) collation = validate_collation_or_none(kwargs.pop('collation', None)) cmd.update(kwargs) if projection is not None: cmd['fields'] = helpers._fields_list_to_dict(projection, 'projection') if sort is not None: cmd['sort'] = helpers._index_document(sort) if upsert is not None: common.validate_boolean('upsert', upsert) cmd['upsert'] = upsert connection = await self.database.client.get_connection() if connection.max_wire_version >= 4 and 'writeConcern' not in cmd: wc_doc = self.write_concern.document if wc_doc: cmd['writeConcern'] = wc_doc out = await connection.command( self.database.name, cmd, ReadPreference.PRIMARY, self.codec_options, allowable_errors=[_NO_OBJ_ERROR], collation=collation ) helpers._check_write_command_response([(0, out)]) return out.get('value')
def __to_json__(self) -> t.Mapping[t.Any, t.Any]: """Creates a JSON serializable representation of this object. :returns: This APIException instance as a dictionary. """ ret = dict(self.rest) # type: t.MutableMapping[t.Any, t.Any] ret['message'] = self.message ret['description'] = self.description ret['code'] = self.api_code.name return ret
def extract( file: FileStorage, ignore_filter: IgnoreFilterManager = None, handle_ignore: IgnoreHandling = IgnoreHandling.keep ) -> t.Optional[ExtractFileTree]: """Extracts all files in archive with random name to uploads folder. :param werkzeug.datastructures.FileStorage file: The file to extract. :param ignore_filter: What files should be ignored in the given archive. This can only be None when ``handle_ignore`` is ``IgnoreHandling.keep``. :param handle_ignore: How should ignored file be handled. :returns: A file tree as generated by :py:func:`rename_directory_structure`. """ if handle_ignore == IgnoreHandling.keep and ignore_filter is None: ignore_filter = IgnoreFilterManager([]) elif ignore_filter is None: # pragma: no cover raise ValueError tmpdir = extract_to_temp( file, ignore_filter, handle_ignore, ) rootdir = tmpdir.rstrip(os.sep) start = rootdir.rfind(os.sep) + 1 try: res = rename_directory_structure(tmpdir)[tmpdir[start:]] filename: str = file.filename.split('.')[0] if not res: return None elif len(res) > 1: return {filename: res if isinstance(res, list) else [res]} elif not isinstance(res[0], t.MutableMapping): return {filename: res} else: return res[0] finally: shutil.rmtree(tmpdir)
def __init__(self, name: str) -> None: self.conf: t.MutableMapping[t.Any, t.Any] = {}
def is_valid_request( self, request: t.Any, parameters: t.MutableMapping[str, str] = {}, fake_method: t.Any = None, handle_error: bool = True ) -> bool: ''' Validates an OAuth request using the python-oauth2 library: https://github.com/simplegeo/python-oauth2 ''' def handle(e: oauth2.Error) -> bool: if handle_error: return False else: raise e try: method, url, headers, parameters = self.parse_request( request, parameters, fake_method ) oauth_request = oauth2.Request.from_request( method, url, headers=headers, parameters=parameters ) oauth2.Token self.oauth_server.verify_request( oauth_request, self.oauth_consumer, {} ) except oauth2.Error as e: return handle(e) except ValueError as e: return handle(e) # Signature was valid return True
def parse_request( self, req: 'flask.Request', parameters: t.MutableMapping[str, str] = None, fake_method: t.Any = None ) -> t.Tuple[str, str, t.MutableMapping[str, str], t.MutableMapping[str, str]]: ''' Parse Flask request ''' return (req.method, req.url, dict(req.headers), req.form.copy())
def __to_json__(self) -> t.MutableMapping[str, t.Any]: """Creates a JSON serializable representation of this object. """ return { 'name': self.name, 'course': self.course, 'id': self.id, }
def set_bool( out: t.MutableMapping[str, t.Any], parser: t.Any, item: str, default: bool ) -> None: val = parser.getboolean(item) out[item] = bool(default if val is None else val)
def set_float( out: t.MutableMapping[str, t.Any], parser: t.Any, item: str, default: float ) -> None: val = parser.getfloat(item) out[item] = float(default if val is None else val)
def set_int( out: t.MutableMapping[str, t.Any], parser: t.Any, item: str, default: int ) -> None: val = parser.getint(item) out[item] = int(default if val is None else val)
def set_str( out: t.MutableMapping[str, t.Any], parser: t.Any, item: str, default: str ) -> None: val = parser.get(item) out[item] = str(default if val is None else val)
def __init__(self, uuid: str, value: Optional[Sequence[int]], flags: Sequence[str]): self.uuid = uuid self.value = value self.flags = flags self.descriptors: MutableMapping[str, GATTDescriptor] = dict()
def __init__(self, uuid: str, primary: bool): self.uuid = uuid self.primary = primary self.characteristics: MutableMapping[str, GATTCharacteristic] = dict()
def __init__(self, path: str, address: str, paired: bool, connected: bool, services_resolved: bool, name: Optional[str] = None, device_class: Optional[int] = None, appearance: Optional[int] = None, uuids: Sequence[str] = None, rssi: int = None, tx_power: int = None, manufacturer_data: Dict[int, Sequence[int]] = None, service_data: Dict[str, Sequence[int]] = None) -> None: self.active = True self.path = path self.address = address self.paired = paired self.connected = connected self.services_resolved = services_resolved self.name = name self.device_class = device_class self.appearance = appearance self.uuids = set(uuids) if uuids is not None else set() self.rssis = [rssi] if rssi is not None else list() self.tx_power = tx_power self.first_seen = datetime.datetime.now() self.last_seen = datetime.datetime.now() self.services: MutableMapping[str, GATTService] = dict() self.manufacturer_data = dict() if manufacturer_data is not None: for k, v in manufacturer_data.items(): self.manufacturer_data[k] = [v] self.service_data = dict() if service_data is not None: self.uuids = self.uuids.union(service_data.keys()) for k, v in service_data.items(): self.service_data[k] = [v]
def list_scale_sets(self, resource_group_name: str) -> List[AzureScaleSet]: fifteen_minutes_ago = datetime.now(pytz.utc) - TIMEOUT_PERIOD filter_clause = "eventTimestamp ge '{}' and resourceGroupName eq '{}'".format(fifteen_minutes_ago, resource_group_name) select_clause = "authorization,status,subStatus,properties,resourceId,eventTimestamp" failures_by_scale_set: MutableMapping[str, List[EventData]] = {} for log in self._monitor_client.activity_logs.list(filter=filter_clause, select=select_clause): if (log.status and log.status.value == 'Failed') or (log.properties and log.properties.get('statusCode') == 'Conflict'): if log.authorization and log.authorization.action and 'delete' in log.authorization.action: continue failures_by_scale_set.setdefault(log.resource_id, []).append(log) result = [] for scale_set in self._compute_client.virtual_machine_scale_sets.list(resource_group_name): failures = sorted(failures_by_scale_set.get(scale_set.id, []), key=lambda x: x.event_timestamp, reverse=True) timeout_until = None timeout_reason = None for failure in failures: status_message = json.loads(failure.properties.get('statusMessage', "{}")) if failure.properties else {} error_details = status_message.get('error', {}) if 'message' in error_details: timeout_until = failure.event_timestamp + TIMEOUT_PERIOD timeout_reason = error_details['message'] # Stop if we found a message with details break if timeout_until is None: timeout_until = failure.event_timestamp + TIMEOUT_PERIOD timeout_reason = failure.sub_status.localized_value priority = int(scale_set.tags[PRIORITY_TAG]) if PRIORITY_TAG in scale_set.tags else None no_schedule_taints = json.loads(scale_set.tags.get(NO_SCHEDULE_TAINTS_TAG, '{}')) result.append(AzureScaleSet(scale_set.location, resource_group_name, scale_set.name, scale_set.sku.name, scale_set.sku.capacity, scale_set.provisioning_state, timeout_until=timeout_until, timeout_reason=timeout_reason, priority=priority, no_schedule_taints=no_schedule_taints)) return result
def __init__(self, delegate: AzureApi) -> None: self._delegate = delegate self._lock = RLock() self._instance_cache: MutableMapping[Tuple[str, str], List[AzureScaleSetInstance]] = {} self._scale_set_cache: MutableMapping[str, List[AzureScaleSet]] = {} self._remaining_instances_cache: MutableMapping[str, MutableMapping[str, int]] = {}
def __init__(self, guild: 'Guild', channels: 'typing.MutableMapping[int, channel.Channel]'): """ :param guild: The :class:`~.Guild` object that owns this wrapper. :param channels: The dictionary of channels that this wrapper contains. """ self._guild = guild self._channels = channels
def __init__(self, guild: 'Guild', roles: 'typing.MutableMapping[int, role.Role]'): """ :param guild: The :class:`~.Guild` object that owns this wrapper. :param roles: The dictionary of roles that this wrapper contains. """ self._guild = guild self._roles = roles
def __init__(self, guild: 'Guild', emojis: 'typing.MutableMapping[int, dt_emoji.Emoji]'): """ :param guild: The :class:`.Guild` object that owns this wrapper. :param emojis: The dictionary of emojis that this wrapper contains. """ self._guild = guild self._emojis = emojis
def remove_versions(self, versions: typing.MutableMapping[str, str]): """ Remove this document from each given index provided that it contains the given version of this document. """ es_client = ElasticsearchClient.get(self.logger) num_ok, errors = bulk(es_client, raise_on_error=False, actions=[{ '_op_type': 'delete', '_index': index_name, '_type': ESDocType.doc.name, '_version': version, '_id': str(self.fqid), } for index_name, version in versions.items()]) for item in errors: self.logger.warning(f"Document deletion failed: {json.dumps(item)}")