我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用six.moves.http_client.UNAUTHORIZED。
def _handle_error(url, response): """Handle response status codes.""" handlers = { http_client.NOT_FOUND: NotFoundError( 'Resource not found: {}'.format(url) ), http_client.FOUND: AlreadyExistsError( 'Resource already exists: {}'.format(url) ), http_client.FAILED_DEPENDENCY: ValidationError(response), http_client.UNAUTHORIZED: NotAuthorizedError(response), http_client.BAD_REQUEST: BadRequestError(response), } if response.status_code in handlers: raise handlers[response.status_code]
def test_request_refresh(self): credentials = mock.Mock(wraps=CredentialsStub()) final_response = make_response(status=http_client.OK) # First request will 401, second request will succeed. adapter = AdapterStub([ make_response(status=http_client.UNAUTHORIZED), final_response]) authed_session = google.auth.transport.requests.AuthorizedSession( credentials) authed_session.mount(self.TEST_URL, adapter) result = authed_session.request('GET', self.TEST_URL) assert result == final_response assert credentials.before_request.call_count == 2 assert credentials.refresh.called assert len(adapter.requests) == 2 assert adapter.requests[0].url == self.TEST_URL assert adapter.requests[0].headers['authorization'] == 'token' assert adapter.requests[1].url == self.TEST_URL assert adapter.requests[1].headers['authorization'] == 'token1'
def test_urlopen_refresh(self): credentials = mock.Mock(wraps=CredentialsStub()) final_response = ResponseStub(status=http_client.OK) # First request will 401, second request will succeed. http = HttpStub([ ResponseStub(status=http_client.UNAUTHORIZED), final_response]) authed_http = google.auth.transport.urllib3.AuthorizedHttp( credentials, http=http) authed_http = authed_http.urlopen('GET', 'http://example.com') assert authed_http == final_response assert credentials.before_request.call_count == 2 assert credentials.refresh.called assert http.requests == [ ('GET', self.TEST_URL, None, {'authorization': 'token'}, {}), ('GET', self.TEST_URL, None, {'authorization': 'token1'}, {})]
def test_get_new_token_should_throw_ecsclientexception_401(self): self.requests_mock.register_uri('GET', 'https://127.0.0.1:4443/login', status_code=http_client.UNAUTHORIZED) with super(testtools.TestCase, self).assertRaises(ECSClientException) as error: self.token_request.get_new_token() exception = error.exception self.assertEqual(exception.http_status, http_client.UNAUTHORIZED)
def test_token_validation_401(self, mock_get_existing_token, mock_get_new_token): self.requests_mock.register_uri('GET', 'https://127.0.0.1:4443/user/whoami', status_code=http_client.UNAUTHORIZED) mock_get_new_token.return_value = 'NEW-TOKEN-123' mock_get_existing_token.return_value = 'EXISTING-TOKEN-123' token = self.token_request.get_token() self.assertEqual(token, 'NEW-TOKEN-123')
def raise_for_response(method, url, response): """Raise a correct error class, if needed.""" if response.status_code < http_client.BAD_REQUEST: return elif response.status_code == http_client.NOT_FOUND: raise ResourceNotFoundError(method, url, response) elif response.status_code == http_client.BAD_REQUEST: raise BadRequestError(method, url, response) elif response.status_code in (http_client.UNAUTHORIZED, http_client.FORBIDDEN): raise AccessError(method, url, response) elif response.status_code >= http_client.INTERNAL_SERVER_ERROR: raise ServerSideError(method, url, response) else: raise HTTPError(method, url, response)
def test_token_refresh_store_expires_soon(self): # Tests the case where an access token that is valid when it is read # from the store expires before the original request succeeds. expiration = (datetime.datetime.utcnow() + datetime.timedelta(minutes=15)) credentials = self._create_test_credentials(expiration=expiration) storage = file_module.Storage(FILENAME) storage.put(credentials) credentials = storage.get() new_cred = copy.copy(credentials) new_cred.access_token = 'bar' storage.put(new_cred) access_token = '1/3w' token_response = {'access_token': access_token, 'expires_in': 3600} http = http_mock.HttpMockSequence([ ({'status': http_client.UNAUTHORIZED}, b'Initial token expired'), ({'status': http_client.UNAUTHORIZED}, b'Store token expired'), ({'status': http_client.OK}, json.dumps(token_response).encode('utf-8')), ({'status': http_client.OK}, b'Valid response to original request') ]) credentials.authorize(http) transport.request(http, 'https://example.com') self.assertEqual(credentials.access_token, access_token)
def test_token_refresh_stream_body(self): expiration = (datetime.datetime.utcnow() + datetime.timedelta(minutes=15)) credentials = self._create_test_credentials(expiration=expiration) storage = file_module.Storage(FILENAME) storage.put(credentials) credentials = storage.get() new_cred = copy.copy(credentials) new_cred.access_token = 'bar' storage.put(new_cred) valid_access_token = '1/3w' token_response = {'access_token': valid_access_token, 'expires_in': 3600} http = http_mock.HttpMockSequence([ ({'status': http_client.UNAUTHORIZED}, b'Initial token expired'), ({'status': http_client.UNAUTHORIZED}, b'Store token expired'), ({'status': http_client.OK}, json.dumps(token_response).encode('utf-8')), ({'status': http_client.OK}, 'echo_request_body') ]) body = six.StringIO('streaming body') credentials.authorize(http) _, content = transport.request(http, 'https://example.com', body=body) self.assertEqual(content, 'streaming body') self.assertEqual(credentials.access_token, valid_access_token)
def _credentials_refresh(self, credentials): http = http_mock.HttpMockSequence([ ({'status': http_client.OK}, b'{"access_token":"1/3w","expires_in":3600}'), ({'status': http_client.UNAUTHORIZED}, b''), ({'status': http_client.OK}, b'{"access_token":"3/3w","expires_in":3600}'), ({'status': http_client.OK}, 'echo_request_headers'), ]) http = credentials.authorize(http) _, content = transport.request(http, 'http://example.org') return content
def test_get_401(self, resp_mock): """Test treadmill.restclient.get UNAUTHORIZED (401)""" resp_mock.return_value.status_code = http_client.UNAUTHORIZED resp_mock.return_value.json.return_value = {} with self.assertRaises(restclient.NotAuthorizedError): restclient.get('http://foo.com', '/')
def test_sign_bytes_failure(self): request = make_request(http_client.UNAUTHORIZED) credentials = make_credentials() signer = iam.Signer( request, credentials, mock.sentinel.service_account_email) with pytest.raises(exceptions.TransportError): signer.sign('123')
def test_no_user_or_user_id(self): response = self.request.get_response(self.middleware) self.assertEqual(response.status_int, http.UNAUTHORIZED)
def get_versions(self, header=None, ccancel=None): """Query the repo for versions information. Returns a fileobject. If server returns 401 (Unauthorized) check for presence of X-IPkg-Error header and decode.""" requesturl = self.__get_request_url("versions/0/") fobj = self._fetch_url(requesturl, header, ccancel=ccancel, failonerror=False) try: # Bogus request to trigger # StreamingFileObj.__fill_buffer(), otherwise the # TransportProtoError won't be raised here. We can't # use .read() since this will empty the data buffer. fobj.getheader("octopus", None) except tx.TransportProtoError as e: if e.code == http_client.UNAUTHORIZED: exc_type, exc_value, exc_tb = sys.exc_info() try: e.details = self._analyze_server_error( fobj.getheader("X-IPkg-Error", None)) except: # If analysis fails, raise original # exception. if six.PY2: six.reraise(exc_value, None, exc_tb) else: raise exc_value raise return fobj
def check_resp_status_and_retry(resp, image_id, url): # Note(Jesse): This branch sorts errors into those that are permanent, # those that are ephemeral, and those that are unexpected. if resp.status in (httplib.BAD_REQUEST, # 400 httplib.UNAUTHORIZED, # 401 httplib.PAYMENT_REQUIRED, # 402 httplib.FORBIDDEN, # 403 httplib.METHOD_NOT_ALLOWED, # 405 httplib.NOT_ACCEPTABLE, # 406 httplib.PROXY_AUTHENTICATION_REQUIRED, # 407 httplib.CONFLICT, # 409 httplib.GONE, # 410 httplib.LENGTH_REQUIRED, # 411 httplib.PRECONDITION_FAILED, # 412 httplib.REQUEST_ENTITY_TOO_LARGE, # 413 httplib.REQUEST_URI_TOO_LONG, # 414 httplib.UNSUPPORTED_MEDIA_TYPE, # 415 httplib.REQUESTED_RANGE_NOT_SATISFIABLE, # 416 httplib.EXPECTATION_FAILED, # 417 httplib.UNPROCESSABLE_ENTITY, # 422 httplib.LOCKED, # 423 httplib.FAILED_DEPENDENCY, # 424 httplib.UPGRADE_REQUIRED, # 426 httplib.NOT_IMPLEMENTED, # 501 httplib.HTTP_VERSION_NOT_SUPPORTED, # 505 httplib.NOT_EXTENDED, # 510 ): raise PluginError("Got Permanent Error response [%i] while " "uploading image [%s] to glance [%s]" % (resp.status, image_id, url)) # Nova service would process the exception elif resp.status == httplib.NOT_FOUND: # 404 exc = XenAPI.Failure('ImageNotFound') raise exc # NOTE(nikhil): Only a sub-set of the 500 errors are retryable. We # optimistically retry on 500 errors below. elif resp.status in (httplib.REQUEST_TIMEOUT, # 408 httplib.INTERNAL_SERVER_ERROR, # 500 httplib.BAD_GATEWAY, # 502 httplib.SERVICE_UNAVAILABLE, # 503 httplib.GATEWAY_TIMEOUT, # 504 httplib.INSUFFICIENT_STORAGE, # 507 ): raise RetryableError("Got Ephemeral Error response [%i] while " "uploading image [%s] to glance [%s]" % (resp.status, image_id, url)) else: # Note(Jesse): Assume unexpected errors are retryable. If you are # seeing this error message, the error should probably be added # to either the ephemeral or permanent error list. raise RetryableError("Got Unexpected Error response [%i] while " "uploading image [%s] to glance [%s]" % (resp.status, image_id, url))
def test_authorize_401(self, utcnow): utcnow.return_value = T1_DATE http = http_mock.HttpMockSequence([ ({'status': http_client.OK}, b''), ({'status': http_client.UNAUTHORIZED}, b''), ({'status': http_client.OK}, b''), ]) self.jwt.authorize(http) transport.request(http, self.url) token_1 = self.jwt.access_token utcnow.return_value = T2_DATE response, _ = transport.request(http, self.url) self.assertEquals(response.status, http_client.OK) token_2 = self.jwt.access_token # Check the 401 forced a new token self.assertNotEqual(token_1, token_2) # Verify mocks. certs = {'key': datafile('public_cert.pem')} self.assertEqual(len(http.requests), 3) issued_at_vals = (T1, T1, T2) exp_vals = (T1_EXPIRY, T1_EXPIRY, T2_EXPIRY) for info, issued_at, exp_val in zip(http.requests, issued_at_vals, exp_vals): self.assertEqual(info['uri'], self.url) self.assertEqual(info['method'], 'GET') self.assertIsNone(info['body']) self.assertEqual(len(info['headers']), 1) bearer, token = info['headers'][b'Authorization'].split() self.assertEqual(bearer, b'Bearer') # To parse the token, skip the time check, since this # test intentionally has stale tokens. with mock.patch('oauth2client.crypt._verify_time_range', return_value=True): payload = crypt.verify_signed_jwt_with_certs( token, certs, audience=self.url) self.assertEqual(len(payload), 5) self.assertEqual(payload['iss'], self.service_account_email) self.assertEqual(payload['sub'], self.service_account_email) self.assertEqual(payload['iat'], issued_at) self.assertEqual(payload['exp'], exp_val) self.assertEqual(payload['aud'], self.url)