我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用werkzeug.exceptions.ServiceUnavailable()。
def test_failed_ssh_connection_gives_503_exception(self, mock_run): # At least some of the standard Exceptions raised when setting up # an SCP connection in Paramiko do not get magically converted to # HTTP error responses by Flask. These tests ensure that we get # a ServiceUnavailable (503) Exception that Flask will magically pass # on to the client. job = new_job5() manager = JIM(job) expected_message = "Unable to connect to backend compute resource" expected_exception = ServiceUnavailable(description=expected_message) # Check correct Flask exception raised when updating status try: manager.update_job_status() except Exception as e: assert e == expected_exception # Check correct Flask exception raised when executing actions for action in ['RUN', 'SETUP', 'CANCEL', 'PROGRESS', 'DATA']: try: manager.trigger_action_script('action') except Exception as e: assert e == expected_exception
def _insert_with_random_id(self, obj, fields): """Attempts to insert an entity with randomly assigned ID(s) repeatedly until success or a maximum number of attempts are performed.""" all_tried_ids = [] for _ in range(0, MAX_INSERT_ATTEMPTS): tried_ids = {} for field in fields: rand_id = self._get_random_id() tried_ids[field] = rand_id setattr(obj, field, rand_id) all_tried_ids.append(tried_ids) try: with self.session() as session: return self.insert_with_session(session, obj) except IntegrityError, e: # SQLite and MySQL variants of the error message, respectively. if 'UNIQUE constraint failed' in e.message or 'Duplicate entry' in e.message: logging.warning('Failed insert with %s: %s', tried_ids, e.message) else: raise # We were unable to insert a participant (unlucky). Throw an error. logging.warning( 'Giving up after %d insert attempts, tried %s.' % (MAX_INSERT_ATTEMPTS, all_tried_ids)) raise ServiceUnavailable('Giving up after %d insert attempts.' % MAX_INSERT_ATTEMPTS)
def test_broken_url(self): # Verify that broken public key url is caught with self.assertRaises(ServiceUnavailable) as context: t = template.copy() t['public_key_url'] = 'broken url' validate_gamecenter_token(t) self.assertIn("The server is temporarily unable", context.exception.description)
def test_broken_url(self): # Verify that broken key url is caught with self.assertRaises(ServiceUnavailable) as context: run_ticket_validation({'ticket': self.ticket, 'appid': 123}, key_url='http://localhost:1/') self.assertIn("The server is temporarily unable", context.exception.description)
def fetch_url(url, error_title, expire=None): """ Fetch contents of 'url' and cache the results in Redis if possible. Raises 503 - Service Unavailable if url cannot be fetched. The real reason is logged out. If 'url' points to S3, it will be signed using implicit AWS credentials. """ expire = expire or 3600 # Cache for one hour. redis = None content = None if _app_ctx_stack.top and hasattr(g, "redis"): content = g.redis.get("urlget:" + url) redis = g.redis if not content: signed_url = _aws_s3_sign_url(url) try: ret = requests.get(signed_url) except requests.exceptions.RequestException as e: log.warning(error_title + "Url '%s' can't be fetched. %s", signed_url, e) raise ServiceUnavailable() if ret.status_code != 200: log.warning(error_title + "Url '%s' can't be fetched. Status code %s", signed_url, ret.status_code) raise ServiceUnavailable() content = ret.content if redis: g.redis.set("urlget:" + url, content, expire=expire) return content
def _ssh_connection(self): try: connection = ssh( self.hostname, self.username, self.port, private_key_path=self.private_key_path, private_key_string=self.private_key_string, debug=True) return connection except Exception: # If connection cannot be made, raise a ServiceUnavailble # exception that will be passed to API client as a HTTP error raise(ServiceUnavailable( description="Unable to connect to backend compute resource"))
def test_aborter(self): abort = exceptions.abort self.assert_raises(exceptions.BadRequest, abort, 400) self.assert_raises(exceptions.Unauthorized, abort, 401) self.assert_raises(exceptions.Forbidden, abort, 403) self.assert_raises(exceptions.NotFound, abort, 404) self.assert_raises(exceptions.MethodNotAllowed, abort, 405, ['GET', 'HEAD']) self.assert_raises(exceptions.NotAcceptable, abort, 406) self.assert_raises(exceptions.RequestTimeout, abort, 408) self.assert_raises(exceptions.Gone, abort, 410) self.assert_raises(exceptions.LengthRequired, abort, 411) self.assert_raises(exceptions.PreconditionFailed, abort, 412) self.assert_raises(exceptions.RequestEntityTooLarge, abort, 413) self.assert_raises(exceptions.RequestURITooLarge, abort, 414) self.assert_raises(exceptions.UnsupportedMediaType, abort, 415) self.assert_raises(exceptions.UnprocessableEntity, abort, 422) self.assert_raises(exceptions.InternalServerError, abort, 500) self.assert_raises(exceptions.NotImplemented, abort, 501) self.assert_raises(exceptions.BadGateway, abort, 502) self.assert_raises(exceptions.ServiceUnavailable, abort, 503) myabort = exceptions.Aborter({1: exceptions.NotFound}) self.assert_raises(LookupError, myabort, 404) self.assert_raises(exceptions.NotFound, myabort, 1) myabort = exceptions.Aborter(extra={1: exceptions.NotFound}) self.assert_raises(exceptions.NotFound, myabort, 404) self.assert_raises(exceptions.NotFound, myabort, 1)
def test_insert_duplicate_participant_id_give_up(self): p = Participant() with random_ids([1, 2]): self.dao.insert(p) rand_ints = [] for i in range(0, MAX_INSERT_ATTEMPTS): rand_ints.append(1) rand_ints.append(i) p2 = Participant() with random_ids(rand_ints): with self.assertRaises(ServiceUnavailable): self.dao.insert(p2)
def test_insert_duplicate_biobank_id_give_up(self): p = Participant() with random_ids([1, 2]): self.dao.insert(p) rand_ints = [] for i in range(0, MAX_INSERT_ATTEMPTS): rand_ints.append(i + 2) rand_ints.append(2) p2 = Participant() with random_ids(rand_ints): with self.assertRaises(ServiceUnavailable): self.dao.insert(p2)
def invoke(self, stub, method_name, request, metadata, retry=1): """ Invoke a gRPC call to the remote server and return the response. :param stub: Reference to the *_pb2 service stub :param method_name: The method name inside the service stub :param request: The request protobuf message :param metadata: [(str, str), (str, str), ...] :return: The response protobuf message and returned trailing metadata """ if not self.connected: raise ServiceUnavailable() try: method = getattr(stub(self.channel), method_name) response, rendezvous = method.with_call(request, metadata=metadata) returnValue((response, rendezvous.trailing_metadata())) except grpc._channel._Rendezvous, e: code = e.code() if code == grpc.StatusCode.UNAVAILABLE: e = ServiceUnavailable() if self.connected: self.connected = False yield self.connect() if retry > 0: response = yield self.invoke(stub, method_name, request, metadata, retry=retry - 1) returnValue(response) elif code in ( grpc.StatusCode.NOT_FOUND, grpc.StatusCode.INVALID_ARGUMENT, grpc.StatusCode.ALREADY_EXISTS): pass # don't log error, these occur naturally else: log.exception(e) raise e # Below is an adaptation of Google's MessageToDict() which includes # protobuf options extensions