我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用pstats.Stats()。
def load_with_profiler( context, filepath, *, global_matrix=None ): import cProfile import pstats pro = cProfile.Profile() pro.runctx("load_web3d(context.scene, filepath, PREF_FLAT=True, " "PREF_CIRCLE_DIV=16, global_matrix=global_matrix)", globals(), locals()) st = pstats.Stats(pro) st.sort_stats("time") st.print_stats(0.1) # st.print_callers(0.1)
def enable_profiling(): global _profile_hook import cProfile, pstats def _profile_hook(name, func, *args): profiler = cProfile.Profile() profiler.enable() try: return func(*args) finally: profiler.create_stats() fp = open('/tmp/mitogen.stats.%d.%s.log' % (os.getpid(), name), 'w') try: stats = pstats.Stats(profiler, stream=fp) stats.sort_stats('cumulative') stats.print_stats() finally: fp.close()
def process_fp(fp, p, sparse): if _PROFILE: pr = cProfile.Profile() pr.enable() beg = time.time() try: p.ParseFile(fp) except xml.parsers.expat.ExpatError as err: app.logger.error("Bad XML: %r" % err) sparse.exceptions += 1 # Bulk upload the remainder sparse._bulk_upload() end = time.time() if _PROFILE: pr.disable() s = StringIO.StringIO() sortby = 'cumulative' ps = pstats.Stats(pr, stream=s).sort_stats(sortby) ps.print_stats() app.logger.info(s.getvalue()) return beg, end
def do_cprofile(func): """Decorator for profiling a function """ def profiled_func(*args, **kwargs): """Wrapper """ profile = cProfile.Profile() try: profile.enable() result = func(*args, **kwargs) profile.disable() return result finally: stats = pstats.Stats(profile) stats.sort_stats("time").print_stats(20) return profiled_func
def process_profilelog(fn, pout = None): # Either call with a list of filenames and set pout or a filename and optionally pout. if not pout: pout = fn + '.processed' pout = open(pout, 'w') import pstats if isinstance(fn, list): p = pstats.Stats(*fn, stream=pout) else: p = pstats.Stats(fn, stream=pout) p.sort_stats('time') p.print_stats() p.print_callers() p.sort_stats('cumulative') p.print_stats() pout.flush() pout.close() # # Was present to work around multiprocessing pool bugs in python < 2.7.3 #
def do_profiling(cls): results = [] prof = cls.profilerclass(timer, 0.001) start_timer = timer() prof.runctx("testfunc()", globals(), locals()) results.append(timer() - start_timer) for methodname in cls.methodnames: s = StringIO() stats = pstats.Stats(prof, stream=s) stats.strip_dirs().sort_stats("stdname") getattr(stats, methodname)() output = s.getvalue().splitlines() mod_name = testfunc.__module__.rsplit('.', 1)[1] # Only compare against stats originating from the test file. # Prevents outside code (e.g., the io module) from causing # unexpected output. output = [line.rstrip() for line in output if mod_name in line] results.append('\n'.join(output)) return results
def test_calling_conventions(self): # Issue #5330: profile and cProfile wouldn't report C functions called # with keyword arguments. We test all calling conventions. stmts = [ "max([0])", "max([0], key=int)", "max([0], **dict(key=int))", "max(*([0],))", "max(*([0],), key=int)", "max(*([0],), **dict(key=int))", ] for stmt in stmts: s = StringIO() prof = self.profilerclass(timer, 0.001) prof.runctx(stmt, globals(), locals()) stats = pstats.Stats(prof, stream=s) stats.print_stats() res = s.getvalue() self.assertIn(self.expected_max_output, res, "Profiling {0!r} didn't report max:\n{1}".format(stmt, res))
def profile_func(call_func_str): """????????? def f(): d = AndroidDevice("192.168.1.120") d.swipe_position(650, 700, 50, 700, 30) d.swipe_position(130, 800, 850, 800, 50) profile_func("f()") :param call_func_str: :return: """ import cProfile cProfile.run(call_func_str, "prof.txt") import pstats p = pstats.Stats("prof.txt") p.sort_stats("time").print_stats()
def __bus_destroy_cb(self, bus=None): if DEBUG_LEVEL > 1: sys.stderr.write("IMApp.__bus_destroy_cb(bus=%s)\n" % bus) if self.destroyed: return print("finalizing:)") self.__factory.do_destroy() self.destroyed = True self.__mainloop.quit() if _ARGS.profile: _PROFILE.disable() stats = pstats.Stats(_PROFILE) stats.strip_dirs() stats.sort_stats('cumulative') stats.print_stats('tabsqlite', 25) stats.print_stats('hunspell_suggest', 25) stats.print_stats('hunspell_table', 25) stats.print_stats('itb_emoji', 25)
def run_profile(func, sort_order="cumtime", count=1, strip_dir=True, name_filter=""): """sort_order : keywords 'ncalls', 'tottime', 'cumtime', 'filename' """ @wraps(func) def wrapper(*args, **kwargs): def cmd(): for i in range(count): func(*args, **kwargs) prof = cProfile.Profile() _profile = prof.runctx("cmd()", globals(), locals()) stream = StringIO.StringIO() stats = pstats.Stats(_profile, stream=stream) if strip_dir: stats.strip_dirs() stats.sort_stats(sort_order) stats.print_stats(name_filter) return stream.getvalue() return wrapper
def print_stats(self, sort=-1): import pstats pstats.Stats(self).strip_dirs().sort_stats(sort). \ print_stats()
def Stats(*args): print 'Report generating functions are in the "pstats" module\a'
def main(): usage = "profile.py [-o output_file_path] [-s sort] scriptfile [arg] ..." parser = OptionParser(usage=usage) parser.allow_interspersed_args = False parser.add_option('-o', '--outfile', dest="outfile", help="Save stats to <outfile>", default=None) parser.add_option('-s', '--sort', dest="sort", help="Sort order when printing to stdout, based on pstats.Stats class", default=-1) if not sys.argv[1:]: parser.print_usage() sys.exit(2) (options, args) = parser.parse_args() sys.argv[:] = args if len(args) > 0: progname = args[0] sys.path.insert(0, os.path.dirname(progname)) with open(progname, 'rb') as fp: code = compile(fp.read(), progname, 'exec') globs = { '__file__': progname, '__name__': '__main__', '__package__': None, } runctx(code, globs, None, options.outfile, options.sort) else: parser.print_usage() return parser # When invoked as main program, invoke the profiler on a script
def profile(sourcecode, p_locals, p_globals): import cProfile import pstats prof_filename = os.path.join(tempfile.gettempdir(), "%s.prof" % __file__) try: cProfile.runctx(sourcecode, p_locals, p_globals, prof_filename) p = pstats.Stats(prof_filename) p.sort_stats('cumulative').print_stats(40) finally: os.remove(prof_filename)
def dump_stats(self): if self._profiler is not None: s = StringIO.StringIO() params = (self.sort_by,) if isinstance(self.sort_by, basestring) else self.sort_by ps = pstats.Stats(self._profiler, stream=s).sort_stats(*params) ps.print_stats() if self.file_path is not None: with open(self.file_path, 'w') as f: f.write(s.getvalue())
def profile(num_elements=100000, parser="lxml"): filehandle = tempfile.NamedTemporaryFile() filename = filehandle.name data = rdoc(num_elements) vars = dict(bs4=bs4, data=data, parser=parser) cProfile.runctx('bs4.BeautifulSoup(data, parser)' , vars, vars, filename) stats = pstats.Stats(filename) # stats.strip_dirs() stats.sort_stats("cumulative") stats.print_stats('_html5lib|bs4', 50)
def __call__(self, environ, start_response): response_body = [] def catching_start_response(status, headers, exc_info=None): start_response(status, headers, exc_info) return response_body.append def runapp(): appiter = self._app(environ, catching_start_response) response_body.extend(appiter) if hasattr(appiter, 'close'): appiter.close() p = Profile() start = time.time() p.runcall(runapp) body = b''.join(response_body) elapsed = time.time() - start if self._profile_dir is not None: prof_filename = os.path.join(self._profile_dir, '%s.%s.%06dms.%d.prof' % ( environ['REQUEST_METHOD'], environ.get('PATH_INFO').strip( '/').replace('/', '.') or 'root', elapsed * 1000.0, time.time() )) p.dump_stats(prof_filename) else: stats = Stats(p, stream=self._stream) stats.sort_stats(*self._sort_by) self._stream.write('-' * 80) self._stream.write('\nPATH: %r\n' % environ.get('PATH_INFO')) stats.print_stats(*self._restrictions) self._stream.write('-' * 80 + '\n\n') return [body]
def tearDown(self): if hasattr(self, 'pr'): p = Stats(self.profile) p.strip_dirs() p.sort_stats('cumtime') p.print_stats()
def tearDown(self): if hasattr(self, 'pr'): p = Stats(self.profile) p.strip_dirs() p.sort_stats('cumtime') p.print_stats() del self.level_stats
def run(args): _profile = False if _profile: import pstats, cProfile fh_in = open(args.file, "rb") fh_out = open(args.output, "wb") if args.compress: if _profile: cProfile.runctx("stream_compress(fh_in, fh_out, args.framing, args.bytesize)", globals(), locals(), "Profile.prof") else: stream_compress(fh_in, fh_out, args.framing, args.bytesize) else: if _profile: cProfile.runctx("stream_decompress(fh_in, fh_out, args.framing, args.bytesize)", globals(), locals(), "Profile.prof") else: stream_decompress(fh_in, fh_out, args.framing, args.bytesize) if _profile: s = pstats.Stats("Profile.prof") s.strip_dirs().sort_stats("time").print_stats() fh_in.close() fh_out.close()
def reset_stats(self): """Reset accumulated profiler statistics.""" # Note: not using self.Profile, since pstats.Stats() fails then self.stats = pstats.Stats(profile.Profile()) self.ncalls = 0 self.skipped = 0
def prepare_results(number_of_results): stats = pstats.Stats(STATS_FILENAME) stats.sort_stats('cumulative') stats.print_stats(number_of_results)
def load(self): # The timer selected by the profiler should never be used, so make # sure it doesn't work: p = Profile() p.get_time = _brokentimer log = hotshot.log.LogReader(self._logfn) taccum = 0 for event in log: what, (filename, lineno, funcname), tdelta = event if tdelta > 0: taccum += tdelta # We multiply taccum to convert from the microseconds we # have to the seconds that the profile/pstats module work # with; this allows the numbers to have some basis in # reality (ignoring calibration issues for now). if what == ENTER: frame = self.new_frame(filename, lineno, funcname) p.trace_dispatch_call(frame, taccum * .000001) taccum = 0 elif what == EXIT: frame = self.pop_frame() p.trace_dispatch_return(frame, taccum * .000001) taccum = 0 # no further work for line events assert not self._stack return pstats.Stats(p)
def print_stats(self, sort=-1): import pstats pstats.Stats(self).strip_dirs().sort_stats(sort).print_stats()
def __init__(self, *filename): import pstats try: self.stats = pstats.Stats(*filename) except ValueError: if PYTHON_3: sys.stderr.write('error: failed to load %s\n' % ', '.join(filename)) sys.exit(1) import hotshot.stats self.stats = hotshot.stats.load(filename[0]) self.profile = Profile() self.function_ids = {}
def _run_profiler(function, *args, **kwargs): """Run a profiler on the specified function.""" profiler = cProfile.Profile() profiler.enable() result = function(*args, **kwargs) profiler.disable() stats = pstats.Stats(profiler).sort_stats('cumtime') stats.print_stats() return result
def main(parser): opt = parser.parse_args() if opt['torch']: with torch.autograd.profiler.profile() as prof: TrainLoop(parser).train() print(prof.total_average()) sort_cpu = sorted(prof.key_averages(), key=lambda k: k.cpu_time) sort_cuda = sorted(prof.key_averages(), key=lambda k: k.cuda_time) def cpu(): for e in sort_cpu: print(e) def cuda(): for e in sort_cuda: print(e) cpu() if opt['debug']: print('`cpu()` prints out cpu-sorted list, ' '`cuda()` prints cuda-sorted list') pdb.set_trace() else: pr = cProfile.Profile() pr.enable() TrainLoop(parser).train() pr.disable() s = io.StringIO() sortby = 'cumulative' ps = pstats.Stats(pr, stream=s).sort_stats(sortby) ps.print_stats() print(s.getvalue()) if opt['debug']: pdb.set_trace()
def profile(func): @functools.wraps(func) def wrapper(*args, **kwargs): profiler = cProfile.Profile() try: return profiler.runcall(func, *args, **kwargs) finally: stats = pstats.Stats(profiler) stats.strip_dirs() stats.sort_stats('tottime') stats.print_stats() return wrapper
def profile_execute(self, pstat_file=None): pr = cProfile.Profile() pr.enable() exec_node_group(self) pr.disable() if pstat_file is not None: pr.dump_stats(pstat_file) s = io.StringIO() sortby = 'cumulative' ps = pstats.Stats(pr, stream=s) ps.strip_dirs() ps.sort_stats(sortby) ps.print_stats() text_name = self.name + " Profile" if text_name in bpy.data.texts: text = bpy.data.texts[text_name] else: text = bpy.data.texts.new(text_name) text.from_string(s.getvalue())
def run_profiled(self, coeff_file=None, kgrid_tp="coarse", write_outputs=True): profiler = cProfile.Profile() profiler.runcall(lambda: self.run(coeff_file, kgrid_tp=kgrid_tp, write_outputs=write_outputs)) stats = Stats(profiler, stream=STDOUT) stats.strip_dirs() stats.sort_stats('cumulative') stats.print_stats(15) # only print the top 10 (10 slowest functions)