我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用itertools.zip_longest()。
def tabulate(vals): # From pfmoore on GitHub: # https://github.com/pypa/pip/issues/3651#issuecomment-216932564 assert len(vals) > 0 sizes = [0] * max(len(x) for x in vals) for row in vals: sizes = [max(s, len(str(c))) for s, c in zip_longest(sizes, row)] result = [] for row in vals: display = " ".join([str(c).ljust(s) if c is not None else '' for s, c in zip_longest(sizes, row)]) result.append(display) return result, sizes
def merged(self, s, t): chars = [] for c1, c2 in zip_longest(s.sequence, t.sequence): if c1 is None: c = c2 elif c2 is None: c = c1 elif c1 == 'N': c = c2 elif c2 == 'N': c = c1 elif c1 != c2: return None else: assert c1 == c2 c = c1 chars.append(c) seq = ''.join(chars) requested = s.requested or t.requested name = s.name + ';' + t.name # take union of groups group = pd.concat([s.group, t.group]).groupby(level=0).last() return SiblingInfo(seq, requested, name, group)
def load_from_excel(self, file, click_type='callback_data', default_click='default_blank_callback'): buttons = [] wb = open_workbook(file, formatting_info=True) sheet = wb.sheet_by_name("Sheet1") print('Reading keyboard from:', file) for col in range(sheet.ncols): text = data = '' buttons.append([]) for row in range(sheet.nrows): cell = sheet.cell(row, col) fmt = wb.xf_list[cell.xf_index] border = fmt.border has_bottom = bool(border.bottom_line_style) if not has_bottom: text = str(cell.value) else: data = str(cell.value) if data and text: buttons[col].append({'text': text, click_type: data}) else: buttons[col].append({'text': data, click_type: default_click}) text = '' data = '' if not has_bottom and text: raise ExcelNoBottomException('Cell({0},{1}) has no bottom border.'.format(row, col)) # Flip columns and rows buttons = list(map(list, itertools.zip_longest(*buttons))) buttons = [[button for button in row if button is not None] for row in buttons] self.keyboard['inline_keyboard'] = buttons
def pad_token_sequence(self, tokens: List[List[int]], desired_num_tokens: int, padding_lengths: Dict[str, int]) -> List[List[int]]: # Pad the tokens. padded_tokens = pad_sequence_to_length(tokens, desired_num_tokens, default_value=self.get_padding_token) # Pad the characters within the tokens. desired_token_length = padding_lengths['num_token_characters'] longest_token: List[int] = max(tokens, key=len, default=[]) padding_value = 0 if desired_token_length > len(longest_token): # Since we want to pad to greater than the longest token, we add a # "dummy token" so we can take advantage of the fast implementation of itertools.zip_longest. padded_tokens.append([padding_value] * desired_token_length) # pad the list of lists to the longest sublist, appending 0's padded_tokens = list(zip(*itertools.zip_longest(*padded_tokens, fillvalue=padding_value))) if desired_token_length > len(longest_token): # Removes the "dummy token". padded_tokens.pop() # Truncates all the tokens to the desired length, and return the result. return [list(token[:desired_token_length]) for token in padded_tokens]
def messages_equal(self, res1, res2): """Compare messages of two FTL resources. Uses FTL.BaseNode.equals to compare all messages in two FTL resources. If the order or number of messages differ, the result is also False. """ def message_id(message): "Return the message's identifer name for sorting purposes." return message.id.name messages1 = sorted( (entry for entry in res1.body if isinstance(entry, FTL.Message)), key=message_id) messages2 = sorted( (entry for entry in res2.body if isinstance(entry, FTL.Message)), key=message_id) for msg1, msg2 in zip_longest(messages1, messages2): if msg1 is None or msg2 is None: return False if not msg1.equals(msg2): return False return True
def _gen_dup_trinary_alloy(self, sp1, n1, sp2, n2): init_numbers = self.init_cell.numbers isp1 = sp1.Z isp2 = sp2.Z sp_ind_origin = [i for i, s in enumerate(init_numbers)] for sp1_comb_index in combinations(sp_ind_origin, n1): sp_ind_bin = [x for x in sp_ind_origin if x not in sp1_comb_index] for sp2_comb_index in combinations(sp_ind_bin, n2): numbers = init_numbers.copy() for i1, i2 in zip_longest(sp1_comb_index, sp2_comb_index): if i1 is not None: numbers[i1] = isp1 if i2 is not None: numbers[i2] = isp2 yield GeneralCell(self.lattice, self.positions, numbers) # pdb.set_trace()
def binary_sum(n, n2): """n and n2 are non negative binary numbers with arbitrary len. Ex: '00010101001010101010101010101010101001010101010101010' O(max(n, n2)) """ n = less_to_great_significant_digit(n) n2 = less_to_great_significant_digit(n2) last_d_sum = 0 result = deque() for d, d2 in zip_longest(n, n2, fillvalue=0): d_sum = last_d_sum + d + d2 last_d_sum = 0 if d_sum < 2 else 1 result.appendleft(str(d_sum % 2)) if last_d_sum == 1: result.appendleft('1') return ''.join(result)
def format_desc(desc, ref_desc, use_colour): """ Return potentially colourised string list of descriptor features. """ desc, ref_desc = desc_to_str(desc), desc_to_str(ref_desc) final_string = "" for string, ref_string in zip(desc.split(","), ref_desc.split(",")): for char, ref_char in itertools.zip_longest(string, ref_string): # If a character in the string is identical to the reference # string, we highlight it in red. That makes it easier to visually # spot similarities in the descriptors. if (char == ref_char) and use_colour: final_string += termcolor.colored(char, "red") else: final_string += char if char is not None else " " final_string += "," return final_string.split(",")
def _get_merged_reads_sequence(self, merged_reads: MergedReads): if len(merged_reads.reads) != len(merged_reads.prefix_lengths)+1: raise ValueError( "Invalid `MergedReads` data structure, not enough information " "on read prefix lengths." ) sequence_parts = [] for read, prefix_len in zip_longest(merged_reads.reads, merged_reads.prefix_lengths): sequence = self.get_sequence(read) if prefix_len: sequence_parts.append(sequence[:prefix_len]) else: sequence_parts.append(sequence) return b"".join(sequence_parts)
def privmsg(self, chan, msg): """ Send a message to a chan or an user """ if not msg: return if not isinstance(msg, str): msg = msg.decode("utf-8") for m in ("".join(itertools.takewhile(lambda x: x, a)) for a in itertools.zip_longest(*([iter(msg)] * MAX_MSG_LENGTH))): print(m) if chan in self.taemin.chans: self.taemin.create_pub_message(self.taemin.name, chan, m) else: self.taemin.create_priv_message(self.taemin.name, chan, m) self.taemin.connection.privmsg(chan, m)
def get_unmapped_parameters(packet, packet_mapping): """ Get the UID of parameters that are unused or additionally mapped. Returns two lists of (uid, positionInPacket) """ unresolved = [] additional = [] packet_parameters = packet.get_parameters_as_flattened_list() position = 0 for parameter, mapping in itertools.zip_longest(packet_parameters, packet_mapping.parameters): if parameter is None: additional.append((mapping.parameter.uid, position)) elif mapping is None: unresolved.append((parameter.uid, position)) elif parameter.uid != mapping.parameter.uid: # Found a parameter which is in the structure but not the # mapping. unresolved.append((parameter.uid, position)) additional.append((mapping.parameter.uid, position)) position += 1 return unresolved, additional
def _get_partial_suggestion(self, packet, packet_mapping): unresolved, additional = self.model_validator.get_unmapped_parameters(packet, packet_mapping) if len(unresolved) > 0 or len(additional) > 0: parameters = [] last_sid = "" for parameter, mapping in itertools.zip_longest(packet.get_parameters_as_flattened_list(), packet_mapping.parameters): if parameter is not None: if last_sid != "": sid_proposition = last_sid[0:4] \ + "%04i" % (int(last_sid[4:].lstrip('0')) + 1) else: sid_proposition = "" sid = mapping.sid if mapping is not None else sid_proposition last_sid = sid parameters.append({ "uid": parameter.uid, "sid": sid, }) return { "uid": packet.uid, "parameters": parameters, } else: return None
def __str__(self): argspec = inspect.getargspec(self.__init__) args = argspec.args defaults = argspec.defaults or [] joined = reversed(list(izip_longest(reversed(args), reversed(defaults), fillvalue=_SENTINEL))) next(joined) # Skip self values = [] skipped = False for attribute, default in joined: value = getattr(self, attribute) if value == default: skipped = True continue if skipped: values.append('{}={}'.format(attribute, repr(value))) else: values.append(repr(value)) return ', '.join(values)
def hamming(a, b): """Compute the hamming distance between 2 int. :param a: a 64 bits integer :param b: a 64 bits integer :type a: int :type b: int :return: the hamming distance between a, b :rtype: int """ a = bin(a)[2:][::-1] b = bin(b)[2:][::-1] it = itertools.zip_longest(a, b, fillvalue='0') return sum([va != vb for (va, vb) in it])
def assert_iterators_equal(self, xs, ys, test_id, limit=None): # check that an iterator xs matches the expected results ys, # up to a given limit. if limit is not None: xs = itertools.islice(xs, limit) ys = itertools.islice(ys, limit) sentinel = object() pairs = itertools.zip_longest(xs, ys, fillvalue=sentinel) for i, (x, y) in enumerate(pairs): if x == y: continue elif x == sentinel: self.fail('{}: iterator ended unexpectedly ' 'at position {}; expected {}'.format(test_id, i, y)) elif y == sentinel: self.fail('{}: unexpected excess element {} at ' 'position {}'.format(test_id, x, i)) else: self.fail('{}: wrong element at position {};' 'expected {}, got {}'.format(test_id, i, y, x))
def _create_argument_values_that_must_be_files_or_dirs(params): """ Loop over test_params and create temporary files for training/validation sources/targets. """ def grouper(iterable, n, fillvalue=None): "Collect data into fixed-length chunks or blocks" args = [iter(iterable)] * n return zip_longest(fillvalue=fillvalue, *args) params = params.split() regular_files_params = {'-vs', '-vt', '-t', '-s', '--source', '--target', '--validation-source', '--validation-target'} folder_params = {'--prepared-data', '-d'} to_unlink = set() for arg, val in grouper(params, 2): if arg in regular_files_params and not os.path.isfile(val): open(val, 'w').close() to_unlink.add(val) if arg in folder_params: os.mkdir(val) to_unlink.add(val) return to_unlink
def signature(function): """Return the signature of the given function (possibly without variable positional and keyword arguments) as a string. If available, the result should be determined by function :func:`~inspect.signature` of module :mod:`inspect` (Python 3). If something wrents wrong, a less general costum made string is returned (Python 2). """ try: return str(inspect.signature(function)) except BaseException: argspec = inspect.getargspec(function) args = argspec.args if argspec.args else [] defaults = argspec.defaults if argspec.defaults else [] strings = [] for arg, default in zip_longest(reversed(args), reversed(defaults)): if default is None: strings.insert(0, arg) else: strings.insert(0, '%s=%s' % (arg, default)) return '(%s)' % ', '.join(strings)
def grouper(iterable, n): """Collect data into fixed-length chunks or blocks. If len(iterable) % n != 0, the last chunk is simply cut. Example: grouper('ABCDEFG', 3) -> ABC DEF G Args: iterable: Any iterable object. n: The length of the chunks. Returns: An iterator that returns the chunks. """ args = [iter(iterable)] * n groups = zip_longest(*args, fillvalue=None) return (filter(lambda el: el is not None, group) for group in groups)
def __eq__(self, other): ''' >>> viewabledict({}) == viewabledict({}) True >>> viewabledict({}) != viewabledict({1:1}) True >>> viewabledict({1:1,3:3}) != viewabledict({1:1,2:2}) True >>> viewabledict({1:1,2:2}) + viewabledict({3:3}) == viewabledict({1:1,2:2,3:3}) True >>> viewabledict({}) + viewabledict({3:3}) == viewabledict({3:3}) True ''' if not isinstance(other, Mapping): return False return all(a == b for a, b in zip_longest( self.iteritems(), other.iteritems(), fillvalue=_NO_VAL))
def next(self): """ Yields the next row from the source files. """ for self._filename in self._filenames: self._open() for row in self._csv_reader: self._row_number += 1 yield dict(zip_longest(self._fields, row, fillvalue='')) self._close() self._row_number = -1 self._filename = None raise StopIteration # ------------------------------------------------------------------------------------------------------------------
def from_array(cls, array, xalign=0.5, yalign=0.5, padding=0, bg=0): """Create an image from an array of images.""" if not non_string_iterable(xalign): xalign = [xalign] * max(len(r) for r in array) if not non_string_iterable(yalign): yalign = [yalign] * len(array) align = [[Alignment((xalign[c], yalign[r])) for c,_ in enumerate(row)] for r,row in enumerate(array)] padding = Padding(padding) heights = [max(img.height if img is not None else 0 for img in row) + padding.y for row in array] widths = [max(img.width if img is not None else 0 for img in column) + padding.x for column in zip_longest(*array)] aimg = Image.new("RGBA", (sum(widths), sum(heights)), bg) for r,row in enumerate(array): for c,img in enumerate(row): if img is None: continue x = sum(widths[0:c]) + padding.l + int(align[r][c].x * (widths[c] - (img.width + padding.x))) y = sum(heights[0:r]) + padding.u + int(align[r][c].y * (heights[r] - (img.height + padding.y))) aimg.overlay(img, (x,y)) return aimg