我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cProfile.runctx()。
def search(self): '''Top level search. ''' logger.debug("Start searching...") logger.debug(self.get_info()) self.logwriter = file_writers.LogWriter('%s/%s.log'%(self.out_dir.rstrip('/'), self.data_handle.data_list[0].filename.split('/')[-1].replace('.h5','').replace('.fits','').replace('.fil',''))) self.filewriter = file_writers.FileWriter('%s/%s.dat'%(self.out_dir.rstrip('/'), self.data_handle.data_list[0].filename.split('/')[-1].replace('.h5','').replace('.fits','').replace('.fil','')),self.data_handle.data_list[0].header) logger.info("Start ET search for %s"%self.data_handle.data_list[0].filename) self.logwriter.info("Start ET search for %s"%(self.data_handle.data_list[0].filename)) for ii,target_data_obj in enumerate(self.data_handle.data_list): self.search_data(target_data_obj) ##EE-benshmark cProfile.runctx('self.search_data(target_data_obj)',globals(),locals(),filename='profile_M%2.1f_S%2.1f_t%i'%(self.max_drift,self.snr,int(os.times()[-1]))) #---------------------------------------- #Closing instance. Collect garbage. self.data_handle.data_list[ii].close() gc.collect()
def train_with_profiling(opts, dirs): import cProfile, pstats, StringIO cProfile.runctx('train(opts, dirs)', \ {'train': train, 'opts': opts, 'dirs': dirs}, {}, 'mainstats') # create a stream for the profiler to write to profiling_output = StringIO.StringIO() p = pstats.Stats('mainstats', stream=profiling_output) # print stats to that stream # here we just report the top 30 functions, sorted by total amount of time spent in each p.strip_dirs().sort_stats('cumulative').print_stats(30) # print the result to the log print('---Profiling result follows---\n%s' % profiling_output.getvalue() ) profiling_output.close() print('---End of profiling result---')
def main(): data = json.load(sys.stdin) if {'--profile'}.intersection(sys.argv): import cProfile ret = [] cProfile.runctx( 'ret.append(shapeops.union(data))', globals={'shapeops': shapeops}, locals={'data': data, 'ret': ret}, sort="cumtime", filename="shapeops.cprof") result = ret[0] else: result = shapeops.union(data) print(json.dumps(result, indent=2, sort_keys=True))
def main(argv): parser = _create_option_parser() (options, args) = parser.parse_args(argv) if args: parser.print_help() sys.exit(1) if options.filename: User.objects.filter(username='test_user').delete() cProfile.runctx('benchmark(options.n_sets, options.n_questions)', globals(), locals(), options.filename) else: benchmark(options.n_sets, options.n_questions) return #----------------------------------------------------------------------------#
def execute(self, context): bcPrint(Configuration.rc_path, 'debug', True) try: config = Export.Config(config=self) if self.run_in_profiler: import cProfile cProfile.runctx('export.save(config)', {}, {'export': export, 'config': config}) else: export.save(config) self.filepath = '//' except exceptions.BCryException as exception: bcPrint(exception.what(), 'error') bpy.ops.screen.display_error( 'INVOKE_DEFAULT', message=exception.what()) return {'FINISHED'}
def execute(self, context): bcPrint(Configuration.rc_path, 'debug') try: config = ExportAnimations.Config(config=self) if self.run_in_profiler: import cProfile cProfile.runctx( 'export_animations.save(config)', {}, { 'export_animations': export_animations, 'config': config}) else: export_animations.save(config) self.filepath = '//' except exceptions.BCryException as exception: bcPrint(exception.what(), 'error') bpy.ops.screen.display_error( 'INVOKE_DEFAULT', message=exception.what()) return {'FINISHED'}
def main(arguments=None): logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') parser = HelpfulArgumentParser(description=__doc__, prog='igdiscover') parser.add_argument('--profile', default=False, action='store_true', help='Save profiling information to igdiscover.prof') parser.add_argument('--version', action='version', version='%(prog)s ' + __version__) subparsers = parser.add_subparsers() for command_name in COMMANDS: module = importlib.import_module('.' + command_name, 'igdiscover') subparser = subparsers.add_parser(command_name, help=module.__doc__.split('\n')[1], description=module.__doc__) subparser.set_defaults(func=module.main) module.add_arguments(subparser) args = parser.parse_args(arguments) if not hasattr(args, 'func'): parser.error('Please provide the name of a subcommand to run') elif args.profile: import cProfile as profile profile.runctx('args.func(args)', globals(), locals(), filename='igdiscover.prof') logger.info('Wrote profiling data to igdiscover.prof') else: args.func(args) if sys.platform == 'linux': rself = resource.getrusage(resource.RUSAGE_SELF) rchildren = resource.getrusage(resource.RUSAGE_CHILDREN) memory_kb = rself.ru_maxrss + rchildren.ru_maxrss cpu_time = rself.ru_utime + rself.ru_stime + rchildren.ru_utime + rchildren.ru_stime cpu_time_s = format_duration(cpu_time) logger.info('CPU time {}. Maximum memory usage {:.3f} GB'.format( cpu_time_s, memory_kb / 1E6))
def profile(sourcecode, p_locals, p_globals): import cProfile import pstats prof_filename = os.path.join(tempfile.gettempdir(), "%s.prof" % __file__) try: cProfile.runctx(sourcecode, p_locals, p_globals, prof_filename) p = pstats.Stats(prof_filename) p.sort_stats('cumulative').print_stats(40) finally: os.remove(prof_filename)
def profile(num_elements=100000, parser="lxml"): filehandle = tempfile.NamedTemporaryFile() filename = filehandle.name data = rdoc(num_elements) vars = dict(bs4=bs4, data=data, parser=parser) cProfile.runctx('bs4.BeautifulSoup(data, parser)' , vars, vars, filename) stats = pstats.Stats(filename) # stats.strip_dirs() stats.sort_stats("cumulative") stats.print_stats('_html5lib|bs4', 50)
def run(args): _profile = False if _profile: import pstats, cProfile fh_in = open(args.file, "rb") fh_out = open(args.output, "wb") if args.compress: if _profile: cProfile.runctx("stream_compress(fh_in, fh_out, args.framing, args.bytesize)", globals(), locals(), "Profile.prof") else: stream_compress(fh_in, fh_out, args.framing, args.bytesize) else: if _profile: cProfile.runctx("stream_decompress(fh_in, fh_out, args.framing, args.bytesize)", globals(), locals(), "Profile.prof") else: stream_decompress(fh_in, fh_out, args.framing, args.bytesize) if _profile: s = pstats.Stats("Profile.prof") s.strip_dirs().sort_stats("time").print_stats() fh_in.close() fh_out.close()
def world_profiled(timebase, eyes_are_alive, ipc_pub_url, ipc_sub_url, ipc_push_url, user_dir, version): import cProfile import subprocess import os from world import world cProfile.runctx("world(timebase, eyes_are_alive, ipc_pub_url,ipc_sub_url,ipc_push_url,user_dir,version)", {'timebase': timebase, 'eyes_are_alive': eyes_are_alive, 'ipc_pub_url': ipc_pub_url, 'ipc_sub_url': ipc_sub_url, 'ipc_push_url': ipc_push_url, 'user_dir': user_dir, 'version': version}, locals(), "world.pstats") loc = os.path.abspath(__file__).rsplit('pupil_src', 1) gprof2dot_loc = os.path.join( loc[0], 'pupil_src', 'shared_modules', 'gprof2dot.py') subprocess.call("python " + gprof2dot_loc + " -f pstats world.pstats | dot -Tpng -o world_cpu_time.png", shell=True) print("created cpu time graph for world process. Please check out the png next to the world.py file")
def service_profiled(timebase, eyes_are_alive, ipc_pub_url, ipc_sub_url, ipc_push_url, user_dir, version): import cProfile, subprocess, os from service import service cProfile.runctx("service(timebase,eyes_are_alive,ipc_pub_url,ipc_sub_url,ipc_push_url,user_dir,version)", {'timebase': timebase, 'eyes_are_alive': eyes_are_alive, 'ipc_pub_url': ipc_pub_url, 'ipc_sub_url': ipc_sub_url, 'ipc_push_url': ipc_push_url, 'user_dir': user_dir, 'version': version}, locals(), "service.pstats") loc = os.path.abspath(__file__).rsplit('pupil_src', 1) gprof2dot_loc = os.path.join(loc[0], 'pupil_src', 'shared_modules', 'gprof2dot.py') subprocess.call("python "+gprof2dot_loc+" -f pstats service.pstats | dot -Tpng -o service_cpu_time.png", shell=True) print("created cpu time graph for service process. Please check out the png next to the service.py file")
def eye_profiled(timebase, is_alive_flag, ipc_pub_url, ipc_sub_url, ipc_push_url, user_dir, version, eye_id, overwrite_cap_settings=None): import cProfile import subprocess import os from eye import eye cProfile.runctx("eye(timebase, is_alive_flag,ipc_pub_url,ipc_sub_url,ipc_push_url, user_dir, version, eye_id, overwrite_cap_settings)", {'timebase': timebase, 'is_alive_flag': is_alive_flag, 'ipc_pub_url': ipc_pub_url, 'ipc_sub_url': ipc_sub_url, 'ipc_push_url': ipc_push_url, 'user_dir': user_dir, 'version': version, 'eye_id': eye_id, 'overwrite_cap_settings': overwrite_cap_settings}, locals(), "eye{}.pstats".format(eye_id)) loc = os.path.abspath(__file__).rsplit('pupil_src', 1) gprof2dot_loc = os.path.join(loc[0], 'pupil_src', 'shared_modules', 'gprof2dot.py') subprocess.call("python " + gprof2dot_loc + " -f pstats eye{0}.pstats | dot -Tpng -o eye{0}_cpu_time.png".format(eye_id), shell=True) print("created cpu time graph for eye{} process. Please check out the png next to the eye.py file".format(eye_id))
def main(): "Main hook for standalone usage" start = time.time() runner_args = parseArguments() setupLogging(sys.stdout, runner_args.log_level) logging.root.setLevel(runner_args.log_level) # logging.getLogger('hdlcc.source_file').setLevel(logging.WARNING) logging.getLogger('hdlcc.config_parser').setLevel(logging.WARNING) # logging.getLogger('hdlcc.builders').setLevel(logging.INFO) logging.getLogger('vunit.project').setLevel(logging.ERROR) # Running hdlcc with threads has two major drawbacks: # 1) Makes interrupting it impossible currently because each source # file is parsed on is own thread. Since there can be lots of # sources, interrupting a single thread is not enough. This is # discussed at https://github.com/suoto/hdlcc/issues/19 # 2) When profiling, the result expected is of the inner hdlcc calls # and with threads we have no info. This is discussed at # https://github.com/suoto/hdlcc/issues/16 # poor results (see suoto/hdlcc/issues/16). # To circumvent this we disable using threads at all when running # via standalone (it's ugly, I know) # pylint: disable=protected-access StandaloneProjectBuilder._USE_THREADS = False # pylint: enable=protected-access if runner_args.debug_profiling: profile.runctx( 'runner(runner_args)', globals=globals(), locals={'runner_args' : runner_args}, filename=runner_args.debug_profiling, sort=-1) else: runner(runner_args) end = time.time() _logger.info("Process took %.2fs", (end - start))
def profile(name, env, filename=None, verbose=False): if filename: filename = name + '-' + filename print('Profiling %s ==> %s' % (name, filename)) else: filename = None title = name + ' profile' print() print('=' * len(title)) print(title) print('=' * len(title)) func = create_bench(name, env) gc.collect() code = 'for x in range(10000): func()' if verbose: if pprofile is None: print('pprofile not found. Please install pprofile and try again.') return pprofile.runctx(code, locals(), globals(), filename=filename) else: cProfile.runctx(code, locals(), globals(), sort='tottime', filename=filename)
def _profile(fn, *args, **kw): filename = "%s.prof" % fn.__name__ def load_stats(): st = pstats.Stats(filename) os.unlink(filename) return st began = time.time() cProfile.runctx('result = fn(*args, **kw)', globals(), locals(), filename=filename) ended = time.time() return ended - began, load_stats, locals()['result']
def command(self): self._load_config_into_test_app() import paste.fixture import cProfile import re url = self.args[0] if self.args[1:]: user = self.args[1] else: user = 'visitor' def profile_url(url): try: res = self.app.get(url, status=[200], extra_environ={'REMOTE_USER': user}) except paste.fixture.AppError: print 'App error: ', url.strip() except KeyboardInterrupt: raise except: import traceback traceback.print_exc() print 'Unknown error: ', url.strip() output_filename = 'ckan%s.profile' % re.sub('[/?]', '.', url.replace('/', '.')) profile_command = "profile_url('%s')" % url cProfile.runctx(profile_command, globals(), locals(), filename=output_filename) import pstats stats = pstats.Stats(output_filename) stats.sort_stats('cumulative') stats.print_stats(0.1) # show only top 10% of lines print 'Only top 10% of lines shown' print 'Written profile to: %s' % output_filename
def train_loop_with_profile(process_idx, counter, max_score, args, agent, env, start_time): import cProfile cmd = 'train_loop(process_idx, counter, max_score, args, agent, env, ' \ 'start_time)' cProfile.runctx(cmd, globals(), locals(), 'profile-{}.out'.format(os.getpid()))
def train_loop_with_profile(process_idx, counter, make_env, max_score, args, agent, env, start_time, outdir): import cProfile cmd = 'train_loop(process_idx, counter, make_env, max_score, args, ' \ 'agent, env, start_time)' cProfile.runctx(cmd, globals(), locals(), 'profile-{}.out'.format(os.getpid()))
def adagrad(): theta = ones((features.num_features)) * -.1 theta[0] = 1.0 master_stepsize = 1e-1 #for example fudge_factor = 1e-6 #for numerical stability historical_grad = 0 for iteration in xrange(10): print "iteration", iteration for i, (x, y) in enumerate(train): print i theta_g = zeros_like(theta) t.grad_features(x, y, i, theta, theta_g, features, threshold) historical_grad += theta_g * theta_g adjusted_grad = theta_g / (fudge_factor + np.sqrt(historical_grad)) theta -= master_stepsize * adjusted_grad print f(theta) return theta #cProfile.runctx("adagrad()", globals(), locals(), '.prof') #s = pstats.Stats('.prof') #s.strip_dirs().sort_stats('time').print_stats(30)
def _profile(filename, fn, *args, **kw): gc.collect() profiler.runctx('result = fn(*args, **kw)', globals(), locals(), filename=filename) return locals()['result']
def _live_profile(fn, *args, **kw): load_stats = lambda: pstats.Stats() gc.collect() began = time.time() profiler.runctx('result = fn(*args, **kw)', globals(), locals()) ended = time.time() return ended - began, load_stats, locals()['result']
def main(_): config = create_config() trainer = Trainer(config) # Register signal handler def stop_training(signum, frame): trainer.stop_training() signal.signal(signal.SIGINT, stop_training) if config.profile: import cProfile as profile profile.runctx('trainer.train()', globals(), locals(), 'main.prof') else: trainer.train()