我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用statsmodels.api.add_constant()。
def build_model(y, x, add_constant=True): ''' Build a linear regression model from the provided data Provided: y: a series or single column dataframe holding our solution vector for linear regression x: a dataframe that runs parallel to y, with all the features for our linear regression add_constant: a boolean, if true it will add a constant row to our provided x data. Otherwise this method assumes you've done-so already, or do not want one for some good reason Return: a linear regression model from StatsModels and the data which was used to train the model. If add_constant was true this will be a new dataframe, otherwise it will be x ''' if add_constant: x = sm.add_constant(x) model = sm.OLS(y, x).fit() return (model, x)
def lm(data, xseq, **params): """ Fit OLS / WLS if data has weight """ X = sm.add_constant(data['x']) Xseq = sm.add_constant(xseq) try: model = sm.WLS(data['y'], X, weights=data['weight']) except KeyError: model = sm.OLS(data['y'], X) results = model.fit(**params['method_args']) data = pd.DataFrame({'x': xseq}) data['y'] = results.predict(Xseq) if params['se']: alpha = 1 - params['level'] prstd, iv_l, iv_u = wls_prediction_std( results, Xseq, alpha=alpha) data['se'] = prstd data['ymin'] = iv_l data['ymax'] = iv_u return data
def gls(data, xseq, **params): """ Fit GLS """ X = sm.add_constant(data['x']) Xseq = sm.add_constant(xseq) results = sm.GLS(data['y'], X).fit(**params['method_args']) data = pd.DataFrame({'x': xseq}) data['y'] = results.predict(Xseq) if params['se']: alpha = 1 - params['level'] prstd, iv_l, iv_u = wls_prediction_std( results, Xseq, alpha=alpha) data['se'] = prstd data['ymin'] = iv_l data['ymax'] = iv_u return data
def __get_tail_weighted_time_period(self): """ Method to approximate the weighted-average development age assuming exponential tail fit. Returns: float32 """ #n = self.triangle.ncol-1 #y = self.f[:n] #x = np.array([i + 1 for i in range(len(y))]) #ldf_reg = stats.linregress(x, np.log(y - 1)) #time_pd = (np.log(self.f[n] - 1) - ldf_reg[1]) / ldf_reg[0] n = self.triangle.ncol-1 y = Series(self.f[:n]) x = [num+1 for num, item in enumerate(y)] y.index = x x = sm.add_constant((y.index)[y>1]) y = y[y>1] ldf_reg = sm.OLS(np.log(y-1),x).fit() time_pd = (np.log(self.f[n] - 1) - ldf_reg.params[0]) / ldf_reg.params[1] #tail_factor = np.exp(tail_model.params[0] + np.array([i+2 for i in range(n,n+100)]) * tail_model.params[1]).astype(float) + 1) return time_pd
def __get_MCL_model(self): modelsI=[] modelsP=[] for i in range(len(self.Incurred.data.columns)): modelsI.append(cl.WRTO(self.Incurred.data.iloc[:,i].dropna(), self.Paid.data.iloc[:,i].dropna(), w=1/self.Incurred.data.iloc[:,i].dropna())) modelsP.append(cl.WRTO(self.Paid.data.iloc[:,i].dropna(), self.Incurred.data.iloc[:,i].dropna(), w=1/self.Paid.data.iloc[:,i].dropna())) q_f = np.array([item.coefficient for item in modelsI]) qinverse_f = np.array([item.coefficient for item in modelsP]) rhoI_sigma = np.array([item.sigma for item in modelsI]) rhoP_sigma = np.array([item.sigma for item in modelsP]) #y = np.log(rhoI_sigma[:-1]) #x = np.array([i + 1 for i in range(len(y))]) #x = sm.add_constant(x) #OLS = sm.OLS(y,x).fit() #tailsigma = np.exp((x[:,1][-1]+ 1) * OLS.params[1] + OLS.params[0]) return rhoI_sigma, rhoP_sigma, q_f, qinverse_f
def _check_wls(self, x, y, weights): with tm.assert_produces_warning(FutureWarning, check_stacklevel=False): result = ols(y=y, x=x, weights=1 / weights) combined = x.copy() combined['__y__'] = y combined['__weights__'] = weights combined = combined.dropna() endog = combined.pop('__y__').values aweights = combined.pop('__weights__').values exog = sm.add_constant(combined.values, prepend=False) sm_result = sm.WLS(endog, exog, weights=1 / aweights).fit() assert_almost_equal(sm_result.params, result._beta_raw) assert_almost_equal(sm_result.resid, result._resid_raw) self.checkMovingOLS('rolling', x, y, weights=weights) self.checkMovingOLS('expanding', x, y, weights=weights)
def checkOLS(self, exog, endog, x, y): reference = sm.OLS(endog, sm.add_constant(exog, prepend=False)).fit() with tm.assert_produces_warning(FutureWarning, check_stacklevel=False): result = ols(y=y, x=x) # check that sparse version is the same with tm.assert_produces_warning(FutureWarning, check_stacklevel=False): sparse_result = ols(y=y.to_sparse(), x=x.to_sparse()) _compare_ols_results(result, sparse_result) assert_almost_equal(reference.params, result._beta_raw) assert_almost_equal(reference.df_model, result._df_model_raw) assert_almost_equal(reference.df_resid, result._df_resid_raw) assert_almost_equal(reference.fvalue, result._f_stat_raw[0]) assert_almost_equal(reference.pvalues, result._p_value_raw) assert_almost_equal(reference.rsquared, result._r2_raw) assert_almost_equal(reference.rsquared_adj, result._r2_adj_raw) assert_almost_equal(reference.resid, result._resid_raw) assert_almost_equal(reference.bse, result._std_err_raw) assert_almost_equal(reference.tvalues, result._t_stat_raw) assert_almost_equal(reference.cov_params(), result._var_beta_raw) assert_almost_equal(reference.fittedvalues, result._y_fitted_raw) _check_non_raw_results(result)
def housing(request): result = HOUSING_RESULTS[request.param] keys = request.param.split('-') mod = MODELS[keys[0]] data = HOUSING_DATA endog = data.rent exog = sm.add_constant(data.pcturban) instd = data.hsngval instr = data[['faminc', 'region']] cov_opts = deepcopy(COV_OPTIONS[keys[1]]) cov_opts['debiased'] = keys[2] == 'small' if keys[0] == 'gmm': weight_opts = deepcopy(COV_OPTIONS[keys[1]]) weight_opts['weight_type'] = weight_opts['cov_type'] del weight_opts['cov_type'] else: weight_opts = {} model_result = mod(endog, exog, instd, instr, **weight_opts).fit(**cov_opts) return model_result, result
def test_multiple(data): dependent = data.set_index(['nr', 'year']).lwage exog = sm.add_constant(data.set_index(['nr', 'year'])[['expersq', 'married', 'union']]) res = PanelOLS(dependent, exog, entity_effects=True, time_effects=True).fit() res2 = PanelOLS(dependent, exog, entity_effects=True).fit(cov_type='clustered', cluster_entity=True) exog = sm.add_constant(data.set_index(['nr', 'year'])[['married', 'union']]) res3 = PooledOLS(dependent, exog).fit() exog = data.set_index(['nr', 'year'])[['exper']] res4 = RandomEffects(dependent, exog).fit() comp = compare([res, res2, res3, res4]) assert len(comp.rsquared) == 4 d = dir(comp) for value in d: if value.startswith('_'): continue getattr(comp, value)
def test_multiple_no_effects(data): dependent = data.set_index(['nr', 'year']).lwage exog = sm.add_constant(data.set_index(['nr', 'year'])[['expersq', 'married', 'union']]) res = PanelOLS(dependent, exog).fit() exog = sm.add_constant(data.set_index(['nr', 'year'])[['married', 'union']]) res3 = PooledOLS(dependent, exog).fit() exog = data.set_index(['nr', 'year'])[['exper']] res4 = RandomEffects(dependent, exog).fit() comp = compare(dict(a=res, model2=res3, model3=res4)) assert len(comp.rsquared) == 3 d = dir(comp) for value in d: if value.startswith('_'): continue getattr(comp, value) compare(OrderedDict(a=res, model2=res3, model3=res4))
def compute(self, method='logistic'): """ Compute propensity score and measures of goodness-of-fit Parameters ---------- method : str Propensity score estimation method. Either 'logistic' or 'probit' """ predictors = sm.add_constant(self.covariates, prepend=False) if method == 'logistic': model = sm.Logit(self.treatment, predictors).fit(disp=False, warn_convergence=True) elif method == 'probit': model = sm.Probit(self.treatment, predictors).fit(disp=False, warn_convergence=True) else: raise ValueError('Unrecognized method') return model.predict()
def run_glm(X, y, model_name): """ Train the binomial/negative binomial GLM Args: X (np.array): scaled X. y (pd.df): four columns response table. Returns: sm.model: trained GLM models. """ # Add const manually. sm.add_constant cannot add 1 for shape (1, n) X = np.c_[X, np.ones(X.shape[0])] if model_name == 'Binomial': logger.info('Building binomial GLM') # make two columns response (# success, # failure) y_binom = np.zeros((y.shape[0], 2), dtype=np.int_) y_binom[:, 0] = y.nMut y_binom[:, 1] = y.length * y.N - y.nMut glm = sm.GLM(y_binom, X, family=sm.families.Binomial()) elif model_name == 'NegativeBinomial': logger.info('Building negative binomial GLM') # use nMut as response and length*N as exposure glm = sm.GLM(y.nMut.values, X, family=sm.families.NegativeBinomial(), exposure=y.length.values*y.N.values+1) else: sys.stderr.write('Unknown GLM name {}. Must be Binomial or NegativeBinomial'.format(model_name)) sys.exit(1) model = glm.fit() return model
def rlm(data, xseq, **params): """ Fit RLM """ X = sm.add_constant(data['x']) Xseq = sm.add_constant(xseq) results = sm.RLM(data['y'], X).fit(**params['method_args']) data = pd.DataFrame({'x': xseq}) data['y'] = results.predict(Xseq) if params['se']: warnings.warn("Confidence intervals are not yet implemented" "for RLM smoothing.") return data
def glm(data, xseq, **params): """ Fit GLM """ X = sm.add_constant(data['x']) Xseq = sm.add_constant(xseq) results = sm.GLM(data['y'], X).fit(**params['method_args']) data = pd.DataFrame({'x': xseq}) data['y'] = results.predict(Xseq) if params['se']: # TODO: Depends on statsmodel > 0.7 # https://github.com/statsmodels/statsmodels/pull/2151 # https://github.com/statsmodels/statsmodels/pull/3406 # Remove the try/except when a compatible version is released try: prediction = results.get_prediction(Xseq) ci = prediction.conf_int(1 - params['level']) data['ymin'] = ci[:, 0] data['ymax'] = ci[:, 1] except (AttributeError, TypeError): warnings.warn( "Cannot compute confidence intervals." "Install latest/development version of statmodels.") return data
def get_tail_factor(self, colname_sep='-'): """Estimate tail factor, idea from Thomas Mack: Returns a tail factor based off of an exponential fit to the LDFs. This will fail if the product of 2nd and 3rd to last LDF < 1.0001. This also fails if the estimated tail is larger than 2.0. In other areas, exponential fit is rejected if the slope parameter p-value >0.5. This is currently representative of the R implementation of this package, but may be enhanced in the future to be p-value based. Parameters: colname_sep : str text to join the names of two adjacent columns representing the age-to-age factor column name. Returns: Pandas.Series of the tail factor. """ LDF = np.array(self.get_LDF()[:self.triangle.ncol-1]) if self.tail==False: tail_factor=1 elif len(LDF[LDF>1]) < 2: warn("Not enough factors larger than 1.0 to fit an exponential regression.") tail_factor = 1 elif (LDF[-3] * LDF[-2] > 1.0001): y = Series(LDF) x = sm.add_constant((y.index+1)[y>1]) y = LDF[LDF>1] n, = np.where(LDF==y[-1])[0] tail_model = sm.OLS(np.log(y-1),x).fit() tail_factor = np.product(np.exp(tail_model.params[0] + np.array([i+2 for i in range(n,n+100)]) * tail_model.params[1]).astype(float) + 1) if tail_factor > 2: warn("The estimate tail factor was bigger than 2 and has been reset to 1.") tail_factor = 1 if tail_model.f_pvalue > 0.05: warn("The p-value of the exponential tail fit is insignificant and tail has been set to 1.") tail_factor = 1 else: warn("LDF[-2] * LDF[-1] is not greater than 1.0001 and tail has been set to 1.") tail_factor = 1 return Series(tail_factor, index = [self.triangle.data.columns[-1] + colname_sep + 'Ult'])
def model_fit_and_test(TrainX,TrainY,TestX,TestY): def bulid_model(model_name): model = model_name() return model #for model_name in [LinearRegression, Ridge, Lasso, ElasticNet, KNeighborsRegressor, DecisionTreeRegressor, SVR,RandomForestRegressor, AdaBoostRegressor, GradientBoostingRegressor]: for model_name in [LinearRegression, ElasticNet]: model = bulid_model(model_name) model.fit(TrainX,TrainY) print(model_name) resid = model.predict(TestX) - TestY #print resid print("Residual sum of squares: %f"% np.mean(resid ** 2)) #print model.predict(TestX) #print TestY # Explained variance score: 1 is perfect prediction plt.scatter(model.predict(TestX), resid); plt.axhline(0, color='red') plt.xlabel('Predicted Values') plt.ylabel('Residuals') #plt.xlim([1, 50]) plt.show() print('Variance score: %.2f' % model.score(TestX, TestY)) from statsmodels.stats.stattools import jarque_bera _, pvalue, _, _ = jarque_bera(resid) print ("Test Residuals Normal", pvalue) from statsmodels import regression, stats import statsmodels.api as sms import statsmodels.stats.diagnostic as smd # xs_with_constant = sms.add_constant(np.column_stack((X1,X2,X3,X4))) xs_with_constant = sms.add_constant(TestX) _, pvalue1, _, _ = stats.diagnostic.het_breushpagan(resid, xs_with_constant) print ("Test Heteroskedasticity", pvalue1) ljung_box = smd.acorr_ljungbox(resid, lags=10) #print "Lagrange Multiplier Statistics:", ljung_box[0] print "Test Autocorrelation P-values:", ljung_box[1] if any(ljung_box[1] < 0.05): print "The residuals are autocorrelated." else: print "The residuals are not autocorrelated."
def apply_half_life(self, time_series): lag = np.roll(time_series, 1) lag[0] = 0 ret = time_series - lag ret[0] = 0 # adds intercept terms to X variable for regression lag2 = sm.add_constant(lag) model = sm.OLS(ret, lag2) res = model.fit() self.half_life = -np.log(2) / res.params[1]
def hedge_ratio(Y, X): # Look into using Kalman Filter to calculate the hedge ratio X = sm.add_constant(X) model = sm.OLS(Y, X).fit() return model.params[1]
def run_logistic_regression(df): # Logistic regression X = df['pageviews_cumsum'] X = sm.add_constant(X) y = df['is_conversion'] logit = sm.Logit(y, X) logistic_regression_results = logit.fit() print(logistic_regression_results.summary()) return logistic_regression_results
def predict_probabilities(logistic_regression_results): # Predict the conversion probability for 0 up till 50 pageviews X = sm.add_constant(range(0, 50)) y_hat = logistic_regression_results.predict(X) df_hat = pd.DataFrame(zip(X[:, 1], y_hat)) df_hat.columns = ['X', 'y_hat'] p_conversion_25_pageviews = df_hat.ix[25]['y_hat'] print("") print("The probability of converting after 25 pageviews is {}".format(p_conversion_25_pageviews))
def run_logistic_regression(self): # Logistic regression X = self.df['pageviews_cumsum'] X = sm.add_constant(X) y = self.df['is_conversion'] logit = sm.Logit(y, X) self.logistic_regression_results = logit.fit() print self.logistic_regression_results.summary()
def predict_probabilities(self): # Predict the conversion probability for 0 up till 50 pageviews X = sm.add_constant(range(0, 50)) y_hat = self.logistic_regression_results.predict(X) df_hat = pd.DataFrame(zip(X[:, 1], y_hat)) df_hat.columns = ['X', 'y_hat'] print "" print "For example, the probability of converting after 25 pageviews is {}".format(df_hat.ix[25]['y_hat'])
def ols(self, x, y): xflatten = np.delete(x, [i*(x.shape[0]+1)for i in range(x.shape[0])]) yflatten = np.delete(y, [i*(y.shape[0]+1)for i in range(y.shape[0])]) xflatten = sm.add_constant(xflatten) model = sm.OLS(yflatten,xflatten) results = model.fit() print results.summary()
def regression_apply(row, timepoints, weighted): """ :py:meth:`pandas.DataFrame.apply` apply function for calculating enrichment using linear regression. If *weighted* is ``True`` perform weighted least squares; else perform ordinary least squares. Weights for weighted least squares are included in *row*. Returns a :py:class:`pandas.Series` containing regression coefficients, residuals, and statistics. """ # retrieve log ratios from the row y = row[['L_{}'.format(t) for t in timepoints]] # re-scale the x's to fall within [0, 1] xvalues = [x / float(max(timepoints)) for x in timepoints] # perform the fit X = sm.add_constant(xvalues) # fit intercept if weighted: W = row[['W_{}'.format(t) for t in timepoints]] fit = sm.WLS(y, X, weights=W).fit() else: fit = sm.OLS(y, X).fit() # re-format as a data frame row values = np.concatenate([fit.params, [fit.bse['x1'], fit.tvalues['x1'], fit.pvalues['x1']], fit.resid]) index = ['intercept', 'slope', 'SE_slope', 't', 'pvalue_raw'] + \ ['e_{}'.format(t) for t in timepoints] return pd.Series(data=values, index=index)
def data(): return AttrDict(dep=SIMULATED_DATA.y_robust, exog=add_constant(SIMULATED_DATA[['x3', 'x4', 'x5']]), endog=SIMULATED_DATA[['x1', 'x2']], instr=SIMULATED_DATA[['z1', 'z2']])
def construct_model(key): model, nendog, nexog, ninstr, weighted, var, other = key.split('-') var = var.replace('wmatrix', 'vce') mod = MODELS[model] data = SIMULATED_DATA endog = data[['x1', 'x2']] if '2' in nendog else data.x1 exog = data[['x3', 'x4', 'x5']] instr = data[['z1', 'z2']] if '2' in ninstr else data.z1 deps = {'vce(unadjusted)': data.y_unadjusted, 'vce(robust)': data.y_robust, 'vce(cluster cluster_id)': data.y_clustered, 'vce(hac bartlett 12)': data.y_kernel} dep = deps[var] if 'noconstant' not in other: exog = sm.add_constant(data[['x3', 'x4', 'x5']]) cov_opts = deepcopy(SIMULATED_COV_OPTIONS[var]) cov_opts['debiased'] = 'small' in other mod_options = {} if 'True' in weighted: mod_options['weights'] = data.weights if model == 'gmm': mod_options.update(deepcopy(SIMULATED_COV_OPTIONS[var])) mod_options['weight_type'] = mod_options['cov_type'] del mod_options['cov_type'] mod_options['center'] = 'center' in other model_result = mod(dep, exog, endog, instr, **mod_options).fit(**cov_opts) if model == 'gmm' and 'True' in weighted: pytest.skip('Weighted GMM differs slightly') return model_result
def test_2sls_direct(data): mod = IV2SLS(data.dep, add_constant(data.exog), data.endog, data.instr) res = mod.fit() x = np.c_[add_constant(data.exog), data.endog] z = np.c_[add_constant(data.exog), data.instr] y = data.y xhat = z @ pinv(z) @ x params = pinv(xhat) @ y assert_allclose(res.params, params.ravel()) # This is just a quick smoke check of results get_all(res)
def test_incorrect_type(data): dependent = data.set_index(['nr', 'year']).lwage exog = sm.add_constant(data.set_index(['nr', 'year'])[['expersq', 'married', 'union']]) mod = PanelOLS(dependent, exog) res = mod.fit() mod2 = IV2SLS(mod.dependent.dataframe, mod.exog.dataframe, None, None) res2 = mod2.fit() with pytest.raises(TypeError): compare(dict(model1=res, model2=res2))
def fit_ols(y, x, idx=-1): ols = OLS(y, add_constant(x)) results = ols.fit() return results.params.values[idx], results.cov_params().values[idx, idx]
def weak_instruments(self, n_sims=20): np.random.seed(1692) model = feedforward.FeedForwardModel(19, 1, dense_size=60, n_dense_layers=2) treatment_effects = [] ols_betas, ols_ses = [], [] old_corrs, new_corrs = [], [] for _ in xrange(n_sims): df = self.treatment_gen.simulate_data(False) X = np.hstack((self.x, df['new_treat'].values[:, None])) Z = np.hstack((self.x, df['instrument'].values[:, None])) ols_beta, ols_se = self.fit_ols(df['treatment_effect'], X) ols_betas.append(ols_beta) ols_ses.append(ols_se) old_corr = df[['instrument', 'new_treat']].corr().values[0, 1] new_instrument, new_corr = model.fit_instruments(X, Z, df['treatment_effect'].values, batchsize=128) new_corrs.append(new_corr) old_corrs.append(old_corr) Z2 = Z.copy() Z2[:, -1] = new_instrument[:, 0] iv = IV2SLS(df['treatment_effect'].values.flatten(), add_constant(X), add_constant(Z2)) model.reset_params() if new_corr: logger.info("Old corr: %.2f, New corr: %.2f", np.mean(old_corrs), np.mean(new_corrs)) logger.info("Treatment effect (OLS): %.3f (%.4f)", np.mean(ols_betas), np.mean(ols_ses)) logger.info("Treatment effect: %.3f (%.4f)", np.mean(treatment_effects), np.std(treatment_effects))
def slope_create(F0,F1): # Calculate slopes (OLS) slope, intercept, r_value, p_value, std_err = stats.linregress(F0,F1) # Slope with robust regression x = sm.add_constant(F0) y = F1 rlm_results = sm.RLM(y,x, M=sm.robust.norms.HuberT()).fit() slope_r = rlm_results.params[-1] intercept_r = rlm_results.params[0] del x,y,rlm_results return slope, intercept, r_value, slope_r, intercept_r
def _residualNet(data,uselog=True,c=None,p=None,x=None,useaggregate=True,numericalControls=[],categoricalControls=[]): ''' Given the data on a bipartite network of the form source,target,flow Parameters ---------- data : pandas.DataFrame Raw data. It has source,target,volume (trade, number of people etc.). c,p,x : str (optional) Labels of the columns in data used for source,target,volume. If not provided it will use the first, second, and third. numericalControls : list List of columns to use as numerical controls. categoricalControls : list List of columns to use as categorical controls. uselog : boolean (True) If True it will use the logarithm of the provided weight. useaggregate : boolean (True) If true it will calculate the aggregate of the volume on both sides (c and p) and use as numbercal controls. Returns ------- net : pandas.Dataframe Table with c,p,x,x_res, where x_res is the residual of regressing x on the given control variables. ''' c = data.columns.values[0] if c is None else c p = data.columns.values[1] if p is None else p x = data.columns.values[2] if x is None else x data_ = data[[c,p,x]+numericalControls+categoricalControls] if useaggregate: data_ = merge(data_,data.groupby(c).sum()[[x]].reset_index().rename(columns={x:x+'_'+c})) data_ = merge(data_,data.groupby(p).sum()[[x]].reset_index().rename(columns={x:x+'_'+p})) numericalControls+=[x+'_'+c,x+'_'+p] if uselog: data_ = data_[data_[x]!=0] data_[x] = np.log10(data_[x]) if useaggregate: data_[x+'_'+c] = np.log10(data_[x+'_'+c]) data_[x+'_'+p] = np.log10(data_[x+'_'+p]) _categoricalControls = [] for var in ser(categoricalControls): vals = list(set(data_[var])) for v in vals[1:]: _categoricalControls.append(var+'_'+str(v)) data_[var+'_'+str(v)]=0 data_.loc[data_[var]==v,var+'_'+str(v)]=1 Y = data_[x].values X = data_[list(set(numericalControls))+list(set(_categoricalControls))].values X = sm.add_constant(X) model = sm.OLS(Y,X).fit() data_[x+'_res'] = Y-model.predict(X) return data_[[c,p,x,x+'_res']]
def wt_plot(self, pdf): """ Create a plot of the linear fit of the wild type variant. *pdf* is an open PdfPages instance. Only created for selections that use WLS or OLS scoring and have a wild type specified. Uses :py:func:`~plots.fit_axes` for the plotting. """ logging.info("Creating wild type fit plot", extra={'oname' : self.name}) # get the data and calculate log ratios if "variants" in self.labels: wt_label = "variants" elif "identifiers" in self.labels: wt_label = "identifiers" data = self.store.select("/main/{}/counts".format(wt_label), where='index = "{}"'.format(WILD_TYPE_VARIANT)).ix[0] sums = self.store['/main/{}/counts'.format(wt_label)].sum(axis="index") # sum of complete cases (N') yvalues = np.log(data + 0.5) - np.log(sums + 0.5) xvalues = [tp / float(max(self.timepoints)) for tp in self.timepoints] # fit the line X = sm.add_constant(xvalues) # fit intercept if self.scoring_method == "WLS": W = 1 / (1 / (data + 0.5) + 1 / (sums + 0.5)) fit = sm.WLS(yvalues, X, weights=W).fit() elif self.scoring_method == "OLS": fit = sm.OLS(yvalues, X).fit() else: raise ValueError('Invalid regression scoring method "{}" [{}]'.format(self.scoring_method, self.name)) intercept, slope = fit.params slope_se = fit.bse['x1'] # make the plot fig, ax = plt.subplots() fig.set_tight_layout(True) fit_axes(ax, xvalues, yvalues, slope, intercept, xlabels=self.timepoints) fit_axes_text(ax, cornertext="Slope {:3.2f}\nSE {:.1f}".format(slope, slope_se)) ax.set_title("Wild Type Shape\n{}".format(self.name)) ax.set_ylabel("Log Ratio (Complete Cases)") pdf.savefig(fig) plt.close(fig)
def test_linear_model_time_series(data): mod = TradedFactorModel(data.portfolios, data.factors) mod.fit() res = mod.fit() get_all(res) all_params = [] all_tstats = [] nobs, nport = data.portfolios.shape nf = data.factors.shape[1] e = np.zeros((nobs, (nport * (nf + 1)))) x = np.zeros((nobs, (nport * (nf + 1)))) factors = sm.add_constant(data.factors) loc = 0 for i in range(data.portfolios.shape[1]): if isinstance(data.portfolios, pd.DataFrame): p = data.portfolios.iloc[:, i:(i + 1)] else: p = data.portfolios[:, i:(i + 1)] ols_res = _OLS(p, factors).fit(cov_type='robust', debiased=True) all_params.extend(list(ols_res.params)) all_tstats.extend(list(ols_res.tstats)) x[:, loc:(loc + nf + 1)] = factors e[:, loc:(loc + nf + 1)] = ols_res.resids.values[:, None] loc += nf + 1 cov = res.cov.values[(nf + 1) * i:(nf + 1) * (i + 1), (nf + 1) * i:(nf + 1) * (i + 1)] ols_cov = ols_res.cov.values assert_allclose(cov, ols_cov) assert_allclose(res.params.values.ravel(), np.array(all_params)) assert_allclose(res.tstats.values.ravel(), np.array(all_tstats)) assert_allclose(res.risk_premia, np.asarray(factors).mean(0)[1:]) xpxi_direct = np.eye((nf + 1) * nport + nf) f = np.asarray(factors) fpfi = np.linalg.inv(f.T @ f / nobs) nfp1 = nf + 1 for i in range(nport): st, en = i * nfp1, (i + 1) * nfp1 xpxi_direct[st:en, st:en] = fpfi f = np.asarray(factors)[:, 1:] xe = np.c_[x * e, f - f.mean(0)[None, :]] xeex_direct = xe.T @ xe / nobs cov = xpxi_direct @ xeex_direct @ xpxi_direct / (nobs - nfp1) assert_allclose(cov, res.cov.values) alphas = np.array(all_params)[0::nfp1][:, None] alpha_cov = cov[0:(nfp1 * nport):nfp1, 0:(nfp1 * nport):nfp1] stat_direct = float(alphas.T @ np.linalg.inv(alpha_cov) @ alphas) assert_allclose(res.j_statistic.stat, stat_direct) assert_allclose(1.0 - stats.chi2.cdf(stat_direct, nport), res.j_statistic.pval)
def scatterColor(x0, y, w): """Creates scatter plot with points colored by variable. All input arrays must have matching lengths :param x0: x values to plot :type x0: list :param y: y values to plot :type y: list :param w: z values to plot :returns: plot; slope and intercept of the RLM best fit line shown on the plot .. warning:: all input arrays must have matching lengths and scalar values .. note:: See documentation at http://statsmodels.sourceforge.net/0.6.0/generated/statsmodels.robust.robust_linear_model.RLM.html for the RLM line """ import matplotlib as mpl import matplotlib.cm as cm import statsmodels.api as sm from scipy.stats import linregress cmap = plt.cm.get_cmap('RdYlBu') norm = mpl.colors.Normalize(vmin=w.min(), vmax=w.max()) m = cm.ScalarMappable(norm=norm, cmap=cmap) m.set_array(w) sc = plt.scatter(x0, y, label='', color=m.to_rgba(w)) xa = sm.add_constant(x0) est = sm.RLM(y, xa).fit() r2 = sm.WLS(y, xa, weights=est.weights).fit().rsquared slope = est.params[1] x_prime = np.linspace(np.min(x0), np.max(x0), 100)[:, np.newaxis] x_prime = sm.add_constant(x_prime) y_hat = est.predict(x_prime) const = est.params[0] y2 = [i * slope + const for i in x0] lin = linregress(x0, y) x1 = np.arange(np.min(x0), np.max(x0), 0.1) y1 = [i * lin[0] + lin[1] for i in x1] y2 = [i * slope + const for i in x1] plt.plot(x1, y1, c='g', label='simple linear regression m = {:.2f} b = {:.0f}, r^2 = {:.2f}'.format(lin[0], lin[1], lin[2] ** 2)) plt.plot(x1, y2, c='r', label='rlm regression m = {:.2f} b = {:.0f}, r2 = {:.2f}'.format(slope, const, r2)) plt.legend() cbar = plt.colorbar(m) cbar.set_label('Julian Date') return slope, const