我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用profile.runctx()。
def run(self): try: loop = asyncio.get_event_loop() ipfactory = get_ip.ipFactory(self.q, self.iprange) testip = Test_Ip(loop, ipfactory) loop.create_task(testip.Server()) profile.runctx( "loop.run_until_complete(testip.SuccessStop())", globals(), locals()) # loop.run_until_complete(testip.SuccessStop()) except (KeyboardInterrupt, SystemExit) as e: loop.create_task(testip.stop()) loop.run_until_complete(testip.SuccessStop()) finally: loop.close() print("Task exit")
def main(): "Main hook for standalone usage" start = time.time() runner_args = parseArguments() setupLogging(sys.stdout, runner_args.log_level) logging.root.setLevel(runner_args.log_level) # logging.getLogger('hdlcc.source_file').setLevel(logging.WARNING) logging.getLogger('hdlcc.config_parser').setLevel(logging.WARNING) # logging.getLogger('hdlcc.builders').setLevel(logging.INFO) logging.getLogger('vunit.project').setLevel(logging.ERROR) # Running hdlcc with threads has two major drawbacks: # 1) Makes interrupting it impossible currently because each source # file is parsed on is own thread. Since there can be lots of # sources, interrupting a single thread is not enough. This is # discussed at https://github.com/suoto/hdlcc/issues/19 # 2) When profiling, the result expected is of the inner hdlcc calls # and with threads we have no info. This is discussed at # https://github.com/suoto/hdlcc/issues/16 # poor results (see suoto/hdlcc/issues/16). # To circumvent this we disable using threads at all when running # via standalone (it's ugly, I know) # pylint: disable=protected-access StandaloneProjectBuilder._USE_THREADS = False # pylint: enable=protected-access if runner_args.debug_profiling: profile.runctx( 'runner(runner_args)', globals=globals(), locals={'runner_args' : runner_args}, filename=runner_args.debug_profiling, sort=-1) else: runner(runner_args) end = time.time() _logger.info("Process took %.2fs", (end - start))
def profile(name, env, filename=None, verbose=False): if filename: filename = name + '-' + filename print('Profiling %s ==> %s' % (name, filename)) else: filename = None title = name + ' profile' print() print('=' * len(title)) print(title) print('=' * len(title)) func = create_bench(name, env) gc.collect() code = 'for x in range(10000): func()' if verbose: if pprofile is None: print('pprofile not found. Please install pprofile and try again.') return pprofile.runctx(code, locals(), globals(), filename=filename) else: cProfile.runctx(code, locals(), globals(), sort='tottime', filename=filename)
def quickProfile(name="unnamed"): import pstats def profileDecorator(f): if(not config.GetBool("use-profiler",0)): return f def _profiled(*args, **kArgs): # must do this in here because we don't have base/simbase # at the time that PythonUtil is loaded if(not config.GetBool("profile-debug",0)): #dumb timings st=globalClock.getRealTime() f(*args,**kArgs) s=globalClock.getRealTime()-st print "Function %s.%s took %s seconds"%(f.__module__, f.__name__,s) else: import profile as prof, pstats #detailed profile, stored in base.stats under ( if(not hasattr(base,"stats")): base.stats={} if(not base.stats.get(name)): base.stats[name]=[] prof.runctx('f(*args, **kArgs)', {'f':f,'args':args,'kArgs':kArgs},None,"t.prof") s=pstats.Stats("t.prof") #p=hotshot.Profile("t.prof") #p.runctx('f(*args, **kArgs)', {'f':f,'args':args,'kArgs':kArgs},None) #s = hotshot.stats.load("t.prof") s.strip_dirs() s.sort_stats("cumulative") base.stats[name].append(s) _profiled.__doc__ = f.__doc__ return _profiled return profileDecorator
def main(): global iprange, ipHasFind print("Process Sum:", ProcessSum) if ProcessSum > 1: q = Queue() q.put(ipHasFind) for i in range(ProcessSum): print("Start Process:", i) p = CheckProcess(q, iprange) p.start() try: with open("ip.txt", "w") as f: sum = 0 while True: ip = ipList.get() s = ip + "|" f.write(s) sum += 1 print("All Sucess Ip:%4d" % sum) except (KeyboardInterrupt, SystemExit) as e: print("main exited") finally: file_name = "find_log" +"0" + ".txt" with open(file_name) as f: s = f.readlines() d = dict([[int(i.split(":")[0]), int(i.split(":")[-1])] for i in list(map(lambda x:x[:-1], s))]) while GoodIpRange.qsize() > 0: nowdict = GoodIpRange.get() d.update(nowdict) with open(file_name, "w") as f: for i in d.items(): print(i[0], ":", i[1]) f.write(str(i[0]) + ":" + str(i[1]) + "\n") else: try: q = Queue() q.put(ipHasFind) loop = asyncio.get_event_loop() ipfactory = get_ip.ipFactory(q, iprange) testip = Test_Ip(loop, ipfactory) loop.create_task(testip.Server()) profile.runctx( "loop.run_until_complete(testip.SuccessStop())", globals(), locals()) # loop.run_until_complete(testip.SuccessStop()) except (KeyboardInterrupt, SystemExit) as e: loop.create_task(testip.stop()) loop.run_until_complete(testip.SuccessStop()) finally: loop.close() print("Task exit")
def execute(self, argv): options, args = self.parseOptions(argv) self.setUp(options) if options.interactive: while True: try: input = raw_input(">>> ") except (EOFError, KeyboardInterrupt): self.stdout.write("\nBye.\n") break inStream = antlr3.ANTLRStringStream(input) self.parseStream(options, inStream) else: if options.input is not None: inStream = antlr3.ANTLRStringStream(options.input) elif len(args) == 1 and args[0] != '-': inStream = antlr3.ANTLRFileStream( args[0], encoding=options.encoding ) else: inStream = antlr3.ANTLRInputStream( self.stdin, encoding=options.encoding ) if options.profile: try: import cProfile as profile except ImportError: import profile profile.runctx( 'self.parseStream(options, inStream)', globals(), locals(), 'profile.dat' ) import pstats stats = pstats.Stats('profile.dat') stats.strip_dirs() stats.sort_stats('time') stats.print_stats(100) elif options.hotshot: import hotshot profiler = hotshot.Profile('hotshot.dat') profiler.runctx( 'self.parseStream(options, inStream)', globals(), locals() ) else: self.parseStream(options, inStream)