我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用traceback.extract_tb()。
def recover_stack(self, exc): stack = [] _, _, tb = exc tb_stack = traceback.extract_tb(tb, 25) for tb_frame in tb_stack: func_name = tb_frame[2] filename = tb_frame[0] lineno = tb_frame[1] if self.agent.frame_cache.is_agent_frame(filename): return None if not self.agent.frame_cache.is_system_frame(filename): frame = Frame(func_name, filename, lineno) stack.append(frame) return stack
def test_element_count_validation(self): """ Tests that big collections are detected and raise an exception. """ while True: try: TestSetModel.create(text_set=set(str(uuid4()) for i in range(65535))) break except WriteTimeout: ex_type, ex, tb = sys.exc_info() log.warning("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) del tb except OperationTimedOut: #This will happen if the host is remote self.assertFalse(CASSANDRA_IP.startswith("127.0.0.")) self.assertRaises(ValidationError, TestSetModel.create, **{'text_set': set(str(uuid4()) for i in range(65536))})
def _query(self, session, keyspace, count, consistency_level=ConsistencyLevel.ONE): routing_key = struct.pack('>i', 0) for i in range(count): ss = SimpleStatement('SELECT * FROM cf WHERE k = 0', consistency_level=consistency_level, routing_key=routing_key) tries = 0 while True: if tries > 100: raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(ss)) try: self.coordinator_stats.add_coordinator(session.execute_async(ss)) break except (OperationTimedOut, ReadTimeout): ex_type, ex, tb = sys.exc_info() log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) del tb tries += 1 time.sleep(1)
def _insert(self, session, keyspace, count=12, consistency_level=ConsistencyLevel.ONE): session.execute('USE %s' % keyspace) ss = SimpleStatement('INSERT INTO cf(k, i) VALUES (0, 0)', consistency_level=consistency_level) tries = 0 while tries < 100: try: execute_concurrent_with_args(session, ss, [None] * count) return except (OperationTimedOut, WriteTimeout, WriteFailure): ex_type, ex, tb = sys.exc_info() log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) del tb tries += 1 raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(ss))
def remove_cluster(): if USE_CASS_EXTERNAL or KEEP_TEST_CLUSTER: return global CCM_CLUSTER if CCM_CLUSTER: log.debug("Removing cluster {0}".format(CCM_CLUSTER.name)) tries = 0 while tries < 100: try: CCM_CLUSTER.remove() CCM_CLUSTER = None return except OSError: ex_type, ex, tb = sys.exc_info() log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) del tb tries += 1 time.sleep(1) raise RuntimeError("Failed to remove cluster after 100 attempts")
def execute_until_pass(session, query): tries = 0 while tries < 100: try: return session.execute(query) except (ConfigurationException, AlreadyExists, InvalidRequest): log.warn("Received already exists from query {0} not exiting".format(query)) # keyspace/table was already created/dropped return except (OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure): ex_type, ex, tb = sys.exc_info() log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) del tb tries += 1 raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(query))
def execute_with_long_wait_retry(session, query, timeout=30): tries = 0 while tries < 10: try: return session.execute(query, timeout=timeout) except (ConfigurationException, AlreadyExists): log.warn("Received already exists from query {0} not exiting".format(query)) # keyspace/table was already created/dropped return except (OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure): ex_type, ex, tb = sys.exc_info() log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb))) del tb tries += 1 raise RuntimeError("Failed to execute query after 100 attempts: {0}".format(query))
def __init__(self, msg='', ex=None): """ :param msg: error message :type msg: string :param ex: exception causing this error (optional) :type ex: exception """ self.msg = msg assert not isinstance(msg, Exception) self.stack = [] if ex: if not msg: self.msg = str(ex) if isinstance(ex, WafError): self.stack = ex.stack else: self.stack = traceback.extract_tb(sys.exc_info()[2]) self.stack += traceback.extract_stack()[:-1] self.verbose_msg = ''.join(traceback.format_list(self.stack))
def test_future_traceback(self): @return_future @gen.engine def f(callback): yield gen.Task(self.io_loop.add_callback) try: 1 / 0 except ZeroDivisionError: self.expected_frame = traceback.extract_tb( sys.exc_info()[2], limit=1)[0] raise try: yield f() self.fail("didn't get expected exception") except ZeroDivisionError: tb = traceback.extract_tb(sys.exc_info()[2]) self.assertIn(self.expected_frame, tb) # The following series of classes demonstrate and test various styles # of use, with and without generators and futures.
def format_exception(bfn, ex): import traceback, linecache exinfo = traceback.format_exception_only(ex.__class__, ex) if ex.__class__ == SyntaxError: exinfo = exinfo[1:] lineno = ex.lineno content = '' sys.stderr.write('Error while processing %s:%s\n\t%s\n' % (os.path.abspath(bfn), lineno, content.strip())) else: exec_line = None exloc = traceback.extract_tb(sys.exc_info()[2]) for idx, entry in enumerate(exloc): if entry[3] is None: exec_line = idx if exec_line is not None: exloc = [(bfn, exloc[exec_line][1], '', linecache.getline(bfn, exloc[exec_line][1]))] + exloc[exec_line:] sys.stderr.write('Error while processing %s\n' % os.path.abspath(bfn)) sys.stderr.write(str.join('', traceback.format_list(exloc))) sys.stderr.write(str.join('', exinfo)) sys.exit(1)
def check_errors(fn): def wrapper(*args, **kwargs): global _exception try: fn(*args, **kwargs) except Exception: _exception = sys.exc_info() et, ev, tb = _exception if getattr(ev, 'filename', None) is None: # get the filename from the last item in the stack filename = traceback.extract_tb(tb)[-1][0] else: filename = ev.filename if filename not in _error_files: _error_files.append(filename) raise return wrapper
def __init__(self, *msg): message = "" for arg in msg: message += str(arg) try: ln = sys.exc_info()[-1].tb_lineno file = traceback.extract_tb(sys.exc_info()[-1]) if isinstance(file, list): file = file[0].filename except AttributeError: ln = inspect.currentframe().f_back.f_lineno file = inspect.getframeinfo(inspect.currentframe().f_back).filename if file is not None: file = re.compile("[\\\/]+").split(file) file = file[-1] self.args = "Error ({3}:{1}): {2}".format(type(self), ln, message, file), sys.exit(self) # def __init__(self, *args, **kwargs): # RuntimeError.__init__(self, args, kwargs)
def print_exception(exc_type, exc_value, exc_tb): # remove debugger frames from the top and bottom of the traceback tb = traceback.extract_tb(exc_tb) for i in [0, -1]: while tb: frame_file = path.normcase(tb[i][0]) if not any(is_same_py_file(frame_file, f) for f in DONT_DEBUG): break del tb[i] # print the traceback if tb: print('Traceback (most recent call last):') for out in traceback.format_list(tb): sys.stderr.write(out) # print the exception for out in traceback.format_exception_only(exc_type, exc_value): sys.stdout.write(out)
def abort_on_exception(func): # noqa """ This function decorator wraps the run() method of a thread so that any exceptions in that thread will be logged and cause the threads 'abort' signal to be emitted with the exception as an argument. This way all exception handling can occur on the main thread. Note that the entire sys.exc_info() tuple is passed out, this allows the current traceback to be used in the other thread. """ def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: thread_object = args[0] exc_type, exc_value, exc_tb = exc_info = sys.exc_info() filename, line_num, func_name, text = traceback.extract_tb(exc_tb)[-1] logger.error('Exception Thrown from [%s] on line [%s] via function [%s]' % (filename, line_num, func_name)) logger.error('Exception type %s: %s' % (e.__class__.__name__, e.message)) thread_object.emit('aborted', exc_info) return wrapper
def import_symbol(import_path, setting_name): """ Import a class or function by name. """ mod_name, class_name = import_path.rsplit('.', 1) # import module try: mod = import_module(mod_name) cls = getattr(mod, class_name) except ImportError as e: __, __, exc_traceback = sys.exc_info() frames = traceback.extract_tb(exc_traceback) if len(frames) > 1 and any('importlib' not in f[0] for f in frames[1:]): raise # import error is a level deeper. raise ImproperlyConfigured("{0} does not point to an existing class: {1}".format(setting_name, import_path)) except AttributeError: raise ImproperlyConfigured("{0} does not point to an existing class: {1}".format(setting_name, import_path)) return cls
def user_exception(self, frame, info): """This function is called if an exception occurs, but only if we are to stop at or just below this level.""" if self._wait_for_mainpyfile or self._wait_for_breakpoint: return extype, exvalue, trace = info # pre-process stack trace as it isn't pickeable (cannot be sent pure) msg = ''.join(traceback.format_exception(extype, exvalue, trace)) # in python3.5, convert FrameSummary to tuples (py2.7+ compatibility) tb = [tuple(fs) for fs in traceback.extract_tb(trace)] title = traceback.format_exception_only(extype, exvalue)[0] # send an Exception notification msg = {'method': 'exception', 'args': (title, extype.__name__, repr(exvalue), tb, msg), 'id': None} self.pipe.send(msg) self.interaction(frame)
def rigify_report_exception(operator, exception): import traceback import sys import os # find the module name where the error happened # hint, this is the metarig type! exceptionType, exceptionValue, exceptionTraceback = sys.exc_info() fn = traceback.extract_tb(exceptionTraceback)[-1][0] fn = os.path.basename(fn) fn = os.path.splitext(fn)[0] message = [] if fn.startswith("__"): message.append("Incorrect armature...") else: message.append("Incorrect armature for type '%s'" % fn) message.append(exception.message) message.reverse() # XXX - stupid! menu's are upside down! operator.report({'INFO'}, '\n'.join(message))
def get_root_neighbors(device): try: parser = class_mapping.get(device.device_class) if not parser: raise RuntimeError('No parser found for {}'.format(device)) device_obj = parser(device.device_name, config.credentials) if device_obj.connect(): neighbors = device_obj.get_neighbors() device_obj.disconnect() return neighbors except Exception as e: print("****{}**** failed connecting to root. Error ****{}****".format(device.device_name, e)) exc_type, exc_value, exc_traceback = sys.exc_info() print(repr(traceback.extract_tb(exc_traceback))) finally: if device_obj.is_connected: device_obj.disconnect()
def showtraceback(self): """Display the exception that just occurred. We remove the first stack item because it is our own code. The output is written by self.write(), below. """ try: type, value, tb = sys.exc_info() sys.last_type = type sys.last_value = value sys.last_traceback = tb tblist = traceback.extract_tb(tb) del tblist[:1] list = traceback.format_list(tblist) if list: list.insert(0, "Traceback (most recent call last):\n") list[len(list):] = traceback.format_exception_only(type, value) finally: tblist = tb = None map(self.write, list)
def get_error_repr(exc_info): exc_type, exc_obj, exc_tb = exc_info Trace = traceback.extract_tb(exc_tb) trace_string = "" Indent = "" Text = "" for file_name, line_no, func_name, text in Trace: path, module_name = os.path.split(file_name) # only print exceptions from Coquery files: if any([path.startswith(x) for x in get_source_paths()]): trace_string += "{} {}, line {}: {}\n".format( Indent, module_name, line_no, func_name.replace("<", "<")) Indent += " " file_location = "{}, line {}".format(file_name, line_no) if text: trace_string += "%s> %s\n" % (Indent[:-1], text) return (exc_type, exc_obj, trace_string, file_location)
def log_callstack_last(back_trace=False): if not back_trace: stack = traceback.extract_stack()[:-1] else: stack = traceback.extract_tb(sys.exc_info()[2]) message = "empty" print("Parsing stack") for path, line, func, code in stack: print(path,func) if func not in BACKTRACE_FILTER_FUNC: if func not in BACKTRACE_FILTER_HIDE_CODE: file = os.path.split(path)[-1] message = "%s:%s" % (file, line) return message
def test_concat_tb(): tb1 = get_tb(raiser1) tb2 = get_tb(raiser2) # These return a list of (filename, lineno, fn name, text) tuples # https://docs.python.org/3/library/traceback.html#traceback.extract_tb entries1 = extract_tb(tb1) entries2 = extract_tb(tb2) tb12 = concat_tb(tb1, tb2) assert extract_tb(tb12) == entries1 + entries2 tb21 = concat_tb(tb2, tb1) assert extract_tb(tb21) == entries2 + entries1 # Check degenerate cases assert extract_tb(concat_tb(None, tb1)) == entries1 assert extract_tb(concat_tb(tb1, None)) == entries1 assert concat_tb(None, None) is None # Make sure the original tracebacks didn't get mutated by mistake assert extract_tb(get_tb(raiser1)) == entries1 assert extract_tb(get_tb(raiser2)) == entries2
def user_exception(self, frame, info): """This function is called if an exception occurs, but only if we are to stop at or just below this level.""" if self._wait_for_mainpyfile or self._wait_for_breakpoint: return extype, exvalue, trace = info # pre-process stack trace as it isn't pickeable (cannot be sent pure) msg = ''.join(traceback.format_exception(extype, exvalue, trace)) trace = traceback.extract_tb(trace) title = traceback.format_exception_only(extype, exvalue)[0] # send an Exception notification msg = {'method': 'exception', 'args': (title, extype.__name__, exvalue, trace, msg), 'id': None} self.pipe.send(msg) self.interaction(frame, info)
def restful(self): def wrapper(action, self=self): def f(_action=action, _self=self, *a, **b): self.is_restful = True method = _self.env.request_method if len(_self.args) and '.' in _self.args[-1]: _self.args[-1], _, self.extension = self.args[-1].rpartition('.') current.response.headers['Content-Type'] = \ contenttype('.' + _self.extension.lower()) rest_action = _action().get(method, None) if not (rest_action and method == method.upper() and callable(rest_action)): raise HTTP(405, "method not allowed") try: return rest_action(*_self.args, **getattr(_self, 'vars', {})) except TypeError, e: exc_type, exc_value, exc_traceback = sys.exc_info() if len(traceback.extract_tb(exc_traceback)) == 1: raise HTTP(400, "invalid arguments") else: raise f.__doc__ = action.__doc__ f.__name__ = action.__name__ return f return wrapper
def log_traceback_from_info(exception_type, value, tb, dst=sys.stderr, skip=0): """Log a given exception nicely to 'dst', showing a traceback. dst -- writeable file-like object skip -- number of traceback entries to omit from the top of the list """ for line in traceback.format_exception_only(exception_type, value): dst.write(line) if (not isinstance(exception_type, str) and issubclass(exception_type, SyntaxError)): return print >>dst, 'traceback (most recent call last):' text = None for filename, lineno, fnname, text in traceback.extract_tb(tb)[skip:]: if fnname == "?": fn_s = "<global scope>" else: fn_s = "(%s)" % fnname print >>dst, " %s:%s %s" % (filename, lineno, fn_s) if text is not None: print >>dst, "failing line:" print >>dst, text
def doTraceback(self, module): try: module.do_raise() except: tb = sys.exc_info()[2].tb_next f,lno,n,line = extract_tb(tb, 1)[0] self.assertEqual(line, raise_src.strip()) f,lno,n,line = extract_stack(tb.tb_frame, 1)[0] self.assertEqual(line, raise_src.strip()) s = io.StringIO() print_tb(tb, 1, s) self.assertTrue(s.getvalue().endswith(raise_src)) else: raise AssertionError("This ought to be impossible")
def print_exception(): import linecache linecache.checkcache() flush_stdout() efile = sys.stderr typ, val, tb = excinfo = sys.exc_info() sys.last_type, sys.last_value, sys.last_traceback = excinfo tbe = traceback.extract_tb(tb) print('Traceback (most recent call last):', file=efile) exclude = ("run.py", "rpc.py", "threading.py", "queue.py", "RemoteDebugger.py", "bdb.py") cleanup_traceback(tbe, exclude) traceback.print_list(tbe, file=efile) lines = traceback.format_exception_only(typ, val) for line in lines: print(line, end='', file=efile)
def display_exception_message(exception, lessonfile=None): """Call this function only inside an except clause.""" sourcefile, lineno, func, code = traceback.extract_tb(sys.exc_info()[2])[0] # We can replace characters because we will only display the # file name, not open the file. sourcefile = sourcefile.decode(sys.getfilesystemencoding(), 'replace') m = ExceptionDialog(exception) if lessonfile: m.add_text(_("Please check the lesson file %s.") % lessonfile) if sourcefile: m.add_text(_('The exception was caught in\n"%(filename)s", line %(lineno)i.') % {'filename': sourcefile, 'lineno': lineno}) if 'm_nonwrapped_text' in dir(exception): m.add_nonwrapped_text(exception.m_nonwrapped_text) m.run() m.destroy()
def getPreviousExceptions(limit=0): """ sys.exc_info() returns : type,value,traceback traceback.extract_tb(traceback) : returns (filename, line number, function name, text) """ try: exinfo = sys.exc_info() if exinfo[0] is not None: stack = traceback.format_tb(exinfo[2]) return str('\n'.join(['Tracebacks (most recent call last):', ''.join(stack[(len(stack)>1 and 1 or 0):]), ': '.join([str(exinfo[0].__name__),str(exinfo[1])]) ])) else: return '' except Exception,e: print 'Aaaargh!' return traceback.format_exc()
def doTraceback(self, module): try: module.do_raise() except: tb = sys.exc_info()[2].tb_next f,lno,n,line = extract_tb(tb, 1)[0] self.assertEqual(line, raise_src.strip()) f,lno,n,line = extract_stack(tb.tb_frame, 1)[0] self.assertEqual(line, raise_src.strip()) s = StringIO.StringIO() print_tb(tb, 1, s) self.assertTrue(s.getvalue().endswith(raise_src)) else: raise AssertionError("This ought to be impossible")
def print_exception(): import linecache linecache.checkcache() flush_stdout() efile = sys.stderr typ, val, tb = excinfo = sys.exc_info() sys.last_type, sys.last_value, sys.last_traceback = excinfo tbe = traceback.extract_tb(tb) print>>efile, '\nTraceback (most recent call last):' exclude = ("run.py", "rpc.py", "threading.py", "Queue.py", "RemoteDebugger.py", "bdb.py") cleanup_traceback(tbe, exclude) traceback.print_list(tbe, file=efile) lines = traceback.format_exception_only(typ, val) for line in lines: print>>efile, line,
def from_except(cls, name, tb, exception, error_html_getter=None, error_html_getter_params={}, **data): try: from celery.exceptions import Retry except ImportError: pass else: if isinstance(exception, Retry): six.reraise(Retry, exception, tb) tb_stacks = traceback.extract_tb(tb) stack_list = [ (cls._normalize_file_python(src_path), lineno, func_name, src_code) for src_path, lineno, func_name, src_code in tb_stacks ] error_html_getter = error_html_getter or (lambda exc_type, exc_value, tb, **kwargs: Reporter(exc_type, exc_value, tb).get_traceback_html()) data[cls.name_property] = name res = cls( stack_hash=cls._get_stack_hash(stack_list), error_type=cls._get_except_cls_name(type(exception)), error_html=error_html_getter(exc_type=type(exception), exc_value=exception, tb=tb, **error_html_getter_params), **data ) res.stack_list = stack_list res.error_args = [repr(arg) for arg in exception.args] return res
def SConscript_exception(file=sys.stderr): """Print an exception stack trace just for the SConscript file(s). This will show users who have Python errors where the problem is, without cluttering the output with all of the internal calls leading up to where we exec the SConscript.""" exc_type, exc_value, exc_tb = sys.exc_info() tb = exc_tb while tb and stack_bottom not in tb.tb_frame.f_locals: tb = tb.tb_next if not tb: # We did not find our exec statement, so this was actually a bug # in SCons itself. Show the whole stack. tb = exc_tb stack = traceback.extract_tb(tb) try: type = exc_type.__name__ except AttributeError: type = str(exc_type) if type[:11] == "exceptions.": type = type[11:] file.write('%s: %s:\n' % (type, exc_value)) for fname, line, func, text in stack: file.write(' File "%s", line %d:\n' % (fname, line)) file.write(' %s\n' % text)
def find_deepest_user_frame(tb): """ Find the deepest stack frame that is not part of SCons. Input is a "pre-processed" stack trace in the form returned by traceback.extract_tb() or traceback.extract_stack() """ tb.reverse() # find the deepest traceback frame that is not part # of SCons: for frame in tb: filename = frame[0] if filename.find(os.sep+'SCons'+os.sep) == -1: return frame return tb[0]
def main(): logging.info(' ** Starting ShakeCast Web Server ** ') try: start() except Exception as e: logging.info('FAILED') exc_tb = sys.exc_info()[2] filename, line_num, func_name, text = traceback.extract_tb(exc_tb)[-1] logging.info('{}: {} - line: {}\nOriginated: {} {} {} {}'.format(type(e), e, exc_tb.tb_lineno, filename, line_num, func_name, text)) return
def main(): logging.info(' ** Starting ShakeCast Server ** ') try: sc_server = Server() # start shakecast sc_server.start_shakecast() while sc_server.stop_server is False: sc_server.stop_loop = False sc_server.loop() except Exception as e: exc_tb = sys.exc_info()[2] filename, line_num, func_name, text = traceback.extract_tb(exc_tb)[-1] logging.info('{}: {} - line: {}\nOriginated: {} {} {} {}'.format(type(e), e, exc_tb.tb_lineno, filename, line_num, func_name, text)) return
def main(): logging.info(' ** Starting ShakeCast Web Server ** ') try: start() except Exception as e: exc_tb = sys.exc_info()[2] filename, line_num, func_name, text = traceback.extract_tb(exc_tb)[-1] logging.info('{}: {} - line: {}\nOriginated: {} {} {} {}'.format(type(e), e, exc_tb.tb_lineno, filename, line_num, func_name, text)) return