我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.raise_from()。
def assert_called_with(_mock_self, *args, **kwargs): """assert that the mock was called with the specified arguments. Raises an AssertionError if the args and keyword args passed in are different to the last call to the mock.""" self = _mock_self if self.call_args is None: expected = self._format_mock_call_signature(args, kwargs) raise AssertionError('Expected call: %s\nNot called' % (expected,)) def _error_message(cause): msg = self._format_mock_failure_message(args, kwargs) if six.PY2 and cause is not None: # Tack on some diagnostics for Python without __cause__ msg = '%s\n%s' % (msg, str(cause)) return msg expected = self._call_matcher((args, kwargs)) actual = self._call_matcher(self.call_args) if expected != actual: cause = expected if isinstance(expected, Exception) else None six.raise_from(AssertionError(_error_message(cause)), cause)
def test_context_suppression(self): try: try: raise Exception except: raise_from(ZeroDivisionError, None) except ZeroDivisionError as _: e = _ tb = sys.exc_info()[2] lines = self.get_report(e, tb) self.assertThat(lines, DocTestMatches("""\ Traceback (most recent call last): File "...traceback2/tests/test_traceback.py", line ..., in test_context_suppression raise_from(ZeroDivisionError, None) File "<string>", line 2, in raise_from ZeroDivisionError """, doctest.ELLIPSIS))
def test_cause_and_context(self): # When both a cause and a context are set, only the cause should be # displayed and the context should be muted. def inner_raise(): try: self.zero_div() except ZeroDivisionError as _e: e = _e try: xyzzy except NameError: raise_from(KeyError, e) def outer_raise(): inner_raise() # Marker blocks = boundaries.split(self.get_report(outer_raise)) self.assertEqual(len(blocks), 3) self.assertEqual(blocks[1], cause_message) self.check_zero_div(blocks[0]) self.assertIn('inner_raise() # Marker', blocks[2])
def test_cause_recursive(self): def inner_raise(): try: try: self.zero_div() except ZeroDivisionError as e: z = e raise_from(KeyError, e) except KeyError as e: raise_from(z, e) def outer_raise(): inner_raise() # Marker blocks = boundaries.split(self.get_report(outer_raise)) self.assertEqual(len(blocks), 3) self.assertEqual(blocks[1], cause_message) # The first block is the KeyError raised from the ZeroDivisionError self.assertIn('raise_from(KeyError, e)', blocks[0]) self.assertNotIn('1/0', blocks[0]) # The second block (apart from the boundary) is the ZeroDivisionError # re-raised from the KeyError self.assertIn('inner_raise() # Marker', blocks[2]) self.check_zero_div(blocks[2])
def import_figura_file(path): """ Import a figura config file (with no side affects). :param path: a python import path :return: a python module object :raise ConfigParsingError: if importing fails """ try: return _import_module_no_side_effects(path) except Exception as e: if six.PY2: # no exception chaining in python2 #raise ConfigParsingError('Failed parsing "%s": %r' % (path, e)), None, sys.exc_info()[2] # not a valid py3 syntax raise ConfigParsingError('Failed parsing config "%s": %r' % (path, e)) else: #raise ConfigParsingError('Failed parsing config "%s"' % path) from e # not a valid py2 syntax six.raise_from(ConfigParsingError('Failed parsing config "%s"' % path), e)
def _get_pwd(service=None): """ Returns the present working directory for the given service, or all services if none specified. """ def to_text(data): if six.PY2: # pragma: no cover data = data.decode(locale.getpreferredencoding(False)) return data parser = _get_env() if service: try: return to_text(utils.with_trailing_slash(parser.get('env', service))) except configparser.NoOptionError as e: six.raise_from(ValueError('%s is an invalid service' % service), e) return [to_text(utils.with_trailing_slash(value)) for name, value in parser.items('env')]
def listdir(self, path): self.check() path = relpath(path) if path: try: member = self._tar.getmember(path) except KeyError: six.raise_from(errors.ResourceNotFound(path), None) else: if not member.isdir(): six.raise_from(errors.DirectoryExpected(path), None) return [ basename(member.name) for member in self._tar if dirname(member.path) == path ]
def openbin(self, path, mode="r", buffering=-1, **options): self.check() path = relpath(normpath(path)) if 'w' in mode or '+' in mode or 'a' in mode: raise errors.ResourceReadOnly(path) try: member = self._tar.getmember(path) except KeyError: six.raise_from(errors.ResourceNotFound(path), None) if not member.isfile(): raise errors.FileExpected(path) rw = RawWrapper(self._tar.extractfile(member)) if six.PY2: # Patch nonexistent file.flush in Python2 def _flush(): pass rw.flush = _flush return rw
def test_raise_from(): try: try: raise Exception("blah") except Exception: ctx = sys.exc_info()[1] f = Exception("foo") six.raise_from(f, None) except Exception: tp, val, tb = sys.exc_info() if sys.version_info[:2] > (3, 0): # We should have done a raise f from None equivalent. assert val.__cause__ is None assert val.__context__ is ctx if sys.version_info[:2] >= (3, 3): # And that should suppress the context on the exception. assert val.__suppress_context__ # For all versions the outer exception should have raised successfully. assert str(val) == "foo"
def makedir(self, path, permissions=None, recreate=False): # noqa: D102 self.check() _permissions = permissions or Permissions(mode=0o755) _path = self.validatepath(path) try: info = self.getinfo(_path) except errors.ResourceNotFound: with self._lock: with convert_sshfs_errors('makedir', path): self._sftp.mkdir(_path, _permissions.mode) else: if (info.is_dir and not recreate) or info.is_file: six.raise_from(errors.DirectoryExists(path), None) return self.opendir(path)
def _handle_error(self, err): """ Handle execution error state and sleep the required amount of time. """ # Update latest cached error self.error = err # Defaults to false retry = True # Evaluate if error is legit or should be retried if self.error_evaluator: retry = self.error_evaluator(err) # If evalutor returns an error exception, just raise it if retry and isinstance(retry, Exception): raise_from(retry, self.error) # If retry evaluator returns False, raise original error and # stop the retry cycle if retry is False: raise err
def _literal_terminal_to_python_val(literal_terminal): """ Use the table of "coercer" functions to convert a terminal node from the parse tree to a Python value. """ token_type = literal_terminal.getSymbol().type token_text = literal_terminal.getText() if token_type in _TOKEN_TYPE_COERCERS: coercer = _TOKEN_TYPE_COERCERS[token_type] try: python_value = coercer(token_text) except Exception as e: six.raise_from(MatcherException(u"Invalid {}: {}".format( STIXPatternParser.symbolicNames[token_type], token_text )), e) else: raise MatcherInternalError(u"Unsupported literal type: {}".format( STIXPatternParser.symbolicNames[token_type])) return python_value
def exitStartStopQualifier(self, ctx): """ Consumes nothing Produces a (datetime, datetime) 2-tuple containing the start and stop times. """ start_str = _literal_terminal_to_python_val(ctx.StringLiteral(0)) stop_str = _literal_terminal_to_python_val(ctx.StringLiteral(1)) # If the language used timestamp literals here, this could go away... try: start_dt = _str_to_datetime(start_str) stop_dt = _str_to_datetime(stop_str) except ValueError as e: # re-raise as MatcherException. raise six.raise_from(MatcherException(*e.args), e) self.__push((start_dt, stop_dt), u"exitStartStopQualifier")
def handle_line(self, raw_line): try: line = raw_line.decode('utf-8') except UnicodeDecodeError as err: six.raise_from( MastodonMalformedEventError("Malformed UTF-8"), err ) if line.startswith(':'): self.handle_heartbeat() elif line == '': # end of event self._dispatch(self.event) self.event = {} else: key, value = line.split(': ', 1) # According to the MDN spec, repeating the 'data' key # represents a newline(!) if key in self.event: self.event[key] += '\n' + value else: self.event[key] = value
def har_input(paths): for path in paths: # According to the spec, HAR files are UTF-8 with an optional BOM. path = decode_path(path) with io.open(path, 'rt', encoding='utf-8-sig') as f: try: data = json.load(f) except ValueError as exc: six.raise_from( InputError(u'%s: bad HAR file: %s' % (path, exc)), exc) try: creator = data['log']['creator']['name'] for entry in data['log']['entries']: yield _process_entry(entry, creator, path) except (TypeError, KeyError) as exc: six.raise_from( InputError(u'%s: cannot understand HAR file: %r' % (path, exc)), exc)
def request(self, buf): try: self.sock.send(buf) except SSL.Error as exc: raise_from(Error("SSL Error"), exc) else: try: #peek_bytes = self.sock.recv(8, socket.MSG_PEEK) # peeky no worky?! peek_bytes = self.sock.recv(8) except SSL.WantReadError as exc: # SSL timeout does not work properly. If no data is available from server, # we'll get this error pass except SSL.Error as exc: raise #raise_from(Error("SSL Error"), exc) else: (ver, type_, length) = struct.unpack('>HHL', peek_bytes) return bytearray(peek_bytes + self.sock.recv(length))
def response(self): try: peek_bytes = self.sock.recv(8, socket.MSG_PEEK) except SSL.Error as exc: raise_from(Error("SSL Error"), exc) else: (ver, type_, length) = struct.unpack('>HHL', peek_bytes) return bytearray(peek_bytes + self.sock.recv(length))
def next(self, message, backoff_exchange_name): total_attempts = 0 for deadlettered in message.headers.get('x-death', ()): if deadlettered['exchange'] == backoff_exchange_name: total_attempts += int(deadlettered['count']) if self.limit and total_attempts >= self.limit: expired = Backoff.Expired( "Backoff aborted after '{}' retries (~{:.0f} seconds)".format( self.limit, self.max_delay / 1000 ) ) six.raise_from(expired, self) expiration = self.get_next_schedule_item(total_attempts) if self.random_sigma: randomised = int(random.gauss(expiration, self.random_sigma)) group_size = self.random_sigma / self.random_groups_per_sigma expiration = round_to_nearest(randomised, interval=group_size) # Prevent any negative values created by randomness expiration = abs(expiration) # store calculation results on self. self._next_expiration = expiration self._total_attempts = total_attempts return expiration
def entrypoint_retry(*args, **kwargs): """ Decorator to declare that an entrypoint can be retried on failure. For use with nameko_amqp_retry enabled entrypoints. :param retry_for: An exception class or tuple of exception classes. If the wrapped function raises one of these exceptions, the entrypoint will be retried until successful, or the `limit` number of retries is reached. :param limit: integer The maximum number of times the entrypoint can be retried before giving up (and raising a ``Backoff.Expired`` exception). If not given, the default `Backoff.limit` will be used. :param schedule: tuple of integers A tuple defining the number of milliseconds to wait between each retry. If not given, the default `Backoff.schedule` will be used. :param random_sigma: integer Standard deviation as milliseconds. If not given, the default `Backoff.random_sigma` will be used. :param random_groups_per_sigma: integer Random backoffs are rounded to nearest group. If not given, the default `Backoff.random_groups_per_sigma` will be used. """ retry_for = kwargs.get('retry_for') or Exception backoff_cls = backoff_factory(*args, **kwargs) @wrapt.decorator def wrapper(wrapped, instance, args, kwargs): try: return wrapped(*args, **kwargs) except retry_for as exc: six.raise_from(backoff_cls(), exc) return wrapper
def assert_has_calls(self, calls, any_order=False): """assert the mock has been called with the specified calls. The `mock_calls` list is checked for the calls. If `any_order` is False (the default) then the calls must be sequential. There can be extra calls before or after the specified calls. If `any_order` is True then the calls can be in any order, but they must all appear in `mock_calls`.""" expected = [self._call_matcher(c) for c in calls] cause = expected if isinstance(expected, Exception) else None all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls) if not any_order: if expected not in all_calls: six.raise_from(AssertionError( 'Calls not found.\nExpected: %r\n' 'Actual: %r' % (_CallList(calls), self.mock_calls) ), cause) return all_calls = list(all_calls) not_found = [] for kall in expected: try: all_calls.remove(kall) except ValueError: not_found.append(kall) if not_found: six.raise_from(AssertionError( '%r not all found in call list' % (tuple(not_found),) ), cause)
def assert_any_call(self, *args, **kwargs): """assert the mock has been called with the specified arguments. The assert passes if the mock has *ever* been called, unlike `assert_called_with` and `assert_called_once_with` that only pass if the call is the most recent one.""" expected = self._call_matcher((args, kwargs)) actual = [self._call_matcher(c) for c in self.call_args_list] if expected not in actual: cause = expected if isinstance(expected, Exception) else None expected_string = self._format_mock_call_signature(args, kwargs) six.raise_from(AssertionError( '%s call not found' % expected_string ), cause)
def raise_from(value, from_value): raise value
def test_cause(self): def inner_raise(): try: self.zero_div() except ZeroDivisionError as e: raise_from(KeyError, e) def outer_raise(): inner_raise() # Marker blocks = boundaries.split(self.get_report(outer_raise)) self.assertEqual(len(blocks), 3) self.assertEqual(blocks[1], cause_message) self.check_zero_div(blocks[0]) self.assertIn('inner_raise() # Marker', blocks[2])
def _find_spec_from_path(name, path=None): """Return the spec for the specified module. First, sys.modules is checked to see if the module was already imported. If so, then sys.modules[name].__spec__ is returned. If that happens to be set to None, then ValueError is raised. If the module is not in sys.modules, then sys.meta_path is searched for a suitable spec with the value of 'path' given to the finders. None is returned if no spec could be found. Dotted names do not have their parent packages implicitly imported. You will most likely need to explicitly import all parent packages in the proper order for a submodule to get the correct spec. """ if name not in sys.modules: return _find_spec(name, path) else: module = sys.modules[name] if module is None: return None try: spec = module.__spec__ except AttributeError: six.raise_from(ValueError('{}.__spec__ is not set'.format(name)), None) else: if spec is None: raise ValueError('{}.__spec__ is None'.format(name)) return spec
def find_spec(name, package=None): """Return the spec for the specified module. First, sys.modules is checked to see if the module was already imported. If so, then sys.modules[name].__spec__ is returned. If that happens to be set to None, then ValueError is raised. If the module is not in sys.modules, then sys.meta_path is searched for a suitable spec with the value of 'path' given to the finders. None is returned if no spec could be found. If the name is for submodule (contains a dot), the parent module is automatically imported. The name and package arguments work the same as importlib.import_module(). In other words, relative module names (with leading dots) work. """ fullname = resolve_name(name, package) if name.startswith('.') else name if fullname not in sys.modules: parent_name = fullname.rpartition('.')[0] if parent_name: # Use builtins.__import__() in case someone replaced it. parent = __import__(parent_name, fromlist=['__path__']) return _find_spec(fullname, parent.__path__) else: return _find_spec(fullname, None) else: module = sys.modules[fullname] if module is None: return None try: spec = module.__spec__ except AttributeError: six.raise_from(ValueError('{}.__spec__ is not set'.format(name)), None) else: if spec is None: raise ValueError('{}.__spec__ is None'.format(name)) return spec
def find_loader(name, path=None): """Return the loader for the specified module. This is a backward-compatible wrapper around find_spec(). This function is deprecated in favor of importlib.util.find_spec(). """ warnings.warn('Use importlib.util.find_spec() instead.', DeprecationWarning, stacklevel=2) try: loader = sys.modules[name].__loader__ if loader is None: raise ValueError('{}.__loader__ is None'.format(name)) else: return loader except KeyError: pass except AttributeError: six.raise_from(ValueError('{}.__loader__ is not set'.format(name)), None) spec = _find_spec(name, path) # We won't worry about malformed specs (missing attributes). if spec is None: return None if spec.loader is None: if spec.submodule_search_locations is None: raise _ImportError('spec for {} missing loader'.format(name), name=name) raise _ImportError('namespace packages do not have loaders', name=name) return spec.loader # This should be in all python >2.7
def raise_with_cause(exc_cls, message, *args, **kwargs): """Helper to raise + chain exceptions (when able) and associate a *cause*. NOTE(harlowja): Since in py3.x exceptions can be chained (due to :pep:`3134`) we should try to raise the desired exception with the given *cause* (or extract a *cause* from the current stack if able) so that the exception formats nicely in old and new versions of python. Since py2.x does **not** support exception chaining (or formatting) the exception class provided should take a ``cause`` keyword argument (which it may discard if it wants) to its constructor which can then be inspected/retained on py2.x to get *similar* information as would be automatically included/obtainable in py3.x. :param exc_cls: the exception class to raise (typically one derived from :py:class:`.CausedByException` or equivalent). :param message: the text/str message that will be passed to the exceptions constructor as its first positional argument. :param args: any additional positional arguments to pass to the exceptions constructor. :param kwargs: any additional keyword arguments to pass to the exceptions constructor. .. versionadded:: 1.6 """ if 'cause' not in kwargs: exc_type, exc, exc_tb = sys.exc_info() try: if exc is not None: kwargs['cause'] = exc finally: # Leave no references around (especially with regards to # tracebacks and any variables that it retains internally). del(exc_type, exc, exc_tb) six.raise_from(exc_cls(message, *args, **kwargs), kwargs.get('cause'))
def convert_version_to_int(version): """Convert a version to an integer. *version* must be a string with dots or a tuple of integers. .. versionadded:: 2.0 """ try: if isinstance(version, six.string_types): version = convert_version_to_tuple(version) if isinstance(version, tuple): return six.moves.reduce(lambda x, y: (x * 1000) + y, version) except Exception as ex: msg = _("Version %s is invalid.") % version six.raise_from(ValueError(msg), ex)
def _apirequest(self, method, url, *args, **kwargs): """ General request-response processing routine for dpr-api server. :return: Response -- requests.Response instance """ # TODO: doing this for every request is kinda awkward self._ensure_config() if not url.startswith('http'): # Relative url is given. Build absolute server url url = self.server + url methods = { 'POST': requests.post, 'PUT': requests.put, 'GET': requests.get, 'DELETE': requests.delete } headers = kwargs.pop('headers', {}) if self.token: headers.setdefault('Auth-Token', '%s' % self.token) response = methods.get(method)(url, *args, headers=headers, **kwargs) try: jsonresponse = response.json() except Exception as e: six.raise_from( JSONDecodeError(response, message='Failed to decode JSON response from server'), e) if response.status_code not in (200, 201): raise HTTPStatusError(response, message='Error %s\n%s' % ( response.status_code, jsonresponse.get('message') or jsonresponse.get('description'))) return response
def raise_from(cls, exc, path=None): obj = cls(str(exc), path) if six.PY3: six.raise_from(obj, exc) else: _, _, tb = sys.exc_info() six.reraise(cls, obj, tb)
def validate(self, obj): if self.key in obj: try: self.run_filters(obj) except ValueError as exc: FieldError.raise_from(exc, self.key) except FieldError as exc: exc.add_path_info(self.key) raise else: if not self.optional: raise FieldError('Is required', self.key)
def instantiate(self, value): if self.is_json_object: return self.type.fromjson(value) try: return self.type(value) except ValueError as exc: FieldError.raise_from(exc)
def call(func, args): """Call the function with args normalized and cast to the correct types. Args: func: The function to call. args: The arguments parsed by docopt. Returns: The return value of func. """ assert hasattr(func, '__call__'), 'Cannot call func: {}'.format( func.__name__) raw_func = ( func if isinstance(func, FunctionType) else func.__class__.__call__) hints = collections.defaultdict(lambda: Any, get_type_hints(raw_func)) argspec = _getargspec(raw_func) named_args = {} varargs = () for k, nk, v in _normalize(args): if nk == argspec.varargs: hints[nk] = Tuple[hints[nk], ...] elif nk not in argspec.args and argspec.varkw in hints: hints[nk] = hints[argspec.varkw] try: value = cast(hints[nk], v) except TypeError as e: _LOGGER.exception(e) six.raise_from(exc.InvalidCliValueError(k, v), e) if nk == argspec.varargs: varargs = value elif (nk in argspec.args or argspec.varkw) and ( nk not in named_args or named_args[nk] is None): named_args[nk] = value return func(*varargs, **named_args)
def _raise_wrapped_exception(exc): code = exc.error_code msg = '%s (cdb2api rc %d)' % (exc.error_message, code) if "null constraint violation" in msg: six.raise_from(NonNullConstraintError(msg), exc) # DRQS 86013831 six.raise_from(_EXCEPTION_BY_RC.get(code, OperationalError)(msg), exc)
def _propagate_swift_exceptions(func): """Bubbles all swift exceptions as `SwiftError` classes """ @wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except (swift_service.SwiftError, swift_exceptions.ClientException) as e: six.raise_from(_swiftclient_error_to_descriptive_exception(e), e) return wrapper
def _s3_client_call(self, method_name, *args, **kwargs): """ Creates a boto3 S3 ``Client`` object and runs ``method_name``. """ s3_client = _get_s3_client() method = getattr(s3_client, method_name) try: return method(*args, **kwargs) except botocore_exceptions.ClientError as e: six.raise_from(_parse_s3_error(e, **kwargs), e)
def _make_s3_transfer(self, method_name, config=None, *args, **kwargs): """ Creates a boto3 ``S3Transfer`` object for doing multipart uploads and downloads and executes the given method. """ transfer = _get_s3_transfer(config=config) method = getattr(transfer, method_name) try: return method(*args, **kwargs) except boto3_exceptions.S3UploadFailedError as e: six.raise_from(exceptions.FailedUploadError(str(e), e), e) except boto3_exceptions.RetriesExceededError as e: six.raise_from(exceptions.FailedDownloadError(str(e), e), e)
def _write_request(self, request_url, json_message, api_key=None): data = { 'data': base64.b64encode(json_message.encode('utf8')), 'verbose': 1, 'ip': 0, } if api_key: data.update({'api_key': api_key}) encoded_data = urllib.parse.urlencode(data).encode('utf8') try: request = urllib.request.Request(request_url, encoded_data) # Note: We don't send timeout=None here, because the timeout in urllib2 defaults to # an internal socket timeout, not None. if self._request_timeout is not None: response = urllib.request.urlopen(request, timeout=self._request_timeout).read() else: response = urllib.request.urlopen(request).read() except urllib.error.URLError as e: raise six.raise_from(MixpanelException(e), e) try: response = json.loads(response.decode('utf8')) except ValueError: raise MixpanelException('Cannot interpret Mixpanel server response: {0}'.format(response)) if response['status'] != 1: raise MixpanelException('Mixpanel error: {0}'.format(response['error'])) return True
def _flush_endpoint(self, endpoint, api_key=None): buf = self._buffers[endpoint] while buf: batch = buf[:self._max_size] batch_json = '[{0}]'.format(','.join(batch)) try: self._consumer.send(endpoint, batch_json, api_key) except MixpanelException as orig_e: mp_e = MixpanelException(orig_e) mp_e.message = batch_json mp_e.endpoint = endpoint raise six.raise_from(mp_e, orig_e) buf = buf[self._max_size:] self._buffers[endpoint] = buf
def _wrap_http_exceptions(): """Reraise osc-lib exceptions with detailed messages.""" try: yield except ks_exceptions.HttpError as exc: detail = json.loads(exc.response.content)['errors'][0]['detail'] msg = detail.split('\n')[-1].strip() exc_class = _http_error_to_exc.get(exc.http_status, exceptions.CommandError) six.raise_from(exc_class(exc.http_status, msg), exc)