我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用itertools.chain()。
def _by_version_descending(names): """ Given a list of filenames, return them in descending order by version number. >>> names = 'bar', 'foo', 'Python-2.7.10.egg', 'Python-2.7.2.egg' >>> _by_version_descending(names) ['Python-2.7.10.egg', 'Python-2.7.2.egg', 'foo', 'bar'] >>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.egg' >>> _by_version_descending(names) ['Setuptools-1.2.3.egg', 'Setuptools-1.2.3b1.egg'] >>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.post1.egg' >>> _by_version_descending(names) ['Setuptools-1.2.3.post1.egg', 'Setuptools-1.2.3b1.egg'] """ def _by_version(name): """ Parse each component of the filename """ name, ext = os.path.splitext(name) parts = itertools.chain(name.split('-'), [ext]) return [packaging.version.parse(part) for part in parts] return sorted(names, key=_by_version, reverse=True)
def _pad_version(left, right): left_split, right_split = [], [] # Get the release segment of our versions left_split.append(list(itertools.takewhile(lambda x: x.isdigit(), left))) right_split.append(list(itertools.takewhile(lambda x: x.isdigit(), right))) # Get the rest of our versions left_split.append(left[len(left_split[0]):]) right_split.append(right[len(right_split[0]):]) # Insert our padding left_split.insert( 1, ["0"] * max(0, len(right_split[0]) - len(left_split[0])), ) right_split.insert( 1, ["0"] * max(0, len(left_split[0]) - len(right_split[0])), ) return ( list(itertools.chain(*left_split)), list(itertools.chain(*right_split)), )
def _hash_comparison(self): """ Return a comparison of actual and expected hash values. Example:: Expected sha256 abcdeabcdeabcdeabcdeabcdeabcdeabcdeabcdeabcde or 123451234512345123451234512345123451234512345 Got bcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdef """ def hash_then_or(hash_name): # For now, all the decent hashes have 6-char names, so we can get # away with hard-coding space literals. return chain([hash_name], repeat(' or')) lines = [] for hash_name, expecteds in iteritems(self.allowed): prefix = hash_then_or(hash_name) lines.extend((' Expected %s %s' % (next(prefix), e)) for e in expecteds) lines.append(' Got %s\n' % self.gots[hash_name].hexdigest()) prefix = ' or' return '\n'.join(lines)
def _build_multipart(cls, data): """ Build up the MIME payload for the POST data """ boundary = b'--------------GHSKFJDLGDS7543FJKLFHRE75642756743254' sep_boundary = b'\n--' + boundary end_boundary = sep_boundary + b'--' end_items = end_boundary, b"\n", builder = functools.partial( cls._build_part, sep_boundary=sep_boundary, ) part_groups = map(builder, data.items()) parts = itertools.chain.from_iterable(part_groups) body_items = itertools.chain(parts, end_items) content_type = 'multipart/form-data; boundary=%s' % boundary.decode('ascii') return b''.join(body_items), content_type
def find_data_files(self, package, src_dir): """Return filenames for package's data files in 'src_dir'""" patterns = self._get_platform_patterns( self.package_data, package, src_dir, ) globs_expanded = map(glob, patterns) # flatten the expanded globs into an iterable of matches globs_matches = itertools.chain.from_iterable(globs_expanded) glob_files = filter(os.path.isfile, globs_matches) files = itertools.chain( self.manifest_files.get(package, []), glob_files, ) return self.exclude_data_files(package, src_dir, files)
def exclude_data_files(self, package, src_dir, files): """Filter filenames for package's data files in 'src_dir'""" files = list(files) patterns = self._get_platform_patterns( self.exclude_package_data, package, src_dir, ) match_groups = ( fnmatch.filter(files, pattern) for pattern in patterns ) # flatten the groups of matches into an iterable of matches matches = itertools.chain.from_iterable(match_groups) bad = set(matches) keepers = ( fn for fn in files if fn not in bad ) # ditch dupes return list(_unique_everseen(keepers))
def _build_paths(self, name, spec_path_lists, exists): """ Given an environment variable name and specified paths, return a pathsep-separated string of paths containing unique, extant, directories from those paths and from the environment variable. Raise an error if no paths are resolved. """ # flatten spec_path_lists spec_paths = itertools.chain.from_iterable(spec_path_lists) env_paths = safe_env.get(name, '').split(os.pathsep) paths = itertools.chain(spec_paths, env_paths) extant_paths = list(filter(os.path.isdir, paths)) if exists else paths if not extant_paths: msg = "%s environment variable is empty" % name.upper() raise distutils.errors.DistutilsPlatformError(msg) unique_paths = self._unique_everseen(extant_paths) return os.pathsep.join(unique_paths) # from Python docs
def read_flat(self): """ Read a PNG file and decode it into flat row flat pixel format. Returns (*width*, *height*, *pixels*, *metadata*). May use excessive memory. `pixels` are returned in flat row flat pixel format. See also the :meth:`read` method which returns pixels in the more stream-friendly boxed row flat pixel format. """ x, y, pixel, meta = self.read() arraycode = 'BH'[meta['bitdepth']>8] pixel = array(arraycode, itertools.chain(*pixel)) return x, y, pixel, meta
def _get_platform_patterns(spec, package, src_dir): """ yield platform-specific path patterns (suitable for glob or fn_match) from a glob-based spec (such as self.package_data or self.exclude_package_data) matching package in src_dir. """ raw_patterns = itertools.chain( spec.get('', []), spec.get(package, []), ) return ( # Each pattern has to be converted to a platform-specific path os.path.join(src_dir, convert_path(pattern)) for pattern in raw_patterns ) # from Python docs
def find_a_bracket(self, caret_pt): """ Locate the next bracket after the caret in the current line. If None is found, execution must be aborted. Return (bracket, brackets, bracket_pt). Example ('(', ('(', ')'), 1337)). """ caret_row, caret_col = self.view.rowcol(caret_pt) line_text = self.view.substr(Region(caret_pt, self.view.line(caret_pt).b)) try: found_brackets = min([(line_text.index(bracket), bracket) for bracket in chain(*self.pairs) if bracket in line_text]) except ValueError: return None, None, None bracket_a, bracket_b = [(a, b) for (a, b) in self.pairs if found_brackets[1] in (a, b)][0] return (found_brackets[1], (bracket_a, bracket_b), self.view.text_point(caret_row, caret_col + found_brackets[0]))
def _check_listening_on_services_ports(services, test=False): """Check that the unit is actually listening (has the port open) on the ports that the service specifies are open. If test is True then the function returns the services with ports that are open rather than closed. Returns an OrderedDict of service: ports and a list of booleans @param services: OrderedDict(service: [port, ...], ...) @param test: default=False, if False, test for closed, otherwise open. @returns OrderedDict(service: [port-not-open, ...]...), [boolean] """ test = not(not(test)) # ensure test is True or False all_ports = list(itertools.chain(*services.values())) ports_states = [port_has_listener('0.0.0.0', p) for p in all_ports] map_ports = OrderedDict() matched_ports = [p for p, opened in zip(all_ports, ports_states) if opened == test] # essentially opened xor test for service, ports in services.items(): set_ports = set(ports).intersection(matched_ports) if set_ports: map_ports[service] = set_ports return map_ports, ports_states
def peek(iterable): """ Peek ahead in an iterable. Parameters ---------- iterable : iterable Returns ------- first : object First element of ``iterable`` stream : iterable Iterable containing ``first`` and all other elements from ``iterable`` """ iterable = iter(iterable) ahead = next(iterable) return ahead, chain([ahead], iterable)
def attach_pipeline(self, pipeline, name, chunksize=None): """ Register a pipeline to be computed at the start of each day. """ if self._pipelines: raise NotImplementedError("Multiple pipelines are not supported.") if chunksize is None: # Make the first chunk smaller to get more immediate results: # (one week, then every half year) chunks = iter(chain([5], repeat(126))) else: chunks = iter(repeat(int(chunksize))) self._pipelines[name] = pipeline, chunks # Return the pipeline to allow expressions like # p = attach_pipeline(Pipeline(), 'name') return pipeline
def _group_lengths(grouping): """Convert a localeconv-style grouping into a (possibly infinite) iterable of integers representing group lengths. """ # The result from localeconv()['grouping'], and the input to this # function, should be a list of integers in one of the # following three forms: # # (1) an empty list, or # (2) nonempty list of positive integers + [0] # (3) list of positive integers + [locale.CHAR_MAX], or from itertools import chain, repeat if not grouping: return [] elif grouping[-1] == 0 and len(grouping) >= 2: return chain(grouping[:-1], repeat(grouping[-2])) elif grouping[-1] == _locale.CHAR_MAX: return grouping[:-1] else: raise ValueError('unrecognised format for grouping')
def _get_headnode_dict(fixer_list): """ Accepts a list of fixers and returns a dictionary of head node type --> fixer list. """ head_nodes = collections.defaultdict(list) every = [] for fixer in fixer_list: if fixer.pattern: try: heads = _get_head_types(fixer.pattern) except _EveryNode: every.append(fixer) else: for node_type in heads: head_nodes[node_type].append(fixer) else: if fixer._accept_type is not None: head_nodes[fixer._accept_type].append(fixer) else: every.append(fixer) for node_type in chain(pygram.python_grammar.symbol2number.itervalues(), pygram.python_grammar.tokens): head_nodes[node_type].extend(every) return dict(head_nodes)
def _match_label_with_color(label, colors, bg_label, bg_color): """Return `unique_labels` and `color_cycle` for label array and color list. Colors are cycled for normal labels, but the background color should only be used for the background. """ # Temporarily set background color; it will be removed later. if bg_color is None: bg_color = (0, 0, 0) bg_color = _rgb_vector([bg_color]) unique_labels = list(set(label.flat)) # Ensure that the background label is in front to match call to `chain`. if bg_label in unique_labels: unique_labels.remove(bg_label) unique_labels.insert(0, bg_label) # Modify labels and color cycle so background color is used only once. color_cycle = itertools.cycle(colors) color_cycle = itertools.chain(bg_color, color_cycle) return unique_labels, color_cycle
def predict_beatmap(self, beatmap, *mods, **mods_scalar): """Predict the user's accuracy for the given beatmap. Parameters ---------- beatmap : Beatmap The map to predict the performance of. *mods A sequence of mod dictionaries to predict for. **mods_dict Mods to predict for. Returns ------- accuracy : float The user's expected accuracy in the range [0, 1]. """ for mod_name in 'hidden', 'hard_rock', 'half_time', 'double_time': mods_scalar.setdefault(mod_name, False) return self.predict([ (beatmap, ms) for ms in chain(mods, [mods_scalar]) ])
def compile(self, ttFont): # First make sure that all the data lines up properly. Format 4 # must have all its data lined up consecutively. If not this will fail. for curLoc, nxtLoc in zip(self.locations, self.locations[1:]): assert curLoc[1] == nxtLoc[0], "Data must be consecutive in indexSubTable format 4" offsets = list(self.locations[0]) + [loc[1] for loc in self.locations[1:]] # Image data offset must be less than or equal to the minimum of locations. # Resetting this offset may change the value for round tripping but is safer # and allows imageDataOffset to not be required to be in the XML version. self.imageDataOffset = min(offsets) offsets = [offset - self.imageDataOffset for offset in offsets] glyphIds = list(map(ttFont.getGlyphID, self.names)) # Create an iterator over the ids plus a padding value. idsPlusPad = list(itertools.chain(glyphIds, [0])) dataList = [EblcIndexSubTable.compile(self, ttFont)] dataList.append(struct.pack(">L", len(glyphIds))) tmp = [struct.pack(codeOffsetPairFormat, *cop) for cop in zip(idsPlusPad, offsets)] dataList += tmp data = bytesjoin(dataList) return data
def openapi2httpdomain(spec, **options): generators = [] # OpenAPI spec may contain JSON references, common properties, etc. # Trying to render the spec "As Is" will require to put multiple # if-s around the code. In order to simplify flow, let's make the # spec to have only one (expected) schema, i.e. normalize it. _normalize_spec(spec, **options) # If 'paths' are passed we've got to ensure they exist within an OpenAPI # spec; otherwise raise error and ask user to fix that. if 'paths' in options: if not set(options['paths']).issubset(spec['paths']): raise ValueError( 'One or more paths are not defined in the spec: %s.' % ( ', '.join(set(options['paths']) - set(spec['paths'])), ) ) for endpoint in options.get('paths', spec['paths']): for method, properties in spec['paths'][endpoint].items(): generators.append(_httpresource(endpoint, method, properties)) return iter(itertools.chain(*generators))
def __iter__(self): stack = [ (self.__graphRoot, chain(self.__pkgRoot.getDirectDepSteps(), self.__pkgRoot.getIndirectDepSteps())) ] yield (self.__graphRoot, self.__pkgRoot) done = set([self.__graphRoot.key()]) while stack: try: childPkg = next(stack[-1][1]).getPackage() childNode = stack[-1][0][childPkg.getName()].node if childNode.key() not in done: done.add(childNode.key()) yield (childNode, childPkg) stack.append( (childNode, chain(childPkg.getDirectDepSteps(), childPkg.getIndirectDepSteps())) ) except StopIteration: stack.pop()
def __load(self): # load cards with open(self.cardJSON, 'r', encoding='utf8') as file: cards = json.load(file) with open(self.tokenJSON, 'r', encoding='utf8') as file: tokens = json.load(file) # json to db full of text for name, card in itertools.chain(cards.items(), tokens.items()): clean = CardDB.cleanName(name) if clean in self.__db: log.error("load() duplicate name, already in the db: %s", clean) raise Exception('duplicate card') self.__db[clean] = formatter.createCardText(card, self.constants) self.tokens = [CardDB.cleanName(name) for name in tokens.keys()] # finally load temp file self.refreshTemp()
def exists(self, submission_id, cards): """Test if request is a duplicate and inserts new :return: true if all cards are already posted for parent """ query = ('SELECT card FROM topcomment ' ' WHERE submission_id = ?' ' AND card IN (%s)' % ','.join('?' * len(cards))) params = list(itertools.chain((submission_id,), cards)) foundCards = [row[0] for row in self.conn.execute(query, params)] inserted = False for card in cards: if card not in foundCards: inserted = True self.conn.execute("INSERT INTO topcomment (submission_id, card) VALUES (?, ?)", (submission_id, card)) self.conn.commit() return not inserted
def gen_values(self, n, reversed = False, shuffled = False, gen_dupes = False): if reversed: keys = xrange(n-1,-1,-1) else: keys = xrange(n) if shuffled: keys = list(keys) r = random.Random(1234827) r.shuffle(keys) if gen_dupes: return itertools.chain( itertools.izip(keys, xrange(0, 2*n, 2)), itertools.islice(itertools.izip(keys, xrange(0, 2*n, 2)), 10, None), ) else: return itertools.izip(keys, xrange(0, 2*n, 2))
def _replace_cdata_list_attribute_values(self, tag_name, attrs): """Replaces class="foo bar" with class=["foo", "bar"] Modifies its input in place. """ if self.cdata_list_attributes: universal = self.cdata_list_attributes.get('*', []) tag_specific = self.cdata_list_attributes.get( tag_name.lower(), []) for cdata_list_attr in itertools.chain(universal, tag_specific): if cdata_list_attr in dict(attrs): # Basically, we have a "class" attribute whose # value is a whitespace-separated list of CSS # classes. Split it into a list. value = attrs[cdata_list_attr] values = whitespace_re.split(value) attrs[cdata_list_attr] = values return attrs
def _parse_ipv6(a): """ Parse IPv6 address. Ideally we would use the ipaddress module in Python3.3 but can't rely on having this. Does not handle dotted-quad addresses or subnet prefix >>> _parse_ipv6("::") == (0,) * 16 True >>> _parse_ipv6("1234:5678::abcd:0:ff00") (18, 52, 86, 120, 0, 0, 0, 0, 0, 0, 171, 205, 0, 0, 255, 0) """ l,_,r = a.partition("::") l_groups = list(chain(*[divmod(int(x,16),256) for x in l.split(":") if x])) r_groups = list(chain(*[divmod(int(x,16),256) for x in r.split(":") if x])) zeros = [0] * (16 - len(l_groups) - len(r_groups)) return tuple(l_groups + zeros + r_groups)
def write(self, outfile, rows): """Write a PNG image to the output file. `rows` should be an iterable that yields each row in boxed row flat pixel format. The rows should be the rows of the original image, so there should be ``self.height`` rows of ``self.width * self.planes`` values. If `interlace` is specified (when creating the instance), then an interlaced PNG file will be written. Supply the rows in the normal image order; the interlacing is carried out internally. .. note :: Interlacing will require the entire image to be in working memory. """ if self.interlace: fmt = 'BH'[self.bitdepth > 8] a = array(fmt, itertools.chain(*rows)) return self.write_array(outfile, a) else: nrows = self.write_passes(outfile, rows) if nrows != self.height: raise ValueError( "rows supplied (%d) does not match height (%d)" % (nrows, self.height))
def testPAMin(self): """Test that the command line tool can read PAM file.""" def do(): return _main(['testPAMin']) s = BytesIO() s.write(strtobytes('P7\nWIDTH 3\nHEIGHT 1\nDEPTH 4\nMAXVAL 255\n' 'TUPLTYPE RGB_ALPHA\nENDHDR\n')) # The pixels in flat row flat pixel format flat = [255,0,0,255, 0,255,0,120, 0,0,255,30] asbytes = seqtobytes(flat) s.write(asbytes) s.flush() s.seek(0) o = BytesIO() testWithIO(s, o, do) r = Reader(bytes=o.getvalue()) x,y,pixels,meta = r.read() self.assertTrue(r.alpha) self.assertTrue(not r.greyscale) self.assertEqual(list(itertools.chain(*pixels)), flat)
def partition_args(self, all_commands, args): name = args[0] try: command = all_commands[name] except KeyError: raise RunnerError('Unknown command: {name}'.format(name=name)) args = args[1:] command_args = [] partition = [command, command_args] prev_args = chain([None], args[:-1]) next_args = chain(args[1:], [None]) for prev_arg, arg, next_arg in zip(prev_args, args, next_args): if arg in all_commands: option = command.arg_map.get(prev_arg) if option is None or not option.takes_value: break if arg.startswith(':') and arg != ':': arg = arg[1:] command_args.append(arg) return partition
def _iterate_polymorphic_properties(self, mappers=None): """Return an iterator of MapperProperty objects which will render into a SELECT.""" if mappers is None: mappers = self._with_polymorphic_mappers if not mappers: for c in self.iterate_properties: yield c else: # in the polymorphic case, filter out discriminator columns # from other mappers, as these are sometimes dependent on that # mapper's polymorphic selectable (which we don't want rendered) for c in util.unique_list( chain(*[ list(mapper.iterate_properties) for mapper in [self] + mappers ]) ): if getattr(c, '_is_polymorphic_discriminator', False) and \ (self.polymorphic_on is None or c.columns[0] is not self.polymorphic_on): continue yield c
def _create_outerjoin(cls, left, right, onclause=None): """Return an ``OUTER JOIN`` clause element. The returned object is an instance of :class:`.Join`. Similar functionality is also available via the :meth:`~.FromClause.outerjoin()` method on any :class:`.FromClause`. :param left: The left side of the join. :param right: The right side of the join. :param onclause: Optional criterion for the ``ON`` clause, is derived from foreign key relationships established between left and right otherwise. To chain joins together, use the :meth:`.FromClause.join` or :meth:`.FromClause.outerjoin` methods on the resulting :class:`.Join` object. """ return cls(left, right, onclause, isouter=True)
def _froms(self): # would love to cache this, # but there's just enough edge cases, particularly now that # declarative encourages construction of SQL expressions # without tables present, to just regen this each time. froms = [] seen = set() translate = self._from_cloned for item in itertools.chain( _from_objects(*self._raw_columns), _from_objects(self._whereclause) if self._whereclause is not None else (), self._from_obj ): if item is self: raise exc.InvalidRequestError( "select() construct refers to itself as a FROM") if translate and item in translate: item = translate[item] if not seen.intersection(item._cloned_set): froms.append(item) seen.update(item._cloned_set) return froms
def __init__(self, selectable, equivalents=None, chain_to=None, adapt_required=False, include_fn=None, exclude_fn=None, adapt_on_names=False, allow_label_resolve=True, anonymize_labels=False): ClauseAdapter.__init__(self, selectable, equivalents, include_fn=include_fn, exclude_fn=exclude_fn, adapt_on_names=adapt_on_names, anonymize_labels=anonymize_labels) if chain_to: self.chain(chain_to) self.columns = util.populate_column_dict(self._locate_col) if self.include_fn or self.exclude_fn: self.columns = self._IncludeExcludeMapping(self, self.columns) self.adapt_required = adapt_required self.allow_label_resolve = allow_label_resolve self._wrap = None
def run(self): mtimes = {} while 1: for filename in chain(_iter_module_files(), self.extra_files): try: mtime = os.stat(filename).st_mtime except OSError: continue old_time = mtimes.get(filename) if old_time is None: mtimes[filename] = mtime continue elif mtime > old_time: self.trigger_reload(filename) self._sleep(self.interval)
def update_template_context(self, context): """Update the template context with some commonly used variables. This injects request, session, config and g into the template context as well as everything template context processors want to inject. Note that the as of Flask 0.6, the original values in the context will not be overridden if a context processor decides to return a value with the same key. :param context: the context as a dictionary that is updated in place to add extra variables. """ funcs = self.template_context_processors[None] reqctx = _request_ctx_stack.top if reqctx is not None: bp = reqctx.request.blueprint if bp is not None and bp in self.template_context_processors: funcs = chain(funcs, self.template_context_processors[bp]) orig_ctx = context.copy() for func in funcs: context.update(func()) # make sure the original values win. This makes it possible to # easier add new variables in context processors without breaking # existing views. context.update(orig_ctx)
def preprocess_request(self): """Called before the actual request dispatching and will call every as :meth:`before_request` decorated function. If any of these function returns a value it's handled as if it was the return value from the view and further request handling is stopped. This also triggers the :meth:`url_value_processor` functions before the actual :meth:`before_request` functions are called. """ bp = _request_ctx_stack.top.request.blueprint funcs = self.url_value_preprocessors.get(None, ()) if bp is not None and bp in self.url_value_preprocessors: funcs = chain(funcs, self.url_value_preprocessors[bp]) for func in funcs: func(request.endpoint, request.view_args) funcs = self.before_request_funcs.get(None, ()) if bp is not None and bp in self.before_request_funcs: funcs = chain(funcs, self.before_request_funcs[bp]) for func in funcs: rv = func() if rv is not None: return rv
def process_response(self, response): """Can be overridden in order to modify the response object before it's sent to the WSGI server. By default this will call all the :meth:`after_request` decorated functions. .. versionchanged:: 0.5 As of Flask 0.5 the functions registered for after request execution are called in reverse order of registration. :param response: a :attr:`response_class` object. :return: a new response object or the same, has to be an instance of :attr:`response_class`. """ ctx = _request_ctx_stack.top bp = ctx.request.blueprint funcs = ctx._after_request_functions if bp is not None and bp in self.after_request_funcs: funcs = chain(funcs, reversed(self.after_request_funcs[bp])) if None in self.after_request_funcs: funcs = chain(funcs, reversed(self.after_request_funcs[None])) for handler in funcs: response = handler(response) if not self.session_interface.is_null_session(ctx.session): self.save_session(ctx.session, response) return response
def retry(delays=(0, 1, 5, 30, 180, 600, 3600), exception=Exception, report=lambda *args: None): def wrapper(function): def wrapped(*args, **kwargs): problems = [] for delay in itertools.chain(delays, [ None ]): try: return function(*args, **kwargs) except exception as problem: problems.append(problem) if delay is None: report("retryable failed definitely:", problems) raise else: report("retryable failed:", problem, "-- delaying for %ds" % delay) time.sleep(delay) return wrapped return wrapper
def abstract_brackets(formula, variables_re=''): lwt = split_formula(formula) new_variables = {} while lwt: substitute = no_re_matches(combine_re_expressions( itertools.chain((variables_re,), new_variables.keys()) )) formula = lwt['leading'] + substitute + lwt['trailing'] new_variables[substitute] = lwt['within'] lwt = split_formula(formula) if formula in new_variables.keys(): # incase of extranous brackets return abstract_brackets(new_variables[formula], variables_re) # return [formula, new_variables] return namedtuple('abstract_brackets', ('formula', 'new_variables'))(formula, new_variables) # splits formula into 2 parts (leading and trailing) where the operator (from settings.order_of_operations) with the lowest priority is # if there are no operators in formula returns None