我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用math.nan()。
def reccomandation(self, x): """""" y = np.array([[math.nan] * nombre_films()]) for key in self.films: y[0][self.conv.renvoyer_index(key)] = self.films[key] max_i = 0 n_max = 0 t = np.dot(x, self._theta.T) print(t) for i, el in enumerate(y[0]): if np.isnan(el) and t[i, 0] > n_max: print("film : ", self.conv.renvoyer_nom_index(i), "note :", n_max) n_max = t[i, 0] max_i = i print(t) print(self._theta) return self.conv.renvoyer_nom_index(max_i)
def __init__(self, srmfile, recording_interval): self.metres = nan self.lat, self.lon = nan, nan if srmfile.version < 7: self.watts, self.kph = self.compact_power_speed(srmfile) self.cad, self.hr = unpack('<BB', srmfile.read(2)) self.alt, self.temp = nan, nan else: values = unpack('<HBBllh', srmfile.read(14)) for name, value in zip(self.__slots__, values): setattr(self, name, value) if srmfile.version == 9: latlon = unpack('<ll', srmfile.read(8)) self.lat, self.lon = (l * 180 / 0x7fffffff for l in latlon) self.temp *= 0.1 self.kph = 0 if (self.kph < 0) else self.kph * 3.6 / 1000 self.metres = recording_interval * self.kph / 3.6
def predict(df,steps): print("started function") start = df.index[0].year end = df.index[-1].year years_captured = [idx.year for idx in df.index] years_inclusive = [elem for elem in range(start, end+1)] s = df.T.squeeze() for year in years_inclusive: if not year in years_captured: s = s.set_value(datetime.datetime(year=year,month=1,day=1),math.nan) s.sort_index(inplace=True) s = s.interpolate() data = s.to_frame() print("loaded data") model_order = brute_search(data) model_order = tuple([int(elem) for elem in model_order]) print("found model order") model = sm.tsa.ARIMA(data, model_order).fit(disp=0) print("fit model") return model.forecast(steps=steps)[0], end
def predict(df,steps): start = df.index[0].year end = df.index[-1].year years_captured = [idx.year for idx in df.index] years_inclusive = [elem for elem in range(start, end+1)] s = df.T.squeeze() for year in years_inclusive: if not year in years_captured: s = s.set_value(datetime.datetime(year=year,month=1,day=1),math.nan) s.sort_index(inplace=True) s = s.interpolate() data = s.to_frame() model_order = brute_search(data) model_order = tuple([int(elem) for elem in model_order]) model = sm.tsa.ARIMA(data, model_order).fit(disp=0) return model.forecast(steps=steps)[0], end
def theta(self, x, etapes): """Fait etapes etapes du gradient sur theta du film puis le renvoie""" y = np.array([[math.nan] * nombre_films()]) for key in self.films: y[0][self.conv.renvoyer_index(key)] = self.films[key] for etape in range(etapes): self._theta = etape_du_gradient(y, 0.0001, self._theta, x) return self._theta
def none_to_nan(value): return nan if value is None else value
def test_nan_to_none(self): self.assertIsNone(nan_to_none(nan)) self.assertEqual(1, nan_to_none(1))
def process_stats(self, stats: dict, time_periods: Sequence[str] = ('last1min', )) -> None: ''' Process dump1090 statistics into exported metrics. :param stats: a dict containing dump1090 statistics data. ''' metrics = self.metrics['stats'] for time_period in time_periods: try: tp_stats = stats[time_period] except KeyError: logger.exception( 'Problem extracting time period: {}'.format(time_period)) continue labels = dict(time_period=time_period) for key in metrics: d = tp_stats[key] if key else tp_stats for name, metric in metrics[key].items(): try: value = d[name] # 'accepted' values are in a list if isinstance(value, list): value = value[0] except KeyError: # 'signal' and 'peak_signal' are not present if # there are no aircraft. if name not in ['peak_signal', 'signal']: logger.warning( "Problem extracting{}item '{}' from: {}".format( ' {} '.format(key) if key else ' ', name, d)) value = math.nan metric.set(labels, value)
def analyze_lucene_index(self): results = IndexCharacteristics("Lucene", self.ingestion_thread_count, self.thread_counts) results.index_type = "Lucene" # Don't know how to determine bits per posting for Lucene. results.bits_per_posting = math.nan with open(self.lucene_build_index_log, 'r') as myfile: build_index_log = myfile.read() results.total_ingestion_time = \ float(re.findall("Ingested \d+ chunk files in (\d+\.?\d+) seconds.", build_index_log)[0]) for i, threads in enumerate(self.thread_counts): run_queries_log = self.lucene_run_queries_log[i] with open(run_queries_log, 'r') as myfile: data = myfile.read() results.append_float_field("qps", "QPS:", data) results.append_float_field("mps", "MPS:", data) results.append_float_field("mpq", "MPQ:", data) results.append_float_field("mean_query_latency", "Mean query latency:", data) results.append_float_field("planning_overhead", r"Planning overhead:", data) # Lucene false positive rate is always zero. results.false_positive_rate = 0; results.false_negative_rate = 0; return results ########################################################################### # # MG4J # ###########################################################################
def analyze_mg4j_index(self): results = IndexCharacteristics("MG4J", self.ingestion_thread_count, self.thread_counts) # Compute bits/posting. with open(self.mg4j_run_queries_log[0], 'r') as myfile: run_queries_log = myfile.read() posting_count = float(re.findall("postings=(\d+\.?\d+)", run_queries_log)[0]) pointers = os.path.join(self.mg4j_index_path, self.basename + "-text.pointers"); results.bits_per_posting = os.path.getsize(pointers) / posting_count * 8.0 # Need to annotate build log from Python since Java code doesn't print time. results.total_ingestion_time = math.nan for i, threads in enumerate(self.thread_counts): run_queries_log = self.mg4j_run_queries_log[i] with open(run_queries_log, 'r') as myfile: data = myfile.read() results.append_float_field("qps", "QPS:", data) results.append_float_field("mps", "MPS:", data) results.append_float_field("mpq", "MPQ:", data) results.append_float_field("mean_query_latency", "Mean query latency:", data) results.planning_overhead.append(math.nan) # MG4J false positive rate is always zero. results.false_positive_rate = 0; results.false_negative_rate = 0; return results ########################################################################### # # Partitioned Elias-Fano (PEF) # ###########################################################################
def __init__(self, index_type, ingestion_thread_count, thread_counts): self.index_type = index_type self.ingestion_thread_count = ingestion_thread_count self.thread_counts = thread_counts self.bits_per_posting = math.nan self.total_ingestion_time = math.nan self.false_positive_rate = math.nan self.false_negative_rate = math.nan self.qps = [] self.mps = [] self.mpq = [] self.mean_query_latency = [] self.planning_overhead = []
def __init__(self, gov2_directories, min_terms_per_document, max_terms_per_document): self.gov2_directors = gov2_directories self.min_terms_per_document = min_terms_per_document self.max_terms_per_document = max_terms_per_document self.documents = math.nan self.terms = math.nan self.postings = math.nan self.bytes = math.nan self.matches_per_query = math.nan
def constant(constanttype): import math constanttype = constanttype.lower() if constanttype == 'pi': return math.pi elif constanttype == 'e': return math.e elif constanttype == 'tau': return math.tau elif constanttype == 'inf': return math.inf elif constanttype == 'nan': return math.nan # Find The Power Of A Number
def latex_performance_one(experiment_number, experiment, thread): bf = experiment.analyze_bf_index() lucene = experiment.analyze_lucene_index() mg4j = experiment.analyze_mg4j_index() pef = experiment.analyze_pef_index() print(r" \multirow{{5}}{{*}}{{{}}}".format(chr(ord('A') + experiment_number)), end='') print(r"& {:<25} & {:>10,.0f} & {:>10,.0f} & {:>10,.0f} & {:>10,.0f} \\".format( "QPS", bf.qps[thread], pef.qps[thread], mg4j.qps[thread], lucene.qps[thread])) print(r" ", end='') print(r"& {:<25} & {:>10,.2f} & {:>10,.2f} & {:>10,.2f} & {:>10,.2f} \\".format( "Fixed overhead (\%)", bf.planning_overhead[thread] * 100, pef.planning_overhead[thread] * 100, mg4j.planning_overhead[thread] * 100, lucene.planning_overhead[thread] * 100)) print(r" ", end='') print(r"& {:<25} & {:>10,.2f} & {:>10,.2f} & {:>10,.2f} & {:>10,.2f} \\".format( "False positives (\%)", bf.false_positive_rate * 100, pef.false_positive_rate * 100, mg4j.false_positive_rate * 100, lucene.false_positive_rate * 100)) print(r" ", end='') print(r"& {:<25} & {:>10,.2f} & {:>10,.2f} & {:>10,.2f} & {:>10,.2f} \\".format( "Bits per posting", bf.bits_per_posting, pef.bits_per_posting, mg4j.bits_per_posting, lucene.bits_per_posting)) print(r" ", end='') print(r"& {:<25} & {:>10,.0f} & {:>10,.0f} & {:>10,.0f} & {:>10,.0f} \\".format( "DQ", bf.qps[thread] / bf.bits_per_posting, pef.qps[thread] / pef.bits_per_posting, mg4j.qps[thread] / mg4j.bits_per_posting, math.nan))
def scheduler(tasks: Dict[Source, Task], task_queue: TaskQueue, result_queue: ResultQueue, tree: TaskTree, hashes: Dict[Filename, Hash], changed_files: List[Source]) -> None: """Schedule tasks and handle compiled tasks.""" start = time.time() n_all_lines = sum(tree.line_nums[src] for src in changed_files) n_lines = 0 waiting = set(changed_files) scheduled: Set[Source] = set() while True: blocking = waiting | scheduled for src in list(waiting): if not (blocking & tree.ancestors[src]): hashes.pop(src, None) # if compilation gets interrupted task_queue.put_nowait(( -tree.priority[src], src, Args(tasks[src].args + (str(tasks[src].source),)) )) scheduled.add(src) waiting.remove(src) sys.stdout.write( f' Progress: {len(waiting)} waiting, {len(scheduled)} scheduled, ' f'{n_lines}/{n_all_lines} lines ({100*n_lines/n_all_lines:.1f}%), ' f'ETA: {(time.time()-start)*n_all_lines/(n_lines or nan):.1f} s\r' ) sys.stdout.flush() if not blocking: break src, retcode, clock = await result_queue.get() if retcode != 0: raise CompilationError(src, retcode) clocks.append((src, clock, tree.line_nums[src])) hashes[src] = tree.hashes[src] n_lines += tree.line_nums[src] scheduled.remove(src) pprint(f'Compiled {src}.') for mod in tree.src_mods[src]: modfile = mod + '.mod' modhash = get_hash(Path(modfile)) if modhash != hashes.get(modfile): hashes[modfile] = modhash for src in tree.mod_uses[mod]: assert src not in scheduled hashes.pop(src, None) if src not in waiting: n_all_lines += tree.line_nums[src] waiting.add(src)