Python werkzeug.exceptions 模块,ServiceUnavailable() 实例源码

我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用werkzeug.exceptions.ServiceUnavailable()

项目:science-gateway-middleware    作者:alan-turing-institute    | 项目源码 | 文件源码
def test_failed_ssh_connection_gives_503_exception(self, mock_run):
        # At least some of the standard Exceptions raised when setting up
        # an SCP connection in Paramiko do not get magically converted to
        # HTTP error responses by Flask. These tests ensure that we get
        # a ServiceUnavailable (503) Exception that Flask will magically pass
        # on to the client.
        job = new_job5()
        manager = JIM(job)
        expected_message = "Unable to connect to backend compute resource"
        expected_exception = ServiceUnavailable(description=expected_message)
        # Check correct Flask exception raised when updating status
        try:
            manager.update_job_status()
        except Exception as e:
            assert e == expected_exception
        # Check correct Flask exception raised when executing actions
        for action in ['RUN', 'SETUP', 'CANCEL', 'PROGRESS', 'DATA']:
            try:
                manager.trigger_action_script('action')
            except Exception as e:
                assert e == expected_exception
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def _insert_with_random_id(self, obj, fields):
    """Attempts to insert an entity with randomly assigned ID(s) repeatedly until success
    or a maximum number of attempts are performed."""
    all_tried_ids = []
    for _ in range(0, MAX_INSERT_ATTEMPTS):
      tried_ids = {}
      for field in fields:
        rand_id = self._get_random_id()
        tried_ids[field] = rand_id
        setattr(obj, field, rand_id)
      all_tried_ids.append(tried_ids)
      try:
        with self.session() as session:
          return self.insert_with_session(session, obj)
      except IntegrityError, e:
        # SQLite and MySQL variants of the error message, respectively.
        if 'UNIQUE constraint failed' in e.message or 'Duplicate entry' in e.message:
          logging.warning('Failed insert with %s: %s', tried_ids, e.message)
        else:
          raise
    # We were unable to insert a participant (unlucky). Throw an error.
    logging.warning(
        'Giving up after %d insert attempts, tried %s.' % (MAX_INSERT_ATTEMPTS, all_tried_ids))
    raise ServiceUnavailable('Giving up after %d insert attempts.' % MAX_INSERT_ATTEMPTS)
项目:drift    作者:dgnorth    | 项目源码 | 文件源码
def test_broken_url(self):
        # Verify that broken public key url is caught
        with self.assertRaises(ServiceUnavailable) as context:
            t = template.copy()
            t['public_key_url'] = 'broken url'
            validate_gamecenter_token(t)
        self.assertIn("The server is temporarily unable", context.exception.description)
项目:drift    作者:dgnorth    | 项目源码 | 文件源码
def test_broken_url(self):
        # Verify that broken key url is caught
        with self.assertRaises(ServiceUnavailable) as context:
            run_ticket_validation({'ticket': self.ticket, 'appid': 123}, key_url='http://localhost:1/')
        self.assertIn("The server is temporarily unable", context.exception.description)
项目:drift    作者:dgnorth    | 项目源码 | 文件源码
def fetch_url(url, error_title, expire=None):
    """
    Fetch contents of 'url' and cache the results in Redis if possible.
    Raises 503 - Service Unavailable if url cannot be fetched. The real
    reason is logged out.

    If 'url' points to S3, it will be signed using implicit AWS credentials.
    """
    expire = expire or 3600  # Cache for one hour.
    redis = None
    content = None
    if _app_ctx_stack.top and hasattr(g, "redis"):
        content = g.redis.get("urlget:" + url)
        redis = g.redis 

    if not content:
        signed_url = _aws_s3_sign_url(url)

        try:
            ret = requests.get(signed_url)
        except requests.exceptions.RequestException as e:            
            log.warning(error_title + "Url '%s' can't be fetched. %s", signed_url, e)
            raise ServiceUnavailable()
        if ret.status_code != 200:
            log.warning(error_title + "Url '%s' can't be fetched. Status code %s", signed_url, ret.status_code)
            raise ServiceUnavailable()
        content = ret.content
        if redis:
            g.redis.set("urlget:" + url, content, expire=expire)

    return content
项目:science-gateway-middleware    作者:alan-turing-institute    | 项目源码 | 文件源码
def _ssh_connection(self):
        try:
            connection = ssh(
                self.hostname, self.username, self.port,
                private_key_path=self.private_key_path,
                private_key_string=self.private_key_string,
                debug=True)
            return connection
        except Exception:
            # If connection cannot be made, raise a ServiceUnavailble
            # exception that will be passed to API client as a HTTP error
            raise(ServiceUnavailable(
                description="Unable to connect to backend compute resource"))
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_aborter(self):
        abort = exceptions.abort
        self.assert_raises(exceptions.BadRequest, abort, 400)
        self.assert_raises(exceptions.Unauthorized, abort, 401)
        self.assert_raises(exceptions.Forbidden, abort, 403)
        self.assert_raises(exceptions.NotFound, abort, 404)
        self.assert_raises(exceptions.MethodNotAllowed, abort, 405, ['GET', 'HEAD'])
        self.assert_raises(exceptions.NotAcceptable, abort, 406)
        self.assert_raises(exceptions.RequestTimeout, abort, 408)
        self.assert_raises(exceptions.Gone, abort, 410)
        self.assert_raises(exceptions.LengthRequired, abort, 411)
        self.assert_raises(exceptions.PreconditionFailed, abort, 412)
        self.assert_raises(exceptions.RequestEntityTooLarge, abort, 413)
        self.assert_raises(exceptions.RequestURITooLarge, abort, 414)
        self.assert_raises(exceptions.UnsupportedMediaType, abort, 415)
        self.assert_raises(exceptions.UnprocessableEntity, abort, 422)
        self.assert_raises(exceptions.InternalServerError, abort, 500)
        self.assert_raises(exceptions.NotImplemented, abort, 501)
        self.assert_raises(exceptions.BadGateway, abort, 502)
        self.assert_raises(exceptions.ServiceUnavailable, abort, 503)

        myabort = exceptions.Aborter({1: exceptions.NotFound})
        self.assert_raises(LookupError, myabort, 404)
        self.assert_raises(exceptions.NotFound, myabort, 1)

        myabort = exceptions.Aborter(extra={1: exceptions.NotFound})
        self.assert_raises(exceptions.NotFound, myabort, 404)
        self.assert_raises(exceptions.NotFound, myabort, 1)
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def test_insert_duplicate_participant_id_give_up(self):
    p = Participant()
    with random_ids([1, 2]):
      self.dao.insert(p)
    rand_ints = []
    for i in range(0, MAX_INSERT_ATTEMPTS):
      rand_ints.append(1)
      rand_ints.append(i)
    p2 = Participant()
    with random_ids(rand_ints):
      with self.assertRaises(ServiceUnavailable):
        self.dao.insert(p2)
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def test_insert_duplicate_biobank_id_give_up(self):
    p = Participant()
    with random_ids([1, 2]):
      self.dao.insert(p)
    rand_ints = []
    for i in range(0, MAX_INSERT_ATTEMPTS):
      rand_ints.append(i + 2)
      rand_ints.append(2)
    p2 = Participant()
    with random_ids(rand_ints):
      with self.assertRaises(ServiceUnavailable):
        self.dao.insert(p2)
项目:voltha    作者:opencord    | 项目源码 | 文件源码
def invoke(self, stub, method_name, request, metadata, retry=1):
        """
        Invoke a gRPC call to the remote server and return the response.
        :param stub: Reference to the *_pb2 service stub
        :param method_name: The method name inside the service stub
        :param request: The request protobuf message
        :param metadata: [(str, str), (str, str), ...]
        :return: The response protobuf message and returned trailing metadata
        """

        if not self.connected:
            raise ServiceUnavailable()

        try:
            method = getattr(stub(self.channel), method_name)
            response, rendezvous = method.with_call(request, metadata=metadata)
            returnValue((response, rendezvous.trailing_metadata()))

        except grpc._channel._Rendezvous, e:
            code = e.code()
            if code == grpc.StatusCode.UNAVAILABLE:
                e = ServiceUnavailable()

                if self.connected:
                    self.connected = False
                    yield self.connect()
                    if retry > 0:
                        response = yield self.invoke(stub, method_name,
                                                     request, metadata,
                                                     retry=retry - 1)
                        returnValue(response)

            elif code in (
                    grpc.StatusCode.NOT_FOUND,
                    grpc.StatusCode.INVALID_ARGUMENT,
                    grpc.StatusCode.ALREADY_EXISTS):

                pass  # don't log error, these occur naturally

            else:
                log.exception(e)

            raise e

    # Below is an adaptation of Google's MessageToDict() which includes
    # protobuf options extensions