我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用traceback.extract_stack()。
def error(*k, **kw): """ Wrap logging.errors, display the origin of the message when '-vv' is set """ global log log.error(*k, **kw) if verbose > 2: st = traceback.extract_stack() if st: st = st[:-1] buf = [] for filename, lineno, name, line in st: buf.append(' File "%s", line %d, in %s' % (filename, lineno, name)) if line: buf.append(' %s' % line.strip()) if buf: log.error("\n".join(buf))
def worker_int(worker): worker.log.info("worker received INT or QUIT signal") ## get traceback info import threading, sys, traceback id2name = dict([(th.ident, th.name) for th in threading.enumerate()]) code = [] for threadId, stack in sys._current_frames().items(): code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""), threadId)) for filename, lineno, name, line in traceback.extract_stack(stack): code.append('File: "%s", line %d, in %s' % (filename, lineno, name)) if line: code.append(" %s" % (line.strip())) worker.log.debug("\n".join(code))
def __init__(self, msg='', ex=None): """ :param msg: error message :type msg: string :param ex: exception causing this error (optional) :type ex: exception """ self.msg = msg assert not isinstance(msg, Exception) self.stack = [] if ex: if not msg: self.msg = str(ex) if isinstance(ex, WafError): self.stack = ex.stack else: self.stack = traceback.extract_tb(sys.exc_info()[2]) self.stack += traceback.extract_stack()[:-1] self.verbose_msg = ''.join(traceback.format_list(self.stack))
def filter(self, record): from config import conf wt = conf.warning_threshold if wt > 0: stk = traceback.extract_stack() caller=None for f,l,n,c in stk: if n == 'warning': break caller = l tm,nb = self.warning_table.get(caller, (0,0)) ltm = time.time() if ltm-tm > wt: tm = ltm nb = 0 else: if nb < 2: nb += 1 if nb == 2: record.msg = "more "+record.msg else: return 0 self.warning_table[caller] = (tm,nb) return 1
def __init__(self, *, loop=None): """Initialize the future. The optional event_loop argument allows to explicitly set the event loop object used by the future. If it's not provided, the future uses the default event loop. """ if loop is None: self._loop = events.get_event_loop() # ???? else: self._loop = loop self._callbacks = [] # ?? if self._loop.get_debug(): self._source_traceback = traceback.extract_stack(sys._getframe(1))
def first_spark_call(): """ Return a CallSite representing the first Spark call in the current call stack. """ tb = traceback.extract_stack() if len(tb) == 0: return None file, line, module, what = tb[len(tb) - 1] sparkpath = os.path.dirname(file) first_spark_frame = len(tb) - 1 for i in range(0, len(tb)): file, line, fun, what = tb[i] if file.startswith(sparkpath): first_spark_frame = i break if first_spark_frame == 0: file, line, fun, what = tb[0] return CallSite(function=fun, file=file, linenum=line) sfile, sline, sfun, swhat = tb[first_spark_frame] ufile, uline, ufun, uwhat = tb[first_spark_frame - 1] return CallSite(function=sfun, file=ufile, linenum=uline)
def log_callstack_last(back_trace=False): if not back_trace: stack = traceback.extract_stack()[:-1] else: stack = traceback.extract_tb(sys.exc_info()[2]) message = "empty" print("Parsing stack") for path, line, func, code in stack: print(path,func) if func not in BACKTRACE_FILTER_FUNC: if func not in BACKTRACE_FILTER_HIDE_CODE: file = os.path.split(path)[-1] message = "%s:%s" % (file, line) return message
def filter(self, record): from .config import conf wt = conf.warning_threshold if wt > 0: stk = traceback.extract_stack() caller=None for f,l,n,c in stk: if n == 'warning': break caller = l tm,nb = self.warning_table.get(caller, (0,0)) ltm = time.time() if ltm-tm > wt: tm = ltm nb = 0 else: if nb < 2: nb += 1 if nb == 2: record.msg = "more "+record.msg else: return 0 self.warning_table[caller] = (tm,nb) return 1
def known(s): tb = traceback.extract_stack() tb = tb[-1] position = "%s:%s" % (tb[2], tb[1]) db = get_config().get_db_con() if db.known(s): sample_info = db.sample_info_fetch(s) return RuleResult(position, result=sample_info.get_result(), reason=sample_info.reason, further_analysis=False) return RuleResult(position, result=Result.unknown, reason="Datei ist dem System noch nicht bekannt", further_analysis=True)
def file_larger_than(s, args): tb = traceback.extract_stack() tb = tb[-1] position = "%s:%s" % (tb[2], tb[1]) size = args['byte'] if s.file_size > size: return RuleResult(position, result=Result.unknown, reason="Datei hat mehr als %d bytes" % size, further_analysis=True) return RuleResult(position, result=Result.ignored, reason="Datei ist nur %d bytes lang" % s.file_size, further_analysis=False)
def cuckoo_score(s, args): tb = traceback.extract_stack() tb = tb[-1] position = "%s:%s" % (tb[2], tb[1]) threshold = args['higher'] if s.cuckoo_report.score >= threshold: return RuleResult(position, result=Result.bad, reason="Cuckoo score >= %s: %s" % (threshold, s.cuckoo_report.score), further_analysis=False) return RuleResult(position, result=Result.unknown, reason="Cuckoo score < %s: %s" % (threshold, s.cuckoo_report.score), further_analysis=True)
def doTraceback(self, module): try: module.do_raise() except: tb = sys.exc_info()[2].tb_next f,lno,n,line = extract_tb(tb, 1)[0] self.assertEqual(line, raise_src.strip()) f,lno,n,line = extract_stack(tb.tb_frame, 1)[0] self.assertEqual(line, raise_src.strip()) s = io.StringIO() print_tb(tb, 1, s) self.assertTrue(s.getvalue().endswith(raise_src)) else: raise AssertionError("This ought to be impossible")
def _raise_typecheck_error(msg, is_return=False, value=None, received_type=None, expected_type=None, func=None): if pytypes.warning_mode: import traceback tb = traceback.extract_stack() off = util._calc_traceback_list_offset(tb) cat = pytypes.ReturnTypeWarning if is_return else pytypes.InputTypeWarning warn_explicit(msg, cat, tb[off][0], tb[off][1]) # if not func is None: # warn_explicit(msg, cat, func.__code__.co_filename, # func.__code__.co_firstlineno, func.__module__) # else: # warn(msg, pytypes.ReturnTypeWarning) else: if is_return: raise pytypes.ReturnTypeError(msg) else: raise pytypes.InputTypeError(msg)
def _warn_argname(msg, func, slf, clsm, cls=None, warn_tp=pytypes.exceptions.TypeWarning): if not pytypes.warn_argnames: return if cls is None: if slf or clsm: try: cls_name = get_class_that_defined_method(func).__name__ except: cls_name = '<unknown class>' else: cls_name = None else: cls_name = cls.__name__ tb = traceback.extract_stack() off = _calc_traceback_list_offset(tb) if cls_name is None: _msg = '%s: %s.%s'%(msg, func.__module__, func.__name__) else: _msg = '%s: %s.%s.%s'%(msg, func.__module__, cls_name, func.__name__) warn_explicit(_msg, warn_tp, tb[off][0], tb[off][1])
def sigquit_handler(sig, frame): """Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT e.g. kill -s QUIT <PID> or CTRL+\ """ print("Dumping stack traces for all threads in PID {}".format(os.getpid())) id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()]) code = [] for thread_id, stack in sys._current_frames().items(): code.append("\n# Thread: {}({})" .format(id_to_name.get(thread_id, ""), thread_id)) for filename, line_number, name, line in traceback.extract_stack(stack): code.append('File: "{}", line {}, in {}' .format(filename, line_number, name)) if line: code.append(" {}".format(line.strip())) print("\n".join(code))
def extract_address(): #tb = traceback.extract_stack() # print() # for t in tb: # print(t[0], t[1], t[2], t[3]) #frame = tb[-3] # return '{0}/{1}/{2}'.format(frame[1], frame[2], frame[3]) #return '{0}/{1}'.format(frame[1], frame[2]) # Retun an address in the format: # 'instruction pointer' / 'qualified function name' frame = sys._getframe(2) ip = frame.f_lasti names = [] var_name = _extract_target_of_assignment() if var_name is not None: names.append(var_name) while frame is not None: n = frame.f_code.co_name if n.startswith('<'): break names.append(n) if n == _current_function_name: break frame = frame.f_back return "{}/{}".format(ip, '.'.join(reversed(names)))
def timeRemaining(seconds=0): """ Function to be used with 'while' """ global _timers if seconds == 0: return True now = time.time() stack = traceback.extract_stack() filename, line_no, q1, q2 = stack[-2] if filename.startswith("<pyshell"): filename = "pyshell" if (filename, line_no) not in _timers: _timers[(filename, line_no)] = (now, seconds) return True start, duration = _timers[(filename, line_no)] if seconds != duration: _timers[(filename, line_no)] = (now, seconds) return True if now - start > duration: del _timers[(filename, line_no)] return False else: return True
def doTraceback(self, module): try: module.do_raise() except: tb = sys.exc_info()[2].tb_next f,lno,n,line = extract_tb(tb, 1)[0] self.assertEqual(line, raise_src.strip()) f,lno,n,line = extract_stack(tb.tb_frame, 1)[0] self.assertEqual(line, raise_src.strip()) s = StringIO.StringIO() print_tb(tb, 1, s) self.assertTrue(s.getvalue().endswith(raise_src)) else: raise AssertionError("This ought to be impossible")
def _raw_log(self, logfn, message, exc_info): cname = '' loc = '' fn = '' tb = traceback.extract_stack() if len(tb) > 2: if self.show_loc: loc = '(%s:%d):' % (os.path.basename(tb[-3][0]), tb[-3][1]) fn = tb[-3][2] if fn != '<module>': if self.__class__.__name__ != Logger.__name__: fn = self.__class__.__name__ + '.' + fn fn += '()' if self.show_fcn: logfn(loc + cname + fn + ': ' + message, exc_info=exc_info) else: logfn(message, exc_info=exc_info)
def _get_current_traceback(self, thread ): '''???????????? :param thread: ???????? :type thread: Thread ''' for thread_id, stack in sys._current_frames().items(): if thread_id != thread.ident: continue tb = "Traceback ( thread-%d possibly hold at ):\n" % thread_id for filename, lineno, name, line in traceback.extract_stack(stack): tb += ' File: "%s", line %d, in %s\n' % (filename, lineno, name) if line: tb += " %s\n" % (line.strip()) return tb else: raise RuntimeError("thread not found")
def get_thread_traceback(thread): '''???????????? :param thread: ???????? :type thread: Thread ''' for thread_id, stack in sys._current_frames().items(): if thread_id != thread.ident: continue tb = "Traceback ( thread-%d possibly hold at ):\n" % thread_id for filename, lineno, name, line in traceback.extract_stack(stack): tb += ' File: "%s", line %d, in %s\n' % (filename, lineno, name) if line: tb += " %s\n" % (line.strip()) return tb else: raise RuntimeError("thread not found")
def caller_trace(back=0): """ Trace caller stack and save info into global dicts, which are printed automatically at the end of SCons execution. """ global caller_bases, caller_dicts import traceback tb = traceback.extract_stack(limit=3+back) tb.reverse() callee = tb[1][:3] caller_bases[callee] = caller_bases.get(callee, 0) + 1 for caller in tb[2:]: caller = callee + caller[:3] try: entry = caller_dicts[callee] except KeyError: caller_dicts[callee] = entry = {} entry[caller] = entry.get(caller, 0) + 1 callee = caller # print a single caller and its callers, if any
def get_interactive_console(thread_id, frame_id, frame, console_message): """returns the global interactive console. interactive console should have been initialized by this time :rtype: DebugConsole """ if InteractiveConsoleCache.thread_id == thread_id and InteractiveConsoleCache.frame_id == frame_id: return InteractiveConsoleCache.interactive_console_instance InteractiveConsoleCache.interactive_console_instance = DebugConsole() InteractiveConsoleCache.thread_id = thread_id InteractiveConsoleCache.frame_id = frame_id console_stacktrace = traceback.extract_stack(frame, limit=1) if console_stacktrace: current_context = console_stacktrace[0] # top entry from stacktrace context_message = 'File "%s", line %s, in %s' % (current_context[0], current_context[1], current_context[2]) console_message.add_console_message(CONSOLE_OUTPUT, "[Current context]: %s" % (context_message,)) return InteractiveConsoleCache.interactive_console_instance
def _worker(self, dictobj): """""" func = dictobj['func'] args = dictobj['args'] argv = dictobj['argv'] try: result = func(*args, **argv) except Exception as e: #print 'ecp occured' result = tuple([e, traceback.extract_stack()]) self.lock.acquire() self._executed_task_count = self._executed_task_count + 1 self._add_result_to_queue(result=result) self.lock.release()
def mute_errors(self, fn, *args, **kwargs): """Run given function and catch any of the errors it logged. The self._errors key will not be changed by using this function. Works by patching the BaseValidator.log_error function. BaseValidator.log_error should not be overridden in children. """ caught_errors = set() def patched_log_error(self, field, msg): tb = traceback.extract_stack()[:-1] if self.debug else None caught_errors.add(ValidatorLogError(msg, self._path + (field,), tb)) old_log_error = BaseValidator.log_error try: BaseValidator.log_error = patched_log_error val = fn(*args, **kwargs) finally: BaseValidator.log_error = old_log_error return val, caught_errors
def log_error(self, field, msg): """Add an error to the validator. This method should not be overridden in subclasses, as doing so is likely to break the error and warning coercion decorators. :param field: The field the error was raised on. :param msg: The message to associate with the error. """ if self.collect_errors: tb = traceback.extract_stack()[:-1] if self.debug else None err = ValidatorLogError(msg, self._path + (field,), tb) if self.verbose: self._IIIFValidator.logger.error(str(err)) self._errors.add(err) if self.fail_fast: raise FailFastException
def refresh(): override_files = [] for stack in traceback.extract_stack(): f = os.path.join(os.path.dirname(stack[0]), OVERRIDE_FILE) if f not in override_files: override_files.insert(0, f) if OVERRIDE_FILE in override_files: del override_files[override_files.index(OVERRIDE_FILE)] override_files.append(OVERRIDE_FILE) def import_path(path): if sys.version_info < (3, 5, 0): from importlib.machinery import SourceFileLoader return SourceFileLoader(__name__, path).load_module() import importlib.util spec = importlib.util.spec_from_file_location(__name__, path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return module for override_file in override_files: if not os.path.isfile(override_file): continue mod = import_path(override_file) globals().update({n: getattr(mod, n) for n in dir(mod) if not n.startswith("__")})
def blockingCallFromThread(reactor, f, *args, **kwargs): """ Improved version of twisted's blockingCallFromThread that shows the complete stacktrace when an exception is raised on the reactor's thread. If being called from the reactor thread already, just return the result of execution of the callable. """ if isInIOThread(): return f(*args, **kwargs) else: queue = Queue.Queue() def _callFromThread(): result = defer.maybeDeferred(f, *args, **kwargs) result.addBoth(queue.put) reactor.callFromThread(_callFromThread) result = queue.get() if isinstance(result, failure.Failure): other_thread_tb = traceback.extract_tb(result.getTracebackObject()) this_thread_tb = traceback.extract_stack() logger.error("Exception raised on the reactor's thread %s: \"%s\".\n Traceback from this thread:\n%s\n" " Traceback from the reactor's thread:\n %s", result.type.__name__, result.getErrorMessage(), ''.join(traceback.format_list(this_thread_tb)), ''.join(traceback.format_list(other_thread_tb))) result.raiseException() return result
def getCallerName(): """?????????????????, ???????????????""" import traceback s_trace = traceback.extract_stack() full_function_name = s_trace[-3][2] #????????? #s_trace = s_trace[-4] #full_function_name = s_trace[-1] #s_filters = list('().') #for s_filter in s_filters: #if full_function_name != None: #full_function_name = str.replace(full_function_name, s_filter, '') #else: #full_function_name = '' #???? ???? filename = sys._getframe(2).f_code.co_filename filename = os.path.basename(filename) filename = filename.split('.')[0] return filename + "_" + full_function_name
def assemble(*args, **kwargs): """????, ????????, ???????? args: tuple boll return: bool """ #?????????????? ??four<-0.3, ??????? #???????? ???????? ?????? s_trace = traceback.extract_stack() if len(s_trace)>=2 and is_allow: #print s_trace[-2] fname,line,mod,fn_name = s_trace[-2] fn_name = s_trace[-1][-2] args_text = _getFunctionArgs(fname, line, fn_name) #print args_text #print args args = np.array(args) return args.all()