我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用traceback.format_exc()。
def _tcp_proc(self, addrinfo, task=None): """ Internal use only. """ task.set_daemon() sock = addrinfo.tcp_sock while 1: try: conn, addr = yield sock.accept() except ssl.SSLError as err: logger.debug('SSL connection failed: %s', str(err)) continue except GeneratorExit: break except: logger.debug(traceback.format_exc()) continue SysTask(self._tcp_conn_proc, conn, addrinfo)
def line_reader(apipe, coro=None): nlines = 0 while True: try: line = yield apipe.readline() except: asyncoro.logger.debug('read failed') asyncoro.logger.debug(traceback.format_exc()) break nlines += 1 if not line: break print(line.decode()) raise StopIteration(nlines) # asyncoro.logger.setLevel(asyncoro.Logger.DEBUG)
def peer_status(task): _Peer._lock.acquire() if isinstance(task, Task): # if there is another status_task, add or replace? for peer in _Peer.peers.itervalues(): try: task.send(PeerStatus(peer.location, peer.name, PeerStatus.Online)) except: logger.debug(traceback.format_exc()) break else: _Peer.status_task = task elif task is None: _Peer.status_task = None else: logger.warning('invalid peer status task ignored') _Peer._lock.release()
def terminate(self): if hasattr(self.cmd_write, 'getsockname'): self.cmd_write.close() self.cmd_read.close() for fd in self._fds.itervalues(): try: self._poller.unregister(fd._fileno) except: logger.warning('unregister of %s failed with %s', fd._fileno, traceback.format_exc()) fd._notifier = None self._fds.clear() self._timeouts = [] if hasattr(self._poller, 'terminate'): self._poller.terminate() self._poller = None self.cmd_read = self.cmd_write = None
def terminate(self): if hasattr(self.cmd_write, 'getsockname'): self.cmd_write.close() self.cmd_read.close() for fd in self._fds.values(): try: self._poller.unregister(fd._fileno) except: logger.warning('unregister of %s failed with %s', fd._fileno, traceback.format_exc()) fd._notifier = None self._fds.clear() self._timeouts = [] if hasattr(self._poller, 'terminate'): self._poller.terminate() self._poller = None self.cmd_read = self.cmd_write = None
def build(self, file): if self.built: raise PermissionError("You cannot build multiple times!") if not self.loaded: self.load(file) old = os.getcwd() sys.path.append(os.path.dirname(os.path.abspath(file))) # for module import that aren't "include" call try: content = open(file, "rb").read() os.chdir(os.path.dirname(os.path.abspath(file))) # set the current working directory, for open() etc. exec(compile(content, file, 'exec'), self.user_functions) except Exception as err: print("An exception occured while building: ", file=sys.stderr) lines = traceback.format_exc(None, err).splitlines() print(" " + lines[-1], file=sys.stderr) for l in lines[3:-1]: print(l, file=sys.stderr) exit(1) os.chdir(old) sys.path.remove(os.path.dirname(os.path.abspath(file))) self.built = True
def load(self, file): if self.loaded: return sys.path.append(os.path.dirname(os.path.abspath(file))) # for module import that aren't "include" call old = os.getcwd() try: content = open(file, "rb").read() os.chdir(os.path.dirname(os.path.abspath(file))) # set the current working directory, for open() etc. exec(compile(content, file, 'exec'), self.user_functions) except Exception as err: print("An exception occured while loading: ", file=sys.stderr) lines = traceback.format_exc(None, err).splitlines() print(" " + lines[-1], file=sys.stderr) for l in lines[3:-1]: print(l, file=sys.stderr) exit(1) os.chdir(old) sys.path.remove(os.path.dirname(os.path.abspath(file))) self.loaded = True self.mem_offset = 0
def include(self, incfile: str): old = self.current_path self.current_path = os.path.join(old, os.path.dirname(incfile)) path = os.path.join(self.current_path, os.path.basename(incfile)) sys.path.append(self.current_path) try: content = open(path, "rb").read() os.chdir(self.current_path) # set the current working directory, for open() etc. exec(compile(content, path, 'exec'), self.user_functions) except Exception as err: print("An exception occured while building: ", file=sys.stderr) lines = traceback.format_exc(None, err).splitlines() print(" " + lines[-1], file=sys.stderr) for l in lines[3:-1]: print(l, file=sys.stderr) exit(1) sys.path.remove(self.current_path) os.chdir(old) self.current_path = old
def generate2(): """ Call an external Python 2 program to retrieve the AST symbols of that language version :return: """ import subprocess as sp import tempfile, shutil, sys, traceback tempdir = tempfile.mkdtemp() tempfile = os.path.join(tempdir, "py2_ast_code.py") py2_proc_out = "" try: with open(tempfile, 'w') as py2code: py2code.write(generate_str + WRITESYMS_CODE) py2_proc_out = sp.check_output(["python2", tempfile]).decode() finally: try: shutil.rmtree(tempdir) except: print("Warning: error trying to delete the temporal directory:", file=sys.stderr) print(traceback.format_exc(), file=sys.stderr) return set(py2_proc_out.splitlines())
def setup_py(self): assert self.source_dir, "No source dir for %s" % self try: import setuptools # noqa except ImportError: if get_installed_version('setuptools') is None: add_msg = "Please install setuptools." else: add_msg = traceback.format_exc() # Setuptools is not available raise InstallationError( "Could not import setuptools which is required to " "install from a source distribution.\n%s" % add_msg ) setup_py = os.path.join(self.setup_py_dir, 'setup.py') # Python2 __file__ should not be unicode if six.PY2 and isinstance(setup_py, six.text_type): setup_py = setup_py.encode(sys.getfilesystemencoding()) return setup_py
def internal_server_error(error): logger.error(error) logger.error(traceback.format_exc()) if "username" in session: if "500" in session and "500_title" in session: reason = session['500'] title = session['500_title'] session.pop('500', None) session.pop('500_title', None) else: reason = '''The server encountered something unexpected that didn't allow it to complete the request. We apologize.You can go back to <a href="/dashboard/">dashboard</a> or <a href="/logout">log out</a>''' title = 'Internal Server Error' return render_template('error/500.html', mysession = session, reason = reason, title = title) else: return redirect('/login/')
def main(): argument_spec = dict( compress=dict(default=True, type='bool'), dest=dict(type='str'), mode=dict(default='0644', type='str'), sha1=dict(default=None, type='str'), src=dict(required=True, type='str') ) module = AnsibleModule(argument_spec) dest = module.params.get('dest') try: if dest: copy_to_host(module) else: copy_from_host(module) except Exception: module.exit_json(failed=True, changed=True, msg=repr(traceback.format_exc())) # import module snippets
def main(): module = AnsibleModule( argument_spec=openstack_full_argument_spec( password=dict(required=True, no_log=True, type='str'), project=dict(required=True, type='str'), role=dict(required=True, type='str'), user=dict(required=True, type='str'), service=dict(required=True, type='str'), ) ) try: changed = True cloud = shade.operator_cloud(**module.params) getattr(SanityChecks, module.params.pop("service"))(cloud) module.exit_json(changed=changed) except Exception: module.exit_json(failed=True, changed=True, msg=repr(traceback.format_exc())) # import module snippets
def __init__(self, resource=None ): self._mgr_lock = threading.Lock() self._ecm = None self._logger = logging.getLogger("ossie.events.Manager") self._logger.setLevel(logging.INFO) self._allow = True self._registrations=[] if resource : try: self._logger.debug("Requesting Domain Manager Access....") dom = resource.getDomainManager() self._logger.debug("Requesting EventChannelManager Access....") self._ecm = dom.getRef()._get_eventChannelMgr() self._logger.debug("Acquired reference to EventChannelManager") except: #print traceback.format_exc() self._logger.warn("EventChannelManager - unable to resolve DomainManager's EventChannelManager ") pass
def Subscriber(self, channel_name, registrationId=""): self._mgr_lock.acquire() self._logger.debug("Requesting Subscriber for Channel:" + str(channel_name) ) sub = None try: if self._ecm: ereg = EventChannelManager.EventRegistration( channel_name = channel_name, reg_id = registrationId) self._logger.debug("Requesting Channel:" + str(channel_name) + " from Domain's EventChannelManager ") registration = self._ecm.registerResource( ereg ) sub = EM_Subscriber( self, registration ) self._logger.debug("Channel:" + str(channel_name) + " Reg-Id:" + str(registration.reg.reg_id)) self._registrations.append( registration ) except: #print traceback.format_exc() self._logger.error("Unable to create Subscriber for Channel:" + str(channel_name )) finally: self._mgr_lock.release() return sub
def maker(name): my_level = getattr(logging, name.upper()) if name != 'exception' else logging.ERROR altname = (name if name != 'exception' else 'error').upper() def _log(self, msg, *args, **kwargs): exc = kwargs.pop('exc_info', None) or name == 'exception' tb = ('\n' + traceback.format_exc().strip()) if exc else '' if args: try: msg = msg % args except: self.exception( "Exception raised while formatting message:\n%s\n%r", msg, args) msg += tb # todo: check level before printing if self.level <= my_level: print("%s %s %s"%(time.asctime(), altname, msg)) _log.__name__ = name return _log
def _serve(): from .util import is_exiting, sub_warning while 1: try: conn = _listener.accept() handle_wanted, destination_pid = conn.recv() _cache.remove(handle_wanted) send_handle(conn, handle_wanted, destination_pid) close(handle_wanted) conn.close() except: if not is_exiting(): import traceback sub_warning( 'thread for sharing handles raised exception :\n' + '-'*79 + '\n' + traceback.format_exc() + '-'*79 ) # # Functions to be used for pickling/unpickling objects with handles #
def run(self): log.debug("starting event registry thread") while True: callback, cb, event_type, event_data = self._callback_queue.get() kwargs = callback["kwargs"] kwargs["cb"] = cb kwargs["event_type"] = event_type kwargs["event_data"] = event_data try: callback["func"](*callback["args"], **kwargs) except Exception as e: with self._error_lock: self._errors.append({"exception": traceback.format_exc(), "timestamp": time.time(), "callback_func": callback["func"].__name__, "event_type": event_type, "event_data": event_data})
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( datetime.utcnow().isoformat(), message, format_exc() ) ) ####################### ## UTILITY FUNCTIONS ## ####################### # Utility function to report best scores # modified from a snippet taken from: # http://scikit-learn.org/stable/auto_examples/model_selection/plot_randomized_search.html
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( datetime.utcnow().isoformat(), message, format_exc() ) ) ######################## ## COLUMN DEFINITIONS ## ######################## # LC column definitions # the first elem is the column description, the second is the format to use when # writing a CSV LC column, the third is the type to use when parsing a CSV LC # column
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( datetime.utcnow().isoformat(), message, format_exc() ) ) ################### ## USEFUL CONFIG ## ################### # used to find HATIDs
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( dtime.utcnow().isoformat(), message, format_exc() ) ) ############################################### ## MAGIC CONSTANTS FOR 2MASS TO COUSINS/SDSS ## ############################################### # converting from JHK to BVRI
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( datetime.utcnow().isoformat(), message, format_exc() ) ) ################### ## LOCAL IMPORTS ## ################### # LC reading functions
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( datetime.utcnow().isoformat(), message, format_exc() ) ) ################### ## LOCAL IMPORTS ## ###################
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( datetime.utcnow().isoformat(), message, format_exc() ) ) ########################### ## FLARE MODEL FUNCTIONS ## ###########################
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( datetime.utcnow().isoformat(), message, format_exc() ) ) ######################## ## SOME OTHER IMPORTS ## ########################
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( datetime.utcnow().isoformat(), message, format_exc() ) ) ####################### ## UTILITY FUNCTIONS ## ####################### # this is from Tornado's source (MIT License): # http://www.tornadoweb.org/en/stable/_modules/tornado/escape.html#squeeze
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( datetime.utcnow().isoformat(), message, format_exc() ) ) ################### ## FORM SETTINGS ## ###################
def retry_wrapper(func): def try_and_retry(*args, **kwargs): retry = False try: return func(*args, **kwargs) except utils.SOSError as e: # if we got an http error and # the string contains 401 or if the string contains the word cookie if (e.err_code == utils.SOSError.HTTP_ERR and (e.err_text.find('401') != -1 or e.err_text.lower().find('cookie') != -1)): retry = True CoprHDCLIDriver.AUTHENTICATED = False else: exception_message = "\nViPR Exception: %s\nStack Trace:\n%s" \ % (e.err_text, traceback.format_exc()) raise utils.SOSError(utils.SOSError.SOS_FAILURE_ERR,"Exception is : "+exception_message) except Exception: exception_message = "\nGeneral Exception: %s\nStack Trace:\n%s" \ % (sys.exc_info()[0], traceback.format_exc()) raise utils.SOSError(utils.SOSError.SOS_FAILURE_ERR,"Exception is : "+exception_message) if retry: return func(*args, **kwargs) return try_and_retry
def __init__(self, *args): # build the message from the user user_msg = args[0] if args else "An error has occurred" # get the traceback from the last error if it exists try: tb = traceback.format_exc() # otherwise, get the tb prior to this frame and pretend its us except: tb = "Traceback (most recent call last):\n" tb += ''.join(traceback.format_stack()[:-1]) tb += self.__class__.__name__ + ": " + user_msg tb += '\n' # build the complete log message log_msg = user_msg + "\n" + tb # and log it self.log.error(log_msg) # store the args self.args = args
def update(): try: ident = screen.after(1000 / FPS, update) for mutate in container, governor: for ball in balls: mutate(ball) for index, ball_1 in enumerate(balls[:-1]): for ball_2 in balls[index+1:]: ball_1.crash(ball_2) for ball in balls: ball.move(FPS) screen.delete(ALL) for ball in balls: x1 = ball.pos.x - BALL_RADIUS y1 = ball.pos.y - BALL_RADIUS x2 = ball.pos.x + BALL_RADIUS y2 = ball.pos.y + BALL_RADIUS screen.create_oval(x1, y1, x2, y2, fill=BALL_COLOR) except: screen.after_cancel(ident) screen.delete(ALL) screen.create_text(SCREEN_WIDTH / 2, SCREEN_HEIGHT / 2, text=format_exc(), font='Courier 10', fill='red')
def update(): 'Run physics and update screen.' global frame try: for mutate in wall, floor, gravity, friction, governor: for ball in balls: mutate(ball) for index, ball_1 in enumerate(balls[:-1]): for ball_2 in balls[index+1:]: ball_1.crash(ball_2) for ball in balls: ball.move(FPS) screen.delete('animate') for ball in balls: x1 = ball.pos.x - ball.rad y1 = ball.pos.y - ball.rad x2 = ball.pos.x + ball.rad y2 = ball.pos.y + ball.rad screen.create_oval(x1, y1, x2, y2, fill=BALL_COLOR, tag='animate') frame += 1 screen.after(int((start + frame / FPS - time.clock()) * 1000), update) except: screen.delete(Tkinter.ALL) screen.create_text(SCREEN_WIDTH / 2, SCREEN_HEIGHT / 2, text=traceback.format_exc(), font='Courier 10', fill='red', tag='animate')
def main(args=None): if args is None: args = sys.argv[1:] if not len(args): sys.stderr.write(USAGE) sys.exit(1) elif args[0] in ("-h", "--help"): sys.stderr.write(__doc__ + "\n") sys.exit(1) elif args[0][:1] == "-": sys.stderr.write(USAGE) sys.exit(1) module_name = args.pop(0) try: module = import_module(".utilities." + module_name, package="mjolnir") except ImportError: sys.stderr.write(traceback.format_exc()) sys.stderr.write("Could not find utility %s" % (module_name)) sys.exit(1) module.main(args)
def git_reinstall(): """Reinstall from source and restart services. If the openstack-origin-git config option was used to install openstack from source git repositories, then this action can be used to reinstall from updated git repositories, followed by a restart of services.""" if not git_install_requested(): action_fail('openstack-origin-git is not configured') return try: git_install(config('openstack-origin-git')) config_changed() except: action_set({'traceback': traceback.format_exc()}) action_fail('git-reinstall resulted in an unexpected error')
def catch_error(func): def __tryexcept__(*args, **kwargs): try: func(*args, **kwargs) except Exception as expected: traceback.print_exc(file=sys.stdout) stacktrace = traceback.format_exc() exception = { 'exception': expected, 'stacktrace': stacktrace, } stacktrace_handle(exception) return __tryexcept__
def process(self): """ Main entry point. All archives are processed """ success = 0 self.archives_processed = 0 for nodename, nodeidx, archive in self.job.nodearchives(): try: self.processarchive(nodename, nodeidx, archive) self.archives_processed += 1 except pmapi.pmErr as exc: success -= 1 #pylint: disable=not-callable self.adderror("archive", "{0}: pmapi.pmErr: {1}".format(archive, exc.message())) except Exception as exc: success -= 1 self.adderror("archive", "{0}: Exception: {1}. {2}".format(archive, str(exc), traceback.format_exc())) return success == 0
def test_timeout(self): # Set a short timeout and exceed it. @gen_test(timeout=0.1) def test(self): yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1) # This can't use assertRaises because we need to inspect the # exc_info triple (and not just the exception object) try: test(self) self.fail("did not get expected exception") except ioloop.TimeoutError: # The stack trace should blame the add_timeout line, not just # unrelated IOLoop/testing internals. self.assertIn( "gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)", traceback.format_exc()) self.finished = True
def write_error(self, status_code, **kwargs): """Override to implement custom error pages. ``write_error`` may call `write`, `render`, `set_header`, etc to produce output as usual. If this error was caused by an uncaught exception (including HTTPError), an ``exc_info`` triple will be available as ``kwargs["exc_info"]``. Note that this exception may not be the "current" exception for purposes of methods like ``sys.exc_info()`` or ``traceback.format_exc``. """ if self.settings.get("serve_traceback") and "exc_info" in kwargs: # in debug mode, try to send a traceback self.set_header('Content-Type', 'text/plain') for line in traceback.format_exception(*kwargs["exc_info"]): self.write(line) self.finish() else: self.finish("<html><title>%(code)d: %(message)s</title>" "<body>%(code)d: %(message)s</body></html>" % { "code": status_code, "message": self._reason, })