我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用itertools.chain.from_iterable()。
def elements(self): '''Iterator over elements repeating each as many times as its count. >>> c = Counter('ABCABC') >>> sorted(c.elements()) ['A', 'A', 'B', 'B', 'C', 'C'] # Knuth's example for prime factors of 1836: 2**2 * 3**3 * 17**1 >>> prime_factors = Counter({2: 2, 3: 3, 17: 1}) >>> product = 1 >>> for factor in prime_factors.elements(): # loop over factors ... product *= factor # and multiply them >>> product 1836 Note, if an element's count has been set to zero or is a negative number, elements() will ignore it. ''' # Emulate Bag.do from Smalltalk and Multiset.begin from C++. return _chain.from_iterable(_starmap(_repeat, self.items())) # Override dict methods where necessary
def elements(self): '''Iterator over elements repeating each as many times as its count. >>> c = Counter('ABCABC') >>> sorted(c.elements()) ['A', 'A', 'B', 'B', 'C', 'C'] # Knuth's example for prime factors of 1836: 2**2 * 3**3 * 17**1 >>> prime_factors = Counter({2: 2, 3: 3, 17: 1}) >>> product = 1 >>> for factor in prime_factors.elements(): # loop over factors ... product *= factor # and multiply them >>> product 1836 Note, if an element's count has been set to zero or is a negative number, elements() will ignore it. ''' # Emulate Bag.do from Smalltalk and Multiset.begin from C++. return _chain.from_iterable(_starmap(_repeat, self.iteritems())) # Override dict methods where necessary
def set_alpn_protos(self, protos): """ Specify the clients ALPN protocol list. These protocols are offered to the server during protocol negotiation. :param protos: A list of the protocols to be offered to the server. This list should be a Python list of bytestrings representing the protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. """ # Take the list of protocols and join them together, prefixing them # with their lengths. protostr = b''.join( chain.from_iterable((int2byte(len(p)), p) for p in protos) ) # Build a C string from the list. We don't need to save this off # because OpenSSL immediately copies the data out. input_str = _ffi.new("unsigned char[]", protostr) input_str_len = _ffi.cast("unsigned", len(protostr)) _lib.SSL_CTX_set_alpn_protos(self._context, input_str, input_str_len)
def set_alpn_protos(self, protos): """ Specify the client's ALPN protocol list. These protocols are offered to the server during protocol negotiation. :param protos: A list of the protocols to be offered to the server. This list should be a Python list of bytestrings representing the protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. """ # Take the list of protocols and join them together, prefixing them # with their lengths. protostr = b''.join( chain.from_iterable((int2byte(len(p)), p) for p in protos) ) # Build a C string from the list. We don't need to save this off # because OpenSSL immediately copies the data out. input_str = _ffi.new("unsigned char[]", protostr) input_str_len = _ffi.cast("unsigned", len(protostr)) _lib.SSL_set_alpn_protos(self._ssl, input_str, input_str_len)
def get_all_related_objects(self, local_only=False, include_hidden=False, include_proxy_eq=False): include_parents = True if local_only is False else PROXY_PARENTS fields = self._get_fields( forward=False, reverse=True, include_parents=include_parents, include_hidden=include_hidden, ) fields = (obj for obj in fields if not isinstance(obj.field, ManyToManyField)) if include_proxy_eq: children = chain.from_iterable(c._relation_tree for c in self.concrete_model._meta.proxied_children if c is not self) relations = (f.remote_field for f in children if include_hidden or not f.remote_field.field.remote_field.is_hidden()) fields = chain(fields, relations) return list(fields)
def check_all_models(app_configs=None, **kwargs): errors = [] if app_configs is None: models = apps.get_models() else: models = chain.from_iterable(app_config.get_models() for app_config in app_configs) for model in models: if not inspect.ismethod(model.check): errors.append( Error( "The '%s.check()' class method is currently overridden by %r." % (model.__name__, model.check), obj=model, id='models.E020' ) ) else: errors.extend(model.check(**kwargs)) return errors
def diffs(*mappings, missing=MISSING): """Yield keys and values which differ between the two mappings. A 'mapping' is any object which implements keys() and __getitem__(). """ assert mappings assert all(isinstance(mapping, Mapping) for mapping in mappings) # Defer to __eq__(), even if it contradicts the algorithm below if all_eq(mappings): return keys = chain.from_iterable(mapping.keys() for mapping in mappings) for key in unique(keys): vals = tuple(values(mappings, key)) if not all_eq(vals): yield key, vals
def eval_argument_clinic(self, arguments): """Uses a list with argument clinic information (see PEP 436).""" iterator = self.unpack() for i, (name, optional, allow_kwargs) in enumerate(arguments): key, va_values = next(iterator, (None, [])) if key is not None: raise NotImplementedError if not va_values and not optional: debug.warning('TypeError: %s expected at least %s arguments, got %s', name, len(arguments), i) raise ValueError values = list(chain.from_iterable(self._evaluator.eval_element(el) for el in va_values)) if not values and not optional: # For the stdlib we always want values. If we don't get them, # that's ok, maybe something is too hard to resolve, however, # we will not proceed with the evaluation of that function. debug.warning('argument_clinic "%s" not resolvable.', name) raise ValueError yield values
def _global_completions(self): scope = get_user_scope(self._module, self._position) if not scope.is_scope(): # Might be a flow (if/while/etc). scope = scope.get_parent_scope() scope = self._evaluator.wrap(scope) debug.dbg('global completion scope: %s', scope) names_dicts = global_names_dict_generator( self._evaluator, scope, self._position ) completion_names = [] for names_dict, pos in names_dicts: names = list(chain.from_iterable(names_dict.values())) if not names: continue completion_names += filter_definition_names( names, self._module.get_statement_for_position(self._position), pos ) return completion_names
def eval_argument_clinic(self, arguments): """Uses a list with argument clinic information (see PEP 436).""" iterator = self.unpack() for i, (name, optional, allow_kwargs) in enumerate(arguments): key, va_values = next(iterator, (None, [])) if key is not None: raise NotImplementedError if not va_values and not optional: debug.warning('TypeError: %s expected at least %s arguments, got %s', name, len(arguments), i) raise ValueError values = set(chain.from_iterable(self._evaluator.eval_element(el) for el in va_values)) if not values and not optional: # For the stdlib we always want values. If we don't get them, # that's ok, maybe something is too hard to resolve, however, # we will not proceed with the evaluation of that function. debug.warning('argument_clinic "%s" not resolvable.', name) raise ValueError yield values
def build_phrase_models(content, base_path, settings): """ Build and save the phrase models """ ngram_level = int(settings['level']) # According to tee() docs, this may be inefficient in terms of memory. # We need to do this because we need multiple passes through the # content stream. content = chain.from_iterable(doc.tokenized_text for doc in content) cs1, cs2 = tee(content, 2) for i in range(ngram_level-1): phrases = Phrases(cs1) path = "%s.%s" % (base_path, i + 2) # save path as n-gram level logger.info("Phrase processor: Saving %s", path) phrases.save(path) # TODO: gensim complains about not using Phraser(phrases) content = phrases[cs2] # tokenize phrases in content stream cs1, cs2 = tee(content, 2)
def _get_all_related_objects(self, local_only=False, include_hidden=False, include_proxy_eq=False): """ Returns a list of related fields (also many to many) :param local_only: :param include_hidden: :return: list """ include_parents = True if local_only is False else PROXY_PARENTS fields = self.opts._get_fields( forward=False, reverse=True, include_parents=include_parents, include_hidden=include_hidden ) if include_proxy_eq: children = chain.from_iterable(c._relation_tree for c in self.opts.concrete_model._meta.proxied_children if c is not self.opts) relations = (f.remote_field for f in children if include_hidden or not f.remote_field.field.remote_field.is_hidden()) fields = chain(fields, relations) return list(fields)
def build_indices(self): self.count = [[TrainingData.UNKNOWN_TOKEN, -1]] counter = Counter(chain.from_iterable(self.corpus.sentences())) if self.nb_words: common_counts = counter.most_common(self.nb_words - 1) else: common_counts = counter.most_common() self.count.extend(common_counts) self.word2index = dict() for word, _ in self.count: self.word2index[word] = len(self.word2index) unk_count = 0 for sentence in self.corpus.sentences(): for word in sentence: if word not in self.word2index: unk_count = unk_count + 1 self.count[0][1] = unk_count self.index2word = dict( zip(self.word2index.values(), self.word2index.keys()))
def register_settings(settings_name, settings, register_events_on=None): # we do not have this name registered yet if settings_name not in Animation.animation_settings: assert "default" in settings, "ERROR: no entry `default` in animation-settings. Each settings block needs a default animation name." for anim in settings: if anim != "default": defaults(settings[anim], { "rate": 1 / 3, # the rate with which to play this animation in 1/s "frames": [0, 1], # the frames to play from our spritesheet (starts with 0) "priority": 0, # which priority to use for next if next is given # flags bitmap that determines the behavior of the animation (e.g. block controls during animation play, etc..) "flags": 0, "callbacks": None, "loop": True, # whether to loop the animation when done "next": None, # which animation to play next (str or callable returning a str) "next_priority": 0, # which priority to use for next if next is given "trigger": None, # which events to trigger on the game_object that plays this animation "trigger_data": [], # *args data to pass to the event handler if trigger is given "properties": {}, # some custom properties of this anim }) Animation.animation_settings[settings_name] = settings if isinstance(register_events_on, EventObject): l = list(chain.from_iterable(("anim." + anim, "anim_loop." + anim, "anim_end." + anim) for anim in settings)) register_events_on.register_event(*l)
def update(self, iterable): """Update the list by adding all elements from *iterable*.""" _maxes, _lists = self._maxes, self._lists values = sorted(iterable) if _maxes: if len(values) * 4 >= self._len: values.extend(chain.from_iterable(_lists)) values.sort() self.clear() else: _add = self.add for val in values: _add(val) return _load, _index = self._load, self._index _lists.extend(values[pos:(pos + _load)] for pos in range(0, len(values), _load)) _maxes.extend(sublist[-1] for sublist in _lists) self._len = len(values) del _index[:]
def update(self, iterable): """Update the list by adding all elements from *iterable*.""" _maxes, _lists, _keys = self._maxes, self._lists, self._keys values = sorted(iterable, key=self._key) if _maxes: if len(values) * 4 >= self._len: values.extend(chain.from_iterable(_lists)) values.sort(key=self._key) self.clear() else: _add = self.add for val in values: _add(val) return _load, _index = self._load, self._index _lists.extend(values[pos:(pos + _load)] for pos in range(0, len(values), _load)) _keys.extend(list(map(self._key, _list)) for _list in _lists) _maxes.extend(sublist[-1] for sublist in _keys) self._len = len(values) del _index[:]
def _get_size(item, seen): known_types = {dict: lambda d: chain.from_iterable(d.items())} default_size = getsizeof(0) def size_walk(item): if id(item) in seen: return 0 seen.add(id(item)) s = getsizeof(item, default_size) for _type, fun in known_types.iteritems(): if isinstance(item, _type): s += sum(map(size_walk, fun(item))) break return s return size_walk(item)
def replot(self): """ Function refreshes the chart """ if self.data is None or not self.attr_x or not self.attr_y: return km = self.k_means if not km.centroids_moved: self.complete_replot() return # when centroids moved during step self.scatter.update_series(0, self.k_means.centroids) if self.lines_to_centroids: for i, (c, pts) in enumerate(zip( km.centroids, km.centroids_belonging_points)): self.scatter.update_series(1 + i, list(chain.from_iterable( ([p[0], p[1]], [c[0], c[1]]) for p in pts)))
def set_alpn_protos(self, protos): """ Specify the clients ALPN protocol list. These protocols are offered to the server during protocol negotiation. :param protos: A list of the protocols to be offered to the server. This list should be a Python list of bytestrings representing the protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. """ # Take the list of protocols and join them together, prefixing them # with their lengths. protostr = b''.join( chain.from_iterable((int2byte(len(p)), p) for p in protos) ) # Build a C string from the list. We don't need to save this off # because OpenSSL immediately copies the data out. input_str = _ffi.new("unsigned char[]", protostr) _lib.SSL_CTX_set_alpn_protos(self._context, input_str, len(protostr))
def set_alpn_protos(self, protos): """ Specify the client's ALPN protocol list. These protocols are offered to the server during protocol negotiation. :param protos: A list of the protocols to be offered to the server. This list should be a Python list of bytestrings representing the protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. """ # Take the list of protocols and join them together, prefixing them # with their lengths. protostr = b''.join( chain.from_iterable((int2byte(len(p)), p) for p in protos) ) # Build a C string from the list. We don't need to save this off # because OpenSSL immediately copies the data out. input_str = _ffi.new("unsigned char[]", protostr) _lib.SSL_set_alpn_protos(self._ssl, input_str, len(protostr))
def test_sparse_benchmark(self): benchmark_returns = self.benchmark_returns_06.copy() # Set every other day to nan. benchmark_returns.iloc[::2] = np.nan report = risk.RiskReport( self.algo_returns_06, self.sim_params, benchmark_returns=benchmark_returns, env=self.env, ) for risk_period in chain.from_iterable(itervalues(report.to_dict())): self.assertIsNone(risk_period['beta'])
def flatten_dict(response: dict): # http://feldboris.alwaysdata.net/blog/python-trick-how-to-flatten-dictionaries-values-composed-of-iterables.html return chain.from_iterable(response.values())
def matrix_iter(matrix, version, scale=1, border=None): """\ Returns an interator / generator over the provided matrix which includes the border and the scaling factor. If either the `scale` or `border` value is invalid, a py:exc:`ValueError` is raised. :param matrix: An iterable of bytearrays. :param int version: A version constant. :param int scale: The scaling factor (default: ``1``). :param int border: The border size or ``None`` to specify the default quiet zone (4 for QR Codes, 2 for Micro QR Codes). :raises: py:exc:`ValueError` if an illegal scale or border value is provided """ check_valid_border(border) scale = int(scale) check_valid_scale(scale) border = get_border(version, border) size = get_symbol_size(version, scale=1, border=0)[0] def get_bit(i, j): return 0x1 if (0 <= i < size and 0 <= j < size and matrix[i][j]) else 0x0 for i in range(-border, size + border): for s in range(scale): yield chain.from_iterable(([get_bit(i, j)] * scale for j in range(-border, size + border)))
def backup_instance(ec2, instance, retention_days, to_tag_retention, to_tag_mount_point): skip_volumes = get_tag_value(instance, 'skip_backup_volumes', default=[], func=lambda v: str(v).split(',')) skip_volumes_list = list(chain.from_iterable(skip_volumes)) inst_name = get_tag_value(instance, 'Name', default='') for dev in instance['BlockDeviceMappings']: if dev.get('Ebs', None) is None: continue vol_id = dev['Ebs']['VolumeId'] if vol_id in skip_volumes_list: print("Volume %s is set to be skipped, not backing up" % (vol_id)) continue dev_attachment = dev['DeviceName'] print("Found EBS volume %s on instance %s attached to %s" % ( vol_id, instance['InstanceId'], dev_attachment)) snap = ec2.create_snapshot( VolumeId=vol_id, Description=instance['InstanceId'], ) to_tag_retention[retention_days].append(snap['SnapshotId']) to_tag_mount_point[vol_id].append(snap['SnapshotId']) print("Retaining snapshot %s of volume %s from instance %s for %d days" % ( snap['SnapshotId'], vol_id, instance['InstanceId'], retention_days, )) ec2.create_tags( Resources=to_tag_mount_point[vol_id], Tags=[ {'Key': 'Name', 'Value': inst_name}, ] )
def parseEntries(deps, env={}, fwd=False, use=["result", "deps"], cond=None): """Returns an iterator yielding all dependencies as flat list""" # return flattened list of dependencies return chain.from_iterable( Recipe.Dependency.__parseEntry(dep, env, fwd, use, cond) for dep in deps )
def loadFromFile(recipeSet, fileName, properties, fileSchema, isRecipe): # MultiPackages are handled as separate recipes with an anonymous base # class. Ignore first dir in path, which is 'recipes' by default. # Following dirs are treated as categories separated by '::'. baseName = os.path.splitext( fileName )[0].split( os.sep )[1:] try: for n in baseName: RECIPE_NAME_SCHEMA.validate(n) except schema.SchemaError as e: raise ParseError("Invalid recipe name: '{}'".format(fileName)) baseName = "::".join( baseName ) baseDir = os.path.dirname(fileName) nameMap = {} def anonNameCalculator(suffix): num = nameMap.setdefault(suffix, 0) + 1 nameMap[suffix] = num return baseName + suffix + "$" + str(num) def collect(recipe, suffix, anonBaseClass): if "multiPackage" in recipe: anonBaseClass = Recipe(recipeSet, recipe, fileName, baseDir, anonNameCalculator(suffix), baseName, properties, isRecipe, anonBaseClass) return chain.from_iterable( collect(subSpec, suffix + ("-"+subName if subName else ""), anonBaseClass) for (subName, subSpec) in recipe["multiPackage"].items() ) else: packageName = baseName + suffix return [ Recipe(recipeSet, recipe, fileName, baseDir, packageName, baseName, properties, isRecipe, anonBaseClass) ] return list(collect(recipeSet.loadYaml(fileName, fileSchema), "", None))