Python itertools 模块,groupby() 实例源码


项目:bibcure    作者:bibcure    | 项目源码 | 文件源码
def update_bibs_in(grouped_bibs, db_abbrev):
    actions = {
        "y": lambda items: [update_in(bibs, db_abbrev) for bibs in items],
        "m": lambda items: [manual_update_in(bibs, db_abbrev) for bibs in items],
        "n": lambda items: items
    print("\n ")
    action = input("Abbreviate everthing?" +
                   "y(yes, automatic)/m(manual)/n(do nothing)")
    grouped_by_journal = []
    for key, items in groupby(grouped_bibs, lambda i: i["journal"]):

    if action in ("y", "m", "n"):
        updated_bibs = actions.get(action)(grouped_by_journal)
        return update_bibs_in(grouped_bibs, db_abbrev)

    updated_bibs = reduce(lambda a, b: a+b, updated_bibs)
    return updated_bibs
项目:table-compositor    作者:InvestmentSystems    | 项目源码 | 文件源码
def _build_tree(index, indices, level=0):
        Build a tree of IndexNode that is a tree representtion of
        pandas multi-index
        grps = groupby(indices, key=lambda x: x[0])
        nodes = []
        for k, g in grps:
            g = list(g)
            if len(g[0]) == 1:
                # leaf node
                for i in g:
                next_level = [i[1:] for i in g]
                children = IndexNode._build_tree(
                    index, next_level, level + 1)
                parent = IndexNode(value=index.levels[level][k])
        return nodes
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def iter_by_qname(in_genome_bam, in_trimmed_bam):
    # Iterate through multiple BAMs by qname simultaneously
    # Assume the trimmed-read-bam has every qname in the genome bam, in the same order.

    genome_bam_iter = itertools.groupby(in_genome_bam, key=lambda read: read.qname)

    if in_trimmed_bam is None:
        trimmed_bam_iter = iter(())
        trimmed_bam_iter = itertools.groupby(in_trimmed_bam, key=lambda read: read.qname)

    for (genome_qname, genome_reads), trimmed_tuple in itertools.izip_longest(genome_bam_iter,
        trimmed_qname, trimmed_reads = trimmed_tuple or (None, [])
        genome_reads = list(genome_reads)
        trimmed_reads = list(trimmed_reads)

        assert (in_trimmed_bam is None) or trimmed_qname == genome_qname
        yield (genome_qname, genome_reads, trimmed_reads)
项目:jx-sqlite    作者:mozilla    | 项目源码 | 文件源码
def groupby(self, keys, contiguous=False):
            keys = listwrap(keys)
            get_key = jx_expression_to_function(keys)
            if not contiguous:
                data = sorted(, key=get_key)

            def _output():
                for g, v in itertools.groupby(data, get_key):
                    group = Data()
                    for k, gg in zip(keys, g):
                        group[k] = gg
                    yield (group, wrap(list(v)))

            return _output()
        except Exception as e:
            Log.error("Problem grouping", e)
项目:sketch-components    作者:ibhubs    | 项目源码 | 文件源码
def segments(self, precision=0):
        """Return a list of segments, each segment is ended by a MoveTo.
           A segment is a list of Points"""
        ret = []
        # group items separated by MoveTo
        for moveTo, group in itertools.groupby(self.items,
                                               lambda x: isinstance(x,
            # Use only non MoveTo item
            if not moveTo:
                # Generate segments for each relevant item
                seg = [x.segments(precision) for x in group]
                # Merge all segments into one

        return ret
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def hamming_numbers():
    # Generate "5-smooth" numbers, also called "Hamming numbers"
    # or "Regular numbers".  See:
    # Finds solutions to 2**i * 3**j * 5**k  for some integers i, j, and k.

    def deferred_output():
        'Works like a forward reference to the "output" global variable'
        for i in output:
            yield i

    result, p2, p3, p5 = tee(deferred_output(), 4)  # split the output streams
    m2 = (2*x for x in p2)                          # multiples of 2
    m3 = (3*x for x in p3)                          # multiples of 3
    m5 = (5*x for x in p5)                          # multiples of 5
    merged = merge(m2, m3, m5)
    combined = chain([1], merged)                   # prepend starting point
    output = (k for k, v in groupby(combined))      # eliminate duplicates

    return result
项目:python-spider    作者:naginoasukara    | 项目源码 | 文件源码
def map_reduce(i, mapper, reducer):
        :param i: ??MapReduce???
        :param mapper: ???mapper??
        :param reducer: ???reducer??
        :return: ????reducer??????????????
        intermediate = []  # ?????(intermediate_key, intermediate_value)
        for (key, value) in i.items():
            intermediate.extend(mapper(key, value))

        # sorted????????list???list?????????tuple?key????tuple????????
        # groupby???????????????????,key????tuple?????????????????
        # ??????groupby???key?intermediate_key??group??list??1????
        # ????intermediate_key?(intermediate_key, intermediate_value)
        groups = {}
        for key, group in itertools.groupby(sorted(intermediate, key=lambda im: im[0]), key=lambda x: x[0]):
            groups[key] = [y for x, y in group]
        # groups???????key??????intermediate_key?value?????intermediate_key?intermediate_value
        # ???????
        return [reducer(intermediate_key, groups[intermediate_key]) for intermediate_key in groups]
项目:pyrsss    作者:butala    | 项目源码 | 文件源码
def nan_interpolate(df):

    sum_nan = df.isnull().sum()
    df_null_int = df.isnull().astype(int)
    for col in df.columns:
        max_run = df[col].isnull().astype(int).groupby(df[col].notnull().astype(int).cumsum()).sum()
        if sum_nan[col]:
            # BELOW IS BROKEN!!!
            # logger.warning('column {} has {} NaNs ({} max consecutive run)'.format(col,
                                                                                   # sum_nan[col],
                                                                                   # max_run))
    return df
项目:aiosparql    作者:aio-libs    | 项目源码 | 文件源码
def _output_triples(self):
        item = None
        for s, group in groupby(self, self._group_key):
            assert s is not None, "subject not defined"
            if item is None:
            elif isinstance(item, tuple):
                yield " .\n\n"
            elif isinstance(item, Node):
                yield "\n\n"
            item = next(group)
            if isinstance(item, tuple):
                s, p, o = item
                yield "%s %s %s" % (s, p, escape_any(o))
                for _, p, o in group:
                    assert p is not None, "predicate not defined"
                    if o is None:
                    yield " ;\n"
                    yield "    %s %s" % (p, escape_any(o))
            elif isinstance(item, Node):
                yield str(item)
        if isinstance(item, tuple):
            yield " ."
项目:pheweb    作者:statgen    | 项目源码 | 文件源码
def _order_refalt_lexicographically(self, variants):
        # Also assert that chrom and pos are in order
        cp_groups = itertools.groupby(variants, key=lambda v:(v['chrom'], v['pos']))
        prev_chrom_index, prev_pos = -1, -1
        for cp, tied_variants in cp_groups:
            chrom_index = self._get_chrom_index(cp[0])
            if chrom_index < prev_chrom_index:
                raise PheWebError(
                    "The chromosomes in your file appear to be in the wrong order.\n" +
                    "The required order is: {!r}\n".format(chrom_order_list) +
                    "But in your file, the chromosome {!r} came after the chromosome {!r}\n".format(
                        cp[0], chrom_order_list[prev_chrom_index]))
            if chrom_index == prev_chrom_index and cp[1] < prev_pos:
                raise PheWebError(
                    "The positions in your file appear to be in the wrong order.\n" +
                    "In your file, the position {!r} came after the position {!r} on chromsome {!r}\n".format(
                        cp[1], prev_pos, cp[0]))
            prev_chrom_index, prev_pos = chrom_index, cp[1]
            for v in sorted(tied_variants, key=lambda v:(v['ref'], v['alt'])):
                yield v
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def __register__(cls, module_name):
        pool = Pool()
        Property = pool.get('')
        TableHandler = backend.get('TableHandler')
        cursor = Transaction().cursor
        table = cls.__table__()

        super(Party, cls).__register__(module_name)

        table_h = TableHandler(cursor, cls, module_name)
        if table_h.column_exist('lang'):
            cursor.execute(*, table.lang,
            for lang_id, group in groupby(cursor.fetchall(), lambda r: r[1]):
                ids = [id_ for id_, _ in group]
                if lang_id is not None:
                    value = '%s,%s' % (cls.lang.model_name, lang_id)
                    value = None
                Property.set('lang', cls.__name__, ids, value)
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def _unit_compute(cls, taxes, price_unit, date):
        res = []
        for _, group_taxes in groupby(taxes, key=cls._group_taxes):
            unit_price_variation = 0
            for tax in group_taxes:
                start_date = tax.start_date or
                end_date = tax.end_date or
                if not (start_date <= date <= end_date):
                if tax.type != 'none':
                    value = tax._process_tax(price_unit)
                    if tax.update_unit_price:
                        unit_price_variation += value['amount']
                if len(tax.childs):
                    res.extend(cls._unit_compute(tax.childs, price_unit, date))
            price_unit += unit_price_variation
        return res
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def _reverse_unit_compute(cls, price_unit, taxes, date):
        rate, amount = 0, 0
        update_unit_price = False
        unit_price_variation_amount = 0
        unit_price_variation_rate = 0
        for _, group_taxes in groupby(taxes, key=cls._group_taxes):
            group_taxes = list(group_taxes)
            g_rate, g_amount = cls._reverse_rate_amount(group_taxes, date)
            if update_unit_price:
                g_amount += unit_price_variation_amount * g_rate
                g_rate += unit_price_variation_rate * g_rate

            g_update_unit_price = any(t.update_unit_price for t in group_taxes)
            update_unit_price |= g_update_unit_price
            if g_update_unit_price:
                unit_price_variation_amount += g_amount
                unit_price_variation_rate += g_rate

            rate += g_rate
            amount += g_amount

        return (price_unit - amount) / (1 + rate)
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def check_period_closed(cls, moves):
        Period = Pool().get('stock.period')
        for company, moves in groupby(moves, lambda m:
            periods =[
                    ('state', '=', 'closed'),
                    ('company', '=',,
                    ], order=[('date', 'DESC')], limit=1)
            if periods:
                period, = periods
                for move in moves:
                    date = (move.effective_date if move.effective_date
                        else move.planned_date)
                    if date and date <
                        cls.raise_user_error('period_closed', {
                                'move': move.rec_name,
                                'period': period.rec_name,
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def get_sessions(users, name):
        Session = Pool().get('ir.session')
        now =
        timeout = datetime.timedelta(
            seconds=config.getint('session', 'timeout'))
        result = dict((, 0) for u in users)
        with Transaction().set_user(0):
            for sub_ids in grouped_slice(users):
                sessions =[
                        ('create_uid', 'in', sub_ids),
                        ], order=[('create_uid', 'ASC')])

                def filter_(session):
                    timestamp = session.write_date or session.create_date
                    return abs(timestamp - now) < timeout
                result.update(dict((i, len(list(g)))
                        for i, g in groupby(ifilter(filter_, sessions),
        return result
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def get_action(cls, menus, name):
        pool = Pool()
        actions = dict((, None) for m in menus)
        with Transaction().set_context(active_test=False):
            menus = cls.browse(menus)
        action_keywords = sum((list(m.action_keywords) for m in menus), [])
        key = lambda k: k.action.type
        for type, action_keywords in groupby(action_keywords, key=key):
            action_keywords = list(action_keywords)
            for action_keyword in action_keywords:
                model = action_keyword.model
                actions[] = '%s,-1' % type

            Action = pool.get(type)
            action2keyword = { k for k in action_keywords}
            with Transaction().set_context(active_test=False):
                factions =[
                        ('action', 'in', action2keyword.keys()),
            for action in factions:
                model = action2keyword[].model
                actions[] = str(action)
        return actions
项目:colorguard    作者:mechaphish    | 项目源码 | 文件源码
def attempt_naive_pov(self):

        p1 = self._find_naive_leaks()
        p2 = self._find_naive_leaks()

        leaked = dict()
        for si in p1:
            if si in p2:
                li = list(set(p2[si]).intersection(set(p1[si])))
                if len(li) > 0:
                    for lb in li:
                        leaked[lb] = si

        # find four contiguous
        consecutive_groups = [ ]
        for _, g in groupby(enumerate(sorted(leaked)), lambda (i,x):i-x):
            consecutive_groups.append(map(itemgetter(1), g))
项目:colorguard    作者:mechaphish    | 项目源码 | 文件源码
def get_largest_consecutive(self):
        # extra work here because we need to be confident about the bytes

        ss = self.state.copy()
        ss.add_constraints(self.minimized_ast ==, cast_to=str)))

        leaked_bytes = [ ]
        for byte in self.possibly_leaked_bytes:
            if self._confident_byte(ss, byte):

        leaked_bytes = sorted(set(leaked_bytes))

        consec_bytes = [ ]
        # find consecutive leaked bytes
        for _, g in groupby(enumerate(leaked_bytes), lambda (i, x): i-x):
            consec_bytes.append(map(itemgetter(1), g))
项目:mugen    作者:scherroman    | 项目源码 | 文件源码
def group_by_type(self, select_types: List[str] = None) -> 'EventGroupList':
        Groups events by type

            A list of types for which to select groups in the resulting EventGroupList.
            If no types are specified, all resulting groups will be selected.

        An EventGroupList partitioned by type
        if select_types is None:
            select_types = []

        groups = [EventList(list(group), end=self.end) for index, group in groupby(self, key=attrgetter('__class__'))]
        if not select_types:
            selected_groups = groups
            selected_groups = [group for group in groups if group.type in select_types]

        return EventGroupList(groups, selected=selected_groups)
项目:planet-b-saleor    作者:planet-b    | 项目源码 | 文件源码
def unique_for_country_code(self, country_code):
        shipping = self.filter(
            Q(country_code=country_code) |
        shipping = shipping.order_by('shipping_method_id')
        shipping = shipping.values_list('shipping_method_id', 'id', 'country_code')
        grouped_shipping = groupby(shipping, itemgetter(0))
        any_country = ANY_COUNTRY

        ids = []

        for shipping_method_id, method_values in grouped_shipping:
            method_values = list(method_values)
            # if there is any country choice and specific one remove any country choice
            if len(method_values) == 2:
                method = [val for val in method_values if val[2] != any_country][0]
                method = method_values[0]
        return self.filter(id__in=ids)
项目:planet-b-saleor    作者:planet-b    | 项目源码 | 文件源码
def unique_for_country_code(self, country_code):
        shipping = self.filter(
            Q(country_code=country_code) |
        shipping = shipping.order_by('shipping_method_id')
        shipping = shipping.values_list(
            'shipping_method_id', 'id', 'country_code')
        grouped_shipping = groupby(shipping, itemgetter(0))
        any_country = ANY_COUNTRY

        ids = []

        for shipping_method_id, method_values in grouped_shipping:
            method_values = list(method_values)
            # if there is any country choice and specific one remove any
            # country choice
            if len(method_values) == 2:
                method = [val for val in method_values
                          if val[2] != any_country][0]
                method = method_values[0]
        return self.filter(id__in=ids)
项目:chip_seq_pipeline    作者:biocore-ntnu    | 项目源码 | 文件源码
def create_intervaltrees(genes):

    genome = dict()

    file_handle = open(genes)
    next(file_handle) # skip header

    for chromosome, lines in groupby(file_handle, lambda l: l.split()[0]):
        chromosome_intervaltree = IntervalTree()
        for line in lines:
            start, end, region_type, _, name = line.split()[1:6]
            start, end = int(start), int(end)
            chromosome_intervaltree[start:end] = (start, name, region_type)

        genome[chromosome] = chromosome_intervaltree

    return genome
项目:llk    作者:Tycx2ry    | 项目源码 | 文件源码
def getAttributesDeclarationXML(self) :
        """ generate attributes declaration XML """
        # return lxml etree element
        if len(self)>0 :
            # iter on node and then edge atts
            for attClass,atts in self.iteritems() :
                # group by mode
                key_mode=lambda att : att["mode"]
                for mode,atts in itertools.groupby(atts_sorted_by_mode,key_mode)  :
                    # generate on attributes by mode
                    attributesXML = etree.Element("attributes")
                    # generate attribute by id order
                    for att in sorted(atts,key=lambda att: att["id"]) :
                        attributeXML=etree.SubElement(attributesXML, "attribute")
                        if att["defaultValue"] :
                            etree.SubElement(attributeXML, "default").text=att["defaultValue"]
        return allAttributesXML
项目:pybibtex    作者:rasbt    | 项目源码 | 文件源码
def ids_to_string(ids_list):
    """Converts lists of integer IDs to text"""
    sorted_ids = sorted(ids_list)
    ranges = []
    for key, group in groupby(enumerate(sorted_ids), lambda x: x[0] - x[1]):
        group = list(map(itemgetter(1), group))
        if len(group) > 1:
            ranges.append([group[0], group[-1]])

    parsed = []
    for r in ranges:
        if isinstance(r, list):
            parsed.append('%d-%d' % (r[0], r[1]))

    return '[%s]' % ','.join(parsed)
项目:valhalla    作者:LCOGT    | 项目源码 | 文件源码
def update_request_states_from_pond_blocks(pond_blocks):
    '''Update the states of requests and user_requests given a set of recently changed pond blocks.'''
    blocks_with_tracking_nums = [pb for pb in pond_blocks if pb['molecules'][0]['tracking_num']]
    sorted_blocks_with_tracking_nums = sorted(blocks_with_tracking_nums, key=lambda x: x['molecules'][0]['tracking_num'])
    blocks_by_tracking_num = itertools.groupby(sorted_blocks_with_tracking_nums, lambda x: x['molecules'][0]['tracking_num'])
    now =
    states_changed = False

    for tracking_num, blocks in blocks_by_tracking_num:
        sorted_blocks_by_request = sorted(blocks, key=lambda x: x['molecules'][0]['request_num'])
        blocks_by_request_num = {int(k): list(v) for k, v in itertools.groupby(sorted_blocks_by_request, key=lambda x: x['molecules'][0]['request_num'])}
        user_request = UserRequest.objects.prefetch_related('requests').get(pk=tracking_num)
        ur_expired = user_request.max_window_time < now
        requests = user_request.requests.all()
        for request in requests:
            if in blocks_by_request_num:
                states_changed |= update_request_state(request, blocks_by_request_num[], ur_expired)
        states_changed |= update_user_request_state(user_request)

    return states_changed
项目:VCFped    作者:magnusdv    | 项目源码 | 文件源码
def bestPairs(pairdata, reportall):
    best = []
    for k,v in itertools.groupby(pairdata, key=lambda x: x['pair']):
        allcalls = list(v)
        nonNA = [r for r in allcalls if r['verdict'] != 'na']
        if not nonNA:
            if reportall: 
        verdict = nonNA[-1]['verdict'] 
        if verdict == 'MZ twins':
            nonNA.sort(key=lambda x: (-round(x['MZp'], 1), x['percentile']))
        elif verdict == 'Parent-child' or reportall:
            nonNA.sort(key=lambda x: (-round(x['POp'], 1), x['percentile']))

    return pairwise_table(best)
项目:VCFped    作者:magnusdv    | 项目源码 | 文件源码
def bestGenders(genderdata):
    best = []
    sortfun = lambda x: (round(x['Xhetp'], 1), x['percentile'])
    for k,v in itertools.groupby(genderdata, key=lambda x: x['sample']):
        allcalls = list(v)
        noNA = [r for r in allcalls if r['gender'] != 'na']
        if not noNA: 
        verdicts = {r['gender'] for r in noNA}
        if 'Male' in verdicts and 'Female' in verdicts:
            # if both genders are called, choose the last one
            noQ = [r for r in noNA if r['gender'] != '?']
        if verdicts == {'?'}:
            verd = '?'
            verd = list(verdicts.difference({'?'}))[0]
        use = sorted([r for r in noNA if r['gender'] == verd], key=sortfun)

    return gender_table(best)
项目:globibot    作者:best-coloc-ever    | 项目源码 | 文件源码
def convert(self, message, unit_values):
        converted = [(uv, system_convert(uv)) for uv in unit_values]
        output = ['{} = {}'.format(uv, conv) for uv, conv in converted]

        for t, uvs in groupby(converted, key=lambda uvs: type(uvs[0].unit)):
            values = list(map(lambda x: x[0], uvs))

            if len(values) >= 2:
                summed = sum_units(*values)
                converted_summed = system_convert(summed)
                    '{} total: {} = {}'
                        .format(t.__name__.lower(), summed, converted_summed)

        await self.send_message(
            'Converted units\n{}'
            delete_after = 60
项目:globibot    作者:best-coloc-ever    | 项目源码 | 文件源码
def edited_messages(self, message, user_id, count=10):
        with self.transaction() as trans:
            trans.execute(q.last_edited_logs, dict(
                author_id = user_id,
                limit     = count
            results = trans.fetchall()
            grouped = groupby(results, key=lambda row: row[0])

            messages = [
                ' ? '.join([
                    '{}{}'.format(c[1], ' '.join(c[2]))
                    for c in reversed(list(contents))
                for _, contents in grouped

            await self.send_message(
                'last **{}** edited messages from <@{}>:\n{}'
                    .format(len(messages), user_id, '\n'.join(messages)),
                delete_after = 30
项目:spiderfoot    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def getAttributesDeclarationXML(self) :
        """ generate attributes declaration XML """
        # return lxml etree element
        if len(self)>0 :
            # iter on node and then edge atts
            for attClass,atts in self.iteritems() :
                # group by mode
                key_mode=lambda att : att["mode"]
                for mode,atts in itertools.groupby(atts_sorted_by_mode,key_mode)  :
                    # generate on attributes by mode
                    attributesXML = etree.Element("attributes")
                    # generate attribute by id order
                    for att in sorted(atts,key=lambda att: att["id"]) :
                        attributeXML=etree.SubElement(attributesXML, "attribute")
                        if att["defaultValue"] :
                            etree.SubElement(attributeXML, "default").text=att["defaultValue"]
        return allAttributesXML
项目:ivona-speak    作者:Pythonity    | 项目源码 | 文件源码
def list_voices(access_key, secret_key, voice_language, voice_gender):
    """List available Ivona voices"""
        ivona_api = IvonaAPI(access_key, secret_key)
    except (ValueError, IvonaAPIException) as e:
        raise click.ClickException("Something went wrong: {}".format(repr(e)))

    click.echo("Listing available voices...")

    voices_list = ivona_api.get_available_voices(

    # Group voices by language
    voices_dict = dict()
    data = sorted(voices_list, key=lambda x: x['Language'])
    for k, g in groupby(data, key=lambda x: x['Language']):
        voices_dict[k] = list(g)

    for ln, voices in voices_dict.items():
        voice_names = [v['Name'] for v in voices]
        click.echo("{}: {}".format(ln, ', '.join(voice_names)))

    click.secho("All done", fg='green')
项目:pyt    作者:python-security    | 项目源码 | 文件源码
def get_forums(query_result, user):
    """Returns a tuple which contains the category and the forums as list.
    This is the counterpart for get_categories_and_forums and especially
    usefull when you just need the forums for one category.

    For example::
        (<Category 2>,
          [(<Forum 3>, None),
          (<Forum 4>, None)])

    :param query_result: A tuple (KeyedTuple) with all categories and forums

    :param user: The user object is needed because a signed out user does not
                 have the ForumsRead relation joined.
    it = itertools.groupby(query_result, operator.itemgetter(0))

    if user.is_authenticated:
        for key, value in it:
            forums = key, [(item[1], item[2]) for item in value]
        for key, value in it:
            forums = key, [(item[1], None) for item in value]

    return forums
项目:SWProxy-plugins    作者:lstern    | 项目源码 | 文件源码
def group_battles(self, cache):
        list = sorted(cache.values(), key=get_match_id)
        grouped = groupby(list, lambda x: x['match_id'])
        groups = []
        for key, group in grouped:
            matches = []
            battle = {}
            first = True
            for item in group:
                if first:
                    first = False
                    battle['guild'] = item['op_guild']
                    battle['type'] = item['type']
                    battle['match_id'] = item['match_id']
            battle['matches'] = matches
        return groups
项目:guides-cms    作者:pluralsight    | 项目源码 | 文件源码
def group_articles_by_status(articles):
    Group articles by publish status

    :param articles: Iterable of Article objects
    :returns: Iterable like itertools.groupby with a key as the publish_status
              and a list of articles for that status

    def status_key(a):
        if a.publish_status == PUBLISHED:
            cnt = 1
        elif a.publish_status == IN_REVIEW:
            cnt = 2
        elif a.publish_status == DRAFT:
            cnt = 3
            cnt = 4

        return cnt

    sorted_by_status = sorted(articles, key=status_key)

    return itertools.groupby(sorted_by_status, key=lambda a: a.publish_status)
项目:audio-feeder    作者:pganssle    | 项目源码 | 文件源码
def get_entry_objects(entry_list):
    Retrieve a list of (entry, data_obj) pairs.
    # Grouping these together like this just to minimize the number of calls
    # to get_database_table.
    author_table = dh.get_database_table('authors')

    for table_name, group in it.groupby(entry_list, key=lambda x: x.table):
        table = dh.get_database_table(table_name)

        for entry_obj in group:
            data_obj = table[entry_obj.data_id]

            # Retrieve the author objects as well
            author_objs = [author_table[author_id] for author_id in data_obj.author_ids]

            yield (entry_obj, data_obj, author_objs)
项目:audio-feeder    作者:pganssle    | 项目源码 | 文件源码
def natural_sort_key(cls, value):
        This is a sort key to do a "natural" lexographic sort, the string is
        broken up into segments of strings and numbers, so that, e.g. `'Str 2'`
        will be sorted before `'Str 15'`.

        :param value:
            The book name as it will be sorted.

            Returns a book name tokenized such that it can be sorted.
        o = itertools.groupby(value, key=str.isdigit)
        o = ((k, ''.join(g)) for k, g in o)
        o = ((int(v) if k else v) for k, v in o)

        return tuple(o)
项目:guernsey    作者:ingnil    | 项目源码 | 文件源码
def listenSsl(self, site, ports, ignore=[]):
        privateKeyFile = open(self.options.sslPrivateKey, "r")
        privateKey =
        certificateFile = open(self.options.sslCertificate)
        certificate =
        import twisted.internet.ssl as ssl
        cert = ssl.PrivateCertificate.loadPEM(privateKey + certificate)
        contextFactory = cert.options()

        import itertools
        listenPorts = map(lambda x: x[0], itertools.groupby(sorted(ports)))

        for port in listenPorts:
            if port not in ignore:
                reactor.listenSSL(port, site, contextFactory)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def _generate_endpoints(self, targets):
        def _target_to_port(item):
            _, (listen_port, target_port) = item
            return {'port': target_port, 'name': str(listen_port)}
        port_with_addrs = [
            (p, [e[0] for e in grp])
            for p, grp in itertools.groupby(
                sorted(targets.items()), _target_to_port)]
        return {
            'subsets': [
                    'addresses': [
                            'ip': ip,
                            'targetRef': {
                                'kind': k_const.K8S_OBJ_POD,
                                'name': ip,
                                'namespace': 'default'
                        for ip in addrs
                    'ports': [port]
                for port, addrs in port_with_addrs
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def __str__(self):
        lines = []
        self.errors.sort(key=lambda e: e.order)
        for cls, errors_of_cls in groupby(self.errors, lambda e: e.__class__):
            lines.extend(e.body() for e in errors_of_cls)
        if lines:
            return '\n'.join(lines)
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def validate_msdos(module, partitions):
    """Validate limitations of MSDOS partition table"""
    p_types = [p['type'] for p in partitions]
    # NOTE(pas-ha) no more than 4 primary
    if p_types.count('primary') > 4:
        module.fail_json("Can not create more than 4 primary partitions "
                         "on a MSDOS partition table.")
    if 'extended' in p_types:
        # NOTE(pas-ha) only single extended
        if p_types.count('extended') > 1:
            module.fail_json("Can not create more than single extended "
                             "partition on a MSDOS partition table.")
        allowed = ['primary', 'extended']
        if 'logical' in p_types:

        # NOTE(pas-ha) this produces list with subsequent duplicates
        # removed
        if [k for k, g in itertools.groupby(p_types)] != allowed:
            module.fail_json("Incorrect partitions order: for MSDOS, "
                             "all primary, single extended, all logical")
    elif 'logical' in p_types:
        # NOTE(pas-ha) logical has sense only with extended
        module.fail_json("Logical partition w/o extended one on MSDOS "
                         "partition table")

# TODO(pas-ha) add more validation, e.g.
# - add idempotency: first check the already existing partitions
#   and do not run anything unless really needed, and only what's needed
#   - if only change tags - use specific command
#   - allow fuzziness in partition sizes when alligment is 'optimal'
# - estimate and validate available space
# - support more units
# - support negative units?
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def __str__(self):
        lines = []
        self.errors.sort(key=lambda e: e.order)
        for cls, errors_of_cls in groupby(self.errors, lambda e: e.__class__):
            lines.extend(e.body() for e in errors_of_cls)
        if lines:
            return '\n'.join(lines)
项目:lydoc    作者:Cecca    | 项目源码 | 文件源码
def render_template(docs, template):
    grouped_docs = {f: list(sorted(list(dl), key=lambda d: d['name']))
                    for f, dl in groupby(docs, lambda d: d['file'])}
    template = JINJA_ENV.get_template(template)
    rendered = template.render(documentation=grouped_docs,
    return rendered
项目:8-Queens    作者:miguelarauj1o    | 项目源码 | 文件源码
def isPermutation(self, values):
        from itertools import groupby
        frequencies = [len(list(group)) for key, group in groupby(values)]
        for frequency in frequencies:
            if frequency != 1:
                return False
        return True
项目:mysql-er    作者:StefanLim0    | 项目源码 | 文件源码
def create_structure_ers_from_relations(relations):
    """This function gets structured entity relationship.

       relations (list):  List of (:class:`FieldPath` :class:`FieldPath`)

       Structured ER dict. For example:
       {'database_name': {'table_name': {'field_name': ['foreign_database_table_field']}}

    A way might be used is

    >>> print create_structure_ers_from_relations([(FieldPath('db', 'ac', 'id'), FieldPath('db', 'bc', 'id'))])
    {'db': {'ac': {'id': ['']}, {'bc': {'id': ['']}}}}

    relations.extend([_[::-1] for _ in relations]) # add reverse
    relations = sorted(list(set([tuple(_) for _ in relations])), key=lambda _: _[0].db) # remove duplicate
    dbs = {}
    for db_key, tb_grp in groupby(relations, key=lambda _: _[0].db): # group by db name
        if db_key == '':
        tbs = {}
        for tb_key, fd_grp in groupby(sorted(list(tb_grp), key=lambda _: _[0].tb), key=lambda _: _[0].tb):
            fds = {}
            for fd_key, foreign_grp in groupby(sorted(list(fd_grp), key=lambda _: _[0].fd), key=lambda _: _[0].fd):
                fds[fd_key] = sorted([str(_[1]) for _ in list(foreign_grp)])
            tbs[tb_key] = fds
        dbs[db_key] = tbs
    return dbs
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def compute_largest_specs(history_specs):
    Maps a Frequency to the largest HistorySpec at that frequency from an
    iterable of HistorySpecs.
    return {key: max(group, key=lambda f: f.bar_count)
            for key, group in groupby(
                sorted(history_specs, key=freq_str_and_bar_count),
                key=lambda spec: spec.frequency)}

# tuples to store a change to the shape of a HistoryContainer
项目:table-compositor    作者:InvestmentSystems    | 项目源码 | 文件源码
def _to_html(row_col_dict, **kwargs):
        row_col_dict: dict with (0, 0, 0, 0) : (Value, Style)
        def wrap_tr(offsets):
            s = []
            nesting_level = row_col_dict[offsets[0]].nesting_level
            for offset in offsets:
                row_span = offset.end_row - offset.start_row + 1
                col_span = offset.end_col - offset.start_col + 1
                value = row_col_dict[offset].value
                style = row_col_dict[offset].style_wrapper.user_style
                style = HTMLWriter.style_to_str(style)

                td_attr = dict(
                    colspan=col_span, style=style)
                if nesting_level > row_col_dict[offset].nesting_level:
                    # we have encountered a nested table
                    inner_html = HTMLWriter._to_html(value)
                    inner_html = value
                td = HTMLWriter._wrap_table_element('td', td_attr, inner_html)
            tr = HTMLWriter._wrap_table_element('tr', {}, ''.join(s))
            return tr

        trs = []
        for _, offsets in groupby(sorted(row_col_dict), key=lambda x: (x[0])):

        table_attrs = kwargs or dict()
        return HTMLWriter._wrap_table_element(
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def get_unannotated_intervals(self):
        """ Return a list of Annotation objects corresponding to unannotated regions on the contig """

        unannotated_intervals = []
        annotation_indicator = np.zeros(len(self.sequence))

        for annotation in self.annotations:
            annotation_indicator[annotation.contig_match_start:annotation.contig_match_end] = 1

        interval_start = 0

        for annotated, region_iter in itertools.groupby(annotation_indicator, lambda x: x == 1):
            region = list(region_iter)

            if not annotated:
                feature = vdj_reference.create_dummy_feature(display_name='UNANNOTATED',
                                                        contig_match_end=interval_start + len(region),

            interval_start += len(region)

        return unannotated_intervals
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def load_cell_contigs_from_json(json_file, reference_path, group_key, require_high_conf=True):
    """Returns a list of CellContig objects based on annotations in a json.

    The json is assumed to contain a list of AnnotatedContigs (in dict form).
    The contigs are sorted and grouped by group_key and each such group is put
    into a CellContig object.

    group_key must be 'barcode' or 'clonotype'

    assert group_key in set(['barcode', 'clonotype'])
    annotations = load_contig_list_from_json(open(json_file), reference_path)

    cell_contigs = []

    key_func = lambda x: x.__getattribute__(group_key)
    anno_iter = itertools.groupby(sorted(annotations, key=key_func), key=key_func)
    for clonotype_name, contig_annotations in anno_iter:

        contigs = []
        for new_contig in contig_annotations:
            # Note, for consensus contigs is_cell=None
            if new_contig.is_cell is not False \
               and (new_contig.high_confidence or not require_high_conf):

        if len(contigs) > 0:
            cell_contigs.append(CellContigs(clonotype_name, contigs))

    return cell_contigs
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def iter_tables(all_stylediff_pairs,  # type: List[StyleDiffSourcePairs]
                enc='utf-8',          # type: str
                numhunks=1,           # type: int
                numlines=2,           # type: int
                wrapcolumn=0,         # type: int
                ccmode=CC_PROCESSES   # type: str
    # type: (...) -> Iterator[Tuple[List[str], int, int]]

    def left_diff(sdp):
        # type: (StyleDiffSourcePairs) -> str
        return '\n'.join(set([sdtexts[1] for sdtexts in sdp.keys()]))

    def sdkeys(item):
        # type: (StyleDiffSourcePairs) -> List[bytes]
        return list(item.keys())

    idx = 0
    grouped_sdpairs = itertools.groupby(all_stylediff_pairs, left_diff)
    groups = []  # type: List[CallArgs]
    grouped_sdp = sorted([(key, list(pairs)) for key, pairs in grouped_sdpairs])
    for sdleft, stylediff_pairs in grouped_sdp:
        args_lists = []
        for sdpairs in sorted(stylediff_pairs, key=sdkeys):
            for from_to_texts, pairs in sorted(sdpairs.items()):
                args_lists.append((from_to_texts, pairs, numhunks, numlines, wrapcolumn, idx,
                idx += 1
        grouparg = (args_lists, ), {}  # type: CallArgs
    for tidx, tables in enumerate(iter_parallel(calc_diff_groups, groups, ccmode=ccmode)):
        yield tables, tidx, len(groups)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def condense_option_values(formatter, styles, condensed):
    # type: (CodeFormatter, Iterable[Style], bool) -> List[Style]
    # Use idx to prevent sorted to look at unorderable dicts.
    triplets = [(keypaths(style), idx, style) for idx, style in enumerate(styles)]
    triplets = sorted(triplets)
    pairs = [(kp, style) for kp, idx, style in triplets]
    if condensed:
        equivalents = []
        for kpaths, kp_styles in itertools.groupby(pairs, operator.itemgetter(0)):
            styles = [kps[1] for kps in kp_styles]
            for style in group_consecutive(formatter, styles, condensed):
        equivalents = [style for _, style in pairs]
    return equivalents