我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用traceback.format_stack()。
def signal_manager(self): """ Deprecated! .. deprecated:: 0.5.0 Use :func:`self.context.signals` instead in your apps. :return: Signal manager (global). :rtype: pyplanet.core.events.manager._SignalManager """ logger.warning(DeprecationWarning( 'DEPRECATED: The usage of \'self.instance.signal_manager\' in apps is deprecated, ' 'use \'self.context.signals\' instead!' )) logger.warning('\n'.join(traceback.format_stack(limit=7))) return self.signals
def tryLock(self, timeout=None, id=None): if timeout is None: locked = QtCore.QMutex.tryLock(self) else: locked = QtCore.QMutex.tryLock(self, timeout) if self.debug and locked: self.l.lock() try: if id is None: self.tb.append(''.join(traceback.format_stack()[:-1])) else: self.tb.append(" " + str(id)) #print 'trylock', self, len(self.tb) finally: self.l.unlock() return locked
def _do_get(self): if self._checked_out: if self._checkout_traceback: suffix = ' at:\n%s' % ''.join( chop_traceback(self._checkout_traceback)) else: suffix = '' raise AssertionError("connection is already checked out" + suffix) if not self._conn: self._conn = self._create_connection() self._checked_out = True if self._store_traceback: self._checkout_traceback = traceback.format_stack() return self._conn
def __init__(self, *args): # build the message from the user user_msg = args[0] if args else "An error has occurred" # get the traceback from the last error if it exists try: tb = traceback.format_exc() # otherwise, get the tb prior to this frame and pretend its us except: tb = "Traceback (most recent call last):\n" tb += ''.join(traceback.format_stack()[:-1]) tb += self.__class__.__name__ + ": " + user_msg tb += '\n' # build the complete log message log_msg = user_msg + "\n" + tb # and log it self.log.error(log_msg) # store the args self.args = args
def handle_SIGUSR1(self, signum, frame): # propagate the signal to children, but only if they are AVE processes for pid in self.get_children(): name = get_proc_name(pid) if name.startswith('ave-'): os.kill(pid, signal.SIGUSR1) # make the dump directory if it doesn't exist hickup_dir = os.path.join(self.home, '.ave', 'hickup') try: os.makedirs(hickup_dir) except OSError, e: if e.errno != errno.EEXIST: self.log('ERROR: could not create %s: %s' % (hickup_dir, e)) return # create the trace file date = time.strftime('%Y%m%d-%H%M%S') name = '%s-%s-%d' % (date, self.proc_name, os.getpid()) path = os.path.join(hickup_dir, name) with open(path, 'w') as f: f.write('stack:\n%s' % ''.join(traceback.format_stack(frame))) f.write('locals:\n%s\n' % frame.f_locals) f.write('globals:\n%s' % frame.f_globals)
def __init__(self, time, func, args, kw, cancel, reset, seconds=None): """ @param time: Seconds from the epoch at which to call C{func}. @param func: The callable to call. @param args: The positional arguments to pass to the callable. @param kw: The keyword arguments to pass to the callable. @param cancel: A callable which will be called with this DelayedCall before cancellation. @param reset: A callable which will be called with this DelayedCall after changing this DelayedCall's scheduled execution time. The callable should adjust any necessary scheduling details to ensure this DelayedCall is invoked at the new appropriate time. @param seconds: If provided, a no-argument callable which will be used to determine the current time any time that information is needed. """ self.time, self.func, self.args, self.kw = time, func, args, kw self.resetter = reset self.canceller = cancel self.seconds = seconds self.cancelled = self.called = 0 self.delayed_time = 0 if self.debug: self.creator = traceback.format_stack()[:-2]
def _startRunCallbacks(self, result): if self.called: if self.debug: if self._debugInfo is None: self._debugInfo = DebugInfo() extra = "\n" + self._debugInfo._getDebugTracebacks() raise AlreadyCalledError(extra) raise AlreadyCalledError if self.debug: if self._debugInfo is None: self._debugInfo = DebugInfo() self._debugInfo.invoker = traceback.format_stack()[:-2] self.called = True self.result = result if self.timeoutCall: try: self.timeoutCall.cancel() except: pass del self.timeoutCall self._runCallbacks()
def format_error(e): # errback automatically wraps everything in a Twisted Failure if isinstance(e, failure.Failure): e = e.value if isinstance(e, str): err_string = e elif six.PY2: err_string = traceback.format_exc(e).rstrip() else: err_string = ''.join(traceback.format_exception(type(e), e, e.__traceback__)).rstrip() if err_string == 'None': # Reasonable heuristic for exceptions that were created by hand last = traceback.format_stack()[-2] err_string = '{}\n {}'.format(e, last) # Quick and dirty hack for now. err_string = err_string.replace('Connection to the other side was lost in a non-clean fashion', 'Connection to the other side was lost in a non-clean fashion (HINT: this generally actually means we got a connection refused error. Check that the remote is actually running.)') return error.Error(err_string)
def debugmsg(msg, msgTupleLambda=None): ''' debugmsg - If #IS_DEVELOPER_DEBUG is changed to True (in the code, changing it after import has no affect ) this will print a debug message to stderr prefixed with "DEBUG: " and suffixed with a newline. Don't create your message like: debugmsg( "Order(id=%s) has %d pepperoni pizzas" %( myOrder.id, len( [ topping for pizza in getPizzas() for topping in pizza.toppings() if 'pepperoni' in topping ] ) ) ) Which would evaluate that comprehension whether or not debug mode is on, Instead, use a lambda to return that same tuple, so that code is only evaluated in the #debugmsg function calls your lambda: debugmsg( "Order(id=%s) has %d pepperoni pizzas", lambda : ( myOrder.id, len( [ topping for pizza in getPizzas() for topping in pizza.toppings() if 'pepperoni' in topping ] ) ) ) ''' if issubclass(msgTupleLambda.__class__, (tuple, list)): sys.stderr.write('''WARNING: debugmsg called with a tuple/list for msgTupleLambda.\n\tShould be a lambda that returns the tuple, so when IS_DEVELOPER_DEBUG=False it is not evaluated. \n\n''' + ''.join(traceback.format_stack()[:-1]) + "\n\n" ) msgTuple = msgTupleLambda else: msgTuple = msgTupleLambda() sys.stderr.write('DEBUG: %s\n' % msgTuple )
def requeue( self, exception=None, ): task = self.current_task exception_traceback = ''.join(traceback.format_stack()) if not exception: exception = WorkerRequeue() self._on_requeue( task=task, exception=exception, exception_traceback=exception_traceback, args=task['args'], kwargs=task['kwargs'], )
def _warning(*var_text): """ Prints a warning message to standard out. """ text = 'WARNING: ' for var in var_text: text += str(var) + ' ' if not config['suppress_warnings']: print(text) # # Print the trace # tb = traceback.format_stack() # for line in tb: # if not '/ANNarchy/core/' in line and \ # not '/ANNarchy/parser/' in line and \ # not '/ANNarchy/generator/' in line : # print(line)
def exit_gracefully(self, signum, frame): '''exit the consumer gracefully''' if not self.message_processing: self.logger.info("Fast shutdown available ... exiting (signum %d)" \ % (signum)) self.logger.info("Print stack trace. Don't panic!") self.logger.info("-----------------------------------------------") for chunk in traceback.format_stack(frame): for line in chunk.split("\n"): self.logger.info(line) self.logger.info("-----------------------------------------------") for part in self.partitions: self.partitions[part].writer.close() self.consumer.commit() sys.exit(0) self.logger.info("Graceful shutdown of consumer " + str(self.consumer_id) + " started....") self.shutting_down = True
def setup_thread_debugger(): """Setup a thread debbuger for pytest session This function, based on http://stackoverflow.com/a/133384, is meant to add debugging facility to pytest that will allow to debug deadlock that may sometimes occur. """ def debug(signal, frame): """Interrupt running process and provide a python prompt for interactive debugging.""" d = {'_frame': frame} # Allow access to frame object. d.update(frame.f_globals) # Unless shadowed by global d.update(frame.f_locals) i = code.InteractiveConsole(d) message = "Signal received : entering python shell.\nTraceback:\n" message += ''.join(traceback.format_stack(frame)) i.interact(message) signal.signal(signal.SIGUSR1, debug) # Register handler
def log(msg, msgx='', level=logging.INFO, need_tb=False): """ Logging in UCC Config Module. :param msg: message content :param msgx: detail info. :param level: logging level :param need_tb: if need logging traceback :return: """ global LOGGING_STOPPED if LOGGING_STOPPED: return msgx = ' - ' + msgx if msgx else '' content = 'UCC Config Module: %s%s' % (msg, msgx) if need_tb: stack = ''.join(traceback.format_stack()) content = '%s\r\n%s' % (content, stack) stulog.logger.log(level, content, exc_info=1)
def timeout_response() -> chalice.Response: """ Produce a chalice Response object that indicates a timeout. Stacktraces for all running threads, other than the current thread, are provided in the response object. """ frames = sys._current_frames() current_threadid = threading.get_ident() trace_dump = { thread_id: traceback.format_stack(frame) for thread_id, frame in frames.items() if thread_id != current_threadid} problem = { 'status': requests.codes.gateway_timeout, 'code': "timed_out", 'title': "Timed out processing request.", 'traces': trace_dump, } return chalice.Response( status_code=problem['status'], headers={"Content-Type": "application/problem+json"}, body=json.dumps(problem), )
def tryLock(self, timeout=None, id=None): """ Try to lock the mutex. @param timeout int: give up after this many milliseconds @param id: debug id @return bool: whether locking succeeded """ if timeout is None: locked = QtCore.QMutex.tryLock(self) else: locked = QtCore.QMutex.tryLock(self, timeout) if self.debug and locked: self.l.lock() try: if id is None: self.tb.append(''.join(traceback.format_stack()[:-1])) else: self.tb.append(" " + str(id)) #print 'trylock', self, len(self.tb) finally: self.l.unlock() return locked
def lock(self, id=None): """ Lock mutex. Will try again every 5 seconds. @param id: debug id """ c = 0 waitTime = 5000 # in ms while True: if self.tryLock(waitTime, id): break c += 1 if self.debug: self.l.lock() try: logger.debug('Waiting for mutex lock ({:.1} sec). ' 'Traceback follows:'.format(c*waitTime/1000.)) logger.debug(''.join(traceback.format_stack())) if len(self.tb) > 0: logger.debug('Mutex is currently locked from: {0}\n' ''.format(self.tb[-1])) else: logger.debug('Mutex is currently locked from [???]') finally: self.l.unlock() #print 'lock', self, len(self.tb)
def run(self): def log_failure(failure): logger.exception(failure.value) if failure.frames: logger.critical(str("").join(format_tb(failure.getTracebackObject()))) def errback_main(failure): log_failure(failure) self.task.start(interval=0).addErrback(errback_main) def errback_flush_states(failure): log_failure(failure) self._flush_states_task.start(interval=300).addErrback(errback_flush_states) def debug(sig, frame): logger.critical("Signal received: printing stack trace") logger.critical(str("").join(format_stack(frame))) self.task.start(interval=0).addErrback(errback_main) self._logging_task.start(interval=30) self._flush_states_task.start(interval=300).addErrback(errback_flush_states) signal(SIGUSR1, debug) reactor.addSystemEventTrigger('before', 'shutdown', self.stop) reactor.run()
def _startRunCallbacks(self, result): if self.called: if self._suppressAlreadyCalled: self._suppressAlreadyCalled = False return if self.debug: if self._debugInfo is None: self._debugInfo = DebugInfo() extra = "\n" + self._debugInfo._getDebugTracebacks() raise AlreadyCalledError(extra) raise AlreadyCalledError if self.debug: if self._debugInfo is None: self._debugInfo = DebugInfo() self._debugInfo.invoker = traceback.format_stack()[:-2] self.called = True self.result = result self._runCallbacks()
def get_file(file_id, api=None): # TODO: remove this function and just use the Pillar SDK directly. if file_id is None: return None if api is None: api = system_util.pillar_api() try: return File.find(file_id, api=api) except ResourceNotFound: f = sys.exc_info()[2].tb_frame.f_back tb = traceback.format_stack(f=f, limit=2) log.warning('File %s not found, but requested from %s\n%s', file_id, request.url, ''.join(tb)) return None
def debug(self, truesock_func, *a, **kw): if self.is_http: frame = inspect.stack()[0][0] lines = list(map(utf8, traceback.format_stack(frame))) message = [ "HTTPretty intercepted and unexpected socket method call.", ("Please open an issue at " "'https://github.com/gabrielfalcao/HTTPretty/issues'"), "And paste the following traceback:\n", "".join(decode_utf8(lines)), ] raise RuntimeError("\n".join(message)) if not self.truesock: raise UnmockedError() return getattr(self.truesock, truesock_func)(*a, **kw)
def execute_insert_sql(self, *args, **kwargs): stack_trace = traceback.format_stack() query_dict = { 'query': [], 'query_type': QUERY_TYPE_WRITE, 'traceback': stack_trace, 'model': "%s.%s" % (self.query.model.__module__, self.query.model.__name__), 'start_time': datetime.datetime.now(), } for sql, params in self.as_sql(): query_dict['query'].append(sql % params) try: return self._snoopy_execute_insert_sql(*args, **kwargs) finally: # This gets called just before the `return` query_dict['end_time'] = datetime.datetime.now() SnoopyRequest.record_query_data(query_dict)
def _captureThreadStacks(self): """Capture the stacks for all currently running threads. :return: A list of ``(thread-description, stack)`` tuples. See `traceback.format_stack` for the format of ``stack``. """ threads = {t.ident: t for t in threading.enumerate()} def describe(ident): if ident in threads: return repr(threads[ident]) else: return "<*Unknown* %d>" % ident return [ (describe(ident), traceback.format_stack(frame)) for ident, frame in sys._current_frames().items() ]