我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用statistics.pstdev()。
def PSTDEV(df, n, price='Close', mu=None): """ Population standard deviation of data """ pstdev_list = [] i = 0 while i < len(df[price]): if i + 1 < n: pstdev = float('NaN') else: start = i + 1 - n end = i + 1 pstdev = statistics.pstdev(df[price][start:end], mu) pstdev_list.append(pstdev) i += 1 return pstdev_list
def diagnosticity(evaluations): """Return the diagnosticity of a piece of evidence given its evaluations against a set of hypotheses. :param evaluations: an iterable of iterables of Eval for a piece of evidence """ # The "diagnosticity" needs to capture how well the evidence separates/distinguishes the hypotheses. If we don't # show a preference between consistent/inconsistent, STDDEV captures this intuition OK. However, in the future, # we may want to favor evidence for which hypotheses are inconsistent. Additionally, we may want to calculate # "marginal diagnosticity" which takes into the rest of the evidence. # (1) calculate the consensus for each hypothesis # (2) map N/A to neutral because N/A doesn't help determine consistency of the evidence # (3) calculate the population standard deviation of the evidence. It's more reasonable to consider the set of # hypotheses at a given time to be the population of hypotheses than as a "sample" (although it doesn't matter # much because we're comparing across hypothesis sets of the same size) na_neutral = map(mean_na_neutral_vote, evaluations) # pylint: disable=bad-builtin try: return statistics.pstdev(filter(None.__ne__, na_neutral)) # pylint: disable=bad-builtin except statistics.StatisticsError: return 0.0
def pstdev(self): return statistics.pstdev(self.price) # ?????
def calculator(test, arg, n_runs, only_result,workpath=os.getcwd()): w = [] for x in range(n_runs): if only_result: process = subprocess.Popen( arg, cwd=workpath, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) process.wait() else: process = subprocess.Popen(arg, cwd=workpath, shell=True) process.wait() with open(os.path.join(workpath,"Benchmark.txt")) as f: for line in f.readlines(): w.append(float(line)) click.secho( "\nThe results of the benchmark are (all speeds in items/sec) : \n", bold=True) click.secho( "\nTest = '{0}' Iterations = '{1}'\n".format(test, n_runs), bold=True) click.secho( "\nMean : {0} Median : {1} Std Dev : {2}\n".format( statistics.mean(w), statistics.median(w), statistics.pstdev(w)), bold=True) os.remove(os.path.join(workpath,"Benchmark.txt"))
def getstdev(l, mean): """ Get the mean of the values in the list, or NaN if there is none. """ if len(l) > 0: return statistics.pstdev(l, mean) else: return float('nan')
def stdev(arr): """ Compute the standard deviation. """ if sys.version_info >= (3, 0): import statistics return statistics.pstdev(arr) else: # Dependency on NumPy try: import numpy return numpy.std(arr, axis=0) except ImportError: return 0.
def main(): print(stats.mean(range(6))) print(stats.median(range(6))) print(stats.median_low(range(6))) print(stats.median_high(range(6))) print(stats.median_grouped(range(6))) try: print(stats.mode(range(6))) except Exception as e: print(e) print(stats.mode(list(range(6)) + [3])) print(stats.pstdev(list(range(6)) + [3])) print(stats.stdev(list(range(6)) + [3])) print(stats.pvariance(list(range(6)) + [3])) print(stats.variance(list(range(6)) + [3]))
def __init__(self, dout_pin, pd_sck_pin, gain_channel_A=128, select_channel='A'): if (isinstance(dout_pin, int) and isinstance(pd_sck_pin, int)): # just chack of it is integer self._pd_sck = pd_sck_pin # init pd_sck pin number self._dout = dout_pin # init data pin number else: raise TypeError('dout_pin and pd_sck_pin have to be pin numbers.\nI have got dout_pin: '\ + str(dout_pin) + \ ' and pd_sck_pin: ' + str(pd_sck_pin) + '\n') self._gain_channel_A = 0 # init to 0 self._offset_A_128 = 0 # init offset for channel A and gain 128 self._offset_A_64 = 0 # init offset for channel A and gain 64 self._offset_B = 0 # init offset for channel B self._last_raw_data_A_128 = 0 # init last data to 0 for channel A and gain 128 self._last_raw_data_A_64 = 0 # init last data to 0 for channelA and gain 64 self._last_raw_data_B = 0 # init last data to 0 for channel B self._wanted_channel = '' # init to empty string self._current_channel = '' # init to empty string self._scale_ratio_A_128 = 1 # init to 1 self._scale_ratio_A_64 = 1 # init to 1 self._scale_ratio_B = 1 # init to 1 self._debug_mode = False # init debug mode to False self._pstdev_filter = True # pstdev filter is by default ON GPIO.setmode(GPIO.BCM) # set GPIO pin mode to BCM numbering GPIO.setup(self._pd_sck, GPIO.OUT) # pin _pd_sck is output only GPIO.setup(self._dout, GPIO.IN) # pin _dout is input only self.select_channel(select_channel) # call select channel function self.set_gain_A(gain_channel_A) # init gain for channel A ############################################################ # select_channel function evaluates if the desired channel # # is valid and then sets the _wanted_channel. # # If returns True then OK # # INPUTS: channel ('A'|'B') # # OUTPUTS: BOOL # ############################################################
def get_current_channel(self): return self._current_channel ############################################################ # get pstdev filter status returns True if turned on. # # INPUTS: none # # OUTPUTS: INT # ############################################################
def test_stdev(self): s = Stat(self.data) print(s) assert s.stdev == statistics.pstdev(self.data)
def __init__(self, data): self.mean = st.mean(data) self.median = st.median(data) self.stdev = st.pstdev(data)
def pstdev(text): """ Finds the population standard deviation of a space-separated list of numbers. Example:: /pstdev 33 54 43 65 43 62 """ return format_output(statistics.pstdev(parse_numeric_list(text)))
def setup(): commands.add(mean) commands.add(median) commands.add(median_low) commands.add(median_high) commands.add(median_grouped) commands.add(mode) commands.add(pstdev) commands.add(pvariance) commands.add(stdev) commands.add(variance)
def discover_benches(publish_or_subscribe): for f in os.listdir(os.curdir): if not f.startswith(publish_or_subscribe + '-'): continue with open(f) as fi: datalines = [l.split() for l in fi.readlines() if l[0].isdigit()] rpss = [float(d[1]) for d in datalines] rps_avg = s.mean(rpss) rps_stddev = s.pstdev(rpss) parts = f.split('-') size = int(parts[-1]) yield ('-'.join(parts[1:-1]), size, rps_avg, rps_stddev)
def print_summary_statistics(stats): from statistics import mean, median, pstdev print("benchmark min mean sd median max") for benchmark, samples in stats.items(): if samples: metrics = [int(round(fn(samples), 0)) for fn in [min, mean, pstdev, median, max]] else: metrics = [0, 0, 0, 0, 0] print(benchmark, *metrics)
def uploadresult(test, w): commit = get_latest_commit('scrapy', 'scrapy') data = { 'commitid': commit['html_url'].rsplit('/', 1)[-1], 'branch': 'default', # Always use default for trunk/master/tip 'project': 'Scrapy-Bench', 'executable': 'bench.py', 'benchmark': test, 'environment': get_env(), 'result_value': statistics.mean(w), } data.update({ 'revision_date': current_date, # Optional. Default is taken either # from VCS integration or from current date 'result_date': current_date, # Optional, default is current date 'std_dev': statistics.pstdev(w), # Optional. Default is blank #'max': 4001.6, # Optional. Default is blank #'min': 3995.1, # Optional. Default is blank }) params = urllib.urlencode(data).encode("utf-8") response = "None" print("Saving result for executable %s, revision %s, benchmark %s" % ( data['executable'], data['commitid'], data['benchmark'])) f = urllib2.urlopen(CODESPEED_URL + 'result/add/', params) response = f.read() f.close() print("Server (%s) response: %s\n" % (CODESPEED_URL, response))
def calculator( test, arg, n_runs, only_result, upload_result=False, workpath=os.getcwd()): w = [] for x in range(n_runs): if only_result: process = subprocess.Popen( arg, cwd=workpath, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) process.wait() else: process = subprocess.Popen(arg, cwd=workpath, shell=True) process.wait() with open(os.path.join(workpath, "Benchmark.txt")) as f: for line in f.readlines(): w.append(float(line)) click.secho( "\nThe results of the benchmark are (all speeds in items/sec) : \n", bold=True) click.secho( "\nTest = '{0}' Iterations = '{1}'\n".format(test, n_runs), bold=True) click.secho( "\nMean : {0} Median : {1} Std Dev : {2}\n".format( statistics.mean(w), statistics.median(w), statistics.pstdev(w)), bold=True) if upload_result: codespeedinfo.uploadresult(test, w) os.remove(os.path.join(workpath, "Benchmark.txt"))
def get_raw_data_mean(self, times=1): backup_channel = self._current_channel # do backup of current channel befor reading for later use backup_gain = self._gain_channel_A # backup of gain channel A if times > 0 and times < 100: # check if times is in required range data_list = [] # create empty list for i in range(times): # for number of times read and add up all readings. data_list.append(self._read()) # append every read value to the list if times > 2 and self._pstdev_filter: # if times is > 2 filter the data data_pstdev = stat.pstdev(data_list) # calculate population standard deviation from the data data_mean = stat.mean(data_list) # calculate mean from the collected data max_num = data_mean + data_pstdev # calculate max number which is within pstdev min_num = data_mean - data_pstdev # calculate min number which is within pstdev filtered_data = [] # create new list for filtered data if data_pstdev <=100: # is pstdev is less than 100 it is ok self._save_last_raw_data(backup_channel, backup_gain, data_mean) # save last data return data_mean # just return the calculated mean for index,num in enumerate(data_list): # now I know that pstdev is greater then iterate through the list if (num > min_num and num < max_num): # check if the number is within pstdev filtered_data.append(num) # then append to the filtered data list if self._debug_mode: print('data_list: ' + str(data_list)) print('filtered_data lsit: ' + str(filtered_data)) print('pstdev data: ' + str(data_pstdev)) print('pstdev filtered data: ' + str(stat.pstdev(filtered_data))) print('mean data_list: ' + str(stat.mean(data_list))) print('mean filtered_data: ' + str(stat.mean(filtered_data))) f_data_mean = stat.mean(filtered_data) # calculate mean from filtered data self._save_last_raw_data(backup_channel, backup_gain, f_data_mean) # save last data return f_data_mean # return mean from filtered data else: data_mean = stat.mean(data_list) # calculate mean from the list self._save_last_raw_data(backup_channel, backup_gain, data_mean) # save last data return data_mean # times was 2 or less just return mean else: raise ValueError('function "get_raw_data_mean" parameter "times" has to be in range 1 up to 99.\n I have got: '\ + str(times)) ############################################################ # get_data_mean returns average value of readings minus # # offset for the particular channel which was read. # # If return False something is wrong. Try debug mode. # # INPUTS: times # how many times to read data. Default 1 # # OUTPUTS: INT | BOOL # ############################################################
def get_normalized_scores(entries, args): """Generate smoothed scores based on mean, stdev of that days times. """ times_by_date = defaultdict(dict) for e in entries: x = e.seconds # if x > 0: # x = np.log(x) times_by_date[e.date][e.userid] = x sorted_dates = sorted(times_by_date.keys()) # failures come with a heaver ranking penalty MAX_SCORE = 1.5 FAILURE_PENALTY = -2 def mk_score(mean, t, stdev): if t < 0: return FAILURE_PENALTY if stdev == 0: return 0 score = (mean - t) / stdev return np.clip(score, -MAX_SCORE, MAX_SCORE) # scores are the stdev away from mean of that day scores = {} for date, user_times in times_by_date.items(): times = [t for t in user_times.values() if t is not None] # make failures 1 minute worse than the worst time times = [t if t >= 0 else max(times) + 60 for t in times] q1, q3 = np.percentile(times, [25,75]) stdev = statistics.pstdev(times) o1, o3 = q1 - stdev, q3 + stdev times = [t for t in times if o1 <= t <= o3] mean = statistics.mean(times) stdev = statistics.pstdev(times, mean) scores[date] = { userid: mk_score(mean, t, stdev) for userid, t in user_times.items() if t is not None } new_score_weight = 1 - args.smooth running = defaultdict(list) weighted_scores = defaultdict(dict) for date in sorted_dates: for user, score in scores[date].items(): old_score = running.get(user) new_score = score * new_score_weight + old_score * (1 - new_score_weight) \ if old_score is not None else score weighted_scores[user][date] = running[user] = new_score ticker = matplotlib.ticker.MultipleLocator(base=0.25) formatter = matplotlib.ticker.ScalarFormatter(useOffset=False) return weighted_scores, ticker, formatter
def findConcentrationAux(tagClassification): featList=vars(tagClassification['a1'][0]) del featList['tE'] del featList['tag'] del featList['bBox'] del featList['center'] feats2Rec=copy.deepcopy(featList) del feats2Rec['Coord'] del feats2Rec['Style'] del feats2Rec['coG'] l1=[symbol for symbol in tagClassification if symbol[-1]=='1'] l2=[symbol for symbol in tagClassification if symbol[-1]=='2'] l3=[symbol for symbol in tagClassification if symbol[-1]=='3'] standevs={} for fe in feats2Rec: for llet in tagClassification: for i in range(len(tagClassification[llet])): new=True for j in range(len(vars(tagClassification[llet][i])[fe])): if math.isnan(vars(tagClassification[llet][i])[fe][j]) and new: tagClassification[llet][i].computeFeatures() new=False for symbol in tagClassification: for i in range(len(tagClassification[symbol])): if tagClassification[symbol][i].Style=='diagonal': tagClassification[symbol][i].Style=[[1,1]] elif tagClassification[symbol][i].Style=='horizontal': tagClassification[symbol][i].Style=[[1,2]] elif tagClassification[symbol][i].Style=='vertical': tagClassification[symbol][i].Style=[[2,1]] elif tagClassification[symbol][i].Style=='closed': tagClassification[symbol][i].Style=[[2,2]] for feature in featList: print feature if feature=='Coord' or feature=='Style' or feature=='coG': standevs[feature]=[math.sqrt((((float(float(np.nansum([statistics.pstdev([vars(mostra)[feature][i][0] for mostra in tagClassification[symbol]])/np.mean([vars(mostra)[feature][i][0] for mostra in tagClassification[symbol]]) for i in range(len(vars(tagClassification[symbol][0])[feature]))]))/len(vars(tagClassification[symbol][0])[feature])))/(float((np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j][0] for mostra in tagClassification[altSymbol]] for altSymbol in l1]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j][0] for mostra in tagClassification[altSymbol]] for altSymbol in l1])) for j in range(len(vars(tagClassification['a1'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature]))+(np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j][0] for mostra in tagClassification[altSymbol]] for altSymbol in l2]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j][0] for mostra in tagClassification[altSymbol]] for altSymbol in l2])) for j in range(len(vars(tagClassification['+2'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature]))+(np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j][0] for mostra in tagClassification[altSymbol]] for altSymbol in l3]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j][0] for mostra in tagClassification[altSymbol]] for altSymbol in l3])) for j in range(len(vars(tagClassification['A3'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature])))/3))**2)+(((float(float(np.nansum([statistics.pstdev([vars(mostra)[feature][i][1] for mostra in tagClassification[symbol]])/np.mean([vars(mostra)[feature][i][1] for mostra in tagClassification[symbol]]) for i in range(len(vars(tagClassification[symbol][0])[feature]))]))/len(vars(tagClassification[symbol][0])[feature])))/(float((np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j][1] for mostra in tagClassification[altSymbol]] for altSymbol in l1]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j][1] for mostra in tagClassification[altSymbol]] for altSymbol in l1])) for j in range(len(vars(tagClassification['a1'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature]))+(np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j][1] for mostra in tagClassification[altSymbol]] for altSymbol in l2]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j][1] for mostra in tagClassification[altSymbol]] for altSymbol in l2])) for j in range(len(vars(tagClassification['+2'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature]))+(np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j][1] for mostra in tagClassification[altSymbol]] for altSymbol in l3]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j][1] for mostra in tagClassification[altSymbol]] for altSymbol in l3])) for j in range(len(vars(tagClassification['A3'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature])))/3))**2)) for symbol in tagClassification] else: standevs[feature]=[((float(float(np.nansum([statistics.pstdev([vars(mostra)[feature][i] for mostra in tagClassification[symbol]])/np.mean([vars(mostra)[feature][i] for mostra in tagClassification[symbol]]) for i in range(len(vars(tagClassification[symbol][0])[feature]))]))/len(vars(tagClassification[symbol][0])[feature])))/(float((np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j] for mostra in tagClassification[altSymbol]] for altSymbol in l1]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j] for mostra in tagClassification[altSymbol]] for altSymbol in l1])) for j in range(len(vars(tagClassification['a1'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature]))+(np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j] for mostra in tagClassification[altSymbol]] for altSymbol in l2]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j] for mostra in tagClassification[altSymbol]] for altSymbol in l2])) for j in range(len(vars(tagClassification['+2'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature]))+(np.nansum([statistics.pstdev(reduce(operator.add,[[vars(mostra)[feature][j] for mostra in tagClassification[altSymbol]] for altSymbol in l3]))/np.mean(reduce(operator.add,[[vars(mostra)[feature][j] for mostra in tagClassification[altSymbol]] for altSymbol in l3])) for j in range(len(vars(tagClassification['A3'][0])[feature]))])/len(vars(tagClassification[symbol][0])[feature])))/3)) for symbol in tagClassification] print standevs if os.path.isfile('varStandarDevs.txt'): os.remove('varStandarDevs.txt') f = open('varStandarDevs.txt','wb') pickle.dump(standevs,f) f.close() #PonderateByConcentration: Retorna la ponderacio de les features segons les desviacions estandar que carrega del sistema