我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sys.exc_info()。
def _async_recvfrom(self, *args): """Internal use only; use 'recvfrom' with 'yield' instead. Asynchronous version of socket recvfrom method. """ def _recvfrom(): try: buf = self._rsock.recvfrom(*args) except: self._read_fn = None self._notifier.clear(self, _AsyncPoller._Read) self._read_task.throw(*sys.exc_info()) else: self._read_fn = None self._notifier.clear(self, _AsyncPoller._Read) self._read_task._proceed_(buf) if not self._scheduler: self._scheduler = Pycos.scheduler() self._notifier = self._scheduler._notifier self._register() self._read_task = Pycos.cur_task(self._scheduler) self._read_task._await_() self._read_fn = _recvfrom self._notifier.add(self, _AsyncPoller._Read)
def _async_send(self, *args): """Internal use only; use 'send' with 'yield' instead. Asynchronous version of socket send method. """ def _send(): try: sent = self._rsock.send(*args) except: self._write_fn = None self._notifier.clear(self, _AsyncPoller._Write) self._write_task.throw(*sys.exc_info()) else: self._write_fn = None self._notifier.clear(self, _AsyncPoller._Write) self._write_task._proceed_(sent) if not self._scheduler: self._scheduler = Pycos.scheduler() self._notifier = self._scheduler._notifier self._register() self._write_task = Pycos.cur_task(self._scheduler) self._write_task._await_() self._write_fn = _send self._notifier.add(self, _AsyncPoller._Write)
def _async_sendto(self, *args): """Internal use only; use 'sendto' with 'yield' instead. Asynchronous version of socket sendto method. """ def _sendto(): try: sent = self._rsock.sendto(*args) except: self._write_fn = None self._notifier.clear(self, _AsyncPoller._Write) self._write_task.throw(*sys.exc_info()) else: self._write_fn = None self._notifier.clear(self, _AsyncPoller._Write) self._write_task._proceed_(sent) if not self._scheduler: self._scheduler = Pycos.scheduler() self._notifier = self._scheduler._notifier self._register() self._write_task = Pycos.cur_task(self._scheduler) self._write_task._await_() self._write_fn = _sendto self._notifier.add(self, _AsyncPoller._Write)
def upload_output_xml(request): if request.method == 'POST': form = UploadOutputXmlForm(request.POST, request.FILES) print "HELLP" if form.is_valid(): print "YES" try: handle_uploaded_file(request) except: tt, value, tb = sys.exc_info() print {'exception_value': value, 'value': tt, 'tb': traceback.format_exception(tt, value, tb)} return handler500(request) return HttpResponseRedirect(reverse('home')) else: return handler500(request) else: print "No" form = UploadOutputXmlForm() return render(request, 'report/upload_xml_file.html', {'form': form})
def handler500(request, template_name='500.html'): t = get_template(template_name) tt, value, tb = sys.exc_info() ctx = Context({'exception_value': value, 'value': tt, 'tb': traceback.format_exception(tt, value, tb)}) return HttpResponseServerError(t.render(ctx))
def log(o, t, e=None): if e is None: print("{}: {}".format(type(o).__name__, t)) else: print("{}: {} Exception:{!r}".format(type(o).__name__, t, e)) import sys if hasattr(sys, 'print_exception'): sys.print_exception(e) else: import traceback traceback.print_exception(type(e), e, sys.exc_info()[2])
def report(service, testcase): def decorate(function): def get_result(code, message=None): result = Result() result.exit_code = code result.stderr = message result.command = None return result def new_f(*args, **kwargs): try: function_result = function(*args, **kwargs) except AssertionError: error_type, error_message, error_traceback = sys.exc_info() raise return function_result new_f.func_name = function.func_name return new_f return decorate
def __init__(self, file): self.file = file if file == '': self.infile = sys.stdin elif file.lower().startswith('http://') or file.lower().startswith('https://'): try: if sys.hexversion >= 0x020601F0: self.infile = urllib23.urlopen(file, timeout=5) else: self.infile = urllib23.urlopen(file) except urllib23.HTTPError: print('Error accessing URL %s' % file) print(sys.exc_info()[1]) sys.exit() elif file.lower().endswith('.zip'): try: self.zipfile = zipfile.ZipFile(file, 'r') self.infile = self.zipfile.open(self.zipfile.infolist()[0], 'r', C2BIP3('infected')) except: print('Error opening file %s' % file) print(sys.exc_info()[1]) sys.exit() else: try: self.infile = open(file, 'rb') except: print('Error opening file %s' % file) print(sys.exc_info()[1]) sys.exit() self.ungetted = []
def run(self): data = self.getData() value = { data: { "type": self.data_type } } json_data = json.dumps(value) post_data = json_data.encode('utf-8') headers = {'Content-Type': 'application/json'} try: request = urllib2.Request('{}/hippocampe/api/v1.0/{}'.format(self.url, self.service), post_data, headers) response = urllib2.urlopen(request) report = json.loads(response.read()) self.report(report) except urllib2.HTTPError: self.error("Hippocampe: " + str(sys.exc_info()[1])) except urllib2.URLError: self.error("Hippocampe: service is not available") except Exception as e: self.unexpectedError(e)
def resolve(self, s): """ Resolve strings to objects using standard import and attribute syntax. """ name = s.split('.') used = name.pop(0) try: found = self.importer(used) for frag in name: used += '.' + frag try: found = getattr(found, frag) except AttributeError: self.importer(used) found = getattr(found, frag) return found except ImportError: e, tb = sys.exc_info()[1:] v = ValueError('Cannot resolve %r: %s' % (s, e)) v.__cause__, v.__traceback__ = e, tb raise v
def __load_layout(self, config): var = config.get_value('engine/replace-with-kanji-python', 'layout') if var is None or var.get_type_string() != 's': path = os.path.join(os.getenv('IBUS_REPLACE_WITH_KANJI_LOCATION'), 'layouts') path = os.path.join(path, 'roomazi.json') if var: config.unset('engine/replace-with-kanji-python', 'layout') else: path = var.get_string() logger.info("layout: %s", path) layout = roomazi.layout # Use 'roomazi' as default try: with open(path) as f: layout = json.load(f) except ValueError as error: logger.error("JSON error: %s", error) except OSError as error: logger.error("Error: %s", error) except: logger.error("Unexpected error: %s %s", sys.exc_info()[0], sys.exc_info()[1]) self.__to_kana = self.__handle_roomazi_layout if 'Type' in layout: if layout['Type'] == 'Kana': self.__to_kana = self.__handle_kana_layout return layout
def _importtestmodule(self): # we assume we are only called once per module importmode = self.config.getoption("--import-mode", default=True) try: # XXX patch pyimport in pytest._pytest.pythod.Module mod = _patch_pyimport(self.fspath, ensuresyspath=importmode) except SyntaxError: raise self.CollectError( _pytest._code.ExceptionInfo().getrepr(style="short")) except self.fspath.ImportMismatchError: e = sys.exc_info()[1] raise self.CollectError( "import file mismatch:\n" "imported module %r has this __file__ attribute:\n" " %s\n" "which is not the same as the test file we want to collect:\n" " %s\n" "HINT: remove __pycache__ / .pyc files and/or use a " "unique basename for your test file modules" % e.args ) # print "imported test module", mod self.config.pluginmanager.consider_module(mod) return mod
def handle(self): try: data = self.request[0] clientAddr = self.client_address; self.logger.debug("UDP packet from {0}:{1}, length {2}".format(clientAddr[0], clientAddr[1], len(data))) self.logger.debug("message hex : %s", binascii.hexlify(data)) if data[0:4] == "IOT\xff": self.logger.debug("heartbeat packet - ignoring") elif data[0:4] == "IOT\0" and len(data)>=88 and ((len(data)-72)%16) == 0: self.handleIotPacket(data, clientAddr) else: self.logger.warning("unknown packet - ignoring") except Exception as e: self.logger.exception(e) except: self.logger.error("error on handling incomming packet: {0} ".format(sys.exc_info()[0]))
def raise_error(self, exc, identity=None): if isinstance(exc, ApiUnauthorized): six.reraise( InvalidIdentity, InvalidIdentity(self.message_from_error(exc), identity=identity), sys.exc_info()[2] ) elif isinstance(exc, ApiError): six.reraise( PluginError, PluginError(self.message_from_error(exc)), sys.exc_info()[2] ) elif isinstance(exc, PluginError): raise else: self.logger.exception(six.text_type(exc)) six.reraise( PluginError, PluginError(self.message_from_error(exc)), sys.exc_info()[2] )
def multi_stat_update(args, container_dir, filename): dict = {} try: pipe = os.popen("docker exec " + args.container + " cat " + container_dir + "/" + filename + " 2>&1") for line in pipe: m = _STAT_RE.match(line) if m: dict[m.group(1)] = m.group(2) pipe.close() f = open(args.container + "/" + filename,"w") for key in dict.keys(): f.write(key + " " + dict[key] + "\n") f.close() except Exception, e: debug(args.container + ": could not update " + filename) debug(str(sys.exc_info())) return dict
def __call__(self, *args, **kwargs): if self.nc_match: # Prevent malformed match functions from derailing the entire # notification process try: match = self.nc_match(*args, **kwargs) except: print >>sys.stderr, 'Exception in match function for notification %s:' % (repr(self.nc_func),) traceback.print_exception(*sys.exc_info()) # Treat an exception in the function as a negative response match = False if not match: return None return self.nc_func(*args, **kwargs)
def import_string(dotted_path): """ Import a dotted module path and return the attribute/class designated by the last name in the path. Raise ImportError if the import failed. """ try: module_path, class_name = dotted_path.rsplit('.', 1) except ValueError: msg = "%s doesn't look like a module path" % dotted_path six.reraise(ImportError, ImportError(msg), sys.exc_info()[2]) module = import_module(module_path) try: return getattr(module, class_name) except AttributeError: msg = 'Module "%s" does not define a "%s" attribute/class' % ( module_path, class_name) six.reraise(ImportError, ImportError(msg), sys.exc_info()[2])
def finish_reservations(self): # pragma: no cover """ The method will copy all reservations to the actual signals. (PRIVATE) """ for sig_name, recs in self.reserved.items(): for func, kwargs in recs: try: signal = self.get_signal(sig_name) signal.connect(func, **kwargs) except Exception as e: logging.warning('Signal not found: {}, {}'.format( sig_name, e ), exc_info=sys.exc_info()) for sig_name, recs in self.reserved_self.items(): for func, slf in recs: try: signal = self.get_signal(sig_name) signal.set_self(func, slf) except Exception as e: logging.warning(str(e), exc_info=sys.exc_info()) self.reserved = dict() self.reserved_self = dict()
def _execute_pipeline(self, connection, commands, raise_on_error): # build up all commands into a single request to increase network perf all_cmds = connection.pack_commands([args for args, _ in commands]) connection.send_packed_command(all_cmds) response = [] for args, options in commands: try: response.append( self.parse_response(connection, args[0], **options)) except ResponseError: response.append(sys.exc_info()[1]) if raise_on_error: self.raise_first_error(commands, response) return response
def connect(self): "Connects to the Redis server if not already connected" if self._sock: return try: sock = self._connect() except socket.error: e = sys.exc_info()[1] raise ConnectionError(self._error_message(e)) self._sock = sock try: self.on_connect() except RedisError: # clean up after any error in on_connect self.disconnect() raise # run any user callbacks. right now the only internal callback # is for pubsub channel/pattern resubscription for callback in self._connect_callbacks: callback(self)
def default(self, line): if line[:1] == '!': line = line[1:] locals = self.curframe_locals globals = self.curframe.f_globals try: code = compile(line + '\n', '<stdin>', 'single') save_stdout = sys.stdout save_stdin = sys.stdin save_displayhook = sys.displayhook try: sys.stdin = self.stdin sys.stdout = self.stdout sys.displayhook = self.displayhook exec code in globals, locals finally: sys.stdout = save_stdout sys.stdin = save_stdin sys.displayhook = save_displayhook except: t, v = sys.exc_info()[:2] if type(t) == type(''): exc_type_name = t else: exc_type_name = t.__name__ print >>self.stdout, '***', exc_type_name + ':', v
def r_open(self, file, mode='r', buf=-1): """Method called when open() is called in the restricted environment. The arguments are identical to those of the open() function, and a file object (or a class instance compatible with file objects) should be returned. RExec's default behaviour is allow opening any file for reading, but forbidding any attempt to write a file. This method is implicitly called by code executing in the restricted environment. Overriding this method in a subclass is used to change the policies enforced by a restricted environment. """ mode = str(mode) if mode not in ('r', 'rb'): raise IOError, "can't open files for writing in restricted mode" return open(file, mode, buf) # Restricted version of sys.exc_info()
def _get_value(self, action, arg_string): type_func = self._registry_get('type', action.type, action.type) if not _callable(type_func): msg = _('%r is not callable') raise ArgumentError(action, msg % type_func) # convert the value to the appropriate type try: result = type_func(arg_string) # ArgumentTypeErrors indicate errors except ArgumentTypeError: name = getattr(action.type, '__name__', repr(action.type)) msg = str(_sys.exc_info()[1]) raise ArgumentError(action, msg) # TypeErrors or ValueErrors also indicate errors except (TypeError, ValueError): name = getattr(action.type, '__name__', repr(action.type)) msg = _('invalid %s value: %r') raise ArgumentError(action, msg % (name, arg_string)) # return the converted value return result
def error_output(self, environ, start_response): """WSGI mini-app to create error output By default, this just uses the 'error_status', 'error_headers', and 'error_body' attributes to generate an output page. It can be overridden in a subclass to dynamically generate diagnostics, choose an appropriate message for the user's preferred language, etc. Note, however, that it's not recommended from a security perspective to spit out diagnostics to any old user; ideally, you should have to do something special to enable diagnostic output, which is why we don't include any here! """ start_response(self.error_status,self.error_headers[:],sys.exc_info()) return [self.error_body] # Pure abstract methods; *must* be overridden in subclasses
def handleError(self, record): """ Handle errors which occur during an emit() call. This method should be called from handlers when an exception is encountered during an emit() call. If raiseExceptions is false, exceptions get silently ignored. This is what is mostly wanted for a logging system - most users will not care about errors in the logging system, they are more interested in application errors. You could, however, replace this with a custom handler if you wish. The record which was being processed is passed in to this method. """ if raiseExceptions: ei = sys.exc_info() try: traceback.print_exception(ei[0], ei[1], ei[2], None, sys.stderr) sys.stderr.write('Logged from file %s, line %s\n' % ( record.filename, record.lineno)) except IOError: pass # see issue 5971 finally: del ei
def log(self, level, msg, *args, **kwargs): """ Log 'msg % args' with the integer severity 'level'. To pass exception information, use the keyword argument exc_info with a true value, e.g. logger.log(level, "We have a %s", "mysterious problem", exc_info=1) """ if not isinstance(level, int): if raiseExceptions: raise TypeError("level must be an integer") else: return if self.isEnabledFor(level): self._log(level, msg, args, **kwargs)
def _log(self, level, msg, args, exc_info=None, extra=None): """ Low-level logging routine which creates a LogRecord and then calls all the handlers of this logger to handle the record. """ if _srcfile: #IronPython doesn't track Python frames, so findCaller throws an #exception on some versions of IronPython. We trap it here so that #IronPython can use logging. try: fn, lno, func = self.findCaller() except ValueError: fn, lno, func = "(unknown file)", 0, "(unknown function)" else: fn, lno, func = "(unknown file)", 0, "(unknown function)" if exc_info: if not isinstance(exc_info, tuple): exc_info = sys.exc_info() record = self.makeRecord(self.name, level, fn, lno, msg, args, exc_info, func, extra) self.handle(record)
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None): assert group is None, "group argument must be None for now" _Verbose.__init__(self, verbose) if kwargs is None: kwargs = {} self.__target = target self.__name = str(name or _newname()) self.__args = args self.__kwargs = kwargs self.__daemonic = self._set_daemon() self.__ident = None self.__started = Event() self.__stopped = False self.__block = Condition(Lock()) self.__initialized = True # sys.stderr is not stored in the class like # sys.exc_info since it can be changed between instances self.__stderr = _sys.stderr
def importfile(path): """Import a Python source file or compiled file given its path.""" magic = imp.get_magic() file = open(path, 'r') if file.read(len(magic)) == magic: kind = imp.PY_COMPILED else: kind = imp.PY_SOURCE file.close() filename = os.path.basename(path) name, ext = os.path.splitext(filename) file = open(path, 'r') try: module = imp.load_module(name, file, path, (ext, 'r', kind)) except: raise ErrorDuringImport(path, sys.exc_info()) file.close() return module
def compact_traceback(): t, v, tb = sys.exc_info() tbinfo = [] if not tb: # Must have a traceback raise AssertionError("traceback does not exist") while tb: tbinfo.append(( tb.tb_frame.f_code.co_filename, tb.tb_frame.f_code.co_name, str(tb.tb_lineno) )) tb = tb.tb_next # just to be safe del tb file, function, line = tbinfo[-1] info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) return (file, function, line), t, v, info
def _run_exitfuncs(): """run any registered exit functions _exithandlers is traversed in reverse order so functions are executed last in, first out. """ exc_info = None while _exithandlers: func, targs, kargs = _exithandlers.pop() try: func(*targs, **kargs) except SystemExit: exc_info = sys.exc_info() except: import traceback print >> sys.stderr, "Error in atexit._run_exitfuncs:" traceback.print_exc() exc_info = sys.exc_info() if exc_info is not None: raise exc_info[0], exc_info[1], exc_info[2]
def decrypt(self): """Decrypt decrypts the secret and returns the plaintext. Calling decrypt() may incur side effects such as a call to a remote service for decryption. """ if not self._crypter: return b'' try: plaintext = self._crypter.decrypt(self._ciphertext, **self._decrypt_params) return plaintext except Exception as e: exc_info = sys.exc_info() six.reraise( ValueError('Invalid ciphertext "%s", error: %s' % (self._ciphertext, e)), None, exc_info[2] )
def pushToIPFS(hstr, payload): ipfsRetryCount = 5 # WARC->IPFS attempts before giving up retryCount = 0 while retryCount < ipfsRetryCount: try: httpHeaderIPFSHash = pushBytesToIPFS(bytes(hstr)) payloadIPFSHash = pushBytesToIPFS(bytes(payload)) if retryCount > 0: m = 'Retrying succeeded after {0} attempts'.format(retryCount) print(m) return [httpHeaderIPFSHash, payloadIPFSHash] except NewConnectionError as e: print('IPFS daemon is likely not running.') print('Run "ipfs daemon" in another terminal session.') sys.exit() except: attemptCount = '{0}/{1}'.format(retryCount + 1, ipfsRetryCount) logError('IPFS failed to add, ' + 'retrying attempt {0}'.format(attemptCount)) # print(sys.exc_info()) retryCount += 1 return None # Process of adding to IPFS failed
def isDaemonAlive(hostAndPort="{0}:{1}".format(IPFSAPI_IP, IPFSAPI_PORT)): """Ensure that the IPFS daemon is running via HTTP before proceeding""" client = ipfsapi.Client(IPFSAPI_IP, IPFSAPI_PORT) try: # OSError if ipfs not installed, redundant of below # subprocess.call(['ipfs', '--version'], stdout=open(devnull, 'wb')) # ConnectionError/AttributeError if IPFS daemon not running client.id() return True except (ConnectionError, exceptions.AttributeError): logError("Daemon is not running at http://" + hostAndPort) return False except OSError: logError("IPFS is likely not installed. " "See https://ipfs.io/docs/install/") sys.exit() except: logError('Unknown error in retrieving daemon status') logError(sys.exc_info()[0])
def run_handlers(self, event): assert event in self.observers handlers = [] instance_handlers = { 'instance_canceled': self._on_cancel, 'instance_failed': self._on_failed, 'instance_finished': self._on_finish, } handlers += self.observers[event] handlers += instance_handlers.get(event, []) failures = 0 for handler in handlers: try: handler(self) except: # pylint: disable=bare-except failures += 1 idc.Message("BAP> {0} failed because {1}\n". format(self.action, str(sys.exc_info()[1]))) traceback.print_exc() if failures != 0: idc.Warning("Some BAP handlers failed")
def cleanup_dir(tmpdir, keep_data_files=False, ignore_errors=False): if keep_data_files: return #Remove our tmpdir, but don't fail the test if it doesn't remove try: shutil.rmtree(tmpdir, ignore_errors=ignore_errors) except OSError as oe: error = "" if oe.errno: error = "%s: " % oe.errno if oe.strerror: error += oe.strerror if oe.filename: error += " (filename: %s)" % oe.filename log.warning("Unable to remove powstream temporary directory %s due to error reported by OS: %s" % (tmpdir, error)) except: log.warning("Unable to remove powstream temporary directory %s: %s" % (tmpdir, sys.exc_info()[0])) ## # Called by signal handlers to clean-up then exit
def cleanup_file(tmpfile, keep_data_files=False): if keep_data_files: return #Remove our tmpfile, but don't fail the test if it doesn't remove try: os.remove(tmpfile) except OSError as oe: error = "" if oe.errno: error = "%s: " % oe.errno if oe.strerror: error += oe.strerror if oe.filename: error += " (filename: %s)" % oe.filename log.warning("Unable to remove powstream temporary file %s due to error reported by OS: %s" % (tmpfile, error)) except: log.warning("Unable to remove powstream temporary file %s: %s" % (tmpfile, sys.exc_info()[0])) ## # Handles reporting errors in pscheduler format
def prettyIn(self, value): if not isinstance(value, str): try: return int(value) except: raise error.PyAsn1Error( 'Can\'t coerce %r into integer: %s' % (value, sys.exc_info()[1]) ) r = self.__namedValues.getValue(value) if r is not None: return r try: return int(value) except: raise error.PyAsn1Error( 'Can\'t coerce %r into integer: %s' % (value, sys.exc_info()[1]) )
def _async_recv(self, bufsize, *args): """Internal use only; use 'recv' with 'yield' instead. Asynchronous version of socket recv method. """ def _recv(): try: buf = self._rsock.recv(bufsize, *args) except: self._read_fn = None self._notifier.clear(self, _AsyncPoller._Read) self._read_task.throw(*sys.exc_info()) else: self._read_fn = None self._notifier.clear(self, _AsyncPoller._Read) self._read_task._proceed_(buf) if not self._scheduler: self._scheduler = Pycos.scheduler() self._notifier = self._scheduler._notifier self._register() self._read_task = Pycos.cur_task(self._scheduler) self._read_task._await_() self._read_fn = _recv self._notifier.add(self, _AsyncPoller._Read) if self._certfile and self._rsock.pending(): try: buf = self._rsock.recv(bufsize, *args) except: self._read_fn = None self._notifier.clear(self, _AsyncPoller._Read) self._read_task.throw(*sys.exc_info()) else: if buf: self._read_fn = None self._notifier.clear(self, _AsyncPoller._Read) self._read_task._proceed_(buf)
def _tasklet(self): while 1: item = self._task_queue.get(block=True) if item is None: self._task_queue.task_done() break task, target, args, kwargs = item try: val = target(*args, **kwargs) task._proceed_(val) except: task.throw(*sys.exc_info()) finally: self._task_queue.task_done()
def Scan(directory, options, plugins): try: if os.path.isdir(directory): for entry in os.listdir(directory): Scan(os.path.join(directory, entry), options, plugins) else: ProcessFile(directory, options, plugins) except Exception as e: # print directory print(e) # print(sys.exc_info()[2]) # print traceback.format_exc() #function derived from: http://blog.9bplus.com/pdfidpy-output-to-json
def render_POST(self, request): """ Handle a request from the client. """ script_env = { method: api_method(request, method) for method in request.sdata.api.fns } # Make get do auto-formatting for convenience, even though this # breaks if you try to use literal '{}' named arguments # @@@ reconsider whether this is at all a good idea def get_with_formatting(path, *args): return api_method(request, 'get')(path.format(*args)) script_env['get'] = get_with_formatting script_env['re'] = re script_env['dumps'] = dumps script_env['defaultdict'] = defaultdict script_env['OrderedDict'] = OrderedDict buf = [] def dummy_print(*args): if len(args) == 1 and (isinstance(args[0], list) or isinstance(args[0], dict)): buf.append(dumps(args[0], indent=4)) else: buf.append(' '.join(map(str, args))) script_env['print'] = dummy_print def run_script(script): try: exec script in script_env except: exception_info = sys.exc_info() buf.extend(traceback.format_exception(*exception_info)) request.sdata.log('got reply {}'.format(buf)) request.sdata.add_to_push_queue('script', text=dumps(buf)) script = request.args['script'][0] reactor.callInThread(run_script, script)
def spin(self): reconnect_delay = 1.0 while not rospy.is_shutdown(): try: rospy.loginfo("Connecting to SwiftNav Piksi on port %s" % self.piksi_port) self.connect_piksi() while not rospy.is_shutdown(): rospy.sleep(0.05) if not self.piksi.is_alive(): raise IOError self.diag_updater.update() self.check_timeouts() break # should only happen if rospy is trying to shut down except IOError as e: rospy.logerr("IOError") self.disconnect_piksi() except SystemExit as e: rospy.logerr("Unable to connect to Piksi on port %s" % self.piksi_port) self.disconnect_piksi() except: # catch *all* exceptions e = sys.exc_info()[0] rospy.logerr("Uncaught error: %s" % repr(e)) self.disconnect_piksi() rospy.loginfo("Attempting to reconnect in %fs" % reconnect_delay) rospy.sleep(reconnect_delay)