我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用profile.Profile()。
def __call__(self, *args, **kw): """Profile a singe call to the function.""" self.ncalls += 1 if self.skip > 0: self.skip -= 1 self.skipped += 1 return self.fn(*args, **kw) if FuncProfile.in_profiler: # handle recursive calls return self.fn(*args, **kw) # You cannot reuse the same profiler for many calls and accumulate # stats that way. :-/ profiler = cProfile.Profile() try: FuncProfile.in_profiler = True return profiler.runcall(self.fn, *args, **kw) finally: FuncProfile.in_profiler = False self.stats.add(profiler) if self.immediate: self.print_stats() self.reset_stats()
def profiled(f, outputFile): def _(*args, **kwargs): if sys.version_info[0:2] != (2, 4): import profile prof = profile.Profile() try: result = prof.runcall(f, *args, **kwargs) prof.dump_stats(outputFile) except SystemExit: pass prof.print_stats() return result else: # use hotshot, profile is broken in 2.4 import hotshot.stats prof = hotshot.Profile(outputFile) try: return prof.runcall(f, *args, **kwargs) finally: stats = hotshot.stats.load(outputFile) stats.strip_dirs() stats.sort_stats('cum') # 'time' stats.print_stats(100) return _
def server_main(cooker, func, *args): cooker.pre_serve() if cooker.configuration.profile: try: import cProfile as profile except: import profile prof = profile.Profile() ret = profile.Profile.runcall(prof, func, *args) prof.dump_stats("profile.log") bb.utils.process_profilelog("profile.log") print("Raw profiling information saved to profile.log and processed statistics to profile.log.processed") else: ret = func(*args) cooker.post_serve() return ret
def run(self): if not self.profile: self.realrun() return try: import cProfile as profile except: import profile prof = profile.Profile() try: profile.Profile.runcall(prof, self.realrun) finally: logfile = "profile-parse-%s.log" % multiprocessing.current_process().name prof.dump_stats(logfile)
def __init__(self): self.log_info("Initialing WcdmaRrcAnalyzer..") ProtocolAnalyzer.__init__(self) #init packet filters self.add_source_callback(self.__rrc_filter) #init internal states self.__status=WcdmaRrcStatus() # current cell status self.__history={} # cell history: timestamp -> WcdmaRrcStatus() self.__config={} # cell_id -> WcdmaRrcConfig() self.state_machine = self.create_state_machine() # print type(self.state_machine) #FIXME: change the timestamp self.__history[0]=self.__config #Temporary structure for holding the config self.__config_tmp=WcdmaRrcConfig() # self.__profile = Profile(WcdmaRrcProfileHierarchy())
def __init__(self): """ Initialization. Each protocol should provide three initialization methods - Initialize ProfileHierarchy - Initialize StateMachnie - Initialize Failure flags """ Analyzer.__init__(self) self.profile = Profile(self.create_profile_hierarchy()) self.state_machine = StateMachine( self.create_state_machine(), self.init_protocol_state) # update state dynamics self.add_source_callback(self.__update_state)
def _exec_main(parser, values): sconsflags = os.environ.get('SCONSFLAGS', '') all_args = sconsflags.split() + sys.argv[1:] options, args = parser.parse_args(all_args, values) if isinstance(options.debug, list) and "pdb" in options.debug: import pdb pdb.Pdb().runcall(_main, parser) elif options.profile_file: # compat layer imports "cProfile" for us if it's available. from profile import Profile prof = Profile() try: prof.runcall(_main, parser) finally: prof.dump_stats(options.profile_file) else: _main(parser)
def run(self, reactor): """ Run reactor under the standard profiler. """ try: import profile except ImportError as e: self._reportImportError("profile", e) p = profile.Profile() p.runcall(reactor.run) if self.saveStats: p.dump_stats(self.profileOutput) else: tmp, sys.stdout = sys.stdout, open(self.profileOutput, 'a') try: p.print_stats() finally: sys.stdout, tmp = tmp, sys.stdout tmp.close()
def run(self, reactor): """ Run reactor under the cProfile profiler. """ try: import cProfile import pstats except ImportError as e: self._reportImportError("cProfile", e) p = cProfile.Profile() p.runcall(reactor.run) if self.saveStats: p.dump_stats(self.profileOutput) else: with open(self.profileOutput, 'w') as stream: s = pstats.Stats(p, stream=stream) s.strip_dirs() s.sort_stats(-1) s.print_stats()
def test_profilePrintStatsError(self): """ When an error happens during the print of the stats, C{sys.stdout} should be restored to its initial value. """ class ErroneousProfile(profile.Profile): def print_stats(self): raise RuntimeError("Boom") self.patch(profile, "Profile", ErroneousProfile) config = twistd.ServerOptions() config["profile"] = self.mktemp() config["profiler"] = "profile" profiler = app.AppProfiler(config) reactor = DummyReactor() oldStdout = sys.stdout self.assertRaises(RuntimeError, profiler.run, reactor) self.assertIs(sys.stdout, oldStdout)
def _profileWithoutGarbageLeak(cmd, filename): # The profile module isn't necessarily installed on every Python # installation, so we import it here, instead of in the module # scope. import profile # this is necessary because the profile module creates a memory leak Profile = profile.Profile statement = cmd sort = -1 retVal = None #### COPIED FROM profile.run #### prof = Profile() try: prof = prof.run(statement) except SystemExit: pass if filename is not None: prof.dump_stats(filename) else: #return prof.print_stats(sort) #DCR retVal = prof.print_stats(sort) #DCR ################################# # eliminate the garbage leak del prof.dispatcher return retVal
def runprofile(mainfunction, output, timeout = 60): if noprofiler == True: print('ERROR: profiler and/or pstats library missing ! Please install it (probably package named python-profile) before running a profiling !') return False def profileb3(): profile.run(mainfunction, output) # This is the main function for profiling print('=> SAVING MODE\n\n') print('Calibrating the profiler...') cval = calibrateprofile() #print('Found value : %s' % cval) print('Initializing the profiler...') b3main = KThread(target=profileb3) # we open b3 main function with the profiler, in a special killable thread (see below why) print('Will now run the profiling and terminate it in %s seconds. Results will be saved in %s' % (str(timeout), str(output))) print('\nCountdown:') for i in range(0,5): print(str(5-i)) time.sleep(1) print('0\nStarting to profile...') b3main.start() # starting the thread time.sleep(float(timeout)) # after this amount of seconds, the b3 main function gets killed and the profiler will end its job print('\n\nFinishing the profile and saving to the file %s' % str(output)) b3main.kill() # we must end the main function in order for the profiler to output its results (if we didn't launch a thread and just closed the process, it would have done no result) print('=> Profile done ! Exiting...') return True
def main(self): if len(sys.argv) > 1 and sys.argv[1].lower().startswith('prof'): self._shouldProfile = True if self._shouldRunTestSuite: from TestDataTable import main main() start = time.time() if self._shouldProfile: prof = Profile() prof.runcall(self._main) filename = '%s.pstats' % self.__class__.__name__ prof.dump_stats(filename) print 'Wrote', filename else: self._main() duration = time.time() - start print '%.1f secs' % duration
def main(self): if len(sys.argv) > 1 and sys.argv[1].lower().startswith('prof'): self._shouldProfile = True if self._shouldRunTestSuite: from TestCSVParser import main main() start = time.time() if self._shouldProfile: prof = Profile() prof.runcall(self._main) filename = '%s.pstats' % self.__class__.__name__ prof.dump_stats(filename) print 'Wrote', filename else: self._main() duration = time.time() - start print '%.1f secs' % duration
def run(frameworks, number, do_profile): print("Benchmarking frameworks:", ', '.join(frameworks)) sys.path[0] = '.' path = os.getcwd() print(" ms rps tcalls funcs") for framework in frameworks: os.chdir(os.path.join(path, framework)) try: main = __import__('app', None, None, ['main']).main f = lambda: list(main(environ.copy(), start_response)) time = timeit(f, number=number) st = Stats(profile.Profile().runctx( 'f()', globals(), locals())) print("%-11s %6.0f %7.0f %7d %6d" % (framework, 1000 * time, number / time, st.total_calls, len(st.stats))) if do_profile: st = Stats(profile.Profile().runctx( 'timeit(f, number=number)', globals(), locals())) st.strip_dirs().sort_stats('time').print_stats(10) del sys.modules['app'] except ImportError: print("%-15s not installed" % framework)
def __call__(self, environ, start_response): response_body = [] def catching_start_response(status, headers, exc_info=None): start_response(status, headers, exc_info) return response_body.append def runapp(): appiter = self._app(environ, catching_start_response) response_body.extend(appiter) if hasattr(appiter, 'close'): appiter.close() p = Profile() start = time.time() p.runcall(runapp) body = b''.join(response_body) elapsed = time.time() - start if self._profile_dir is not None: prof_filename = os.path.join(self._profile_dir, '%s.%s.%06dms.%d.prof' % ( environ['REQUEST_METHOD'], environ.get('PATH_INFO').strip( '/').replace('/', '.') or 'root', elapsed * 1000.0, time.time() )) p.dump_stats(prof_filename) else: stats = Stats(p, stream=self._stream) stats.sort_stats(*self._sort_by) self._stream.write('-' * 80) self._stream.write('\nPATH: %r\n' % environ.get('PATH_INFO')) stats.print_stats(*self._restrictions) self._stream.write('-' * 80 + '\n\n') return [body]
def reset_stats(self): """Reset accumulated profiler statistics.""" # Note: not using self.Profile, since pstats.Stats() fails then self.stats = pstats.Stats(profile.Profile()) self.ncalls = 0 self.skipped = 0
def runWithProfiler(reactor, config): """Run reactor under standard profiler.""" try: import profile except ImportError, e: s = "Failed to import module profile: %s" % e s += """ This is most likely caused by your operating system not including profile.py due to it being non-free. Either do not use the option --profile, or install profile.py; your operating system vendor may provide it in a separate package. """ traceback.print_exc(file=log.logfile) log.msg(s) log.deferr() sys.exit('\n' + s + '\n') p = profile.Profile() p.runcall(reactor.run) if config['savestats']: p.dump_stats(config['profile']) else: # XXX - omfg python sucks tmp, sys.stdout = sys.stdout, open(config['profile'], 'a') p.print_stats() sys.stdout, tmp = tmp, sys.stdout tmp.close()
def runWithHotshot(reactor, config): """Run reactor under hotshot profiler.""" try: import hotshot.stats except ImportError, e: s = "Failed to import module hotshot: %s" % e s += """ This is most likely caused by your operating system not including profile.py due to it being non-free. Either do not use the option --profile, or install profile.py; your operating system vendor may provide it in a separate package. """ traceback.print_exc(file=log.logfile) log.msg(s) log.deferr() sys.exit('\n' + s + '\n') # this writes stats straight out p = hotshot.Profile(config["profile"]) p.runcall(reactor.run) if config["savestats"]: # stats are automatically written to file, nothing to do return else: s = hotshot.stats.load(config["profile"]) s.strip_dirs() s.sort_stats(-1) tmp, sys.stdout = sys.stdout, open(config['profile'], 'w') s.print_stats() sys.stdout, tmp = tmp, sys.stdout tmp.close()
def load(self): # The timer selected by the profiler should never be used, so make # sure it doesn't work: p = Profile() p.get_time = _brokentimer log = hotshot.log.LogReader(self._logfn) taccum = 0 for event in log: what, (filename, lineno, funcname), tdelta = event if tdelta > 0: taccum += tdelta # We multiply taccum to convert from the microseconds we # have to the seconds that the profile/pstats module work # with; this allows the numbers to have some basis in # reality (ignoring calibration issues for now). if what == ENTER: frame = self.new_frame(filename, lineno, funcname) p.trace_dispatch_call(frame, taccum * .000001) taccum = 0 elif what == EXIT: frame = self.pop_frame() p.trace_dispatch_return(frame, taccum * .000001) taccum = 0 # no further work for line events assert not self._stack return pstats.Stats(p)
def __call__(self, environ, start_response): response_body = [] def catching_start_response(status, headers, exc_info=None): start_response(status, headers, exc_info) return response_body.append def runapp(): appiter = self._app(environ, catching_start_response) response_body.extend(appiter) if hasattr(appiter, 'close'): appiter.close() p = Profile() start = time.time() p.runcall(runapp) body = ''.join(response_body) elapsed = time.time() - start if self._profile_dir is not None: prof_filename = os.path.join(self._profile_dir, '%s.%s.%06dms.%d.prof' % ( environ['REQUEST_METHOD'], environ.get('PATH_INFO').strip('/').replace('/', '.') or 'root', elapsed * 1000.0, time.time() )) p.dump_stats(prof_filename) else: stats = Stats(p, stream=self._stream) stats.sort_stats(*self._sort_by) self._stream.write('-' * 80) self._stream.write('\nPATH: %r\n' % environ.get('PATH_INFO')) stats.print_stats(*self._restrictions) self._stream.write('-' * 80 + '\n\n') return [body]
def __call__(self, environ, start_response): response_body = [] def catching_start_response(status, headers, exc_info=None): start_response(status, headers, exc_info) return response_body.append def runapp(): appiter = self._app(environ, catching_start_response) response_body.extend(appiter) if hasattr(appiter, 'close'): appiter.close() p = Profile() start = time.time() p.runcall(runapp) body = b''.join(response_body) elapsed = time.time() - start if self._profile_dir is not None: prof_filename = os.path.join(self._profile_dir, '%s.%s.%06dms.%d.prof' % ( environ['REQUEST_METHOD'], environ.get('PATH_INFO').strip('/').replace('/', '.') or 'root', elapsed * 1000.0, time.time() )) p.dump_stats(prof_filename) else: stats = Stats(p, stream=self._stream) stats.sort_stats(*self._sort_by) self._stream.write('-' * 80) self._stream.write('\nPATH: %r\n' % environ.get('PATH_INFO')) stats.print_stats(*self._restrictions) self._stream.write('-' * 80 + '\n\n') return [body]
def profile_session(bias=None): # we are not running more than one greenlet in this test, so it's fine to # use python's standard profiler import cProfile import profile import pstats if bias is None: bias_profiler = profile.Profile() runs = [bias_profiler.calibrate(100000) for _ in range(5)] bias = min(runs) # the bias should be consistent, otherwise we need to increase the number of iterations msg = 'Bias calculations: {}, used bias: {}'.format(runs, bias) print(msg) profiler = cProfile.Profile() profiler.bias = bias profiler.enable() yield profiler.create_stats() stats = pstats.Stats(profiler) stats.strip_dirs().sort_stats('cumulative').print_stats(15) stats.strip_dirs().sort_stats('time').print_stats(15)
def exec_task(fn, task, d, profile = False): try: quieterr = False if d.getVarFlag(task, "quieterrors", False) is not None: quieterr = True if profile: profname = "profile-%s.log" % (d.getVar("PN", True) + "-" + task) try: import cProfile as profile except: import profile prof = profile.Profile() ret = profile.Profile.runcall(prof, _exec_task, fn, task, d, quieterr) prof.dump_stats(profname) bb.utils.process_profilelog(profname) return ret else: return _exec_task(fn, task, d, quieterr) except Exception: from traceback import format_exc if not quieterr: logger.error("Build of %s failed" % (task)) logger.error(format_exc()) failedevent = TaskFailed(task, None, d, True) event.fire(failedevent, d) return 1