我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用itertools.izip_longest()。
def tabulate(vals): # From pfmoore on GitHub: # https://github.com/pypa/pip/issues/3651#issuecomment-216932564 assert len(vals) > 0 sizes = [0] * max(len(x) for x in vals) for row in vals: sizes = [max(s, len(str(c))) for s, c in zip_longest(sizes, row)] result = [] for row in vals: display = " ".join([str(c).ljust(s) if c is not None else '' for s, c in zip_longest(sizes, row)]) result.append(display) return result, sizes
def format_metric_table_line(metrics, metrics_per_line, fillvalue=""): # the final result is a string, ready to add in between <TD></TD> metric_table_line = "" # we use izip_longest to iterate through 'metrics' by 'n' elements at a # time (in this case 'mettrics_per_line') izip_longest_args = [iter(metrics)] * metrics_per_line for metric_line in izip_longest(*izip_longest_args, fillvalue=fillvalue): for metric_str in metric_line: # append the metric to the metric table line metric_table_line += strip_metric_str(str(metric_str)) + "," # add a line break to the table line metric_table_line = metric_table_line.rstrip(",") + "<BR/>" return(metric_table_line.rstrip("<BR/>"))
def iter_by_qname(in_genome_bam, in_trimmed_bam): # Iterate through multiple BAMs by qname simultaneously # Assume the trimmed-read-bam has every qname in the genome bam, in the same order. genome_bam_iter = itertools.groupby(in_genome_bam, key=lambda read: read.qname) if in_trimmed_bam is None: trimmed_bam_iter = iter(()) else: trimmed_bam_iter = itertools.groupby(in_trimmed_bam, key=lambda read: read.qname) for (genome_qname, genome_reads), trimmed_tuple in itertools.izip_longest(genome_bam_iter, trimmed_bam_iter): trimmed_qname, trimmed_reads = trimmed_tuple or (None, []) genome_reads = list(genome_reads) trimmed_reads = list(trimmed_reads) assert (in_trimmed_bam is None) or trimmed_qname == genome_qname yield (genome_qname, genome_reads, trimmed_reads)
def split(args): chunk_mem_gb = cr_utils.get_mem_gb_request_from_barcode_whitelist(args.barcode_whitelist) join_mem_gb = cr_utils.get_mem_gb_request_from_barcode_whitelist(args.barcode_whitelist, args.gem_groups) chunks = [] for chunk_genome_input, chunk_trimmed_input, gem_group in itertools.izip_longest( args.genome_inputs, args.trimmed_inputs or [], args.gem_groups): chunks.append({ 'chunk_genome_input': chunk_genome_input, 'chunk_trimmed_input': chunk_trimmed_input, 'gem_group': gem_group, '__mem_gb': chunk_mem_gb, }) join = { '__mem_gb': join_mem_gb, } return {'chunks': chunks, 'join': join}
def _domain_match(test, compare): test = test.lower() compare = compare.lower() if '*' not in compare: return test == compare c = compare.split('.')[::-1] if '**' in c and (c.count('**') > 1 or not compare.startswith('**')): raise ValidationError( 'Only 1 ** is allowed, and must start the domain.') t = test.split('.')[::-1] z = itertools.izip_longest(c, t) for c, t in z: if c == t: continue elif c == '*': continue elif c == '**': return True return False # Got all the way through and everything matched. return True
def print_options(events): """ Print available aggregration options (headers, context, tags etc.) :param events: list of events from which gather attributes :type: list """ headers = get_keys('headers', events) context = get_keys('context', events) params = get_keys('params', events) variables = get_keys('vars', events) tags = get_keys('tags', events) table = Table(['Headers', 'Context', 'Params', 'Vars', 'Tags']) map(table.add_row, izip_longest(headers, context, params, variables, tags, fillvalue='')) print table
def messages_equal(self, res1, res2): """Compare messages of two FTL resources. Uses FTL.BaseNode.equals to compare all messages in two FTL resources. If the order or number of messages differ, the result is also False. """ def message_id(message): "Return the message's identifer name for sorting purposes." return message.id.name messages1 = sorted( (entry for entry in res1.body if isinstance(entry, FTL.Message)), key=message_id) messages2 = sorted( (entry for entry in res2.body if isinstance(entry, FTL.Message)), key=message_id) for msg1, msg2 in zip_longest(messages1, messages2): if msg1 is None or msg2 is None: return False if not msg1.equals(msg2): return False return True
def addTableForModel(self, namespace, category, fieldids, fieldnames=None, fieldvalues=None, filters=None, nativequery=None, selectable=False): """ @param namespace: namespace of the model @param cateogry: cateogry of the model @param fieldids: list of str pointing to the fields of the dataset @param fieldnames: list of str showed in the table header if ommited fieldids will be used @param fieldvalues: list of items resprenting the value of the data can be a callback """ fields = [] fieldnames = fieldnames or [] fieldvalues = fieldvalues or [] for fieldid, fieldname, fieldvalue in itertools.izip_longest(fieldids, fieldnames, fieldvalues): if fieldname is None: fieldname = fieldid if fieldvalue is None: fieldvalue = fieldid fields.append({'id': fieldid, 'value': fieldvalue, 'name': fieldname}) key = j.apps.system.contentmanager.extensions.datatables.storInCache(fields=fields, filters=filters, nativequery=nativequery) url = "/restmachine/system/contentmanager/modelobjectlist?namespace=%s&category=%s&key=%s" % (namespace, category, key) tableid = 'table_%s_%s' % (namespace, category) return self.addTableFromURLFields(url, fields, tableid, selectable)
def __str__(self): argspec = inspect.getargspec(self.__init__) args = argspec.args defaults = argspec.defaults or [] joined = reversed(list(izip_longest(reversed(args), reversed(defaults), fillvalue=_SENTINEL))) next(joined) # Skip self values = [] skipped = False for attribute, default in joined: value = getattr(self, attribute) if value == default: skipped = True continue if skipped: values.append('{}={}'.format(attribute, repr(value))) else: values.append(repr(value)) return ', '.join(values)
def compare(cls, ver1, ver2): # Versions are equal if ver1 == ver2: return 0 for part1, part2 in izip_longest(ver1.split('.'), ver2.split('.')): # One of the parts is None if part1 is None or part2 is None: return cls.compare_none(part1, part2) for sub1, sub2 in izip_longest(part1.split('-'), part2.split('-')): # One of the sub parts is None if sub1 is None or sub2 is None: # Sub parts are different, because one have a # value and the other not. return cls.compare_none(sub1, sub2) # Both sub parts have a value result = cls.compare_sub_parts(sub1, sub2) if result: # Sub parts are not equal return result
def assertQueries(self, *prefixes): "Assert the correct queries are efficiently executed for a block." debug = connection.use_debug_cursor connection.use_debug_cursor = True count = len(connection.queries) yield if type(prefixes[0]) == int: assert prefixes[0] == len(connection.queries[count:]) else: for prefix, query in itertools.izip_longest(prefixes, connection.queries[count:]): assert prefix and query and query['sql'].startswith(prefix), (prefix, query) connection.use_debug_cursor = debug
def assert_iterators_equal(self, xs, ys, test_id, limit=None): # check that an iterator xs matches the expected results ys, # up to a given limit. if limit is not None: xs = itertools.islice(xs, limit) ys = itertools.islice(ys, limit) sentinel = object() pairs = itertools.izip_longest(xs, ys, fillvalue=sentinel) for i, (x, y) in enumerate(pairs): if x == y: continue elif x == sentinel: self.fail('{}: iterator ended unexpectedly ' 'at position {}; expected {}'.format(test_id, i, y)) elif y == sentinel: self.fail('{}: unexpected excess element {} at ' 'position {}'.format(test_id, x, i)) else: self.fail('{}: wrong element at position {};' 'expected {}, got {}'.format(test_id, i, y, x))
def computeLevenhsteinDistance(b1,o1,b2,o2,isRead1,isRead2): counter = 0 distance = 0 for i in itertools.izip_longest(b1,o1,b2,o2,isRead1,isRead2): # check if read/write matches if i[4] == i[5]: #check if block IDs match if i[0] != i[2]: distance = distance + 1 #check if offsets match if i[1] != i[3]: distance = distance + 1 else: distance = distance + 2 counter = counter + 1 # return normalized distance return distance/float(counter*2) # mix block IDs. Assign block IDs from Ngram1 to Ngram2