我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用collections.abc.Mapping()。
def with_query(self, *args, **kwargs): """Return a new URL with query part replaced. Accepts any Mapping (e.g. dict, multidict.MultiDict instances) or str, autoencode the argument if needed. It also can take an arbitrary number of keyword arguments. Clear query if None is passed. """ # N.B. doesn't cleanup query/fragment new_query = self._get_str_query(*args, **kwargs) return URL( self._val._replace(path=self._val.path, query=new_query), encoded=True)
def diffs(*mappings, missing=MISSING): """Yield keys and values which differ between the two mappings. A 'mapping' is any object which implements keys() and __getitem__(). """ assert mappings assert all(isinstance(mapping, Mapping) for mapping in mappings) # Defer to __eq__(), even if it contradicts the algorithm below if all_eq(mappings): return keys = chain.from_iterable(mapping.keys() for mapping in mappings) for key in unique(keys): vals = tuple(values(mappings, key)) if not all_eq(vals): yield key, vals
def __getitem__(self, key: str) -> Any: node = self.mapping leafs = key.split(".") for i, leaf in enumerate(leafs): if not isinstance(node, c_abc.Mapping): raise KeyError(f"Element {'.'.join(leafs[:i])!r} is not a mapping") if not leaf: raise KeyError(f"Empty sub-key after {'.'.join(leafs[:i])!r}") if leaf not in node: break node = node[leaf] else: return node raise KeyError(f"Cannot find '{key}'")
def __eq__(self, other): if not isinstance(other, abc.Mapping): return NotImplemented if isinstance(other, _Base): lft = self._impl._items rht = other._impl._items if len(lft) != len(rht): return False for (i1, k2, v1), (i2, k2, v2) in zip(lft, rht): if i1 != i2 or v1 != v2: return False return True if len(self._impl._items) != len(other): return False for k, v in self.items(): nv = other.get(k, _marker) if v != nv: return False return True
def basic_instance_data(request, instance_raw_data): """ Transform the raw data for a basic model instance to comply with its ctor. :param pytest._pytest.fixtures.SubRequest request: test case requesting the basic instance data :param Mapping instance_raw_data: the raw data needed to create a model instance :return object: basic instance data in a form accepted by its constructor """ # Cleanup is free with _write_config, using request's temp folder. transformation_by_class = { "AttributeDict": lambda data: data, "PipelineInterface": lambda data: _write_config(data, request, "pipeline_interface.yaml"), "ProtocolInterface": lambda data: _write_config(data, request, "pipeline_interface.yaml"), "ProtocolMapper": lambda data: data, "Sample": lambda data: pd.Series(data)} which_class = request.getfixturevalue("class_name") return transformation_by_class[which_class](instance_raw_data)
def _write_config(data, request, filename): """ Write configuration data to file. :param str Sequence | Mapping data: data to write to file, YAML compliant :param pytest._pytest.fixtures.SubRequest request: test case that requested a fixture from which this function was called :param str filename: name for the file to write :return str: full path to the file written """ # We get cleanup for free by writing to file in requests temp folder. dirpath = request.getfixturevalue("tmpdir").strpath filepath = os.path.join(dirpath, filename) with open(filepath, 'w') as conf_file: yaml.safe_dump(data, conf_file) return filepath
def get_part(self, doc, part): """ Returns the next step in the correct type """ if isinstance(doc, Mapping): return part elif isinstance(doc, Sequence): if part == '-': return part if not RE_ARRAY_INDEX.match(str(part)): raise JsonPointerException("'%s' is not a valid list index" % (part, )) return int(part) elif hasattr(doc, '__getitem__'): # Allow indexing via ducktyping if the target has defined __getitem__ return part else: raise JsonPointerException("Document '%s' does not support indexing, " "must be dict/list or support __getitem__" % type(doc))
def test_against_direct_model(data): keys = list(data.keys()) if not isinstance(data[keys[0]], Mapping): return if 'weights' in data[keys[0]]: return y = [] x = [] data_copy = OrderedDict() for i in range(min(3, len(data))): data_copy[keys[i]] = data[keys[i]] y.append(data[keys[i]]['dependent']) x.append(data[keys[i]]['exog']) direct = simple_sur(y, x) mod = SUR(data_copy) res = mod.fit(method='ols') assert_allclose(res.params.values[:, None], direct.beta0) res = mod.fit(method='gls') assert_allclose(res.params.values[:, None], direct.beta1)
def cast_json(json_dict): """Convert an arbitrary JSON source into MongoDB compatible format.""" DOT = '_' DOLLAR = '\uff04' if isinstance(json_dict, str): return json_dict.replace('.', DOT).replace('$', DOLLAR) if six.PY2 and isinstance(json_dict, unicode): # noqa return json_dict.replace('.', DOT).replace('$', DOLLAR) if isinstance(json_dict, Mapping): return {cast_json(key): cast_json(value) for key, value in json_dict.items()} elif isinstance(json_dict, Iterable): return [cast_json(o) for o in json_dict] else: return json_dict
def asjson(obj, seen=None): if isinstance(obj, collections.Mapping) or isiter(obj): # prevent traversal of recursive structures if seen is None: seen = set() elif id(obj) in seen: return '__RECURSIVE__' seen.add(id(obj)) if hasattr(obj, '__json__') and type(obj) is not type: return obj.__json__() elif isinstance(obj, collections.Mapping): result = collections.OrderedDict() for k, v in obj.items(): try: result[asjson(k, seen)] = asjson(v, seen) except TypeError: debug('Unhashable key?', type(k), str(k)) raise return result elif isiter(obj): return [asjson(e, seen) for e in obj] else: return obj
def get_part(self, doc, part): """Returns the next step in the correct type""" if isinstance(doc, Mapping): return part elif isinstance(doc, Sequence): if part == '-': return part if not self._RE_ARRAY_INDEX.match(str(part)): raise JsonPointerException("'%s' is not a valid sequence index" % part) return int(part) elif hasattr(doc, '__getitem__'): # Allow indexing via ducktyping # if the target has defined __getitem__ return part else: raise JsonPointerException("Document '%s' does not support indexing, " "must be mapping/sequence or support __getitem__" % type(doc))
def run_test(work_type: FunctionType, job_sets: Sequence, trials: int, pool_class: type, worker_count: int) -> Mapping: pool = pool_class(worker_count) if work_type == 'compute': test_func = pool.run_compute_test elif work_type == 'network': test_func = pool.run_network_test else: raise Exception("Invalid work type: {}".format(work_type)) results = map( lambda jobs: test_func(jobs, trials, show_progress=True), tqdm(job_sets, desc=pool_class.__name__), ) summarized_results = list(map(summarize_test, results)) pool.destroy_pool() return summarized_results
def frozen(struct): """Return an immutable, hashable version of the given data structure. Iterators (including generators) are hashable but mutable, so they are evaluated and returned as tuples---if they are infinite, this function will not exit. """ if isinstance(struct, Mapping): return frozenset((k, frozen(v)) for k, v in struct.items()) if isinstance(struct, Set): return frozenset(frozen(item) for item in struct) if isinstance(struct, Iterable): # Includes iterators and generators return tuple(frozen(item) for item in struct) hash(struct) # Raise TypeError for unhashable objects return struct
def hashified(struct, use_none=False): """Return a hashable version of the given data structure. If use_none is True, returns None instead of raising TypeError for unhashable types: this will serve as a bad but sometimes passable hash. See also functools._make_key, which might be a better choice. """ try: hash(struct) except TypeError: pass else: # Return the original object if it's already hashable return struct if isinstance(struct, Mapping): return frozenset((k, hashified(v)) for k, v in struct.items()) if isinstance(struct, Set): return frozenset(hashified(item) for item in struct) if isinstance(struct, Iterable): return tuple(hashified(item) for item in struct) if use_none: return None raise TypeError('unhashable type: {.__name__!r}'.format(type(struct)))
def __call__(self, obj): """Transforms the JSON object `obj`.""" if isinstance(obj, str): return obj elif isinstance(obj, Sequence): return self.act_on_list(obj) elif isinstance(obj, Mapping): return self.act_on_dict(obj) else: return obj
def build(cls, obj): if isinstance(obj, abc.Mapping): return cls(obj) elif isinstance(obj, abc.MutableSequence): return [cls.build(item) for item in obj] else: # <8> return obj
def build(cls, obj): # <5> if isinstance(obj, abc.Mapping): # <6> return cls(obj) elif isinstance(obj, abc.MutableSequence): # <7> return [cls.build(item) for item in obj] else: # <8> return obj # END EXPLORE0
def __new__(cls, arg): # <1> if isinstance(arg, abc.Mapping): return super().__new__(cls) # <2> elif isinstance(arg, abc.MutableSequence): # <3> return [cls(item) for item in arg] else: return arg
def _is_list(obj): return (isinstance(obj, Sized) and isinstance(obj, Iterable) and not isinstance(obj, (Set, Mapping)))
def _is_dict(obj): return isinstance(obj, Mapping)
def __init__(self, mapping: Mapping = None) -> None: if mapping is None: mapping = {} if not isinstance(mapping, c_abc.Mapping): raise ValueError("Must be a mapping") self.mapping = mapping
def __init__(self, mapping: Mapping, *fallback_configs: Configuration) -> None: super().__init__(mapping) # for fb_c in fallback_configs: # if not isinstance(fb_c, Configuration): # raise ValueError(f"{fb_c!r} is not an instance of {Configuration}") self.fallback_configs = fallback_configs
def __init__(self, name: str, mapping: Mapping, *fallback_configs: Configuration) -> None: super().__init__(mapping, *fallback_configs) self.name = name self._require_keys({'nick', 'user', 'realname'}) self._fix_channels(mapping) self.servers = self._parse_servers(mapping)
def _fix_channels(mapping: Mapping) -> None: # TODO move to channels core plugin for channel, channel_conf in mapping.get('channels', {}).items(): if channel_conf is None: mapping['channels'][channel] = channel_conf = {} # replace channel names 'foobar' with '#foobar' if not channel.startswith(tuple('#&+!')): del mapping['channels'][channel] mapping['channels'][f'#{channel}'] = channel_conf
def __init__(self, mapping: Mapping) -> None: super().__init__(mapping) self.networks = list(self._parse_networks(mapping))
def _parse_networks(self, root: Mapping) -> List[NetworkConfiguration]: networks = root.get('networks', None) if networks is None: raise ConfigurationError("No networks found") return [NetworkConfiguration(name, mapping, self) for name, mapping in networks.items()]
def __setitem__(self, key, value): if isinstance(value, Mapping): value = Config(**value) self._values[key] = value
def populate(self, values): for key, value in values.items(): if key not in self._values: continue # Automatically delete removed settings default = self._values[key] if isinstance(default, Config) and isinstance(value, Mapping): default.populate(value) else: self._values[key] = value
def _dump(obj, indent): if isinstance(obj, Mapping): new_indent = indent + ' ' items = [] for k, v in sorted(obj.items()): items.append( '\n' + new_indent + "'" + k + "': " + _dump(v, new_indent)) return '{' + ','.join(items) + '}' elif isinstance(obj, bool): return 'true' if obj else 'false' elif isinstance(obj, list): new_indent = indent + ' ' b = ','.join('\n' + new_indent + _dump(v, new_indent) for v in obj) return '[' + b + ']' elif isinstance(obj, (int, float, Decimal)): return str(obj) elif obj is None: return 'null' elif isinstance(obj, str): quote = LONG_QUOTE if '\n' in obj or '\r' in obj else SHORT_QUOTE return quote + obj + quote elif isinstance(obj, bytearray): return '{{ ' + b64encode(obj).decode() + ' }}' elif isinstance(obj, bytes): return "{{ '" + obj.decode() + "' }}" elif isinstance(obj, Datetime): if obj.tzinfo is None or obj.tzinfo == Timezone.utc: fmt = UTC_FORMAT else: fmt = TZ_FORMAT return obj.strftime(fmt) else: raise PionException("Type " + str(type(obj)) + " not recognised.")
def iter_pairs(iterable): """ Iterate over the (key, value) pairs in ``iterable``. This handles dictionaries sensibly, and falls back to assuming the iterable yields (key, value) pairs. This behaviour is similar to what Python's ``dict()`` constructor does. """ if isinstance(iterable, Mapping): iterable = iterable.items() return iter(iterable)
def __new__(cls, arg): if isinstance(arg, abc.Mapping): return super().__new__(cls) elif isinstance(arg, abc.MutableSequence): return [cls(item) for item in arg] else: return arg
def make_mapping(v, key_fn=identity): """Return a mapping from an object, using a function to generate keys if needed. Mappings are left as is, iterables are split into elements, everything else is wrapped in a singleton map.""" if v is None: return {} elif isinstance(v, Mapping): return v elif non_string_iterable(v): return { ignoring_extra_args(key_fn)(i, x) : x for (i,x) in enumerate(v) } else: return { ignoring_extra_args(key_fn)(None, v) : v }
def __init__(self, d={}, normalize=str.lower, base_factory=dict): self.normalize = normalize self._d = base_factory() self._k = {} if isinstance(d, abc.Mapping): for k, v in d.items(): self.__setitem__(k, v) elif isinstance(d, abc.Iterable): for (k, v) in d: self.__setitem__(k, v)
def __repr__(self): if isinstance(self._args, abc.Sequence): argstr = ', '.join(repr(arg) for arg in self._args) elif isinstance(self._args, abc.Mapping): argstr = ', '.join('%s=%r' % (k,v) for k,v in self._args.items()) provides = '/'.join(interface for interface in self.provides) string = '<Command [{}] {}({})'.format(provides, self.name, argstr) if self.finished: string += ' success={}'.format(self.success) else: string += ' running' return string + '>'
def getone(self, key, default=_marker): """Get first value matching the key.""" identity = self._title(key) for i, k, v in self._impl._items: if i == identity: return v if default is not _marker: return default raise KeyError('Key not found: %r' % key) # Mapping interface #
def clear(self): """Remove all items from MultiDict.""" self._impl._items.clear() self._impl.incr_version() # Mapping interface #
def test_abc_inheritance(): assert issubclass(MultiMapping, Mapping) assert not issubclass(MultiMapping, MutableMapping) assert issubclass(MutableMultiMapping, Mapping) assert issubclass(MutableMultiMapping, MutableMapping)
def test_Mapping(self): for sample in [dict]: self.assertIsInstance(sample(), Mapping) self.assertTrue(issubclass(sample, Mapping)) self.validate_abstract_methods(Mapping, '__contains__', '__iter__', '__len__', '__getitem__') class MyMapping(Mapping): def __len__(self): return 0 def __getitem__(self, i): raise IndexError def __iter__(self): return iter(()) self.validate_comparison(MyMapping())
def atacseq_iface_without_resources(): """ Provide the ATAC-Seq pipeline interface as a fixture, without resources. Note that this represents the configuration data for the interface for a single pipeline. In order to use this in the form that a PipelineInterface expects, this needs to be the value to which a key is mapped within a larger Mapping. :return Mapping: all of the pipeline interface configuration data for ATAC-Seq, minus the resources section """ return { "name": "ATACseq", "looper_args": True, "required_input_files": ["read1", "read2"], "all_input_files": ["read1", "read2"], "ngs_input_files": ["read1", "read2"], "arguments": { "--sample-name": "sample_name", "--genome": "genome", "--input": "read1", "--input2": "read2", "--single-or-paired": "read_type" }, "optional_arguments": { "--frip-ref-peaks": "FRIP_ref", "--prealignments": "prealignments", "--genome-size": "macs_genome_size" } }
def atacseq_iface_with_resources( atacseq_iface_without_resources, resources): """ :param dict atacseq_iface_without_resources: PipelineInterface config data, minus a resources section :param Mapping resources: resources section of PipelineInterface configuration data :return Mapping: pipeline interface data for ATAC-Seq pipeline, with all of the base sections plus resources section """ iface_data = copy.deepcopy(atacseq_iface_without_resources) iface_data["resources"] = copy.deepcopy(resources) return iface_data
def piface_config_bundles(request, resources): """ Provide the ATAC-Seq pipeline interface as a fixture, including resources. Note that this represents the configuration data for the interface for a single pipeline. In order to use this in the form that a PipelineInterface expects, this needs to be the value to which a key is mapped within a larger Mapping. :param pytest._pytest.fixtures.SubRequest request: hook into test case requesting this fixture, which is queried for a resources value with which to override the default if it's present. :param Mapping resources: pipeline interface resource specification :return Iterable[Mapping]: collection of bundles of pipeline interface configuration bundles """ iface_config_datas = request.getfixturevalue("config_bundles") if isinstance(iface_config_datas, Mapping): data_bundles = iface_config_datas.values() elif isinstance(iface_config_datas, Iterable): data_bundles = iface_config_datas else: raise TypeError("Expected mapping or list collection of " "PipelineInterface data: {} ({})".format( iface_config_datas, type(iface_config_datas))) resource_specification = request.getfixturevalue("resources") \ if "resources" in request.fixturenames else resources for config_bundle in data_bundles: config_bundle.update(resource_specification) return iface_config_datas
def extend(self, iterable): """ Concatenate two lists by adding a list of extra items to the end of this list. Each item added must be capable of being unpacked into a key-value pair. >>> kvl = KeyValueList([('one', 'eins'), ('two', 'zwei')]) >>> kvl.extend([('three', 'drei'), ('four', 'vier')]) >>> for item in kvl: ... print(item) ('one', 'eins') ('two', 'zwei') ('three', 'drei') ('four', 'vier') >>> kvl.extend(['five', 'six']) Traceback (most recent call last): File "<stdin>", line 1, in ? ValueError: KeyValueList items must be pairs """ if isinstance(iterable, Mapping): list.extend(self, iterable.items()) else: try: list.extend(self, ((k, v) for k, v in iterable)) except ValueError: raise ValueError("KeyValueList items must be pairs")
def format_tags(obj, defaults=None): result = set() for src in (obj, defaults): if isinstance(src, Mapping): result.update(['%s:%s' % (k, v) for k, v in src.items()]) elif isinstance(src, list): result.update(src) elif isinstance(src, str): result.add(src) return sorted(result)
def _merge_envs(self, merge_envs, re_env): new_env = {} for e in merge_envs: if e == 'replay': new_env.update(re_env) elif e == 'native': new_env.update(builtins.__xonsh_env__) elif isinstance(e, Mapping): new_env.update(e) else: raise TypeError('Type of env not understood: {0!r}'.format(e)) new_env = Env(**new_env) return new_env