我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用collections.Mapping()。
def extend(self, *args, **kwargs): """Generic import function for any type of header-like object. Adapted version of MutableMapping.update in order to insert items with self.add instead of self.__setitem__ """ if len(args) > 1: raise TypeError("extend() takes at most 1 positional " "arguments ({0} given)".format(len(args))) other = args[0] if len(args) >= 1 else () if isinstance(other, HTTPHeaderDict): for key, val in other.iteritems(): self.add(key, val) elif isinstance(other, Mapping): for key in other: self.add(key, other[key]) elif hasattr(other, "keys"): for key in other.keys(): self.add(key, other[key]) else: for key, value in other: self.add(key, value) for key, value in kwargs.items(): self.add(key, value)
def to_key_val_list(value): """Take an object and test to see if it can be represented as a dictionary. If it can be, return a list of tuples, e.g., :: >>> to_key_val_list([('key', 'val')]) [('key', 'val')] >>> to_key_val_list({'key': 'val'}) [('key', 'val')] >>> to_key_val_list('string') ValueError: cannot encode objects that are not 2-tuples. :rtype: list """ if value is None: return None if isinstance(value, (str, bytes, bool, int)): raise ValueError('cannot encode objects that are not 2-tuples') if isinstance(value, collections.Mapping): value = value.items() return list(value) # From mitsuhiko/werkzeug (used with permission).
def _update_nested_dict(original_dict, new_dict): """Update the dictionary and its nested dictionary fields. Note: This was copy-pasted from: opengrok/xref/submodules/yelp_lib/yelp_lib/containers/dicts.py?r=92297a46#40 The reason is that this revision requires yelp_lib>=11.0.0 but we can not use this version yelp-main yet (see YELPLIB-65 for details). It's simpler to just temporarily pull this in. :param original_dict: Original dictionary :param new_dict: Dictionary with data to update """ # Using our own stack to avoid recursion. stack = [(original_dict, new_dict)] while stack: original_dict, new_dict = stack.pop() for key, value in new_dict.items(): if isinstance(value, Mapping): original_dict.setdefault(key, {}) stack.append((original_dict[key], value)) else: original_dict[key] = value
def __eq__(self, other): if not isinstance(other, Mapping) and not hasattr(other, 'keys'): return False if not isinstance(other, type(self)): other = type(self)(other) return (dict((k.lower(), v) for k, v in self.itermerged()) == dict((k.lower(), v) for k, v in other.itermerged()))
def preprocess(cls, op, clauses): param = clauses["between"] if isinstance(param, list): param = param elif isinstance(param, Mapping): var, vals = param.items()[0] if isinstance(vals, list) and len(vals)==2: param = [var, {"literal":vals[0]}, {"literal":vals[1]}] else: Log.error("`between` parameters are expected to be in {var: [prefix, suffix]} form") else: Log.error("`between` parameters are expected to be in {var: [prefix, suffix]} form") return param, { "default": clauses["default"], "start": clauses["start"] }
def quote_value(value): if isinstance(value, (Mapping, list)): return "." elif isinstance(value, Date): return text_type(value.unix) elif isinstance(value, Duration): return text_type(value.seconds) elif isinstance(value, basestring): return "'" + value.replace("'", "''") + "'" elif value == None: return "NULL" elif value is True: return "1" elif value is False: return "0" else: return text_type(value)
def __init__(self, fmt=None, datefmt=None, style="%"): if style != '%': raise ValueError( "only '%' percent style is supported in both python 2 and 3") if fmt is None: fmt = DEFAULT_FORMATS if isinstance(fmt, basestring): default_format = fmt custom_formats = {} elif isinstance(fmt, collections.Mapping): custom_formats = dict(fmt) default_format = custom_formats.pop("*", None) else: raise TypeError('fmt must be a str or a dict of str: %r' % fmt) super(LevelFormatter, self).__init__(default_format, datefmt) self.default_format = self._fmt self.custom_formats = {} for level, fmt in custom_formats.items(): level = logging._checkLevel(level) self.custom_formats[level] = fmt
def _resolve_refs(uri, spec): """Resolve JSON references in a given dictionary. OpenAPI spec may contain JSON references to its nodes or external sources, so any attempt to rely that there's some expected attribute in the spec may fail. So we need to resolve JSON references before we use it (i.e. replace with referenced object). For details see: https://tools.ietf.org/html/draft-pbryan-zyp-json-ref-02 The input spec is modified in-place despite being returned from the function. """ resolver = jsonschema.RefResolver(uri, spec) def _do_resolve(node): if isinstance(node, collections.Mapping) and '$ref' in node: with resolver.resolving(node['$ref']) as resolved: return resolved elif isinstance(node, collections.Mapping): for k, v in node.items(): node[k] = _do_resolve(v) elif isinstance(node, (list, tuple)): for i in range(len(node)): node[i] = _do_resolve(node[i]) return node return _do_resolve(spec)
def _validate_tag_sets(tag_sets): """Validate tag sets for a MongoReplicaSetClient. """ if tag_sets is None: return tag_sets if not isinstance(tag_sets, list): raise TypeError(( "Tag sets %r invalid, must be a list") % (tag_sets,)) if len(tag_sets) == 0: raise ValueError(( "Tag sets %r invalid, must be None or contain at least one set of" " tags") % (tag_sets,)) for tags in tag_sets: if not isinstance(tags, Mapping): raise TypeError( "Tag set %r invalid, must be an instance of dict, " "bson.son.SON or other type that inherits from " "collection.Mapping" % (tags,)) return tag_sets
def _index_document(index_list): """Helper to generate an index specifying document. Takes a list of (key, direction) pairs. """ if isinstance(index_list, collections.Mapping): raise TypeError("passing a dict to sort/create_index/hint is not " "allowed - use a list of tuples instead. did you " "mean %r?" % list(iteritems(index_list))) elif not isinstance(index_list, (list, tuple)): raise TypeError("must use a list of (key, direction) pairs, " "not: " + repr(index_list)) if not len(index_list): raise ValueError("key_or_list must not be the empty list") index = SON() for (key, value) in index_list: if not isinstance(key, string_type): raise TypeError("first item in each key pair must be a string") if not isinstance(value, (string_type, int, collections.Mapping)): raise TypeError("second item in each key pair must be 1, -1, " "'2d', 'geoHaystack', or another valid MongoDB " "index specifier.") index[key] = value return index
def _fields_list_to_dict(fields, option_name): """Takes a sequence of field names and returns a matching dictionary. ["a", "b"] becomes {"a": 1, "b": 1} and ["a.b.c", "d", "a.c"] becomes {"a.b.c": 1, "d": 1, "a.c": 1} """ if isinstance(fields, collections.Mapping): return fields if isinstance(fields, collections.Sequence): if not all(isinstance(field, string_type) for field in fields): raise TypeError("%s must be a list of key names, each an " "instance of %s" % (option_name, string_type.__name__)) return dict.fromkeys(fields, 1) raise TypeError("%s must be a mapping or " "list of key names" % (option_name,))
def remove(self, spec_or_id=None, multi=True, **kwargs): """Remove a document(s) from this collection. **DEPRECATED** - Use :meth:`delete_one` or :meth:`delete_many` instead. .. versionchanged:: 3.0 Removed the `safe` parameter. Pass ``w=0`` for unacknowledged write operations. """ warnings.warn("remove is deprecated. Use delete_one or delete_many " "instead.", DeprecationWarning, stacklevel=2) if spec_or_id is None: spec_or_id = {} if not isinstance(spec_or_id, collections.Mapping): spec_or_id = {"_id": spec_or_id} write_concern = None if kwargs: write_concern = WriteConcern(**kwargs) with self._socket_for_writes() as sock_info: return self._delete(sock_info, spec_or_id, multi, write_concern)
def __new__(cls, code, scope=None, **kwargs): if not isinstance(code, string_type): raise TypeError("code must be an " "instance of %s" % (string_type.__name__)) self = str.__new__(cls, code) try: self.__scope = code.scope except AttributeError: self.__scope = {} if scope is not None: if not isinstance(scope, collections.Mapping): raise TypeError("scope must be an instance of dict") self.__scope.update(scope) self.__scope.update(kwargs) return self
def to_dict(self): """Convert a SON document to a normal Python dictionary instance. This is trickier than just *dict(...)* because it needs to be recursive. """ def transform_value(value): if isinstance(value, list): return [transform_value(v) for v in value] elif isinstance(value, collections.Mapping): return dict([ (k, transform_value(v)) for k, v in iteritems(value)]) else: return value return transform_value(dict(self))
def _make_option_strings(cls, options_map): ret = [] options_copy = dict(options_map.items()) for option in cls.option_maps: value = options_copy.get(option) if isinstance(value, Mapping): del options_copy[option] params = ("'%s': '%s'" % (k, v) for k, v in value.items()) ret.append("%s = {%s}" % (option, ', '.join(params))) for name, value in options_copy.items(): if value is not None: if name == "comment": value = value or "" ret.append("%s = %s" % (name, protect_value(value))) return list(sorted(ret))
def _walk(node, config, path, parameters): if is_ref(node) and node[node.keys()[0]] in parameters: node = parameters[node[node.keys()[0]]] if isinstance(node, list): return [apply_extrinsics(_walk(value, config, path + [i], parameters), config, path + [i], parameters) for i, value in enumerate(node)] if isinstance(node, collections.Mapping): return {key: apply_extrinsics(_walk(value, config, path + [key], parameters), config, path + [key], parameters) for key, value in node.iteritems()} if isinstance(node, int): return node if isinstance(node, unicode): return node raise UnexpectedNodeType(path, node)
def objwalk(obj, path=(), memo=None): if memo is None: memo = set() iterator = None if isinstance(obj, Mapping): iterator = iteritems elif isinstance(obj, (Sequence, Set)) and not isinstance(obj, string_types): iterator = enumerate if iterator: if id(obj) not in memo: memo.add(id(obj)) for path_component, value in iterator(obj): for result in objwalk(value, path + (path_component,), memo): yield result memo.remove(id(obj)) else: yield path, obj # optional test code from here on
def _update(d, u): """Update a nested dict recursively in place. >>> _update({'a': 0, 'b': 2}, {'a': 1}) {'a': 1, 'b': 2} >>> _update({'a': {'b': 1}}, {'a': 0}) {'a': 0} >>> d = _update({'a': {'b': 1}}, {'a': {'c': 2}}) >>> d == {'a': {'b': 1, 'c': 2}} True """ for k, v in six.iteritems(u): r = d.get(k, {}) if isinstance(v, Mapping) and isinstance(r, Mapping): _update(r, v) d[k] = r else: d[k] = u[k] return d
def get_raw_data(self, datum): """Returns the raw data for this column, before any filters or formatting are applied to it. This is useful when doing calculations on data in the table. """ # Callable transformations if callable(self.transform): data = self.transform(datum) # Dict lookups elif isinstance(datum, collections.Mapping) and \ self.transform in datum: data = datum.get(self.transform) else: # Basic object lookups data = getattr(datum, self.transform, None) if data is None: msg = _("The attribute %(attr)s doesn't exist on " "%(obj)s.") % {'attr': self.transform, 'obj': datum} msg = termcolors.colorize(msg, **PALETTE['ERROR']) LOG.debug(msg) return data
def merge_setting(request_setting, session_setting, dict_class=OrderedDict): """Determines appropriate setting for a given request, taking into account the explicit setting on that request, and the setting in the session. If a setting is a dictionary, they will be merged together using `dict_class` """ if session_setting is None: return request_setting if request_setting is None: return session_setting # Bypass if not a dictionary (e.g. verify) if not ( isinstance(session_setting, Mapping) and isinstance(request_setting, Mapping) ): return request_setting merged_setting = dict_class(to_key_val_list(session_setting)) merged_setting.update(to_key_val_list(request_setting)) # Remove keys that are set to None. Extract keys first to avoid altering # the dictionary during iteration. none_keys = [k for (k, v) in merged_setting.items() if v is None] for key in none_keys: del merged_setting[key] return merged_setting
def __eq__(self, other): if isinstance(other, collections.Mapping): other = CaseInsensitiveDict(other) else: return NotImplemented # Compare insensitively return dict(self.lower_items()) == dict(other.lower_items()) # Copy is required