我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cProfile.run()。
def main(): cProfile.run('test_emo()',sort = 'time') return None
def __init__(self, commdir): # This code is run when starting a new instance, and not when the instance is unpickled. self._file_ext = '.launch.txt' self._job_by_jobId = {} self._commdir = commdir launch_subdir = 'launch' archive_subdir = 'archive' self._launch_dir = os.path.join(self._commdir, launch_subdir) self._archive_dir = os.path.join(self._commdir, archive_subdir) try: os.makedirs(self._launch_dir,exist_ok=True) except: raise Exception('Failed to create launchdir; communicationDirBase may be read-only') os.makedirs(self._archive_dir,exist_ok=True) existing_launchfiles = os.listdir(self._launch_dir) if existing_launchfiles: print ('Starting with ' + str(len(existing_launchfiles)) + ' launched but unprocessed pipelines.') else: print ('Starting with 0 pipelines') existing_archivefiles = os.listdir(self._archive_dir) if len(existing_archivefiles) > 0: raise Exception ('starting with ' + str(len(existing_archivefiles)) + ' in-process jobs... either these should be purged from:' + self._archive_dir + ' or things shoud be restarted from the persisted state')
def run(): print('\nLoading JSON...') input_fn = 'benchmark.json' output_fn = 'panflute.json' with open(input_fn, encoding='utf-8') as f: doc = pf.load(f) print('\nApplying trivial filter...') doc = doc.walk(action=empty_test, doc=doc) print('Dumping JSON...') with open(output_fn, mode='w', encoding='utf-8') as f: pf.dump(doc, f) f.write('\n') print(' - Done!')
def __str__(self): pm = '+-' if hasattr(sys.stdout, 'encoding') and sys.stdout.encoding: try: u'\xb1'.encode(sys.stdout.encoding) pm = u'\xb1' except: pass return ( u"{mean} {pm} {std} per loop (mean {pm} std. dev. of {runs} run{run_plural}, {loops} loop{loop_plural} each)" .format( pm = pm, runs = self.repeat, loops = self.loops, loop_plural = "" if self.loops == 1 else "s", run_plural = "" if self.repeat == 1 else "s", mean = _format_time(self.average, self._precision), std = _format_time(self.stdev, self._precision)) )
def profile_func(call_func_str): """????????? def f(): d = AndroidDevice("192.168.1.120") d.swipe_position(650, 700, 50, 700, 30) d.swipe_position(130, 800, 850, 800, 50) profile_func("f()") :param call_func_str: :return: """ import cProfile cProfile.run(call_func_str, "prof.txt") import pstats p = pstats.Stats("prof.txt") p.sort_stats("time").print_stats()
def __init__(self): self.done = False self.msgs_lock = threading.Lock() self.msgs = [ ] self.verbose = False self.enabled = True # True -> run process(); False -> don't self.jrate = int(11025/2) # sample rate for processing (FFT &c) self.jblock = int(4096/2) # samples per symbol # set self.start_time to the UNIX time of the start # of a UTC minute. now = int(time.time()) gm = time.gmtime(now) self.start_time = now - gm.tm_sec # seconds per cycle
def run_norm_cdf(lets_be_rational_version): start = time.clock() for i in range(TestCases): z = _z[i] actual = lets_be_rational_version.norm_cdf(z) # print(z, " norm_cdf = ", actual) end = time.clock() return end - start # profile.run("run_black(py_lets_be_rational)") # profile.run("run_implied_volatility_from_a_transformed_rational_guess(py_lets_be_rational)") # profile.run("run_implied_volatility_from_a_transformed_rational_guess_with_limited_iterations(py_lets_be_rational)") # profile.run("run_normalised_black(py_lets_be_rational)") # profile.run("run_normalised_black_call(py_lets_be_rational)") # profile.run("run_normalised_vega(py_lets_be_rational)") # profile.run("run_normalised_implied_volatility_from_a_transformed_rational_guess(py_lets_be_rational)") # profile.run("run_normalised_implied_volatility_from_a_transformed_rational_guess_with_limited_iterations(py_lets_be_rational)") # profile.run("run_norm_cdf(py_lets_be_rational)")
def _run_get_on_all_columns(self): column_lookup = self._get_key_column_lookup() start_time = datetime.now() for key in column_lookup.keys(): for col in column_lookup[key]: self._store.get(key, col) end_time = datetime.now() total_time = end_time - start_time print "Took %s to run get on all %s keys and %s columns." % ( total_time, len(column_lookup.keys()), len(column_lookup[column_lookup.keys()[0]]))
def main_loop(): global last_map profiling = True profiling = False #import objgraph if not profiling: while True: main() else: if os.path.exists("stats"): if os.path.exists("stats\*"): os.remove("stats\*") while True: currpath = 'stats\mybot-%s.stats' % (turnCounter+1) cProfile.run('main()', currpath) # with open("bot." + "debug", "a") as f: # logger.debug("Baselining at %s" % getframeinfo(currentframe()).lineno) # objgraph.show_growth(limit=20,file=f)
def main_loop(): profiling = True profiling = False #import objgraph if not profiling: while True: main() else: if os.path.exists("stats"): if os.path.exists("stats\*"): os.remove("stats\*") while True: currpath = 'stats\mybot-%s.stats' % (turnCounter+1) cProfile.run('main()', currpath) # with open("bot." + "debug", "a") as f: # logger.error("Baselining at %s" % getframeinfo(currentframe()).lineno) # objgraph.show_growth(limit=20,file=f)
def main_loop(): global last_map profiling = True profiling = False #import objgraph if not profiling: while True: main() else: if os.path.exists("stats"): if os.path.exists("stats\*"): os.remove("stats\*") while True: currpath = 'stats\mybot-%s.stats' % (turnCounter+1) cProfile.run('main()', currpath) # with open("bot." + "debug", "a") as f: # # logger.debug("Baselining at %s" % getframeinfo(currentframe()).lineno) # objgraph.show_growth(limit=20,file=f)
def main_loop(): while True: profiling = True # profiling = False if not profiling: main() else: currpath = 'stats\mybot-currturn.stats' lastpath = 'stats\mybot-lastturn.stats' if os.path.exists(currpath): if os.path.exists(lastpath): os.remove(lastpath) os.rename(currpath, lastpath) cProfile.run('main()', currpath) # if "pypy" not in sys.executable: # else: # with open(currpath, "w") as f: # vmprof.enable(f.fileno(), period=0.00099, memory=False) # main() # vmprof.disable() #cProfile.run('main_loop()', 'stats\mybot.stats')
def main(): if len(sys.argv) > 1: path = os.path.abspath(sys.argv[1]) if len(sys.argv) > 2: debug = sys.argv[2] if len(sys.argv) > 3: max_cycles = int(sys.argv[3]) run(path, debug, max_cycles) else: run(path, debug, -1) else: run(path, "NONE", -1) else: print(help)
def profile(code, name='profile_run', sort='cumulative', num=30): """Common-use for cProfile""" cProfile.run(code, name) stats = pstats.Stats(name) stats.sort_stats(sort) stats.print_stats(num) return stats #### Code for listing (nearly) all objects in the known universe #### http://utcc.utoronto.ca/~cks/space/blog/python/GetAllObjects # Recursively expand slist's objects # into olist, using seen to track # already processed objects.
def findNew(self, regex): """Return all objects matching regex that were considered 'new' when the last diff() was run.""" return self.findTypes(self.newRefs, regex)
def findPersistent(self, regex): """Return all objects matching regex that were considered 'persistent' when the last diff() was run.""" return self.findTypes(self.persistentRefs, regex)
def start(self, interval=None): if interval is not None: self.interval = interval self._stop = False self.thread = threading.Thread(target=self.run) self.thread.daemon = True self.thread.start()
def run(self): while True: with self.lock: if self._stop is True: return print("\n============= THREAD FRAMES: ================") for id, frame in sys._current_frames().items(): if id == threading.current_thread().ident: continue print("<< thread %d >>" % id) traceback.print_stack(frame) print("===============================================\n") time.sleep(self.interval)
def test_cAddScalar(): dtype = numpy.complex64 try: device=pyopencl.get_platforms()[1].get_devices() except: device=pyopencl.get_platforms()[0].get_devices() print('using cl device=',device,device[0].max_work_group_size, device[0].max_compute_units,pyopencl.characterize.get_simd_group_size(device[0], dtype.size)) ctx = pyopencl.Context(device) #pyopencl.create_some_context() queue = pyopencl.CommandQueue(ctx) wavefront = pyopencl.characterize.get_simd_group_size(device[0], dtype.size) # B = routine(wavefront) import cl_subroutine.cAddScalar prg = pyopencl.Program(ctx, cl_subroutine.cAddScalar.R).build() AddScalar = prg.cAddScalar AddScalar.set_scalar_arg_dtypes(cl_subroutine.cAddScalar.scalar_arg_dtypes) # indata= numpy.arange(0,128).astype(dtype) indata = (numpy.random.randn(128,)+numpy.random.randn(128,)*1.0j).astype(dtype) indata_g = pyopencl.array.to_device(queue, indata) scal= 0.1+0.1j AddScalar(queue, (128,),None,scal, indata_g.data) print(-indata[0]+indata_g.get()[0]) # if __name__ == '__main__': # import cProfile # # cProfile.run('benchmark()') # test_init() # test_cAddScalar() # cProfile.run('test_init()')
def run(code): if args.hex: code = codecs.decode(code.replace(b" ", b""), 'hex_codec') sys.stderr.write("RUNNING: {} ({} bytes)\n".format(repr(code), len(code))) ast = lang_ast.AST() ast.setup(bytearray(code), first=True) stack = ast.run() return stack
def run_file(filename): with open(filename, "rb") as f_obj: return run(f_obj.read())
def __setstate__(self, state): # This code is run when an instance is created by unpickling, and not when it is initially created. self.__dict__.update(state) # roll back jobs that were sent into some state? make these transitions tolerant of being called twice in a row? # how do jobs get purged that finished? for jobId in self._status_by_jobId: directory_status = self._get_module_directory_status(jobId) if directory_status == self._status_by_jobId[jobId]: continue if directory_status in ['AboutToFail','AboutToAbort']: # just try again self._set_module_directory_status(jobId, jobId) elif directory_status in ['Pass','Fail', jobId]: pass elif directory_status in ['AboutToPass']: # move all the stuff back into the jobdir and try again jobdir = self._jobDirectory_by_jobId[jobId] moddir = self._moduleDirectory_by_jobId[jobId] if not os.path.exists(jobdir): os.mkdir(jobdir) module_files = os.listdir(jobdir) for fn in module_files: if fn.startswith('pipette.'): continue old_path = os.path.join(moddir, fn) new_path = os.path.join(jobdir, fn) os.rename(old_path, new_path) # _after_ the files are moved, update the status self._set_module_directory_status(jobId, jobId) else: raise Exception('Module directory left in an unrepairable state: ' + \ self._moduleDirectory_by_jobId[jobId])
def main(): cProfile.run('very_slow()', 'prof.txt') import pstats p = pstats.Stats('prof.txt') p.sort_stats('time').print_stats()
def __init__(self, shell): super(ExecutionMagics, self).__init__(shell) if profile is None: self.prun = self.profile_missing_notice # Default execution function used to actually run user code. self.default_runner = None
def _run_with_timing(run, nruns): """ Run function `run` and print timing information. Parameters ---------- run : callable Any callable object which takes no argument. nruns : int Number of times to execute `run`. """ twall0 = time.time() if nruns == 1: t0 = clock2() run() t1 = clock2() t_usr = t1[0] - t0[0] t_sys = t1[1] - t0[1] print("\nIPython CPU timings (estimated):") print(" User : %10.2f s." % t_usr) print(" System : %10.2f s." % t_sys) else: runs = range(nruns) t0 = clock2() for nr in runs: run() t1 = clock2() t_usr = t1[0] - t0[0] t_sys = t1[1] - t0[1] print("\nIPython CPU timings (estimated):") print("Total runs performed:", nruns) print(" Times : %10s %10s" % ('Total', 'Per run')) print(" User : %10.2f s, %10.2f s." % (t_usr, t_usr / nruns)) print(" System : %10.2f s, %10.2f s." % (t_sys, t_sys / nruns)) twall1 = time.time() print("Wall time: %10.2f s." % (twall1 - twall0))
def capture(self, line, cell): """run the cell, capturing stdout, stderr, and IPython's rich display() calls.""" args = magic_arguments.parse_argstring(self.capture, line) out = not args.no_stdout err = not args.no_stderr disp = not args.no_display with capture_output(out, err, disp) as io: self.shell.run_cell(cell) if args.output: self.shell.user_ns[args.output] = io
def testbench(): plaintext_iter = xrange(plaintext_len_min, plaintext_len_max+plaintext_len_step, plaintext_len_step) key_len_iter = xrange(key_len_min, key_len_max+key_len_step, key_len_step) result = [[]] total_progress = len(plaintext_iter)*len(key_len_iter)*iterations progress = 0 average_time = 0 time_file = open('testbench_times.txt', 'w') print "Running Testbench" start_time = datetime.now() result.append([0] + list(key_len_iter)) for plaintext_len in plaintext_iter: same_plaintext_len_results = [plaintext_len] for key_len in key_len_iter: single_result = 0 for iteration in xrange(iterations): start_run_time = time.time() score = test_run(plaintext_len, key_len) single_result += score end_run_time = time.time() time_file.write(str(end_run_time-start_run_time) + '\n') average_time += end_run_time - start_run_time progress += 1 update_progress(progress/float(total_progress+1), status="keylength: %d, textlength: %d " % (key_len, plaintext_len)) same_plaintext_len_results.append(float(single_result)/iterations) result.append(same_plaintext_len_results) end_time = datetime.now() update_progress(1, done='Completed in %s hours ' % str(end_time-start_time).rsplit('.')[0]) print 'average time per break run: %.1f' % (average_time/total_progress) time_file.close() with open('testbench.txt', 'w') as file: file.writelines('\t'.join(str(j) for j in i) + '\n' for i in result) file.close()
def generate_random_points(ref_radius, total_points): """Return x, y coordinate lists representing random points inside a circle. This function illustrates NumPy capabilities. It is used in the optimization pass 4, 5, 6 in the chapter on performance of the book Learning Python Application Development (Packt Publishing). The run time performance of this function will be significantly faster compared to the previous optimization pass. Generates random points inside a circle with center at (0,0). For any point it randomly picks a radius between 0 and ref_radius. :param ref_radius: The random point lies between 0 and this radius. :param total_points: total number of random points to be created :return: x and y coordinates as lists .. todo:: Refactor! Move the function to a module like gameutilities.py """ # Combination of avoiding the dots (function reevaluations) # and using local variable. This is similar to the # optimization pass-3 but here we use equivalent NumPy functions. l_uniform = np.random.uniform l_sqrt = np.sqrt l_pi = np.pi l_cos = np.cos l_sin = np.sin # Note that the variables theta and radius are now NumPy arrays. theta = l_uniform(0.0, 2.0*l_pi, total_points) radius = ref_radius*l_sqrt(l_uniform(0.0, 1.0, total_points)) x = radius*l_cos(theta) y = radius*l_sin(theta) # x and y thus obtained are NumPy arrays. Return these as Python lists return x.tolist(), y.tolist()
def generate_random_points(ref_radius, total_points): """Return x, y coordinate lists representing random points inside a circle. This function illustrates NumPy capabilities. It is used in the optimization pass 4, 5, 6 in the chapter on performance of the book Learning Python Application Development (Packt Publishing). The run time performance of this function will be significantly faster compared to the previous optimization pass. Generates random points inside a circle with center at (0,0). For any point, it randomly picks a radius between 0 and ref_radius. :param ref_radius: The random point lies between 0 and this radius. :param total_points: total number of random points to be created :return: x and y coordinates as lists .. todo:: Refactor! Move the function to a module like gameutilities.py """ # Combination of avoiding the dots (function reevaluations) # and using local variable. This is similar to the # optimization pass-3 but here we use equivalent NumPy functions. l_uniform = np.random.uniform l_sqrt = np.sqrt l_pi = np.pi l_cos = np.cos l_sin = np.sin # Note that the variables theta and radius are now NumPy arrays. theta = l_uniform(0.0, 2.0*l_pi, total_points) radius = ref_radius*l_sqrt(l_uniform(0.0, 1.0, total_points)) x = radius*l_cos(theta) y = radius*l_sin(theta) # Unlike optimization pass-4 (which returns x and y as Python lists, # here it returns the NumPy arrays directly to be consumed by # the GoldHunt.find_coins method return x, y
def main(): import cProfile # cProfile.run("pack_test()") cProfile.run("pack_test()", "result") # >python -m cProfile myscript.py -o result import pstats p = pstats.Stats("result") p.strip_dirs().sort_stats(-1).print_stats() p.strip_dirs().sort_stats("name").print_stats() p.strip_dirs().sort_stats("cumulative").print_stats(10) p.sort_stats('tottime', 'cumtime').print_stats(.5, 'pack_test')
def show_gui(): deps_gtk = GuiCommon.requirements_details_gtk() report_gtk = GuiCommon.get_dependency_report(deps_gtk, prefix="\t") if GuiCommon.check_dependencies(deps_gtk): from pycam.Gui.Project import ProjectGui gui_class = ProjectGui else: full_report = [] full_report.append("PyCAM dependency problem") full_report.append("Error: Failed to load the GTK interface.") full_report.append("Details:") full_report.append(report_gtk) full_report.append("") full_report.append("Detailed list of requirements: %s" % GuiCommon.REQUIREMENTS_LINK) log.critical(os.linesep.join(full_report)) return EXIT_CODES["requirements"] event_manager = get_event_handler() gui = gui_class(event_manager) # initialize plugins plugin_manager = pycam.Plugins.PluginManager(core=event_manager) plugin_manager.import_plugins() # some more initialization gui.reset_preferences() # TODO: preferences are not loaded until the new format is stable # self.load_preferences() # tell the GUI to empty the "undo" queue gui.clear_undo_states() event_manager.emit_event("notify-initialization-finished") # open the GUI get_mainloop(use_gtk=True).run() # no error -> return no error code return None
def _profile_(): try: import cProfile as profile except: import profile profile.run('_fulltest_()') #profile.run('_filetest_()')
def got_protocol(p): # this needs to be lower than the deferLater in `run` call_later(1, p.send_ping)
def _run(): run(Config(args.port, args.n, args.t, args.population, args.test, args.value, args.failure, args.tx_rate, args.fan_out, args.validate, args.ignore_promoter, args.auto_byzantine), args.broadcast, args.discovery)
def stats_run(npart, nproc, ndim, periodic=False, overwrite=False, display=False, suppress_final_output=False): r"""Get timing stats using :package:`cProfile`. Args: npart (int): Number of particles. nproc (int): Number of processors. ndim (int): Number of dimensions. periodic (bool, optional): If True, the domain is assumed to be periodic. Defaults to False. overwrite (bool, optional): If True, the existing file for this set of input parameters if overwritten. Defaults to False. suppress_final_output (bool, optional): If True, the final output from spawned MPI processes is suppressed. This is mainly for timing purposes. Defaults to False. display (bool, optional): If True, display the profile results. Defaults to False. """ perstr = "" outstr = "" if periodic: perstr = "_periodic" if suppress_final_output: outstr = "_noout" fname_stat = 'stat_{}part_{}proc_{}dim{}{}.txt'.format( npart, nproc, ndim, perstr, outstr) if overwrite or not os.path.isfile(fname_stat): cProfile.run( "from cykdtree.tests import run_test; "+ "run_test({}, {}, nproc={}, ".format(npart, ndim, nproc) + "periodic={}, ".format(periodic) + "suppress_final_output={})".format(suppress_final_output), fname_stat) if display: p = pstats.Stats(fname_stat) p.sort_stats('time').print_stats(10) return p return fname_stat
def time_run(npart, nproc, ndim, nrep=1, periodic=False, leafsize=10, suppress_final_output=False): r"""Get runing times using :package:`time`. Args: npart (int): Number of particles. nproc (int): Number of processors. ndim (int): Number of dimensions. nrep (int, optional): Number of times the run should be performed to get an average. Defaults to 1. periodic (bool, optional): If True, the domain is assumed to be periodic. Defaults to False. leafsize (int, optional): The maximum number of points that should be in any leaf in the tree. Defaults to 10. suppress_final_output (bool, optional): If True, the final output from spawned MPI processes is suppressed. This is mainly for timing purposes. Defaults to False. """ times = np.empty(nrep, 'float') for i in range(nrep): t1 = time.time() run_test(npart, ndim, nproc=nproc, periodic=periodic, leafsize=leafsize, suppress_final_output=suppress_final_output) t2 = time.time() times[i] = t2 - t1 return np.mean(times), np.std(times)
def master(options): """ Start of the master process. """ if not options.silence: print "Master started on PID %s" % os.getpid() # start embedded Web server if asked for (this only runs on master) ## if options.port: webdir = File(".") web = Site(webdir) web.log = lambda _: None # disable annoyingly verbose request logging reactor.listenTCP(options.port, web) # we just need some factory like thing .. it won't be used on master anyway # for actual socket accept ## factory = Factory() # create socket, bind and listen .. port = reactor.listenTCP(options.wsport, factory, backlog=options.backlog) # .. but immediately stop reading: we only want to accept on workers, not master port.stopReading() # fire off background workers ## for i in range(options.workers): args = [executable, "-u", __file__, "--fd", str(port.fileno()), "--cpuid", str(i)] # pass on cmd line args to worker .. args.extend(sys.argv[1:]) reactor.spawnProcess( None, executable, args, childFDs={0: 0, 1: 1, 2: 2, port.fileno(): port.fileno()}, env=os.environ) reactor.run()
def process(self, buf, eof, tm0): global filterorder, votewin # correct back to start of self.ssamples[] tm0 -= self.ssampleslen / float(self.inrate) self.ssamples.append(buf) self.ssampleslen += len(buf) while True: if self.ssampleslen < 60 * self.inrate: break if eof == False and self.ssampleslen < (votewin+1)*60*self.inrate: break samples = numpy.concatenate(self.ssamples) self.ssamples = None self.ssampleslen = None filter = weakutil.butter_bandpass(self.center - self.filterwidth/2, self.center + self.filterwidth/2, self.inrate, filterorder) filtered = scipy.signal.lfilter(filter[0], filter[1], samples) # down-sampling makes everything run much faster. # XXX perhaps sacrificing fine alignment? down = weakutil.resample(filtered, self.inrate, self.lorate) self.process1(down, tm0) trim = 60*self.inrate samples = samples[trim:] self.ssamples = [ samples ] self.ssampleslen = len(samples) tm0 += trim / float(self.inrate)