我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用statistics.median()。
def get_median_problems_solved_per_user(eligible=True, scoring=True, user_breakdown=None): if user_breakdown is None: user_breakdown = get_team_member_solve_stats(eligible) solves = [] for tid, breakdown in user_breakdown.items(): for uid, ubreakdown in breakdown.items(): if ubreakdown is None: solved = 0 else: if 'correct' in ubreakdown: solved = ubreakdown['correct'] else: solved = 0 if solved > 0 or not scoring: solves += [solved] return statistics.median(solves)
def MEDIAN(df, n, price='Close'): """ Median (middle value) of data """ median_list = [] i = 0 while i < len(df[price]): if i + 1 < n: median = float('NaN') else: start = i + 1 - n end = i + 1 median = statistics.median(df[price][start:end]) median_list.append(median) i += 1 return median_list
def MEDIAN_LOW(df, n, price='Close'): """ Low median of data """ median_low_list = [] i = 0 while i < len(df[price]): if i + 1 < n: median_low = float('NaN') else: start = i + 1 - n end = i + 1 median_low = statistics.median_low(df[price][start:end]) median_low_list.append(median_low) i += 1 return median_low_list
def MEDIAN_HIGH(df, n, price='Close'): """ High median of data """ median_high_list = [] i = 0 while i < len(df[price]): if i + 1 < n: median_high = float('NaN') else: start = i + 1 - n end = i + 1 median_high = statistics.median_high(df[price][start:end]) median_high_list.append(median_high) i += 1 return median_high_list
def printWinSizeSummary(neighborTL): '''Given a list where index is genes and the values are neighbor genes, calculate the size of this window in bp for each gene. Return the mean and standard deviation.''' winL = [] for neighborT in neighborTL: winL.append(calcWinSize(neighborT,geneNames,geneInfoD)) median = statistics.median(winL) mean = statistics.mean(winL) stdev = statistics.stdev(winL) print(" median",round(median)) print(" mean",round(mean)) print(" stdev",round(stdev)) ## mods for core stuff (requires changing functions, so we move them here)
def evaluate_and_update_max_score(self, t, episodes): eval_stats = eval_performance( self.env, self.agent, self.n_runs, max_episode_len=self.max_episode_len, explorer=self.explorer, logger=self.logger) elapsed = time.time() - self.start_time custom_values = tuple(tup[1] for tup in self.agent.get_statistics()) mean = eval_stats['mean'] values = (t, episodes, elapsed, mean, eval_stats['median'], eval_stats['stdev'], eval_stats['max'], eval_stats['min']) + custom_values record_stats(self.outdir, values) if mean > self.max_score: update_best_model(self.agent, self.outdir, t, self.max_score, mean, logger=self.logger) self.max_score = mean return mean
def evaluate_and_update_max_score(self, t, episodes, env, agent): eval_stats = eval_performance( env, agent, self.n_runs, max_episode_len=self.max_episode_len, explorer=self.explorer, logger=self.logger) elapsed = time.time() - self.start_time custom_values = tuple(tup[1] for tup in agent.get_statistics()) mean = eval_stats['mean'] values = (t, episodes, elapsed, mean, eval_stats['median'], eval_stats['stdev'], eval_stats['max'], eval_stats['min']) + custom_values record_stats(self.outdir, values) with self._max_score.get_lock(): if mean > self._max_score.value: update_best_model( agent, self.outdir, t, self._max_score.value, mean, logger=self.logger) self._max_score.value = mean return mean
def runPutTest(testDataPath, testDataRangeStart, testDataRangeEnd, f): log.debug('running put tests...') timeStart = time.perf_counter() times = [time.perf_counter()] for i in range(testDataRangeStart, testDataRangeEnd): print(i) thisPath = '%s/%i' % (testDataPath, i) o = loadTestData(thisPath) f.putObject(o, str(i)) times.append(time.perf_counter()) timeEnd = time.perf_counter() log.warning('RESULT (PUT): total test runtime: %s seconds, mean per object: %s' % ( timeEnd - timeStart, ((timeEnd - timeStart) / testDataRangeEnd))) log.critical('RESULT (PUT): median result: %s ' % statistics.median(calculateTimeDeltas(times))) log.critical('RESULT (PUT): standard deviation result: %s ' % statistics.stdev(calculateTimeDeltas(times))) log.critical('RESULT (PUT): mean result: %s ' % statistics.mean(calculateTimeDeltas(times))) # log.critical('RESULT (PUT): individual times: %s ' % (calculateTimeDeltas(times)))
def runGetTest(testDataPath, testDataRangeStart, testDataRangeEnd, f): log.debug('running get tests...') timeStart = time.perf_counter() times = [time.perf_counter()] for i in range(testDataRangeStart, testDataRangeEnd): thisPath = '%s/%i' % (testDataPath, i) o = f.getObject(str(i)) saveTestData(o, thisPath) times.append(time.perf_counter()) timeEnd = time.perf_counter() log.critical('RESULT (GET): total test runtime: %s seconds, mean per object: %s' % ( timeEnd - timeStart, ((timeEnd - timeStart) / testDataRangeEnd))) log.critical('RESULT (GET): median result: %s ' % statistics.median(calculateTimeDeltas(times))) log.critical('RESULT (GET): standard deviation result: %s ' % statistics.stdev(calculateTimeDeltas(times))) log.critical('RESULT (GET): mean result: %s ' % statistics.mean(calculateTimeDeltas(times))) # log.critical('RESULT (GET): individual times: %s ' % (calculateTimeDeltas(times)))
def runDeleteTest(testDataRangeStart, testDataRangeEnd, f): log.debug('running delete tests...') timeStart = time.perf_counter() times = [time.perf_counter()] for i in range(testDataRangeStart, testDataRangeEnd): f.deleteObject(str(i)) times.append(time.perf_counter()) timeEnd = time.perf_counter() log.critical('RESULT (DELETE): total test runtime: %s seconds, mean per object: %s' % ( timeEnd - timeStart, ((timeEnd - timeStart) / testDataRangeEnd))) log.critical('RESULT (DELETE): median result: %s ' % statistics.median(calculateTimeDeltas(times))) log.critical('RESULT (DELETE): standard deviation result: %s ' % statistics.stdev(calculateTimeDeltas(times))) log.critical('RESULT (DELETE): mean result: %s ' % statistics.mean(calculateTimeDeltas(times))) # log.critical('RESULT (DELETE): individual times: %s ' % (calculateTimeDeltas(times))) ############################################################################### ###############################################################################
def eval_performance(rom, p_func, n_runs): assert n_runs > 1, 'Computing stdev requires at least two runs' scores = [] for i in range(n_runs): env = ale.ALE(rom, treat_life_lost_as_terminal=False) test_r = 0 while not env.is_terminal: s = chainer.Variable(np.expand_dims(dqn_phi(env.state), 0)) pout = p_func(s) a = pout.action_indices[0] test_r += env.receive_action(a) scores.append(test_r) print('test_{}:'.format(i), test_r) mean = statistics.mean(scores) median = statistics.median(scores) stdev = statistics.stdev(scores) return mean, median, stdev
def eval_performance(process_idx, make_env, model, phi, n_runs): assert n_runs > 1, 'Computing stdev requires at least two runs' scores = [] for i in range(n_runs): model.reset_state() env = make_env(process_idx, test=True) obs = env.reset() done = False test_r = 0 while not done: s = chainer.Variable(np.expand_dims(phi(obs), 0)) pout, _ = model.pi_and_v(s) a = pout.action_indices[0] obs, r, done, info = env.step(a) test_r += r scores.append(test_r) print('test_{}:'.format(i), test_r) mean = statistics.mean(scores) median = statistics.median(scores) stdev = statistics.stdev(scores) return mean, median, stdev
def math_stats_calculations(point_map): point_array = [] for team in team_array: point_array.append(point_map[team]) # Calculates mean mean_val = str(round(statistics.mean(point_array), 2)) # Calculates median median_val = str(round(statistics.median(point_array), 2)) # Calculates standard deviation stdev_val = str(round(statistics.stdev(point_array), 2)) # Calculates variance var_val = str(round(statistics.variance(point_array), 2)) return (mean_val,median_val,stdev_val,var_val) # Calls my function
def test_odd_number_repeated(self): # Test median.grouped with repeated median values. data = [12, 13, 14, 14, 14, 15, 15] assert len(data)%2 == 1 self.assertEqual(self.func(data), 14) #--- data = [12, 13, 14, 14, 14, 14, 15] assert len(data)%2 == 1 self.assertEqual(self.func(data), 13.875) #--- data = [5, 10, 10, 15, 20, 20, 20, 20, 25, 25, 30] assert len(data)%2 == 1 self.assertEqual(self.func(data, 5), 19.375) #--- data = [16, 18, 18, 18, 18, 20, 20, 20, 22, 22, 22, 24, 24, 26, 28] assert len(data)%2 == 1 self.assertApproxEqual(self.func(data, 2), 20.66666667, tol=1e-8)
def test_even_number_repeated(self): # Test median.grouped with repeated median values. data = [5, 10, 10, 15, 20, 20, 20, 25, 25, 30] assert len(data)%2 == 0 self.assertApproxEqual(self.func(data, 5), 19.16666667, tol=1e-8) #--- data = [2, 3, 4, 4, 4, 5] assert len(data)%2 == 0 self.assertApproxEqual(self.func(data), 3.83333333, tol=1e-8) #--- data = [2, 3, 3, 4, 4, 4, 5, 5, 5, 5, 6, 6] assert len(data)%2 == 0 self.assertEqual(self.func(data), 4.5) #--- data = [3, 4, 4, 4, 5, 5, 5, 5, 6, 6] assert len(data)%2 == 0 self.assertEqual(self.func(data), 4.75)
def temp_stat(temps): """ prints the average, median, std dev, and variance of temps """ import statistics print(temps) print("Mean: ", statistics.mean(temps)) print("Median: ", statistics.median(temps)) print("Standard deviation: ", statistics.stdev(temps)) print("Variance: ", statistics.variance(temps)) #%%
def temp_stat(temps): """ computes the average, median, std dev, and variance of temps """ import statistics print(temps) print("Mean: ", statistics.mean(temps)) print("Median: ", statistics.median(temps)) print("Standard deviation: ", statistics.stdev(temps)) print("Variance: ", statistics.variance(temps)) try: print("Mode: ", statistics.mode(temps)) except statistics.StatisticsError as e: print("Mode error: ", e) #%%
def test(): """Tests the statistical functions. Raises: AssertionError if a test fails. """ testlist0 = [1, 2, 3, 4, 5] testlist1 = [1, 2, 3, 4, 5, 6] testlist2 = [2, 2, 3, 4, 4, 6] testlist3 = [2, 2, 3, 4, 5, 6, 7] assert mean(testlist0) - 5 <= 1e-6, mean(testlist0) assert mean(testlist1) - 3.5 <= 1e-6, mean(testlist1) assert mean(testlist2) - 21 / 6 <= 1e-6, mean(testlist2) assert mean(testlist3) - 29 / 7 <= 1e-6, mean(testlist3) assert median(testlist0) == 3, median(testlist0) assert median(testlist1) - 3.5 <= 1e-6, median(testlist1) assert median(testlist2) - 3.5 <= 1e-6, median(testlist2) assert median(testlist3) == 4, median(testlist3) assert mode(testlist3) == 2, mode(testlist3)
def gsr_response(stream_id: uuid, start_time: datetime, end_time: datetime, label_attachment: str, label_off: str, CC_obj: CerebralCortex, config: dict) -> str: """ This method analyzes Galvanic skin response to label a window as improper attachment or sensor-off-body :param stream_id: UUID :param start_time: :param end_time: :param label_attachment: :param label_off: :param CC_obj: :param config: :return: string """ datapoints = CC_obj.get_datastream(stream_id, start_time=start_time, end_time=end_time, data_type=DataSet.COMPLETE) vals = [] for dp in datapoints: vals.append(dp.sample) if stat.median(stat.array(vals)) < config["attachment_marker"]["improper_attachment"]: return label_attachment elif stat.median(stat.array(vals)) > config["attachment_marker"]["gsr_off_body"]: return label_off
def outlier_detection(window_data: list) -> list: """ removes outliers from a list This algorithm is modified version of Chauvenet's_criterion (https://en.wikipedia.org/wiki/Chauvenet's_criterion) :param window_data: :return: """ if not window_data: raise ValueError("List is empty.") vals = [] for dp in window_data: vals.append(float(dp.sample)) median = stat.median(vals) standard_deviation = stat.stdev(vals) normal_values = list() for val in window_data: if (abs(float(val.sample)) - median) < standard_deviation: normal_values.append(float(val.sample)) return normal_values
def graphRampTime(deltas, nocontribs, graphtitle, xtitle, filename): data = [Histogram(x=deltas)] layout = Layout( title=graphtitle, yaxis=dict(title='Number of contributors'), xaxis=dict(title= xtitle + '<br>Mean: ' + '{:.2f}'.format(statistics.mean(deltas)) + ' days, ' + 'Median: ' + '{:.2f}'.format(statistics.median(deltas)) + ' days' + '<br>Number of contributors who did this: ' + '{:,g}'.format(len(deltas)) + '<br>Percentage of contributors who did this: ' + '{:.2f}'.format(len(deltas)/(len(deltas)+len(nocontribs))*100) + '%') ) fig = Figure(data=data, layout=layout) return offline.plot(fig, show_link=False, include_plotlyjs=False, output_type='div') # FIXME Maybe look for the word 'bot' in the user description?
def sync_gain_sliders(self): conf = self.get_config_for_selected_device() prefix = self.rx_tx_prefix if prefix + "rf_gain" in conf: key = prefix + "rf_gain" gain = conf[key][int(median(range(len(conf[key]))))] self.ui.spinBoxGain.setValue(gain) self.ui.spinBoxGain.valueChanged.emit(gain) if prefix + "if_gain" in conf: key = prefix + "if_gain" if_gain = conf[key][int(median(range(len(conf[key]))))] self.ui.spinBoxIFGain.setValue(if_gain) self.ui.spinBoxIFGain.valueChanged.emit(if_gain) if prefix + "baseband_gain" in conf: key = prefix + "baseband_gain" baseband_gain = conf[key][int(median(range(len(conf[key]))))] self.ui.spinBoxBasebandGain.setValue(baseband_gain) self.ui.spinBoxBasebandGain.valueChanged.emit(baseband_gain)
def eval_performance(rom, p_func, n_runs): assert n_runs > 1, 'Computing stdev requires at least two runs' scores = [] for i in range(n_runs): env = ale.ALE(rom, treat_life_lost_as_terminal=False) test_r = 0 while not env.is_terminal: s = util.dqn_phi(env.state) pout = p_func(s) a = util.categorical_sample(pout) test_r += env.receive_action(a) scores.append(test_r) print 'test_',i,':',test_r mean = statistics.mean(scores) median = statistics.median(scores) stdev = statistics.stdev(scores) return mean, median, stdev
def _print(self): """Print statistics and other informational text.""" mean = statistics.mean(self.prices) median = statistics.median(self.prices) stdev = statistics.stdev(self.prices) high = mean + stdev low = mean - stdev print(dedent('''\ Sourced %d prices in %.3f seconds Mean:\t$%.2f Median:\t$%.2f Hi/Lo:\t$%.2f/$%.2f StDev:\t%.2f ''' % (len(self.prices), self.duration, mean, median, high, low, stdev)))
def get_stats(self, metrics, lang=UNSPECIFIED_TRANSLATION, limit=100): stats = super(NumField, self).get_stats(metrics, lang, limit) stats.update({ 'median': '*', 'mean': '*', 'mode': '*', 'stdev': '*' }) try: # require a non empty dataset stats['mean'] = statistics.mean(self.flatten_dataset(metrics)) stats['median'] = statistics.median(self.flatten_dataset(metrics)) # requires at least 2 values in the dataset stats['stdev'] = statistics.stdev(self.flatten_dataset(metrics), xbar=stats['mean']) # requires a non empty dataset and a unique mode stats['mode'] = statistics.mode(self.flatten_dataset(metrics)) except statistics.StatisticsError: pass return stats
def getMedWeight(graph, node1, node2): weights = [] for (x, weight) in graph[node1]: if weight != 1.1: weights.append(weight) else: weights.append(1) for (x, weight) in graph[node2]: if weight != 1.1: weights.append(weight) else: weights.append(1) if not weights: return(0) else: return(statistics.median(weights))
def average(numbers, type='mean'): import statistics type = type.lower() try: statistics.mean(numbers) except: raise RuntimeError('An Error Has Occured: List Not Specified (0018)') if type == 'mean': return statistics.mean(numbers) elif type == 'mode': return statistics.mode(numbers) elif type == 'median': return statistics.median(numbers) elif type == 'min': return min(numbers) elif type == 'max': return max(numbers) elif type == 'range': return max(numbers) - min(numbers) else: raise RuntimeError('An Error Has Occured: You Entered An Invalid Operation (0003)') # Throw A Runtime Error
def async_update(self): """Get the latest data and updates the states.""" if not self.is_binary: try: self.mean = round(statistics.mean(self.states), 2) self.median = round(statistics.median(self.states), 2) self.stdev = round(statistics.stdev(self.states), 2) self.variance = round(statistics.variance(self.states), 2) except statistics.StatisticsError as err: _LOGGER.warning(err) self.mean = self.median = STATE_UNKNOWN self.stdev = self.variance = STATE_UNKNOWN if self.states: self.total = round(sum(self.states), 2) self.min = min(self.states) self.max = max(self.states) else: self.min = self.max = self.total = STATE_UNKNOWN
def get_median_eligible_score(): return statistics.median([x['score'] for x in get_all_team_scores()])
def get_median_problems_solved(eligible=True, scoring=True): teams = api.team.get_all_teams(show_ineligible=(not eligible)) return statistics.median([len(api.problem.get_solved_pids(tid=t['tid'])) for t in teams if not scoring or len(api.problem.get_solved_pids(tid=t['tid'])) > 0])
def print_stat(msg, times_taken): print('{}: mean {:.2f} secs, median {:.2f} secs, stdev {:.2f}'.format( msg, mean(times_taken), median(times_taken), stdev(times_taken) ))
def getIslandPositions(familyL,geneInfoD,strainNum2StrD,grID,mrcaNum,strain): '''Given a list of families (from a single island in a single strain), return its chrom,start,end. ''' chromL=[] islandMin=float('inf') islandMax=-float('inf') geneMidpointL=[] for fam,geneL in familyL: for gene in geneL: commonName,locusTag,descrip,chrom,start,end,strand=geneInfoD[gene] chromL.append(chrom) start = int(start) end = int(end) if start<islandMin: islandMin=start if end>islandMax: islandMax=end geneMidpointL.append(int((end-start)/2)) # sanity check: all entries in chromL should be same if not all((c==chromL[0] for c in chromL)): print("Genes in island",grID,"at mrca",strainNum2StrD[mrcaNum],"in strain",strain,"are not all on the same chromosome.",file=sys.stderr) islandMedianMidpoint = statistics.median(geneMidpointL) return chrom,islandMedianMidpoint,islandMin,islandMax
def eval_performance(env, agent, n_runs, max_episode_len=None, explorer=None, logger=None): """Run multiple evaluation episodes and return statistics. Args: env (Environment): Environment used for evaluation agent (Agent): Agent to evaluate. n_runs (int): Number of evaluation runs. max_episode_len (int or None): If specified, episodes longer than this value will be truncated. explorer (Explorer): If specified, the given Explorer will be used for selecting actions. logger (Logger or None): If specified, the given Logger object will be used for logging results. If not specified, the default logger of this module will be used. Returns: Dict of statistics. """ scores = run_evaluation_episodes( env, agent, n_runs, max_episode_len=max_episode_len, explorer=explorer, logger=logger) stats = dict( mean=statistics.mean(scores), median=statistics.median(scores), stdev=statistics.stdev(scores) if n_runs >= 2 else 0.0, max=np.max(scores), min=np.min(scores)) return stats
def _avg_time_spent(history, value): index = history[0] avg_list = history[1] if index >= 100: index = 0 avg_list.insert(index, value) index += 1 final_avg = statistics.median(avg_list) return [index, avg_list], final_avg
def get_edge_trade_size(self, side: OrderSide, order_type: OrderType, seconds_ago: int, edge_type: EdgeType, group_by_period: Optional[int] = None) -> float: qty = None if edge_type == EdgeType.best: qty = 0. elif edge_type == EdgeType.mean: qty = self.get_average_trade_size(side, order_type, seconds_ago, group_by_period) elif edge_type == EdgeType.median: qty = self.get_median_trade_size(side, order_type, seconds_ago, group_by_period) elif edge_type == EdgeType.custom: qty = self.get_average_trade_size(side, order_type, seconds_ago, group_by_period) if qty is not None: qty = qty / 10. return qty
def get_median_trade_size(self, side: OrderSide, order_type: OrderType, seconds_ago: int, group_by_period: Optional[int] = None) -> Optional[float]: order_quantities = self.get_trade_quantities(side, order_type, seconds_ago, group_by_period) if len(order_quantities) == 0: return None return median(order_quantities)
def get_consensus_time(hg, x) -> datetime: times = [dateutil.parser.parse(e.time) for e in get_events_for_consensus_time(hg, x)] timestamps = [int(time.mktime(t.timetuple())) for t in times] median_timestamp = int(median(timestamps)) if timestamps else 0 return datetime.fromtimestamp(median_timestamp)
def get_events_for_consensus_time(hg, x) -> Set[Event]: """ "set of each event z such that z is a self-ancestor of a round r unique famous witness, and x is an ancestor of z but not of the self-parent of z" :param hg: The hashgraph :param x: The event for which we want to calculate the median timestamp :return: """ result = set() # For all famous round r witnesses r = x.round_received for witness_id in hg.witnesses[r].values(): witness = hg.lookup_table[witness_id] if witness.is_famous != Fame.TRUE: continue # Go through the self ancestors z = hg.lookup_table[witness.parents.self_parent] if z.parents.self_parent is not None: z_self_parent = hg.lookup_table[z.parents.self_parent] while not event_can_see_event(hg, z, x) or event_can_see_event(hg, z_self_parent, x): z = hg.lookup_table[z.parents.self_parent] if z.parents.self_parent is None: # Special case for the first event - this is not described in the paper break else: z_self_parent = hg.lookup_table[z.parents.self_parent] result.add(z) else: # Special case for the first event - this is not described in the paper result.add(z) return result
def update_stats(self, refresh=conf.STAT_REFRESH, med=median, count=conf.GRID[0] * conf.GRID[1]): visits = [] seen_per_worker = [] after_spawns = [] speeds = [] for w in self.workers: after_spawns.append(w.after_spawn) seen_per_worker.append(w.total_seen) visits.append(w.visits) speeds.append(w.speed) self.stats = ( 'Seen per worker: min {}, max {}, med {:.0f}\n' 'Visits per worker: min {}, max {}, med {:.0f}\n' 'Visit delay: min {:.1f}, max {:.1f}, med {:.1f}\n' 'Speed: min {:.1f}, max {:.1f}, med {:.1f}\n' 'Extra accounts: {}, CAPTCHAs needed: {}\n' ).format( min(seen_per_worker), max(seen_per_worker), med(seen_per_worker), min(visits), max(visits), med(visits), min(after_spawns), max(after_spawns), med(after_spawns), min(speeds), max(speeds), med(speeds), self.extra_queue.qsize(), self.captcha_queue.qsize() ) self.sighting_cache_size = len(SIGHTING_CACHE.store) self.mystery_cache_size = len(MYSTERY_CACHE.store) self.update_coroutines_count() self.counts = ( 'Known spawns: {}, unknown: {}, more: {}\n' '{} workers, {} coroutines\n' 'sightings cache: {}, mystery cache: {}, DB queue: {}\n' ).format( len(spawns), len(spawns.unknown), spawns.cells_count, count, self.coroutines_count, len(SIGHTING_CACHE), len(MYSTERY_CACHE), len(db_proc) ) LOOP.call_later(refresh, self.update_stats)
def format_stats(series, d, f=0): mean = statistics.mean(series) med = statistics.median(series) std = statistics.stdev(series) return f"x {mean:{d}.{f}f} ? {med:{d}.{f}f} ?² {std:{d}.{f}f}"
def get_median(self, totals): try: return statistics.median(totals) except statistics.StatisticsError: return 0