我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用itertools.groupby()。
def update_bibs_in(grouped_bibs, db_abbrev): actions = { "y": lambda items: [update_in(bibs, db_abbrev) for bibs in items], "m": lambda items: [manual_update_in(bibs, db_abbrev) for bibs in items], "n": lambda items: items } print("\n ") action = input("Abbreviate everthing?" + "y(yes, automatic)/m(manual)/n(do nothing)") grouped_bibs.sort(key=operator.itemgetter('journal')) grouped_by_journal = [] for key, items in groupby(grouped_bibs, lambda i: i["journal"]): grouped_by_journal.append(list(items)) if action in ("y", "m", "n"): updated_bibs = actions.get(action)(grouped_by_journal) else: return update_bibs_in(grouped_bibs, db_abbrev) updated_bibs = reduce(lambda a, b: a+b, updated_bibs) return updated_bibs
def _build_tree(index, indices, level=0): ''' Build a tree of IndexNode that is a tree representtion of pandas multi-index ''' grps = groupby(indices, key=lambda x: x[0]) nodes = [] for k, g in grps: g = list(g) if len(g[0]) == 1: # leaf node for i in g: nodes.append(IndexNode(value=index.levels[level][i[0]])) else: next_level = [i[1:] for i in g] children = IndexNode._build_tree( index, next_level, level + 1) parent = IndexNode(value=index.levels[level][k]) parent.add_children(children) nodes.append(parent) return nodes
def iter_by_qname(in_genome_bam, in_trimmed_bam): # Iterate through multiple BAMs by qname simultaneously # Assume the trimmed-read-bam has every qname in the genome bam, in the same order. genome_bam_iter = itertools.groupby(in_genome_bam, key=lambda read: read.qname) if in_trimmed_bam is None: trimmed_bam_iter = iter(()) else: trimmed_bam_iter = itertools.groupby(in_trimmed_bam, key=lambda read: read.qname) for (genome_qname, genome_reads), trimmed_tuple in itertools.izip_longest(genome_bam_iter, trimmed_bam_iter): trimmed_qname, trimmed_reads = trimmed_tuple or (None, []) genome_reads = list(genome_reads) trimmed_reads = list(trimmed_reads) assert (in_trimmed_bam is None) or trimmed_qname == genome_qname yield (genome_qname, genome_reads, trimmed_reads)
def groupby(self, keys, contiguous=False): try: keys = listwrap(keys) get_key = jx_expression_to_function(keys) if not contiguous: data = sorted(self.data, key=get_key) def _output(): for g, v in itertools.groupby(data, get_key): group = Data() for k, gg in zip(keys, g): group[k] = gg yield (group, wrap(list(v))) return _output() except Exception as e: Log.error("Problem grouping", e)
def segments(self, precision=0): """Return a list of segments, each segment is ended by a MoveTo. A segment is a list of Points""" ret = [] # group items separated by MoveTo for moveTo, group in itertools.groupby(self.items, lambda x: isinstance(x, MoveTo)): # Use only non MoveTo item if not moveTo: # Generate segments for each relevant item seg = [x.segments(precision) for x in group] # Merge all segments into one ret.append(list(itertools.chain.from_iterable(seg))) return ret
def hamming_numbers(): # Generate "5-smooth" numbers, also called "Hamming numbers" # or "Regular numbers". See: http://en.wikipedia.org/wiki/Regular_number # Finds solutions to 2**i * 3**j * 5**k for some integers i, j, and k. def deferred_output(): 'Works like a forward reference to the "output" global variable' for i in output: yield i result, p2, p3, p5 = tee(deferred_output(), 4) # split the output streams m2 = (2*x for x in p2) # multiples of 2 m3 = (3*x for x in p3) # multiples of 3 m5 = (5*x for x in p5) # multiples of 5 merged = merge(m2, m3, m5) combined = chain([1], merged) # prepend starting point output = (k for k, v in groupby(combined)) # eliminate duplicates return result
def map_reduce(i, mapper, reducer): """ map_reduce?? :param i: ??MapReduce??? :param mapper: ???mapper?? :param reducer: ???reducer?? :return: ????reducer?????????????? """ intermediate = [] # ?????(intermediate_key, intermediate_value) for (key, value) in i.items(): intermediate.extend(mapper(key, value)) # sorted????????list???list?????????tuple?key????tuple???????? # groupby???????????????????,key????tuple????????????????? # ??????groupby???key?intermediate_key??group??list??1???? # ????intermediate_key?(intermediate_key, intermediate_value) groups = {} for key, group in itertools.groupby(sorted(intermediate, key=lambda im: im[0]), key=lambda x: x[0]): groups[key] = [y for x, y in group] # groups???????key??????intermediate_key?value?????intermediate_key?intermediate_value # ??????? return [reducer(intermediate_key, groups[intermediate_key]) for intermediate_key in groups]
def nan_interpolate(df): """ Reference: https://stackoverflow.com/questions/29007830/identifying-consecutive-nans-with-pandas """ sum_nan = df.isnull().sum() df_null_int = df.isnull().astype(int) for col in df.columns: max_run = df[col].isnull().astype(int).groupby(df[col].notnull().astype(int).cumsum()).sum() if sum_nan[col]: # BELOW IS BROKEN!!! pass # logger.warning('column {} has {} NaNs ({} max consecutive run)'.format(col, # sum_nan[col], # max_run)) df.interpolate(inplace=True) return df
def _output_triples(self): item = None for s, group in groupby(self, self._group_key): assert s is not None, "subject not defined" if item is None: pass elif isinstance(item, tuple): yield " .\n\n" elif isinstance(item, Node): yield "\n\n" item = next(group) if isinstance(item, tuple): s, p, o = item yield "%s %s %s" % (s, p, escape_any(o)) for _, p, o in group: assert p is not None, "predicate not defined" if o is None: continue yield " ;\n" yield " %s %s" % (p, escape_any(o)) elif isinstance(item, Node): yield str(item) if isinstance(item, tuple): yield " ."
def _order_refalt_lexicographically(self, variants): # Also assert that chrom and pos are in order cp_groups = itertools.groupby(variants, key=lambda v:(v['chrom'], v['pos'])) prev_chrom_index, prev_pos = -1, -1 for cp, tied_variants in cp_groups: chrom_index = self._get_chrom_index(cp[0]) if chrom_index < prev_chrom_index: raise PheWebError( "The chromosomes in your file appear to be in the wrong order.\n" + "The required order is: {!r}\n".format(chrom_order_list) + "But in your file, the chromosome {!r} came after the chromosome {!r}\n".format( cp[0], chrom_order_list[prev_chrom_index])) if chrom_index == prev_chrom_index and cp[1] < prev_pos: raise PheWebError( "The positions in your file appear to be in the wrong order.\n" + "In your file, the position {!r} came after the position {!r} on chromsome {!r}\n".format( cp[1], prev_pos, cp[0])) prev_chrom_index, prev_pos = chrom_index, cp[1] for v in sorted(tied_variants, key=lambda v:(v['ref'], v['alt'])): yield v
def __register__(cls, module_name): pool = Pool() Property = pool.get('ir.property') TableHandler = backend.get('TableHandler') cursor = Transaction().cursor table = cls.__table__() super(Party, cls).__register__(module_name) table_h = TableHandler(cursor, cls, module_name) if table_h.column_exist('lang'): cursor.execute(*table.select(table.id, table.lang, order_by=table.lang)) for lang_id, group in groupby(cursor.fetchall(), lambda r: r[1]): ids = [id_ for id_, _ in group] if lang_id is not None: value = '%s,%s' % (cls.lang.model_name, lang_id) else: value = None Property.set('lang', cls.__name__, ids, value) table_h.drop_column('lang')
def _unit_compute(cls, taxes, price_unit, date): res = [] for _, group_taxes in groupby(taxes, key=cls._group_taxes): unit_price_variation = 0 for tax in group_taxes: start_date = tax.start_date or datetime.date.min end_date = tax.end_date or datetime.date.max if not (start_date <= date <= end_date): continue if tax.type != 'none': value = tax._process_tax(price_unit) res.append(value) if tax.update_unit_price: unit_price_variation += value['amount'] if len(tax.childs): res.extend(cls._unit_compute(tax.childs, price_unit, date)) price_unit += unit_price_variation return res
def _reverse_unit_compute(cls, price_unit, taxes, date): rate, amount = 0, 0 update_unit_price = False unit_price_variation_amount = 0 unit_price_variation_rate = 0 for _, group_taxes in groupby(taxes, key=cls._group_taxes): group_taxes = list(group_taxes) g_rate, g_amount = cls._reverse_rate_amount(group_taxes, date) if update_unit_price: g_amount += unit_price_variation_amount * g_rate g_rate += unit_price_variation_rate * g_rate g_update_unit_price = any(t.update_unit_price for t in group_taxes) update_unit_price |= g_update_unit_price if g_update_unit_price: unit_price_variation_amount += g_amount unit_price_variation_rate += g_rate rate += g_rate amount += g_amount return (price_unit - amount) / (1 + rate)
def check_period_closed(cls, moves): Period = Pool().get('stock.period') for company, moves in groupby(moves, lambda m: m.company): periods = Period.search([ ('state', '=', 'closed'), ('company', '=', company.id), ], order=[('date', 'DESC')], limit=1) if periods: period, = periods for move in moves: date = (move.effective_date if move.effective_date else move.planned_date) if date and date < period.date: cls.raise_user_error('period_closed', { 'move': move.rec_name, 'period': period.rec_name, })
def get_sessions(users, name): Session = Pool().get('ir.session') now = datetime.datetime.now() timeout = datetime.timedelta( seconds=config.getint('session', 'timeout')) result = dict((u.id, 0) for u in users) with Transaction().set_user(0): for sub_ids in grouped_slice(users): sessions = Session.search([ ('create_uid', 'in', sub_ids), ], order=[('create_uid', 'ASC')]) def filter_(session): timestamp = session.write_date or session.create_date return abs(timestamp - now) < timeout result.update(dict((i, len(list(g))) for i, g in groupby(ifilter(filter_, sessions), attrgetter('create_uid.id')))) return result
def get_action(cls, menus, name): pool = Pool() actions = dict((m.id, None) for m in menus) with Transaction().set_context(active_test=False): menus = cls.browse(menus) action_keywords = sum((list(m.action_keywords) for m in menus), []) key = lambda k: k.action.type action_keywords.sort(key=key) for type, action_keywords in groupby(action_keywords, key=key): action_keywords = list(action_keywords) for action_keyword in action_keywords: model = action_keyword.model actions[model.id] = '%s,-1' % type Action = pool.get(type) action2keyword = {k.action.id: k for k in action_keywords} with Transaction().set_context(active_test=False): factions = Action.search([ ('action', 'in', action2keyword.keys()), ]) for action in factions: model = action2keyword[action.id].model actions[model.id] = str(action) return actions
def attempt_naive_pov(self): p1 = self._find_naive_leaks() p2 = self._find_naive_leaks() leaked = dict() for si in p1: if si in p2: li = list(set(p2[si]).intersection(set(p1[si]))) if len(li) > 0: for lb in li: leaked[lb] = si # find four contiguous consecutive_groups = [ ] for _, g in groupby(enumerate(sorted(leaked)), lambda (i,x):i-x): consecutive_groups.append(map(itemgetter(1), g))
def get_largest_consecutive(self): # extra work here because we need to be confident about the bytes ss = self.state.copy() ss.add_constraints(self.minimized_ast == ss.se.BVV(ss.se.eval(self.minimized_ast, cast_to=str))) leaked_bytes = [ ] for byte in self.possibly_leaked_bytes: if self._confident_byte(ss, byte): leaked_bytes.append(byte) leaked_bytes = sorted(set(leaked_bytes)) consec_bytes = [ ] # find consecutive leaked bytes for _, g in groupby(enumerate(leaked_bytes), lambda (i, x): i-x): consec_bytes.append(map(itemgetter(1), g))
def group_by_type(self, select_types: List[str] = None) -> 'EventGroupList': """ Groups events by type Attributes ---------- select_types A list of types for which to select groups in the resulting EventGroupList. If no types are specified, all resulting groups will be selected. Returns ------- An EventGroupList partitioned by type """ if select_types is None: select_types = [] groups = [EventList(list(group), end=self.end) for index, group in groupby(self, key=attrgetter('__class__'))] if not select_types: selected_groups = groups else: selected_groups = [group for group in groups if group.type in select_types] return EventGroupList(groups, selected=selected_groups)
def unique_for_country_code(self, country_code): shipping = self.filter( Q(country_code=country_code) | Q(country_code=ANY_COUNTRY)) shipping = shipping.order_by('shipping_method_id') shipping = shipping.values_list('shipping_method_id', 'id', 'country_code') grouped_shipping = groupby(shipping, itemgetter(0)) any_country = ANY_COUNTRY ids = [] for shipping_method_id, method_values in grouped_shipping: method_values = list(method_values) # if there is any country choice and specific one remove any country choice if len(method_values) == 2: method = [val for val in method_values if val[2] != any_country][0] else: method = method_values[0] ids.append(method[1]) return self.filter(id__in=ids)
def unique_for_country_code(self, country_code): shipping = self.filter( Q(country_code=country_code) | Q(country_code=ANY_COUNTRY)) shipping = shipping.order_by('shipping_method_id') shipping = shipping.values_list( 'shipping_method_id', 'id', 'country_code') grouped_shipping = groupby(shipping, itemgetter(0)) any_country = ANY_COUNTRY ids = [] for shipping_method_id, method_values in grouped_shipping: method_values = list(method_values) # if there is any country choice and specific one remove any # country choice if len(method_values) == 2: method = [val for val in method_values if val[2] != any_country][0] else: method = method_values[0] ids.append(method[1]) return self.filter(id__in=ids)
def create_intervaltrees(genes): genome = dict() file_handle = open(genes) next(file_handle) # skip header for chromosome, lines in groupby(file_handle, lambda l: l.split()[0]): chromosome_intervaltree = IntervalTree() for line in lines: start, end, region_type, _, name = line.split()[1:6] start, end = int(start), int(end) chromosome_intervaltree[start:end] = (start, name, region_type) genome[chromosome] = chromosome_intervaltree return genome
def getAttributesDeclarationXML(self) : """ generate attributes declaration XML """ # return lxml etree element allAttributesXML=[] if len(self)>0 : # iter on node and then edge atts for attClass,atts in self.iteritems() : # group by mode key_mode=lambda att : att["mode"] atts_sorted_by_mode=sorted(atts.values(),key=key_mode,reverse=True) for mode,atts in itertools.groupby(atts_sorted_by_mode,key_mode) : # generate on attributes by mode attributesXML = etree.Element("attributes") attributesXML.set("class",attClass) attributesXML.set("mode",mode) # generate attribute by id order for att in sorted(atts,key=lambda att: att["id"]) : attributeXML=etree.SubElement(attributesXML, "attribute") attributeXML.set("id",str(att["id"])) attributeXML.set("title",att["title"]) attributeXML.set("type",att["type"]) if att["defaultValue"] : etree.SubElement(attributeXML, "default").text=att["defaultValue"] allAttributesXML.append(attributesXML) return allAttributesXML
def ids_to_string(ids_list): """Converts lists of integer IDs to text""" sorted_ids = sorted(ids_list) ranges = [] for key, group in groupby(enumerate(sorted_ids), lambda x: x[0] - x[1]): group = list(map(itemgetter(1), group)) if len(group) > 1: ranges.append([group[0], group[-1]]) else: ranges.append(group[0]) parsed = [] for r in ranges: if isinstance(r, list): parsed.append('%d-%d' % (r[0], r[1])) else: parsed.append(str(r)) return '[%s]' % ','.join(parsed)
def update_request_states_from_pond_blocks(pond_blocks): '''Update the states of requests and user_requests given a set of recently changed pond blocks.''' blocks_with_tracking_nums = [pb for pb in pond_blocks if pb['molecules'][0]['tracking_num']] sorted_blocks_with_tracking_nums = sorted(blocks_with_tracking_nums, key=lambda x: x['molecules'][0]['tracking_num']) blocks_by_tracking_num = itertools.groupby(sorted_blocks_with_tracking_nums, lambda x: x['molecules'][0]['tracking_num']) now = timezone.now() states_changed = False for tracking_num, blocks in blocks_by_tracking_num: sorted_blocks_by_request = sorted(blocks, key=lambda x: x['molecules'][0]['request_num']) blocks_by_request_num = {int(k): list(v) for k, v in itertools.groupby(sorted_blocks_by_request, key=lambda x: x['molecules'][0]['request_num'])} user_request = UserRequest.objects.prefetch_related('requests').get(pk=tracking_num) ur_expired = user_request.max_window_time < now requests = user_request.requests.all() for request in requests: if request.id in blocks_by_request_num: states_changed |= update_request_state(request, blocks_by_request_num[request.id], ur_expired) states_changed |= update_user_request_state(user_request) return states_changed
def bestPairs(pairdata, reportall): best = [] for k,v in itertools.groupby(pairdata, key=lambda x: x['pair']): allcalls = list(v) nonNA = [r for r in allcalls if r['verdict'] != 'na'] if not nonNA: if reportall: best.append(allcalls[0]) continue verdict = nonNA[-1]['verdict'] if verdict == 'MZ twins': nonNA.sort(key=lambda x: (-round(x['MZp'], 1), x['percentile'])) elif verdict == 'Parent-child' or reportall: nonNA.sort(key=lambda x: (-round(x['POp'], 1), x['percentile'])) else: continue best.append(nonNA[0]) return pairwise_table(best)
def bestGenders(genderdata): best = [] sortfun = lambda x: (round(x['Xhetp'], 1), x['percentile']) for k,v in itertools.groupby(genderdata, key=lambda x: x['sample']): allcalls = list(v) noNA = [r for r in allcalls if r['gender'] != 'na'] if not noNA: best.append(allcalls[0]) continue verdicts = {r['gender'] for r in noNA} if 'Male' in verdicts and 'Female' in verdicts: # if both genders are called, choose the last one noQ = [r for r in noNA if r['gender'] != '?'] best.append(noQ[-1]) continue if verdicts == {'?'}: verd = '?' else: verd = list(verdicts.difference({'?'}))[0] use = sorted([r for r in noNA if r['gender'] == verd], key=sortfun) best.append(use[0]) return gender_table(best)
def convert(self, message, unit_values): converted = [(uv, system_convert(uv)) for uv in unit_values] output = ['{} = {}'.format(uv, conv) for uv, conv in converted] for t, uvs in groupby(converted, key=lambda uvs: type(uvs[0].unit)): values = list(map(lambda x: x[0], uvs)) if len(values) >= 2: summed = sum_units(*values) converted_summed = system_convert(summed) output.append( '{} total: {} = {}' .format(t.__name__.lower(), summed, converted_summed) ) await self.send_message( message.channel, 'Converted units\n{}' .format(f.code_block(output)), delete_after = 60 )
def edited_messages(self, message, user_id, count=10): with self.transaction() as trans: trans.execute(q.last_edited_logs, dict( author_id = user_id, limit = count )) results = trans.fetchall() grouped = groupby(results, key=lambda row: row[0]) messages = [ ' ? '.join([ '{}{}'.format(c[1], ' '.join(c[2])) for c in reversed(list(contents)) ]) for _, contents in grouped ] await self.send_message( message.channel, 'last **{}** edited messages from <@{}>:\n{}' .format(len(messages), user_id, '\n'.join(messages)), delete_after = 30 )
def list_voices(access_key, secret_key, voice_language, voice_gender): """List available Ivona voices""" try: ivona_api = IvonaAPI(access_key, secret_key) except (ValueError, IvonaAPIException) as e: raise click.ClickException("Something went wrong: {}".format(repr(e))) click.echo("Listing available voices...") voices_list = ivona_api.get_available_voices( language=voice_language, gender=voice_gender, ) # Group voices by language voices_dict = dict() data = sorted(voices_list, key=lambda x: x['Language']) for k, g in groupby(data, key=lambda x: x['Language']): voices_dict[k] = list(g) for ln, voices in voices_dict.items(): voice_names = [v['Name'] for v in voices] click.echo("{}: {}".format(ln, ', '.join(voice_names))) click.secho("All done", fg='green')
def get_forums(query_result, user): """Returns a tuple which contains the category and the forums as list. This is the counterpart for get_categories_and_forums and especially usefull when you just need the forums for one category. For example:: (<Category 2>, [(<Forum 3>, None), (<Forum 4>, None)]) :param query_result: A tuple (KeyedTuple) with all categories and forums :param user: The user object is needed because a signed out user does not have the ForumsRead relation joined. """ it = itertools.groupby(query_result, operator.itemgetter(0)) if user.is_authenticated: for key, value in it: forums = key, [(item[1], item[2]) for item in value] else: for key, value in it: forums = key, [(item[1], None) for item in value] return forums
def group_battles(self, cache): list = sorted(cache.values(), key=get_match_id) grouped = groupby(list, lambda x: x['match_id']) groups = [] for key, group in grouped: matches = [] battle = {} first = True for item in group: if first: first = False battle['guild'] = item['op_guild'] battle['type'] = item['type'] battle['match_id'] = item['match_id'] matches.append(item) battle['matches'] = matches groups.append(battle) return groups
def group_articles_by_status(articles): """ Group articles by publish status :param articles: Iterable of Article objects :returns: Iterable like itertools.groupby with a key as the publish_status and a list of articles for that status """ def status_key(a): if a.publish_status == PUBLISHED: cnt = 1 elif a.publish_status == IN_REVIEW: cnt = 2 elif a.publish_status == DRAFT: cnt = 3 else: cnt = 4 return cnt sorted_by_status = sorted(articles, key=status_key) return itertools.groupby(sorted_by_status, key=lambda a: a.publish_status)
def get_entry_objects(entry_list): """ Retrieve a list of (entry, data_obj) pairs. """ # Grouping these together like this just to minimize the number of calls # to get_database_table. author_table = dh.get_database_table('authors') for table_name, group in it.groupby(entry_list, key=lambda x: x.table): table = dh.get_database_table(table_name) for entry_obj in group: data_obj = table[entry_obj.data_id] # Retrieve the author objects as well author_objs = [author_table[author_id] for author_id in data_obj.author_ids] yield (entry_obj, data_obj, author_objs)
def natural_sort_key(cls, value): """ This is a sort key to do a "natural" lexographic sort, the string is broken up into segments of strings and numbers, so that, e.g. `'Str 2'` will be sorted before `'Str 15'`. :param value: The book name as it will be sorted. :return: Returns a book name tokenized such that it can be sorted. """ o = itertools.groupby(value, key=str.isdigit) o = ((k, ''.join(g)) for k, g in o) o = ((int(v) if k else v) for k, v in o) return tuple(o)
def listenSsl(self, site, ports, ignore=[]): privateKeyFile = open(self.options.sslPrivateKey, "r") privateKey = privateKeyFile.read() privateKeyFile.close() certificateFile = open(self.options.sslCertificate) certificate = certificateFile.read() certificateFile.close() import twisted.internet.ssl as ssl cert = ssl.PrivateCertificate.loadPEM(privateKey + certificate) contextFactory = cert.options() import itertools listenPorts = map(lambda x: x[0], itertools.groupby(sorted(ports))) for port in listenPorts: if port not in ignore: reactor.listenSSL(port, site, contextFactory)
def _generate_endpoints(self, targets): def _target_to_port(item): _, (listen_port, target_port) = item return {'port': target_port, 'name': str(listen_port)} port_with_addrs = [ (p, [e[0] for e in grp]) for p, grp in itertools.groupby( sorted(targets.items()), _target_to_port)] return { 'subsets': [ { 'addresses': [ { 'ip': ip, 'targetRef': { 'kind': k_const.K8S_OBJ_POD, 'name': ip, 'namespace': 'default' } } for ip in addrs ], 'ports': [port] } for port, addrs in port_with_addrs ] }
def __str__(self): lines = [] self.errors.sort(key=lambda e: e.order) for cls, errors_of_cls in groupby(self.errors, lambda e: e.__class__): lines.append(cls.head) lines.extend(e.body() for e in errors_of_cls) if lines: return '\n'.join(lines)
def validate_msdos(module, partitions): """Validate limitations of MSDOS partition table""" p_types = [p['type'] for p in partitions] # NOTE(pas-ha) no more than 4 primary if p_types.count('primary') > 4: module.fail_json("Can not create more than 4 primary partitions " "on a MSDOS partition table.") if 'extended' in p_types: # NOTE(pas-ha) only single extended if p_types.count('extended') > 1: module.fail_json("Can not create more than single extended " "partition on a MSDOS partition table.") allowed = ['primary', 'extended'] if 'logical' in p_types: allowed.append('logical') # NOTE(pas-ha) this produces list with subsequent duplicates # removed if [k for k, g in itertools.groupby(p_types)] != allowed: module.fail_json("Incorrect partitions order: for MSDOS, " "all primary, single extended, all logical") elif 'logical' in p_types: # NOTE(pas-ha) logical has sense only with extended module.fail_json("Logical partition w/o extended one on MSDOS " "partition table") # TODO(pas-ha) add more validation, e.g. # - add idempotency: first check the already existing partitions # and do not run anything unless really needed, and only what's needed # - if only change tags - use specific command # - allow fuzziness in partition sizes when alligment is 'optimal' # - estimate and validate available space # - support more units # - support negative units?
def render_template(docs, template): grouped_docs = {f: list(sorted(list(dl), key=lambda d: d['name'])) for f, dl in groupby(docs, lambda d: d['file'])} template = JINJA_ENV.get_template(template) rendered = template.render(documentation=grouped_docs, trim_blocks=True, lstrip_blocks=True) return rendered
def isPermutation(self, values): from itertools import groupby frequencies = [len(list(group)) for key, group in groupby(values)] print(frequencies) for frequency in frequencies: if frequency != 1: return False return True
def create_structure_ers_from_relations(relations): """This function gets structured entity relationship. Args: relations (list): List of (:class:`FieldPath` :class:`FieldPath`) Returns: Structured ER dict. For example: {'database_name': {'table_name': {'field_name': ['foreign_database_table_field']}} A way might be used is >>> print create_structure_ers_from_relations([(FieldPath('db', 'ac', 'id'), FieldPath('db', 'bc', 'id'))]) {'db': {'ac': {'id': ['db.bc.id']}, {'bc': {'id': ['db.ac.id']}}}} """ relations.extend([_[::-1] for _ in relations]) # add reverse relations = sorted(list(set([tuple(_) for _ in relations])), key=lambda _: _[0].db) # remove duplicate dbs = {} for db_key, tb_grp in groupby(relations, key=lambda _: _[0].db): # group by db name if db_key == '': continue tbs = {} for tb_key, fd_grp in groupby(sorted(list(tb_grp), key=lambda _: _[0].tb), key=lambda _: _[0].tb): fds = {} for fd_key, foreign_grp in groupby(sorted(list(fd_grp), key=lambda _: _[0].fd), key=lambda _: _[0].fd): fds[fd_key] = sorted([str(_[1]) for _ in list(foreign_grp)]) tbs[tb_key] = fds dbs[db_key] = tbs return dbs
def compute_largest_specs(history_specs): """ Maps a Frequency to the largest HistorySpec at that frequency from an iterable of HistorySpecs. """ return {key: max(group, key=lambda f: f.bar_count) for key, group in groupby( sorted(history_specs, key=freq_str_and_bar_count), key=lambda spec: spec.frequency)} # tuples to store a change to the shape of a HistoryContainer
def _to_html(row_col_dict, **kwargs): ''' Args: row_col_dict: dict with (0, 0, 0, 0) : (Value, Style) ''' def wrap_tr(offsets): s = [] nesting_level = row_col_dict[offsets[0]].nesting_level for offset in offsets: row_span = offset.end_row - offset.start_row + 1 col_span = offset.end_col - offset.start_col + 1 value = row_col_dict[offset].value style = row_col_dict[offset].style_wrapper.user_style style = HTMLWriter.style_to_str(style) td_attr = dict( rowspan=row_span, colspan=col_span, style=style) if nesting_level > row_col_dict[offset].nesting_level: # we have encountered a nested table inner_html = HTMLWriter._to_html(value) else: inner_html = value td = HTMLWriter._wrap_table_element('td', td_attr, inner_html) s.extend(td) tr = HTMLWriter._wrap_table_element('tr', {}, ''.join(s)) return tr trs = [] for _, offsets in groupby(sorted(row_col_dict), key=lambda x: (x[0])): trs.append(wrap_tr(list(offsets))) table_attrs = kwargs or dict() return HTMLWriter._wrap_table_element( 'table', table_attrs, ''.join(trs))
def get_unannotated_intervals(self): """ Return a list of Annotation objects corresponding to unannotated regions on the contig """ unannotated_intervals = [] annotation_indicator = np.zeros(len(self.sequence)) for annotation in self.annotations: annotation_indicator[annotation.contig_match_start:annotation.contig_match_end] = 1 interval_start = 0 for annotated, region_iter in itertools.groupby(annotation_indicator, lambda x: x == 1): region = list(region_iter) if not annotated: feature = vdj_reference.create_dummy_feature(display_name='UNANNOTATED', region_type='UNANNOTATED', sequence=None) unannotated_intervals.append(Annotation(feature=feature, cigar=None, score=0, annotation_length=len(region), annotation_match_start=0, annotation_match_end=len(region), contig_match_start=interval_start, contig_match_end=interval_start + len(region), mismatches=[], )) interval_start += len(region) return unannotated_intervals
def load_cell_contigs_from_json(json_file, reference_path, group_key, require_high_conf=True): """Returns a list of CellContig objects based on annotations in a json. The json is assumed to contain a list of AnnotatedContigs (in dict form). The contigs are sorted and grouped by group_key and each such group is put into a CellContig object. group_key must be 'barcode' or 'clonotype' """ assert group_key in set(['barcode', 'clonotype']) annotations = load_contig_list_from_json(open(json_file), reference_path) cell_contigs = [] key_func = lambda x: x.__getattribute__(group_key) anno_iter = itertools.groupby(sorted(annotations, key=key_func), key=key_func) for clonotype_name, contig_annotations in anno_iter: contigs = [] for new_contig in contig_annotations: # Note, for consensus contigs is_cell=None if new_contig.is_cell is not False \ and (new_contig.high_confidence or not require_high_conf): contigs.append(new_contig) if len(contigs) > 0: cell_contigs.append(CellContigs(clonotype_name, contigs)) return cell_contigs
def iter_tables(all_stylediff_pairs, # type: List[StyleDiffSourcePairs] enc='utf-8', # type: str numhunks=1, # type: int numlines=2, # type: int wrapcolumn=0, # type: int ccmode=CC_PROCESSES # type: str ): # type: (...) -> Iterator[Tuple[List[str], int, int]] def left_diff(sdp): # type: (StyleDiffSourcePairs) -> str return '\n'.join(set([sdtexts[1] for sdtexts in sdp.keys()])) def sdkeys(item): # type: (StyleDiffSourcePairs) -> List[bytes] return list(item.keys()) idx = 0 grouped_sdpairs = itertools.groupby(all_stylediff_pairs, left_diff) groups = [] # type: List[CallArgs] grouped_sdp = sorted([(key, list(pairs)) for key, pairs in grouped_sdpairs]) for sdleft, stylediff_pairs in grouped_sdp: args_lists = [] for sdpairs in sorted(stylediff_pairs, key=sdkeys): for from_to_texts, pairs in sorted(sdpairs.items()): args_lists.append((from_to_texts, pairs, numhunks, numlines, wrapcolumn, idx, enc)) idx += 1 grouparg = (args_lists, ), {} # type: CallArgs groups.append(grouparg) for tidx, tables in enumerate(iter_parallel(calc_diff_groups, groups, ccmode=ccmode)): yield tables, tidx, len(groups)
def condense_option_values(formatter, styles, condensed): # type: (CodeFormatter, Iterable[Style], bool) -> List[Style] # Use idx to prevent sorted to look at unorderable dicts. triplets = [(keypaths(style), idx, style) for idx, style in enumerate(styles)] triplets = sorted(triplets) pairs = [(kp, style) for kp, idx, style in triplets] if condensed: equivalents = [] for kpaths, kp_styles in itertools.groupby(pairs, operator.itemgetter(0)): styles = [kps[1] for kps in kp_styles] for style in group_consecutive(formatter, styles, condensed): equivalents.append(style) else: equivalents = [style for _, style in pairs] return equivalents