我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sys._current_frames()。
def print_all_stacktraces(): print("\n*** STACKTRACE - START ***\n") code = [] for threadId, stack in sys._current_frames().items(): threadName = '' for t in threading.enumerate(): if t.ident == threadId: threadName = t.name code.append("\n# ThreadID: %s %s" % (threadId, threadName)) for filename, lineno, name, line in traceback.extract_stack(stack): code.append('File: "%s", line %d, in %s' % (filename, lineno, name)) if line: code.append(" %s" % (line.strip())) for line in code: print(line) print("\n*** STACKTRACE - END ***\n")
def process_sample(self, signal_frame, sample_time, main_thread_id): if self.profile: start = time.clock() current_frames = sys._current_frames() items = current_frames.items() for thread_id, thread_frame in items: if thread_id == main_thread_id: thread_frame = signal_frame stack = self.recover_stack(thread_frame) if stack: current_node = self.profile for frame in reversed(stack): current_node = current_node.find_or_add_child(str(frame)) current_node.increment(sample_time, 1) thread_id, thread_frame, stack = None, None, None items = None current_frames = None self.profile._overhead += (time.clock() - start)
def worker_int(worker): worker.log.info("worker received INT or QUIT signal") ## get traceback info import threading, sys, traceback id2name = dict([(th.ident, th.name) for th in threading.enumerate()]) code = [] for threadId, stack in sys._current_frames().items(): code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""), threadId)) for filename, lineno, name, line in traceback.extract_stack(stack): code.append('File: "%s", line %d, in %s' % (filename, lineno, name)) if line: code.append(" %s" % (line.strip())) worker.log.debug("\n".join(code))
def sigquit_handler(sig, frame): """Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT e.g. kill -s QUIT <PID> or CTRL+\ """ print("Dumping stack traces for all threads in PID {}".format(os.getpid())) id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()]) code = [] for thread_id, stack in sys._current_frames().items(): code.append("\n# Thread: {}({})" .format(id_to_name.get(thread_id, ""), thread_id)) for filename, line_number, name, line in traceback.extract_stack(stack): code.append('File: "{}", line {}, in {}' .format(filename, line_number, name)) if line: code.append(" {}".format(line.strip())) print("\n".join(code))
def _get_current_traceback(self, thread ): '''???????????? :param thread: ???????? :type thread: Thread ''' for thread_id, stack in sys._current_frames().items(): if thread_id != thread.ident: continue tb = "Traceback ( thread-%d possibly hold at ):\n" % thread_id for filename, lineno, name, line in traceback.extract_stack(stack): tb += ' File: "%s", line %d, in %s\n' % (filename, lineno, name) if line: tb += " %s\n" % (line.strip()) return tb else: raise RuntimeError("thread not found")
def get_thread_traceback(thread): '''???????????? :param thread: ???????? :type thread: Thread ''' for thread_id, stack in sys._current_frames().items(): if thread_id != thread.ident: continue tb = "Traceback ( thread-%d possibly hold at ):\n" % thread_id for filename, lineno, name, line in traceback.extract_stack(stack): tb += ' File: "%s", line %d, in %s\n' % (filename, lineno, name) if line: tb += " %s\n" % (line.strip()) return tb else: raise RuntimeError("thread not found")
def enable_tracing(self): """ Enable tracing if it is disabled and debugged program is running, else do nothing. Do this on all threads but the debugger thread. :return: True if tracing has been enabled, False else. """ _logger.x_debug("enable_tracing()") #self.dump_tracing_state("before enable_tracing()") if not self.tracing_enabled and self.execution_started: # Restore or set trace function on all existing frames appart from # debugger threading.settrace(self._tracer) # then enable on all threads to come for thr in threading.enumerate(): if thr.ident != self.debugger_thread_ident: # skip debugger thread a_frame = sys._current_frames()[thr.ident] while a_frame: a_frame.f_trace = self._tracer a_frame = a_frame.f_back iksettrace._set_trace_on(self._tracer, self.debugger_thread_ident) self.tracing_enabled = True #self.dump_tracing_state("after enable_tracing()") return self.tracing_enabled
def timeout_response() -> chalice.Response: """ Produce a chalice Response object that indicates a timeout. Stacktraces for all running threads, other than the current thread, are provided in the response object. """ frames = sys._current_frames() current_threadid = threading.get_ident() trace_dump = { thread_id: traceback.format_stack(frame) for thread_id, frame in frames.items() if thread_id != current_threadid} problem = { 'status': requests.codes.gateway_timeout, 'code': "timed_out", 'title': "Timed out processing request.", 'traces': trace_dump, } return chalice.Response( status_code=problem['status'], headers={"Content-Type": "application/problem+json"}, body=json.dumps(problem), )
def setUpClass(cls): cls.__lockup_timestamp__ = time.time() def check_twisted(): while time.time() - cls.__lockup_timestamp__ < cls.MAX_TEST_TIME: time.sleep(2) # If the test class completed normally, exit if not cls.__testing__: return # If we made it here, there is a serious issue which we cannot recover from. # Most likely the Twisted threadpool got into a deadlock while shutting down. import os, traceback print >> sys.stderr, "The test-suite locked up! Force quitting! Thread dump:" for tid, stack in sys._current_frames().items(): if tid != threading.currentThread().ident: print >> sys.stderr, "THREAD#%d" % tid for line in traceback.format_list(traceback.extract_stack(stack)): print >> sys.stderr, "|", line[:-1].replace('\n', '\n| ') os._exit(1) t = threading.Thread(target=check_twisted) t.daemon = True t.start()
def test_clear_threads_states_after_fork(self): # Issue #17094: check that threads states are cleared after fork() # start a bunch of threads threads = [] for i in range(16): t = threading.Thread(target=lambda : time.sleep(0.3)) threads.append(t) t.start() pid = os.fork() if pid == 0: # check that threads states have been cleared if len(sys._current_frames()) == 1: os._exit(0) else: os._exit(1) else: _, status = os.waitpid(pid, 0) self.assertEqual(0, status) for t in threads: t.join()
def iter_thread_frames(): main_thread_frame = None for ident, frame in sys._current_frames().items(): if IDENT_TO_UUID.get(ident) == MAIN_UUID: main_thread_frame = frame # the MainThread should be shown in it's "greenlet" version continue yield ident, frame for thread in threading.enumerate(): if not getattr(thread, '_greenlet', None): # some inbetween state, before greenlet started or after it died?... pass elif thread._greenlet.gr_frame: yield thread.ident, thread._greenlet.gr_frame else: # a thread with greenlet but without gr_frame will be fetched from sys._current_frames # If we switch to another greenlet by the time we get there we will get inconsistent dup of threads. # TODO - make best-effort attempt to show coherent thread dump yield thread.ident, main_thread_frame
def threadDump(signum, frame): """Signal handler for dumping thread stack frames to stdout.""" print print "App server has been signaled to attempt a thread dump." print print "Thread stack frame dump at", asclocaltime() sys.stdout.flush() frames = sys._current_frames() print print "-" * 79 print for threadID in sorted(frames): frame = frames[threadID] print "Thread ID: %d (reference count = %d)" % ( threadID, sys.getrefcount(frame)) print ''.join(traceback.format_list(traceback.extract_stack(frame))) print "-" * 79 sys.stdout.flush()
def get_full_thread_dump(): """Returns a string containing a traceback for all threads""" output = io.StringIO() time = strftime("%Y-%m-%d %H:%M:%S", gmtime()) thread_names = {} for thread in threading.enumerate(): thread_names[thread.ident] = thread.name output.write("\n>>>> Begin stack trace (%s) >>>>\n" % time) for threadId, stack in current_frames().items(): output.write( "\n# ThreadID: %s (%s)\n" % (threadId, thread_names.get(threadId, "unknown"))) for filename, lineno, name, line in traceback.extract_stack(stack): output.write( 'File: "%s", line %d, in %s\n' % (filename, lineno, name)) if line: output.write(" %s\n" % (line.strip())) output.write("\n<<<< End stack trace <<<<\n\n") thread_dump = output.getvalue() output.close() return thread_dump
def _captureThreadStacks(self): """Capture the stacks for all currently running threads. :return: A list of ``(thread-description, stack)`` tuples. See `traceback.format_stack` for the format of ``stack``. """ threads = {t.ident: t for t in threading.enumerate()} def describe(ident): if ident in threads: return repr(threads[ident]) else: return "<*Unknown* %d>" % ident return [ (describe(ident), traceback.format_stack(frame)) for ident, frame in sys._current_frames().items() ]
def _debug_lock_release(self): errbuf = "" owner = self._RLock__owner if not owner: return errbuf # Get stack of current owner, if lock is owned. for tid, stack in sys._current_frames().items(): if tid != owner.ident: continue errbuf += "Stack of owner:\n" for filenm, lno, func, txt in \ traceback.extract_stack(stack): errbuf += " File: \"{0}\", line {1:d},in {2}".format( filenm, lno, func) if txt: errbuf += "\n {0}".format(txt.strip()) errbuf += "\n" break return errbuf
def dump_stacks(self): ''' Dumps the stack of all threads. This function is meant for debugging. Useful when a deadlock happens. borrowed from: http://blog.ziade.org/2012/05/25/zmq-and-gevent-debugging-nightmares/ ''' dump = [] # threads threads = dict([(th.ident, th.name) for th in threading.enumerate()]) for thread, frame in sys._current_frames().items(): if thread not in threads: continue dump.append('Thread 0x%x (%s)\n' % (thread, threads[thread])) dump.append(''.join(traceback.format_stack(frame))) dump.append('\n') return ''.join(dump)
def checkAddressSanitizerError(output, router, component): "Checks for AddressSanitizer in output. If found, then logs it and returns true, false otherwise" addressSantizerError = re.search('(==[0-9]+==)ERROR: AddressSanitizer: ([^\s]*) ', output) if addressSantizerError: sys.stderr.write("%s: %s triggered an exception by AddressSanitizer\n" % (router, component)) # Sanitizer Error found in log pidMark = addressSantizerError.group(1) addressSantizerLog = re.search('%s(.*)%s' % (pidMark, pidMark), output, re.DOTALL) if addressSantizerLog: callingTest = os.path.basename(sys._current_frames().values()[0].f_back.f_back.f_globals['__file__']) callingProc = sys._getframe(2).f_code.co_name with open("/tmp/AddressSanitzer.txt", "a") as addrSanFile: sys.stderr.write('\n'.join(addressSantizerLog.group(1).splitlines()) + '\n') addrSanFile.write("## Error: %s\n\n" % addressSantizerError.group(2)) addrSanFile.write("### AddressSanitizer error in topotest `%s`, test `%s`, router `%s`\n\n" % (callingTest, callingProc, router)) addrSanFile.write(' '+ '\n '.join(addressSantizerLog.group(1).splitlines()) + '\n') addrSanFile.write("\n---------------\n") return True return False
def worker_int(worker): worker.log.info("worker received INT or QUIT signal") ## get traceback info import threading, sys, traceback id2name = dict([(th.ident, th.name) for th in threading.enumerate()]) code = [] for threadId, stack in sys._current_frames().items(): code.append("\n# Thread: %s(%d)" % (id2name.get(threadId, ""), threadId)) for filename, lineno, name, line in traceback.extract_stack(stack): code.append('File: "%s", line %d, in %s' % (filename, lineno, name)) if line: code.append(" %s" % (line.strip())) worker.log.debug("\n".join(code))
def run(self): while True: with self.lock: if self._stop is True: return print("\n============= THREAD FRAMES: ================") for id, frame in sys._current_frames().items(): if id == threading.current_thread().ident: continue print("<< thread %d >>" % id) traceback.print_stack(frame) print("===============================================\n") time.sleep(self.interval)
def _print_nativethreads(): for threadId, stack in sys._current_frames().items(): print(threadId) traceback.print_stack(stack) print()
def delete(self): self.logger.info("Threads: %s" % sys._current_frames()) print sys._current_frames() try: return jsonify({'Slaves': storperf.terminate_workloads()}) except Exception as e: abort(400, str(e))
def worker_int(worker): worker.log.info("worker received INT or QUIT signal") import threading, sys, traceback id2name = dict([(th.ident, th.name) for th in threading.enumerate()]) code = [] for threadId, stack in sys._current_frames().items(): code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""), threadId)) for filename, lineno, name, line in traceback.extract_stack(stack): code.append('File: "%s", line %d, in %s' % (filename, lineno, name)) if line: code.append(" %s" % (line.strip())) worker.log.debug("\n".join(code))
def test_current_frames(self): have_threads = True try: import _thread except ImportError: have_threads = False if have_threads: self.current_frames_with_threads() else: self.current_frames_without_threads() # Test sys._current_frames() in a WITH_THREADS build.
def current_frames_without_threads(self): # Not much happens here: there is only one thread, with artificial # "thread id" 0. d = sys._current_frames() self.assertEqual(len(d), 1) self.assertIn(0, d) self.assertTrue(d[0] is sys._getframe())
def test_getframe(self): self.assertRaises(TypeError, sys._getframe, 42, 42) self.assertRaises(ValueError, sys._getframe, 2000000000) self.assertTrue( SysModuleTest.test_getframe.im_func.func_code \ is sys._getframe().f_code ) # sys._current_frames() is a CPython-only gimmick.
def test_current_frames(self): have_threads = True try: import thread except ImportError: have_threads = False if have_threads: self.current_frames_with_threads() else: self.current_frames_without_threads() # Test sys._current_frames() in a WITH_THREADS build.
def dumpstacks(self, data): import threading, sys, traceback id2name = dict([(th.ident, th.name) for th in threading.enumerate()]) code = [] for threadId, stack in sys._current_frames().items(): code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""), threadId)) for filename, lineno, name, line in traceback.extract_stack(stack): code.append('File: "%s", line %d, in %s' % (filename, lineno, name)) if line: code.append(" %s" % (line.strip())) return "\r\n".join(code)
def iter_frames(self, t): #sys._current_frames(): dictionary with thread id -> topmost frame current_frames = sys._current_frames() v = current_frames.get(t.ident) if v is not None: return [v] return [] # IFDEF CYTHON # def create_db_frame(self, *args, **kwargs): # raise AssertionError('This method should not be called on cython (PyDbFrame should be used directly).') # ELSE # just create the db frame directly
def create_db_frame(self, args): #the frame must be cached as a weak-ref (we return the actual db frame -- which will be kept #alive until its trace_dispatch method is not referenced anymore). #that's a large workaround because: #1. we can't have weak-references to python frame object #2. only from 2.5 onwards we have _current_frames support from the interpreter db_frame = PyDBFrame(args) db_frame.frame = args[-1] self._AddDbFrame(db_frame) return db_frame
def __str__(self): return 'State:%s Stop:%s Cmd: %s Kill:%s Frames:%s' % ( self.pydev_state, self.pydev_step_stop, self.pydev_step_cmd, self.pydev_notify_kill, len(self.iter_frames(None))) #======================================================================================================================= # NOW, WE HAVE TO DEFINE WHICH THREAD INFO TO USE # (whether we have to keep references to the frames or not) # from version 2.5 onwards, we can use sys._current_frames to get a dict with the threads # and frames, but to support other versions, we can't rely on that. #=======================================================================================================================
def run(self): time.sleep(10) thread_id_to_name = {} try: for t in threading.enumerate(): thread_id_to_name[t.ident] = '%s (daemon: %s)' % (t.name, t.daemon) except: pass stack_trace = [ '===============================================================================', 'pydev pyunit runner: Threads still found running after tests finished', '================================= Thread Dump ================================='] for thread_id, stack in sys._current_frames().items(): stack_trace.append('\n-------------------------------------------------------------------------------') stack_trace.append(" Thread %s" % thread_id_to_name.get(thread_id, thread_id)) stack_trace.append('') if 'self' in stack.f_locals: sys.stderr.write(str(stack.f_locals['self']) + '\n') for filename, lineno, name, line in traceback.extract_stack(stack): stack_trace.append(' File "%s", line %d, in %s' % (filename, lineno, name)) if line: stack_trace.append(" %s" % (line.strip())) stack_trace.append('\n=============================== END Thread Dump ===============================') sys.stderr.write('\n'.join(stack_trace))
def test_getframe(self): self.assertRaises(TypeError, sys._getframe, 42, 42) self.assertRaises(ValueError, sys._getframe, 2000000000) self.assertTrue( SysModuleTest.test_getframe.__code__ \ is sys._getframe().f_code ) # sys._current_frames() is a CPython-only gimmick.
def dumpstacks(self): id2name = dict([(th.ident, th.name) for th in threading.enumerate()]) for threadId, stack in sys._current_frames().items(): self.logger.debug("# Thread: %s(%d)" % (id2name.get(threadId,""), threadId)) for filename, lineno, name, line in traceback.extract_stack(stack): self.logger.debug('File: "%s", line %d, in %s' % (filename, lineno, name)) if line: self.logger.debug(" %s" % (line.strip()))
def dump_tracing_state(self, context): """ A debug tool to dump all threads tracing state """ print "Dumping all threads Tracing state: (%s)" % context print " self.tracing_enabled=%s" % self.tracing_enabled print " self.execution_started=%s" % self.execution_started print " self.frame_beginning=%s" % self.frame_beginning print " self.debugger_thread_ident=%s" % self.debugger_thread_ident for thr in threading.enumerate(): is_current_thread = thr.ident == threading.current_thread().ident print " Thread: %s, %s %s" % (thr.name, thr.ident, "<= Current*" if is_current_thread else '') a_frame = sys._current_frames()[thr.ident] while a_frame: flags = [] if a_frame == self.frame_beginning: flags.append("beginning") if a_frame == inspect.currentframe(): flags.append("current") if flags: flags_str = "**"+",".join(flags) else: flags_str = "" print " => %s, %s:%s(%s) | %s %s" % (a_frame, a_frame.f_code.co_filename, a_frame.f_lineno, a_frame.f_code.co_name, a_frame.f_trace, flags_str) a_frame = a_frame.f_back
def stackdump(sig, frm): code = [] for threadId, stack in sys._current_frames().items(): code.append("\n# ThreadID: %s" % threadId) for filename, lineno, name, line in traceback.extract_stack(stack): code.append('File: "%s", line %d, in %s' % (filename, lineno, name)) if line: code.append(" %s" % (line.strip())) print "\n".join(code)
def threads_handler(): frames = sys._current_frames() text = ['%d threads found\n\n' % len(frames)] for i, frame in frames.items(): s = 'Thread 0x%x:\n%s\n' % (i, ''.join(traceback.format_stack(frame))) text.append(s) return { 'Content-Type': 'text/plain' }, ''.join(text)