我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sys.setprofile()。
def restore_profiler(): """If a typechecking profiler is active, e.g. created by pytypes.set_global_typechecked_profiler(), such a profiler must be restored whenever a TypeCheckError is caught. The call must stem from the thread that raised the error. Otherwise the typechecking profiler is implicitly disabled. Alternatively one can turn pytypes into warning mode. In that mode no calls to this function are required (unless one uses filterwarnings("error") or likewise). """ idn = threading.current_thread().ident if not sys.getprofile() is None: warn("restore_profiler: Current profile is not None!") if not idn in _saved_profilers: warn("restore_profiler: No saved profiler for calling thread!") else: sys.setprofile(_saved_profilers[idn]) del _saved_profilers[idn]
def start(self): if self._active: raise RuntimeError('type checker already running') elif self._pending: raise RuntimeError('type checker already starting up') self._pending = True # Install this instance as the current profiler self._previous_profiler = sys.getprofile() self._set_caller_level_shift(0) sys.setprofile(self) # If requested, set this instance as the default profiler for all future threads # (does not affect existing threads) if self.all_threads: self._previous_thread_profiler = threading._profile_hook threading.setprofile(self) self._active, self._pending = True, False
def stop(self): if self._active and not self._pending: self._pending = True if sys.getprofile() is self: sys.setprofile(self._previous_profiler) if not self._previous_profiler is None and \ isinstance(self._previous_profiler, TypeAgent): self._previous_profiler._set_caller_level_shift(0) else: if not (sys.getprofile() is None and self._cleared): warn('the system profiling hook has changed unexpectedly') if self.all_threads: if threading._profile_hook is self: threading.setprofile(self._previous_thread_profiler) else: # pragma: no cover warn('the threading profiling hook has changed unexpectedly') self._active, self._pending = False, False
def _trace_cmdline(self, *args, **kwargs): ''' Internal function used to trace the values set to the special variable cmd_line in the function body. ''' def tracer(frame, event, arg): if event=='return': self._locals = frame.f_locals.copy() # Activate tracer sys.setprofile(tracer) try: # trace the function call res = self.func(*args, **kwargs) finally: # disable tracer and replace with old one sys.setprofile(None) return self._locals['cmd_line']
def __bootstrap(self): try: self._set_ident() self._Thread__started.set() threading._active_limbo_lock.acquire() threading._active[self._Thread__ident] = self del threading._limbo[self] threading._active_limbo_lock.release() if threading._trace_hook: sys.settrace(threading._trace_hook) if threading._profile_hook: sys.setprofile(threading._profile_hook) try: self.run() finally: self._Thread__exc_clear() finally: with threading._active_limbo_lock: self._Thread__stop() try: del threading._active[threading._get_ident()] except: pass
def start(): """ Turn on profiling. """ global _profile_start, _profile_setup, _call_stack, _inst_data if _profile_start is not None: print("profiling is already active.") return if not _profile_setup: setup() # just do a default setup _profile_start = etime() _call_stack.append(('$total', _profile_start, None)) if '$total' not in _inst_data: _inst_data['$total'] = [None, 0., 0] if sys.getprofile() is not None: raise RuntimeError("another profile function is already active.") sys.setprofile(_instance_profile_callback)
def stop(): """ Turn off profiling. """ global _profile_total, _profile_start, _call_stack, _inst_data if _profile_start is None: return sys.setprofile(None) _call_stack.pop() _profile_total += (etime() - _profile_start) _inst_data['$total'][1] = _profile_total _inst_data['$total'][2] += 1 _profile_start = None
def register_request(request, settings): snoopy_data = { 'request': request.path, 'method': request.method, 'queries': [], 'profiler_traces': [], 'custom_attributes': {}, 'start_time': datetime.datetime.now() } _snoopy_request.request = request _snoopy_request.data = snoopy_data _snoopy_request.settings = settings _snoopy_request.current_function_key = [None, None] from django.conf import settings as django_settings _snoopy_request.relevant_apps = tuple(django_settings.INSTALLED_APPS) app_root = get_app_root() _snoopy_request.app_root = app_root if _snoopy_request.settings.get('USE_CPROFILE'): _snoopy_request.profiler = cProfile.Profile() _snoopy_request.profiler.enable() if _snoopy_request.settings.get('USE_BUILTIN_PROFILER'): sys.setprofile(SnoopyRequest.profile)
def register_response(response): if _snoopy_request.settings.get('USE_BUILTIN_PROFILER'): sys.setprofile(None) snoopy_data = _snoopy_request.data snoopy_data['end_time'] = datetime.datetime.now() snoopy_data['total_request_time'] = \ (snoopy_data['end_time'] - snoopy_data['start_time']) if _snoopy_request.settings.get('USE_CPROFILE'): _snoopy_request.profiler.disable() profiler_result = StringIO.StringIO() profiler_stats = pstats.Stats( _snoopy_request.profiler, stream=profiler_result).sort_stats('cumulative') profiler_stats.print_stats() result = profiler_result.getvalue() if not _snoopy_request.settings.get('CPROFILE_SHOW_ALL_FUNCTIONS'): result = clean_profiler_result(result) snoopy_data['profiler_result'] = result return snoopy_data
def runctx(self, cmd, globals, locals): self.set_cmd(cmd) sys.setprofile(self.dispatcher) try: exec cmd in globals, locals finally: sys.setprofile(None) return self # This method is more useful to profile a single function call.
def setprofile(func): global _profile_hook _profile_hook = func
def profile_on(): global p_stats, p_start_time p_stats = {} p_start_time = time() threading.setprofile(profiler) sys.setprofile(profiler)
def profile_off(): threading.setprofile(None) sys.setprofile(None)
def setprofile(func): """Set a profile function for all threads started from the threading module. The func will be passed to sys.setprofile() for each thread, before its run() method is called. """ global _profile_hook _profile_hook = func
def start_profiler(): global _state _state = GlobalState() frame = sys._getframe(0) current_greenlet = greenlet.getcurrent() # pylint: disable=no-member thread_state = ensure_thread_state(current_greenlet, frame) _state.last = thread_state # this needs to be instantiate before the handler is installed greenlet.settrace(greenlet_profiler) # pylint: disable=no-member sys.setprofile(thread_profiler) threading.setprofile(thread_profiler)
def stop_profiler(): # we keep the _state around for the user until the next session # Unregister the profiler in this order, otherwise we will have extra # measurements in the end sys.setprofile(None) threading.setprofile(None) greenlet.settrace(None) # pylint: disable=no-member
def trace_calls( logger: CallTraceLogger, code_filter: Optional[CodeFilter] = None, sample_rate: Optional[int] = None, ) -> Iterator[None]: """Enable call tracing for a block of code""" old_trace = sys.getprofile() sys.setprofile(CallTracer(logger, code_filter, sample_rate)) try: yield finally: sys.setprofile(old_trace) logger.flush()
def runctx(self, cmd, globals, locals): self.set_cmd(cmd) sys.setprofile(self.dispatcher) try: exec(cmd, globals, locals) finally: sys.setprofile(None) return self # This method is more useful to profile a single function call.
def test_getdefaultencoding(self): self.assertRaises(TypeError, sys.getdefaultencoding, 42) # can't check more than the type, as the user might have changed it self.assertIsInstance(sys.getdefaultencoding(), str) # testing sys.settrace() is done in test_sys_settrace.py # testing sys.setprofile() is done in test_sys_setprofile.py
def setUp(self): sys.setprofile(None)
def tearDown(self): sys.setprofile(None)
def callback(self, frame, event, arg): # Callback registered with sys.setprofile()/sys.settrace() self.dispatch[event](self, frame)
def capture_events(callable, p=None): if p is None: p = HookWatcher() # Disable the garbage collector. This prevents __del__s from showing up in # traces. old_gc = gc.isenabled() gc.disable() try: sys.setprofile(p.callback) protect(callable, p) sys.setprofile(None) finally: if old_gc: gc.enable() return p.get_events()[1:-1]
def test_getdefaultencoding(self): if test.test_support.have_unicode: self.assertRaises(TypeError, sys.getdefaultencoding, 42) # can't check more than the type, as the user might have changed it self.assertIsInstance(sys.getdefaultencoding(), str) # testing sys.settrace() is done in test_sys_settrace.py # testing sys.setprofile() is done in test_sys_setprofile.py
def Start(self): sys.setprofile(self.OnEvent)
def Stop(self, path): sys.setprofile(None) # Only one process should write out the file! if os.getpid() != self.pid: return # TODO: # - report number of events? # - report number of bytes? print >>sys.stderr, 'num_events: %d' % self.num_events print >>sys.stderr, 'Writing to %r' % path with open(path, 'w') as f: f.write(self.event_strs.getvalue())
def test_setget(self): def fn(*args): pass sys.setprofile(fn) self.assertIs(sys.getprofile(), fn)