我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用pandas.TimeGrouper()。
def quiet_days(series, n_days=5, period='min'): """ Given a pandas time series *series*, return a mapping between months and the *n_days* quietest days for that month. See (4) of Love and Gannon. The parameter *period* gives the sampling period (`'min'` = 1 measurement per minute). """ quiet_day_map = {} delta_H_i = series.diff().abs().\ groupby(PD.TimeGrouper(freq='D')).\ filter(lambda x: x.isnull().sum() <= int(0.5 * SAMPLES[period])).\ groupby(PD.TimeGrouper(freq='D')).mean() for month, delta_H_i_month in delta_H_i.groupby(PD.TimeGrouper(freq='M')): quiet_day_map[month] = delta_H_i_month.nsmallest(n_days).sort_index().index return quiet_day_map
def chebyshev_fit(series, quiet_day_map, deg=10): """ ??? """ grouped_by_day = series.groupby(PD.TimeGrouper(freq='D')) x = [] y = [] for month_end in sorted(quiet_day_map): for quiet_day in quiet_day_map[month_end]: quiet_day_H = grouped_by_day.get_group(quiet_day) quiet_day_H = quiet_day_H[quiet_day_H.notnull()] x.extend([toJ2000(time_stamp.to_datetime()) for time_stamp in quiet_day_H.index]) y.extend(quiet_day_H.values) return NP.polynomial.chebyshev.chebfit(x, y, deg, full=True)
def test_dates(self): for t in self.mock_tweets: self.feels_db.insert_tweet(t) self.assertEqual(len(self.feels_db.tweet_dates), 3) tweets = [] with open(self.tweets_data_path) as tweets_file: lines = filter(None, (line.rstrip() for line in tweets_file)) for line in lines: try: tweets.append(Tweet(json.loads(line))) except KeyError: pass for t in tweets: self.feels_db.insert_tweet(t) self.assertEqual(len(self.feels_db.tweet_dates), 105) df = self.feels_db.tweet_dates timebox = timedelta(seconds=60) second = timedelta(seconds=1) df = df.groupby(pd.TimeGrouper(freq=f'{int(timebox/second)}S')).size() df = df[df != 0] print(df) self.assertEqual(len(df), 3) self.assertEqual(df.iloc[0], 103)
def test_resample_basic(self): rng = date_range('1/1/2000 00:00:00', '1/1/2000 00:13:00', freq='min', name='index') s = Series(np.random.randn(14), index=rng) result = s.resample('5min', closed='right', label='right').mean() exp_idx = date_range('1/1/2000', periods=4, freq='5min', name='index') expected = Series([s[0], s[1:6].mean(), s[6:11].mean(), s[11:].mean()], index=exp_idx) assert_series_equal(result, expected) self.assertEqual(result.index.name, 'index') result = s.resample('5min', closed='left', label='right').mean() exp_idx = date_range('1/1/2000 00:05', periods=3, freq='5min', name='index') expected = Series([s[:5].mean(), s[5:10].mean(), s[10:].mean()], index=exp_idx) assert_series_equal(result, expected) s = self.series result = s.resample('5Min').last() grouper = TimeGrouper(Minute(5), closed='left', label='left') expect = s.groupby(grouper).agg(lambda x: x[-1]) assert_series_equal(result, expect)
def test_resample_frame_basic(self): df = tm.makeTimeDataFrame() b = TimeGrouper('M') g = df.groupby(b) # check all cython functions work funcs = ['add', 'mean', 'prod', 'min', 'max', 'var'] for f in funcs: g._cython_agg_general(f) result = df.resample('A').mean() assert_series_equal(result['A'], df['A'].resample('A').mean()) result = df.resample('M').mean() assert_series_equal(result['A'], df['A'].resample('M').mean()) df.resample('M', kind='period').mean() df.resample('W-WED', kind='period').mean()
def test_resample_ohlc(self): s = self.series grouper = TimeGrouper(Minute(5)) expect = s.groupby(grouper).agg(lambda x: x[-1]) result = s.resample('5Min').ohlc() self.assertEqual(len(result), len(expect)) self.assertEqual(len(result.columns), 4) xs = result.iloc[-2] self.assertEqual(xs['open'], s[-6]) self.assertEqual(xs['high'], s[-6:-1].max()) self.assertEqual(xs['low'], s[-6:-1].min()) self.assertEqual(xs['close'], s[-2]) xs = result.iloc[0] self.assertEqual(xs['open'], s[0]) self.assertEqual(xs['high'], s[:5].max()) self.assertEqual(xs['low'], s[:5].min()) self.assertEqual(xs['close'], s[4])
def test_apply_iteration(self): # #2300 N = 1000 ind = pd.date_range(start="2000-01-01", freq="D", periods=N) df = DataFrame({'open': 1, 'close': 2}, index=ind) tg = TimeGrouper('M') _, grouper, _ = tg._get_grouper(df) # Errors grouped = df.groupby(grouper, group_keys=False) f = lambda df: df['close'] / df['open'] # it works! result = grouped.apply(f) self.assertTrue(result.index.equals(df.index))
def test_panel_aggregation(self): ind = pd.date_range('1/1/2000', periods=100) data = np.random.randn(2, len(ind), 4) wp = pd.Panel(data, items=['Item1', 'Item2'], major_axis=ind, minor_axis=['A', 'B', 'C', 'D']) tg = TimeGrouper('M', axis=1) _, grouper, _ = tg._get_grouper(wp) bingrouped = wp.groupby(grouper) binagg = bingrouped.mean() def f(x): assert (isinstance(x, Panel)) return x.mean(1) result = bingrouped.agg(f) tm.assert_panel_equal(result, binagg)
def test_fails_on_no_datetime_index(self): index_names = ('Int64Index', 'Index', 'Float64Index', 'MultiIndex') index_funcs = (tm.makeIntIndex, tm.makeUnicodeIndex, tm.makeFloatIndex, lambda m: tm.makeCustomIndex(m, 2)) n = 2 for name, func in zip(index_names, index_funcs): index = func(n) df = DataFrame({'a': np.random.randn(n)}, index=index) with tm.assertRaisesRegexp(TypeError, "Only valid with DatetimeIndex, " "TimedeltaIndex or PeriodIndex, " "but got an instance of %r" % name): df.groupby(TimeGrouper('D')) # PeriodIndex gives a specific error message df = DataFrame({'a': np.random.randn(n)}, index=tm.makePeriodIndex(n)) with tm.assertRaisesRegexp(TypeError, "axis must be a DatetimeIndex, but " "got an instance of 'PeriodIndex'"): df.groupby(TimeGrouper('D'))
def get_ts_data(): requested_facts = text_input.value.split(",")[:8] requested_facts = [f.strip() for f in requested_facts] req_df = preprocessing.get_facts_from_list(df, requested_facts) ts = preprocessing.make_timeseries(req_df) ts.columns = ts.columns.droplevel(0) ts = ts.groupby(pd.TimeGrouper(freq=timegroupoptionsmapper[timegroup.active])).sum() return ts
def get_daily_gain(data, timezone=pytz.timezone('US/Pacific')): """Obtain the daily gain. Attributes ---------- data: list The list of dictionaries where each dictionary gives information on one completed submission. It is the output of the `get_data` function timezone: pytz.timezone A valid pytz timezone. Default is the Pacific Standard Time, which is the one used by Udacity Returns ------- A Pandas Series where the indices are the days and the values are the total gain in USD of that day. The time zone is the Pacific Standard Time. """ date_price_data = np.array([(d['completed_at'], d['price']) for d in data]) price_series = pd.Series(date_price_data[:, 1].astype(float), index=pd.to_datetime(date_price_data[:, 0])) price_series = price_series.sort_index() # Convert timezone utc = pytz.utc price_series = price_series.tz_localize(utc).tz_convert(timezone) # Calculate the gain by day daily_gain = price_series.groupby(pd.TimeGrouper('D')).sum() return daily_gain
def test_custom_grouper(self): dti = DatetimeIndex(freq='Min', start=datetime(2005, 1, 1), end=datetime(2005, 1, 10)) s = Series(np.array([1] * len(dti)), index=dti, dtype='int64') b = TimeGrouper(Minute(5)) g = s.groupby(b) # check all cython functions work funcs = ['add', 'mean', 'prod', 'ohlc', 'min', 'max', 'var'] for f in funcs: g._cython_agg_general(f) b = TimeGrouper(Minute(5), closed='right', label='right') g = s.groupby(b) # check all cython functions work funcs = ['add', 'mean', 'prod', 'ohlc', 'min', 'max', 'var'] for f in funcs: g._cython_agg_general(f) self.assertEqual(g.ngroups, 2593) self.assertTrue(notnull(g.mean()).all()) # construct expected val arr = [1] + [5] * 2592 idx = dti[0:-1:5] idx = idx.append(dti[-1:]) expect = Series(arr, index=idx) # GH2763 - return in put dtype if we can result = g.agg(np.sum) assert_series_equal(result, expect) df = DataFrame(np.random.rand(len(dti), 10), index=dti, dtype='float64') r = df.groupby(b).agg(np.sum) self.assertEqual(len(r.columns), 10) self.assertEqual(len(r.index), 2593)
def test_resample_nunique(self): # GH 12352 df = DataFrame({ 'ID': {pd.Timestamp('2015-06-05 00:00:00'): '0010100903', pd.Timestamp('2015-06-08 00:00:00'): '0010150847'}, 'DATE': {pd.Timestamp('2015-06-05 00:00:00'): '2015-06-05', pd.Timestamp('2015-06-08 00:00:00'): '2015-06-08'}}) r = df.resample('D') g = df.groupby(pd.Grouper(freq='D')) expected = df.groupby(pd.TimeGrouper('D')).ID.apply(lambda x: x.nunique()) self.assertEqual(expected.name, 'ID') for t in [r, g]: result = r.ID.nunique() assert_series_equal(result, expected) # TODO # this should have name # https://github.com/pydata/pandas/issues/12363 expected.name = None result = df.ID.resample('D').nunique() assert_series_equal(result, expected) result = df.ID.groupby(pd.Grouper(freq='D')).nunique() assert_series_equal(result, expected)
def test_count(self): self.ts[::3] = np.nan expected = self.ts.groupby(lambda x: x.year).count() grouper = TimeGrouper('A', label='right', closed='right') result = self.ts.groupby(grouper).count() expected.index = result.index assert_series_equal(result, expected) result = self.ts.resample('A').count() expected.index = result.index assert_series_equal(result, expected)
def test_groupby_with_empty(self): index = pd.DatetimeIndex(()) data = () series = pd.Series(data, index) grouper = pd.tseries.resample.TimeGrouper('D') grouped = series.groupby(grouper) assert next(iter(grouped), None) is None
def test_groupby_with_timegrouper(self): # GH 4161 # TimeGrouper requires a sorted index # also verifies that the resultant index has the correct name import datetime as DT df_original = DataFrame({ 'Buyer': 'Carl Carl Carl Carl Joe Carl'.split(), 'Quantity': [18, 3, 5, 1, 9, 3], 'Date': [ DT.datetime(2013, 9, 1, 13, 0), DT.datetime(2013, 9, 1, 13, 5), DT.datetime(2013, 10, 1, 20, 0), DT.datetime(2013, 10, 3, 10, 0), DT.datetime(2013, 12, 2, 12, 0), DT.datetime(2013, 9, 2, 14, 0), ] }) # GH 6908 change target column's order df_reordered = df_original.sort_values(by='Quantity') for df in [df_original, df_reordered]: df = df.set_index(['Date']) expected = DataFrame( {'Quantity': np.nan}, index=date_range('20130901 13:00:00', '20131205 13:00:00', freq='5D', name='Date', closed='left')) expected.iloc[[0, 6, 18], 0] = np.array( [24., 6., 9.], dtype='float64') result1 = df.resample('5D') .sum() assert_frame_equal(result1, expected) df_sorted = df.sort_index() result2 = df_sorted.groupby(pd.TimeGrouper(freq='5D')).sum() assert_frame_equal(result2, expected) result3 = df.groupby(pd.TimeGrouper(freq='5D')).sum() assert_frame_equal(result3, expected)
def test_groupby_with_timegrouper_methods(self): # GH 3881 # make sure API of timegrouper conforms import datetime as DT df_original = pd.DataFrame({ 'Branch': 'A A A A A B'.split(), 'Buyer': 'Carl Mark Carl Joe Joe Carl'.split(), 'Quantity': [1, 3, 5, 8, 9, 3], 'Date': [ DT.datetime(2013, 1, 1, 13, 0), DT.datetime(2013, 1, 1, 13, 5), DT.datetime(2013, 10, 1, 20, 0), DT.datetime(2013, 10, 2, 10, 0), DT.datetime(2013, 12, 2, 12, 0), DT.datetime(2013, 12, 2, 14, 0), ] }) df_sorted = df_original.sort_values(by='Quantity', ascending=False) for df in [df_original, df_sorted]: df = df.set_index('Date', drop=False) g = df.groupby(pd.TimeGrouper('6M')) self.assertTrue(g.group_keys) self.assertTrue(isinstance(g.grouper, pd.core.groupby.BinGrouper)) groups = g.groups self.assertTrue(isinstance(groups, dict)) self.assertTrue(len(groups) == 3)
def backtest(self, data_frame): dfs = data_frame.groupby(pd.TimeGrouper(freq='D')) # only choose trading days dfs = [(d, df) for (d, df) in dfs if df.shape[0]] if sys.version_info[0] < 3: exit_dfs = [self._compute_daily_performance(daily_data) for daily_data in dfs] else: exit_dfs = Parallel(n_jobs=-1, verbose=50)( delayed(self._compute_daily_performance)(daily_data) for daily_data in dfs) return pd.concat(exit_dfs)
def timeGroup (frame, step, file_csv): dfx = frame.set_index(['time']) dfx = dfx[dfx.status == 'accepted'] grouper = dfx.groupby([pd.TimeGrouper(step), 'Miner']) dfTimeS = grouper['Miner'].count().unstack('Miner').fillna(0) dfTimeS = dfTimeS[:-1] #There are arguments for and against this. The final group is not 'complete', so we will not include it. dfTimeS.to_csv(file_csv, encoding='utf-8') return dfTimeS #Summary tables
def calculate_latest_coeffs(self): unit_topic_tmpl = "{campus}/{building}/{unit}/{point}" unit_points = [self.power_name] df = None #Get data unit = self.temp_unit for point in unit_points: if point == self.power_name: unit = self.power_unit unit_topic = unit_topic_tmpl.format(campus=self.site, building=self.building, unit=unit, point=point) result = self.vip.rpc.call('platform.historian', 'query', topic=unit_topic, count=self.no_of_recs_needed, order="LAST_TO_FIRST").get(timeout=10000) df2 = pd.DataFrame(result['values'], columns=[self.ts_name, point]) df2[self.ts_name] = pd.to_datetime(df2[self.ts_name]) df2 = df2.groupby([pd.TimeGrouper(key=self.ts_name, freq=self.aggregate_freq)]).mean() # df2[self.ts_name] = df2[self.ts_name].apply(lambda dt: dt.replace(second=0, microsecond=0)) df = df2 if df is None else pd.merge(df, df2, how='outer', left_index=True, right_index=True) #Calculate coefficients result_df = self.calculate_coeffs(df) # Publish coeffs to store #if coeffs is not None: # self.save_coeffs(coeffs, subdevice)
def calculate_latest_coeffs(self): unit_topic_tmpl = "{campus}/{building}/{unit}/{point}" unit_points = [self.fan_power_name, self.static_pressure_name, self.air_flow_rate_name] df = None for point in unit_points: unit_topic = unit_topic_tmpl.format(campus=self.site, building=self.building, unit=self.unit, point=point) result = self.vip.rpc.call('platform.historian', 'query', topic=unit_topic, count=self.no_of_recs_needed, order="LAST_TO_FIRST").get(timeout=1000) df2 = pd.DataFrame(result['values'], columns=[self.ts_name, point]) self.convert_units_to_SI(df2, point, result['metadata']['units']) df2[self.ts_name] = pd.to_datetime(df2[self.ts_name]) df2 = df2.groupby([pd.TimeGrouper(key=self.ts_name, freq=self.aggregate_freq)]).mean() #df2[self.ts_name] = df2[self.ts_name].apply(lambda dt: dt.replace(second=0, microsecond=0)) df = df2 if df is None else pd.merge(df, df2, how='outer', left_index=True, right_index=True) #print(df) coeffs = self.calculate_coeffs(df) # Publish coeffs to store if coeffs is not None: self.save_coeffs(coeffs)
def Calculate_Liquidity_Coeff(df_list): #list of lists of liq coeff per bond liq_arr_list = [] #print 'df_list size: ', len(df_list) for df in df_list: if df.empty: continue # A temporary array for holding liquidity beta for each month liq_arr = [np.nan] * num_months_CONST #print df['cusip_id'][0] # Group dataframe on index by month for date, df_group in df.groupby(pd.TimeGrouper("M")): month = ''.join([str(date.month),str(date.year)]) month_key = month_keys[month] # When there are some data in current month, if df_group.shape[0] > 0: # Run regression (as equation (2)) to get liquidity measure y,X = dmatrices('excess_return_1 ~ yld_pt + volume_and_sign', data=df_group, return_type='dataframe') #print date, X.shape mod = sm.OLS(y,X) res = mod.fit() #set specific months with liquidity factors #res.params(2) = liquidity coefficient liq_arr[month_key] = res.params[2] liq_arr_list.append(liq_arr) #store all liq coeff for each month per bond return liq_arr_list
def wip_chart(cfd_data, frequency="1W-MON", start_column=None, end_column=None, title=None, ax=None): if len(cfd_data.index) == 0: raise UnchartableData("Cannot draw WIP chart with no data") if start_column is None: start_column = cfd_data.columns[1] if end_column is None: end_column = cfd_data.columns[-1] if ax is None: fig, ax = plt.subplots() if title is not None: ax.set_title(title) wip_data = pd.DataFrame({'wip': cfd_data[start_column] - cfd_data[end_column]}) groups = wip_data[['wip']].groupby(pd.TimeGrouper(frequency, label='left')) labels = [x[0].strftime("%d/%m/%Y") for x in groups] groups.boxplot(subplots=False, ax=ax, showmeans=True, return_type='axes') ax.set_xticklabels(labels, rotation=70, size='small') ax.set_xlabel("Week") ax.set_ylabel("WIP") return ax
def get_savings(data, column_debit='Debit', column_credit='Credit', dt_start=None, dt_end=None, aggregation_period='M'): """ Consumes the checking account data and returns the monthly savings rate. Args: data (dataframe): The panadas dataframe containing at least a debit and a credit column. column_debit (str): The column name for the debit column. column_credit (str): The column name for the credit column. dt_start (str): The start date (specific if given '2012-11-11' or the month '2012-11') from were the savings should be calculated. dt_end (str): The end date (specific if given '2012-11-11' or the month '2012-11') to were the savings should be calculated. aggregation_period (str): Single string character like 'M' for month specifying, over which period the savings are aggregated. A full specification can be found here: http://pandas.pydata.org/pandas-docs/stable/timeseries.html#timeseries-offset-aliases Returns: A pandas data frame, with an additional 'Savings' column and the time difference between start and end represented with a single row for each aggregation interval that is not null. """ if not isinstance(data.index, pd.DatetimeIndex): logging.getLogger().error("A pandas datetimeindex is required for the given dataframe") return pd.DataFrame() # create a copy of the indexed original data frame aggregated = data[dt_start:dt_end][[column_debit, column_credit]].copy() aggregated = aggregated.groupby(pd.TimeGrouper(aggregation_period)).sum() aggregated = aggregated.fillna(0) aggregated['Savings'] = aggregated[column_credit] - aggregated[column_debit] return aggregated
def test_basicStats(self): filepath = RESOURCE_PATH + "\\unittest\\test_hb_basic01.csv" data = utils.loadIntradayData(filepath).set_index('datetime') stats = hbStats.groupByBasicStats(pd.TimeGrouper(freq='d'), data) self.assertEqual(stats.iloc[0]['count'], 16) self.assertEqual(stats.iloc[0]['max'], 70) self.assertEqual(stats.iloc[0]['min'], 50) self.assertEqual(stats.iloc[0]['mean'], 60)
def plotYearMonthStatsHb(data): #pd.groupby(b,by=[b.index.month,b.index.year]) data.groupby(pd.TimeGrouper(freq='M')).mean().plot() sns.plt.show()
def avg_wl(self, numObs=50, avgtype='stdWL', grptype='bytime', grper='12M'): """Calculates standardized statistics for a list of stations or a huc from the USGS :param numObs: minimum observations per site required to include site in analysis; default is 50 :param avgtype: averaging technique for site data; options are 'avgDiffWL' and 'stdWL'; default is 'stWL' :param grptype: way to group the averaged data; options are 'bytime' or 'monthly' or user input; default 'bytime' :param grper: only used if 'bytime' called; defaults to '12M'; other times can be put in :return: """ data = self.cleanGWL(self.data) # stationWL = pd.merge(siteinfo, data, on = 'site_no') data.reset_index(inplace=True) data.set_index(['datetime'], inplace=True) # get averages by year, month, and site number site_size = data.groupby('site_no').size() wl_long = data[data['site_no'].isin(list(site_size[site_size >= numObs].index.values))] siteList = list(wl_long.site_no.unique()) for site in siteList: mean = wl_long.ix[wl_long.site_no == site, 'value'].mean() std = wl_long.ix[wl_long.site_no == site, 'value'].std() wl_long.ix[wl_long.site_no == site, 'avgDiffWL'] = wl_long.ix[wl_long.site_no == site, 'value'] - mean wl_long.ix[wl_long.site_no == site, 'stdWL'] = wl_long.ix[wl_long.site_no == site, 'avgDiffWL'] / std if grptype == 'bytime': grp = pd.TimeGrouper(grper) elif grptype == 'monthly': grp = wl_long.index.month else: grp = grptype wl_stats = wl_long.groupby([grp])[avgtype].agg({'mean': np.mean, 'median': np.median, 'standard': np.std, 'cnt': (lambda x: np.count_nonzero(~np.isnan(x))), 'err_pls': (lambda x: np.mean(x) + (np.std(x) * 1.96)), 'err_min': (lambda x: np.mean(x) - (np.std(x) * 1.96))}) return wl_stats
def fetchbin(self, start=None, end=None, binsize=timedelta(seconds=60), empty=False): """ Returns a generator that can be used to iterate over the tweet data based on ``binsize``. :param start: Query start date. :type start: datetime :param end: Query end date. :type end: datetime :param binsize: Time duration for each bin for tweet grouping. :type binsize: timedelta :param empty: Determines whether empty dataframes will be yielded. :type empty: boolean :returns: A dataframe along with time boundaries for the data. :rtype: tuple """ second = timedelta(seconds=1) if start is None: start=self.start-second if end is None: end=self.end if start == self.start: start = start-second df = self.tweet_dates df = df.groupby(pd.TimeGrouper(freq=f'{int(binsize/second)}S')).size() df = df[df.index > start - binsize] if not empty: df = df[df != 0] conn = sqlite3.connect(self._db, detect_types=sqlite3.PARSE_DECLTYPES) c = conn.cursor() c.execute( "SELECT * FROM tweets WHERE created_at > ? AND created_at <= ?", (start, end) ) for i in range(0,len(df)): frame = [] if df.iloc[i] > 0: frame = pd.DataFrame.from_records( data=c.fetchmany(df.iloc[i]), columns=self.fields, index='created_at' ) left = df.index[i].to_pydatetime() right = left + binsize if len(frame)>0 or empty: yield TweetBin(frame, left, right) c.close()
def update(attrname, old, new): subset = get_subset(dictchooser.value) new_absolute_source = subset[0] \ .ix[:, :top_n.value] \ .groupby(pd.TimeGrouper(freq=timegroupoptionsmapper[timegroup.active])) \ .sum().fillna(0) new_relative_source = subset[1] \ .ix[:, :top_n.value] \ .groupby(pd.TimeGrouper(freq=timegroupoptionsmapper[timegroup.active])) \ .sum().fillna(0) for old, new in zip(abs_arrangement, new_absolute_source.columns.tolist()): old.title.text = new for old, new in zip(rel_arrangement, new_relative_source.columns.tolist()): old.title.text = new new_abs_sources = [ColumnDataSource(dict(date=new_absolute_source.index, y=new_absolute_source[l])) for l in new_absolute_source.columns.tolist()] new_rel_sources = [ColumnDataSource(dict(date=new_relative_source.index, y=new_relative_source[l])) for l in new_relative_source.columns.tolist()] for old, new in zip(abs_sources, new_abs_sources): old.data.update(new.data) for old, new in zip(rel_sources, new_rel_sources): old.data.update(new.data) new_abs_point_sources = [ColumnDataSource(dict(date=[new_absolute_source[l].idxmax()], y=[new_absolute_source[l].max()], text=[str(int(new_absolute_source[l].max()))] ) ) for l in new_absolute_source.columns.tolist()] new_rel_point_sources = [ColumnDataSource(dict(date=[new_relative_source[l].idxmax()], y=[new_relative_source[l].max()], text=[str(int(new_relative_source[l].max()))] ) ) for l in new_relative_source.columns.tolist()] for old, new in zip(abs_point_sources, new_abs_point_sources): old.data.update(new.data) for old, new in zip(rel_point_sources, new_rel_point_sources): old.data.update(new.data) # Create Input controls
def run(tags, time_span, step): data_sets = [Data(time_span, step, t) for t in tags] data_frames = [] step_fmt = period_map.get(step) for ss in data_sets: if not ss.df.empty: ss.df['start_time'] = pd.to_datetime(ss.df['start']) ss.df['end_time'] = pd.to_datetime(ss.df['end']) ss.df.drop('start', axis=1, inplace=True) # Drop `start` column. ss.df.drop('end', axis=1, inplace=True) # Drop `end` column. ss.df.drop('tags', axis=1, inplace=True) # Drop `tags` column. ss.df['duration'] = (ss.df['end_time'] - ss.df['start_time']) ss.df['duration'] = ( (ss.df['duration'] / np.timedelta64(1, 's')) / 60 ) ss.df['interval'] = ss.df.end_time.dt.to_period(step_fmt) ss.df = ss.df.set_index('interval') # `interval` column as index. ss.df.drop('start_time', axis=1, inplace=True) # Drop `start_time`. ss.df.drop('end_time', axis=1, inplace=True) # Drop `end_time`. ss.df = ss.df.groupby( pd.TimeGrouper(step_fmt), level=0, ).aggregate( np.sum ) ss.df.rename(columns={'duration': ss.tag}, inplace=True) data_frames.append(ss.df) result = pd.concat(data_frames, axis=1) if step_fmt == 'D': result = result.to_timestamp() # `PeriodIndex` to `DatetimeIndex`. result = result.asfreq('D', fill_value=0) # Fill missing days. plot = result.plot(kind='bar') plot.set_title('Minutes spent by {p}'.format(p=step)) plot.set_xlabel('{p}s'.format(p=step)) plot.set_ylabel('minutes') plt.show()
def check_downsampled_term(self, term): # June 2014 # Mo Tu We Th Fr Sa Su # 1 # 2 3 4 5 6 7 8 # 9 10 11 12 13 14 15 # 16 17 18 19 20 21 22 # 23 24 25 26 27 28 29 # 30 all_sessions = self.nyse_sessions compute_dates = all_sessions[ all_sessions.slice_indexer('2014-06-05', '2015-01-06') ] start_date, end_date = compute_dates[[0, -1]] pipe = Pipeline({ 'year': term.downsample(frequency='year_start'), 'quarter': term.downsample(frequency='quarter_start'), 'month': term.downsample(frequency='month_start'), 'week': term.downsample(frequency='week_start'), }) # Raw values for term, computed each day from 2014 to the end of the # target period. raw_term_results = self.run_pipeline( Pipeline({'term': term}), start_date=pd.Timestamp('2014-01-02', tz='UTC'), end_date=pd.Timestamp('2015-01-06', tz='UTC'), )['term'].unstack() expected_results = { 'year': (raw_term_results .groupby(pd.TimeGrouper('AS')) .first() .reindex(compute_dates, method='ffill')), 'quarter': (raw_term_results .groupby(pd.TimeGrouper('QS')) .first() .reindex(compute_dates, method='ffill')), 'month': (raw_term_results .groupby(pd.TimeGrouper('MS')) .first() .reindex(compute_dates, method='ffill')), 'week': (raw_term_results .groupby(pd.TimeGrouper('W', label='left')) .first() .reindex(compute_dates, method='ffill')), } results = self.run_pipeline(pipe, start_date, end_date) for frequency in expected_results: result = results[frequency].unstack() expected = expected_results[frequency] assert_frame_equal(result, expected)
def test_aggregate_normal(self): # check TimeGrouper's aggregation is identical as normal groupby n = 20 data = np.random.randn(n, 4) normal_df = DataFrame(data, columns=['A', 'B', 'C', 'D']) normal_df['key'] = [1, 2, 3, 4, 5] * 4 dt_df = DataFrame(data, columns=['A', 'B', 'C', 'D']) dt_df['key'] = [datetime(2013, 1, 1), datetime(2013, 1, 2), datetime(2013, 1, 3), datetime(2013, 1, 4), datetime(2013, 1, 5)] * 4 normal_grouped = normal_df.groupby('key') dt_grouped = dt_df.groupby(TimeGrouper(key='key', freq='D')) for func in ['min', 'max', 'prod', 'var', 'std', 'mean']: expected = getattr(normal_grouped, func)() dt_result = getattr(dt_grouped, func)() expected.index = date_range(start='2013-01-01', freq='D', periods=5, name='key') assert_frame_equal(expected, dt_result) for func in ['count', 'sum']: expected = getattr(normal_grouped, func)() expected.index = date_range(start='2013-01-01', freq='D', periods=5, name='key') dt_result = getattr(dt_grouped, func)() assert_frame_equal(expected, dt_result) # GH 7453 for func in ['size']: expected = getattr(normal_grouped, func)() expected.index = date_range(start='2013-01-01', freq='D', periods=5, name='key') dt_result = getattr(dt_grouped, func)() assert_series_equal(expected, dt_result) """ for func in ['first', 'last']: expected = getattr(normal_grouped, func)() expected.index = date_range(start='2013-01-01', freq='D', periods=5, name='key') dt_result = getattr(dt_grouped, func)() assert_frame_equal(expected, dt_result) for func in ['nth']: expected = getattr(normal_grouped, func)(3) expected.index = date_range(start='2013-01-01', freq='D', periods=5, name='key') dt_result = getattr(dt_grouped, func)(3) assert_frame_equal(expected, dt_result) """ # if TimeGrouper is used included, 'first','last' and 'nth' doesn't # work yet
def test_transform(self): data = Series(np.arange(9) // 3, index=np.arange(9)) index = np.arange(9) np.random.shuffle(index) data = data.reindex(index) grouped = data.groupby(lambda x: x // 3) transformed = grouped.transform(lambda x: x * x.sum()) self.assertEqual(transformed[7], 12) # GH 8046 # make sure that we preserve the input order df = DataFrame( np.arange(6, dtype='int64').reshape( 3, 2), columns=["a", "b"], index=[0, 2, 1]) key = [0, 0, 1] expected = df.sort_index().groupby(key).transform( lambda x: x - x.mean()).groupby(key).mean() result = df.groupby(key).transform(lambda x: x - x.mean()).groupby( key).mean() assert_frame_equal(result, expected) def demean(arr): return arr - arr.mean() people = DataFrame(np.random.randn(5, 5), columns=['a', 'b', 'c', 'd', 'e'], index=['Joe', 'Steve', 'Wes', 'Jim', 'Travis']) key = ['one', 'two', 'one', 'two', 'one'] result = people.groupby(key).transform(demean).groupby(key).mean() expected = people.groupby(key).apply(demean).groupby(key).mean() assert_frame_equal(result, expected) # GH 8430 df = tm.makeTimeDataFrame() g = df.groupby(pd.TimeGrouper('M')) g.transform(lambda x: x - 1) # GH 9700 df = DataFrame({'a': range(5, 10), 'b': range(5)}) result = df.groupby('a').transform(max) expected = DataFrame({'b': range(5)}) tm.assert_frame_equal(result, expected)
def calculate_latest_coeffs(self): unit_topic_tmpl = "{campus}/{building}/{unit}/{point}" topic_tmpl = "{campus}/{building}/{unit}/{subdevice}/{point}" unit_points = [self.out_temp_name, self.supply_temp_name] zone_points = [self.zone_temp_name, self.air_flow_rate_name] df = None for point in unit_points: unit_topic = unit_topic_tmpl.format(campus=self.site, building=self.building, unit=self.unit, point=point) result = self.vip.rpc.call('platform.historian', 'query', topic=unit_topic, count=self.no_of_recs_needed, order="LAST_TO_FIRST").get(timeout=1000) df2 = pd.DataFrame(result['values'], columns=[self.ts_name, point]) self.convert_units_to_SI(df2, point, result['metadata']['units']) df2[self.ts_name] = pd.to_datetime(df2[self.ts_name]) df2 = df2.groupby([pd.TimeGrouper(key=self.ts_name, freq=self.aggregate_freq)]).mean() #df2[self.ts_name] = df2[self.ts_name].apply(lambda dt: dt.replace(second=0, microsecond=0)) df = df2 if df is None else pd.merge(df, df2, how='outer', left_index=True, right_index=True) for subdevice in self.subdevices: for point in zone_points: # Query data from platform historian topic = topic_tmpl.format(campus=self.site, building=self.building, unit=self.unit, subdevice=subdevice, point=point) result = self.vip.rpc.call('platform.historian', 'query', topic=topic, count=self.no_of_recs_needed, order="LAST_TO_FIRST").get(timeout=1000) # Merge new point data to df df2 = pd.DataFrame(result['values'], columns=[self.ts_name, point]) self.convert_units_to_SI(df2, point, result['metadata']['units']) df2[self.ts_name] = pd.to_datetime(df2[self.ts_name]) df2 = df2.groupby([pd.TimeGrouper(key=self.ts_name, freq=self.aggregate_freq)]).mean() #df2[self.ts_name] = df2[self.ts_name].apply(lambda dt: dt.replace(second=0, microsecond=0)) df = pd.merge(df, df2, how='outer', left_index=True, right_index=True) #print(df) coeffs = self.calculate_coeffs(df) # Publish coeffs to store if coeffs is not None: self.save_coeffs(coeffs, subdevice)
def id_to_member_mapping(fileobject, time_bins_size='1min', tz='US/Eastern'): """Creates a mapping from badge id to member, for each time bin, from proximity data file. Parameters ---------- fileobject : file or iterable list of str The proximity data, as an iterable of JSON strings. time_bins_size : str The size of the time bins used for resampling. Defaults to '1min'. tz : str The time zone used for localization of dates. Defaults to 'US/Eastern'. Returns ------- pd.Series : A mapping from badge id to member, indexed by datetime and id. """ def readfile(fileobject): for line in fileobject: data = json.loads(line)['data'] yield (data['timestamp'], mac_address_to_id(data['badge_address']), str(data['member'])) df = pd.DataFrame(readfile(fileobject), columns=['timestamp', 'id', 'member']) # Convert the timestamp to a datetime, localized in UTC df['datetime'] = pd.to_datetime(df['timestamp'], unit='s', utc=True) \ .dt.tz_localize('UTC').dt.tz_convert(tz) del df['timestamp'] # Group by id and resample df = df.groupby([ pd.TimeGrouper(time_bins_size, key='datetime'), 'id' ]).first() df.sort_index(inplace=True) return df['member']
def voltages(fileobject, time_bins_size='1min', tz='US/Eastern'): """Creates a DataFrame of voltages, for each member and time bin. Parameters ---------- fileobject : file or iterable list of str The proximity data, as an iterable of JSON strings. time_bins_size : str The size of the time bins used for resampling. Defaults to '1min'. tz : str The time zone used for localization of dates. Defaults to 'US/Eastern'. Returns ------- pd.Series : Voltages, indexed by datetime and member. """ def readfile(fileobject): for line in fileobject: data = json.loads(line)['data'] yield (data['timestamp'], str(data['member']), float(data['voltage'])) df = pd.DataFrame(readfile(fileobject), columns=['timestamp', 'member', 'voltage']) # Convert the timestamp to a datetime, localized in UTC df['datetime'] = pd.to_datetime(df['timestamp'], unit='s', utc=True) \ .dt.tz_localize('UTC').dt.tz_convert(tz) del df['timestamp'] # Group by id and resample df = df.groupby([ pd.TimeGrouper(time_bins_size, key='datetime'), 'member' ]).mean() df.sort_index(inplace=True) return df['voltage']
def member_to_badge_proximity(fileobject, time_bins_size='1min', tz='US/Eastern'): """Creates a member-to-badge proximity DataFrame from a proximity data file. Parameters ---------- fileobject : file or iterable list of str The proximity data, as an iterable of JSON strings. time_bins_size : str The size of the time bins used for resampling. Defaults to '1min'. tz : str The time zone used for localization of dates. Defaults to 'US/Eastern'. Returns ------- pd.DataFrame : The member-to-badge proximity data. """ def readfile(fileobject): for line in fileobject: data = json.loads(line)['data'] for (observed_id, distance) in data['rssi_distances'].items(): yield ( data['timestamp'], str(data['member']), int(observed_id), float(distance['rssi']), float(distance['count']), ) df = pd.DataFrame( readfile(fileobject), columns=('timestamp', 'member', 'observed_id', 'rssi', 'count') ) # Convert timestamp to datetime for convenience, and localize to UTC df['datetime'] = pd.to_datetime(df['timestamp'], unit='s', utc=True) \ .dt.tz_localize('UTC').dt.tz_convert(tz) del df['timestamp'] # Group per time bins, member and observed_id, # and take the first value, arbitrarily df = df.groupby([ pd.TimeGrouper(time_bins_size, key='datetime'), 'member', 'observed_id' ]).first() # Sort the data df.sort_index(inplace=True) return df