我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用toolz.groupby()。
def __init__(self, restrictions): # A dict mapping each asset to its restrictions, which are sorted by # ascending order of effective_date self._restrictions_by_asset = { asset: sorted( restrictions_for_asset, key=lambda x: x.effective_date ) for asset, restrictions_for_asset in iteritems(groupby(lambda x: x.asset, restrictions)) }
def split_next_and_previous_event_columns(self, requested_columns): """ Split requested columns into columns that should load the next known value and columns that should load the previous known value. Parameters ---------- requested_columns : iterable[BoundColumn] Returns ------- next_cols, previous_cols : iterable[BoundColumn], iterable[BoundColumn] ``requested_columns``, partitioned into sub-sequences based on whether the column should produce values from the next event or the previous event """ def next_or_previous(c): if c in self.next_value_columns: return 'next' elif c in self.previous_value_columns: return 'previous' raise ValueError( "{c} not found in next_value_columns " "or previous_value_columns".format(c=c) ) groups = groupby(next_or_previous, requested_columns) return groups.get('next', ()), groups.get('previous', ())
def get_zeroth_quarter_idx(self, stacked_last_per_qtr): """ Filters for releases that are on or after each simulation date and determines the next quarter by picking out the upcoming release for each date in the index. Parameters ---------- stacked_last_per_qtr : pd.DataFrame A DataFrame with index of calendar dates, sid, and normalized quarters with each row being the latest estimate for the row's index values, sorted by event date. Returns ------- next_releases_per_date_index : pd.MultiIndex An index of calendar dates, sid, and normalized quarters, for only the rows that have a next event. """ next_releases_per_date = stacked_last_per_qtr.loc[ stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] >= stacked_last_per_qtr.index.get_level_values(SIMULATION_DATES) ].groupby( level=[SIMULATION_DATES, SID_FIELD_NAME], as_index=False, # Here we take advantage of the fact that `stacked_last_per_qtr` is # sorted by event date. ).nth(0) return next_releases_per_date.index
def get_zeroth_quarter_idx(self, stacked_last_per_qtr): """ Filters for releases that are on or after each simulation date and determines the previous quarter by picking out the most recent release relative to each date in the index. Parameters ---------- stacked_last_per_qtr : pd.DataFrame A DataFrame with index of calendar dates, sid, and normalized quarters with each row being the latest estimate for the row's index values, sorted by event date. Returns ------- previous_releases_per_date_index : pd.MultiIndex An index of calendar dates, sid, and normalized quarters, for only the rows that have a previous event. """ previous_releases_per_date = stacked_last_per_qtr.loc[ stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] <= stacked_last_per_qtr.index.get_level_values(SIMULATION_DATES) ].groupby( level=[SIMULATION_DATES, SID_FIELD_NAME], as_index=False, # Here we take advantage of the fact that `stacked_last_per_qtr` is # sorted by event date. ).nth(-1) return previous_releases_per_date.index
def _message_handlers(self): def create(prio, h): h = List.wrap(h).apzip(_.message).map2(flip) return prio, Handlers(prio, Map(h)) return Map(toolz.groupby(_.prio, self._handlers)).map(create)
def create_notifications(self, notifications): results = [] # create non-existant users before creating notifications usernames = [] for notification in notifications: usernames.append(notification['to_username']) usernames.append(notification.get('from_username')) usernames = set(u for u in usernames if u) results.append(await self.create_users(usernames)) # group notifications by keys to allow multi-row inserts # grouped_notifications = toolz.groupby(lambda x: tuple(x.keys()), # notifications) # logger.debug('create_notifications', # notification_count=len(notifications), # group_count=len(grouped_notifications.keys())) #futures = [] wwwpoll_columns = set(c.name for c in wwwpoll_table.c._all_columns) async with self.async_engine.acquire() as conn: for n in notifications: results.append(await conn.execute(notifications_table.insert().values(**n))) n2 = toolz.keyfilter(lambda k: k in wwwpoll_columns, n) results.append(await conn.execute(wwwpoll_table.insert().values(**n2))) return all(results) # notification retrieval methods # pylint: disable=too-many-arguments,too-many-locals
def lazy_proxy_dict(artifacts_or_ids, group_artifacts_of_same_name=False): """ Takes a list of artifacts or artifact ids and returns a dictionary whose keys are the names of the artifacts. The values will be lazily loaded into proxies as requested. Parameters ---------- artifacts_or_ids : collection of artifacts or artifact ids (strings) group_artifacts_of_same_name: bool (default: False) If set to True then artifacts of the same name will be grouped together in one list. When set to False an exception will be raised """ if isinstance(artifacts_or_ids, dict): artifacts = t.valmap(coerce_to_artifact, artifacts_or_ids) lambdas = {name: (lambda a: lambda: a.proxy())(a) for name, a in artifacts.items()} return lazy_dict(lambdas) # else we have a collection artifacts = coerce_to_artifacts(artifacts_or_ids) by_name = t.groupby(lambda a: a.name, artifacts) singles = t.valfilter(lambda l: len(l) == 1, by_name) multi = t.valfilter(lambda l: len(l) > 1, by_name) lambdas = {name: (lambda a: lambda: a.proxy())(a[0]) for name, a in singles.items()} if group_artifacts_of_same_name and len(multi) > 0: lambdas = t.merge(lambdas, {name: (lambda artifacts: (lambda: [a.proxy() for a in artifacts]))(artifacts) for name, artifacts in multi.items()}) if not group_artifacts_of_same_name and len(multi) > 0: raise ValueError("""Only artifacts with distinct names can be used in a lazy_proxy_dict. Offending names: {} Use the option `group_artifacts_of_same_name=True` if you want a list of proxies to be returned under the respective key. """.format({n: len(a) for n, a in multi.items()})) return lazy_dict(lambdas)
def get_adjustments(self, zero_qtr_data, requested_qtr_data, last_per_qtr, dates, assets, columns, **kwargs): """ Creates an AdjustedArray from the given estimates data for the given dates. Parameters ---------- zero_qtr_data : pd.DataFrame The 'time zero' data for each calendar date per sid. requested_qtr_data : pd.DataFrame The requested quarter data for each calendar date per sid. last_per_qtr : pd.DataFrame A DataFrame with a column MultiIndex of [self.estimates.columns, normalized_quarters, sid] that allows easily getting the timeline of estimates for a particular sid for a particular quarter. dates : pd.DatetimeIndex The calendar dates for which estimates data is requested. assets : pd.Int64Index An index of all the assets from the raw data. columns : list of BoundColumn The columns for which adjustments need to be calculated. kwargs : Additional keyword arguments that should be forwarded to `get_adjustments_for_sid` and to be used in computing adjustments for each sid. Returns ------- col_to_all_adjustments : dict[int -> AdjustedArray] A dictionary of all adjustments that should be applied. """ zero_qtr_data.sort_index(inplace=True) # Here we want to get the LAST record from each group of records # corresponding to a single quarter. This is to ensure that we select # the most up-to-date event date in case the event date changes. quarter_shifts = zero_qtr_data.groupby( level=[SID_FIELD_NAME, NORMALIZED_QUARTERS] ).nth(-1) col_to_all_adjustments = {} sid_to_idx = dict(zip(assets, range(len(assets)))) quarter_shifts.groupby(level=SID_FIELD_NAME).apply( self.get_adjustments_for_sid, dates, requested_qtr_data, last_per_qtr, sid_to_idx, columns, col_to_all_adjustments, **kwargs ) return col_to_all_adjustments