我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用six.moves.http_client.FOUND。
def test_callback_view(self): self.oauth2.storage = mock.Mock() with self.app.test_client() as client: with mock.patch( 'oauth2client.transport.get_http_object') as new_http: # Set-up mock. http = http_mock.HttpMock(data=DEFAULT_RESP) new_http.return_value = http # Run tests. state = self._setup_callback_state(client) response = client.get( '/oauth2callback?state={0}&code=codez'.format(state)) self.assertEqual(response.status_code, httplib.FOUND) self.assertIn('/return_url', response.headers['Location']) self.assertIn(self.oauth2.client_secret, http.body) self.assertIn('codez', http.body) self.assertTrue(self.oauth2.storage.put.called) # Check the mocks were called. new_http.assert_called_once_with()
def test_incremental_auth_exchange(self): self._create_incremental_auth_app() with mock.patch('oauth2client.transport.get_http_object') as new_http: # Set-up mock. new_http.return_value = http_mock.HttpMock(data=DEFAULT_RESP) # Run tests. with self.app.test_client() as client: state = self._setup_callback_state( client, return_url='/return_url', # Incremental auth scopes. scopes=['one', 'two']) response = client.get( '/oauth2callback?state={0}&code=codez'.format(state)) self.assertEqual(response.status_code, httplib.FOUND) credentials = self.oauth2.credentials self.assertTrue( credentials.has_scopes(['email', 'one', 'two'])) # Check the mocks were called. new_http.assert_called_once_with()
def test_incremental_auth_exchange(self): self._create_incremental_auth_app() with Http2Mock(): with self.app.test_client() as client: state = self._setup_callback_state( client, return_url='/return_url', # Incremental auth scopes. scopes=['one', 'two']) response = client.get( '/oauth2callback?state={0}&code=codez'.format(state)) self.assertEqual(response.status_code, httplib.FOUND) credentials = self.oauth2.credentials self.assertTrue( credentials.has_scopes(['email', 'one', 'two']))
def _handle_error(url, response): """Handle response status codes.""" handlers = { http_client.NOT_FOUND: NotFoundError( 'Resource not found: {}'.format(url) ), http_client.FOUND: AlreadyExistsError( 'Resource already exists: {}'.format(url) ), http_client.FAILED_DEPENDENCY: ValidationError(response), http_client.UNAUTHORIZED: NotAuthorizedError(response), http_client.BAD_REQUEST: BadRequestError(response), } if response.status_code in handlers: raise handlers[response.status_code]
def init(api, _cors, impl): """Configures REST handlers for allocation resource.""" namespace = webutils.namespace( api, __name__, 'Local nodeinfo redirect API.' ) @namespace.route('/<hostname>/<path:path>') class _NodeRedirect(restplus.Resource): """Redirects to local nodeinfo endpoint.""" def get(self, hostname, path): """Returns list of local instances.""" hostport = impl.get(hostname) if not hostport: return 'Host not found.', http_client.NOT_FOUND url = utils.encode_uri_parts(path) return flask.redirect('http://%s/%s' % (hostport, url), code=http_client.FOUND)
def _http_request(self, conn_url, method, **kwargs): """Send an http request with the specified characteristics Wrapper around request.Session.request to handle tasks such as setting headers and error handling. """ kwargs['headers'] = kwargs.get('headers', {}) kwargs['headers'].setdefault('User-agent', USER_AGENT) self.log_curl_request(method, conn_url, kwargs) body = kwargs.pop('body', None) headers = kwargs.pop('headers') conn_url = self._make_connection_url(conn_url) try: resp = self.session.request(method, conn_url, headers=headers, data=body, json=kwargs) except requests.exceptions.RequestException as e: msg = (_("Error has occured while handling request for " "%(url)s: %(e)s") % dict(url=conn_url, e=e)) if isinstance(e, ValueError): raise exc.ValidationError(msg) raise exc.ConnectionRefuse(msg) self.log_http_response(resp, resp.text) body_iter = six.StringIO(resp.text) if resp.status_code >= http_client.BAD_REQUEST: error_json = _extract_error_json(resp.text) raise exc.from_response(resp, error_json, method, conn_url) elif resp.status_code in (http_client.FOUND, http_client.USE_PROXY): return self._http_request(resp['location'], method, **kwargs) return resp, body_iter
def test_callback_view(self): self.oauth2.storage = mock.Mock() with self.app.test_client() as client: with Http2Mock() as http: state = self._setup_callback_state(client) response = client.get( '/oauth2callback?state={0}&code=codez'.format(state)) self.assertEqual(response.status_code, httplib.FOUND) self.assertIn('/return_url', response.headers['Location']) self.assertIn(self.oauth2.client_secret, http.body) self.assertIn('codez', http.body) self.assertTrue(self.oauth2.storage.put.called)
def test_get_302(self, resp_mock): """Test treadmill.restclient.get FOUND (302)""" resp_mock.return_value.status_code = http_client.FOUND with self.assertRaises(restclient.AlreadyExistsError): restclient.get('http://foo.com', '/')
def test_get_version_list_302(self, mock_get_client): req = webob.Request.blank('/v1') req.accept = "application/json" res = req.get_response(self.wsgi_app) self.assertEqual(http.FOUND, res.status_int) redirect_req = webob.Request.blank('/v1/') self.assertEqual(redirect_req.url, res.location)
def _request_unscoped_token(self): resp = self.saml2_client.send_service_provider_request( self.keystone_v3_endpoint, self.idp_id, self.protocol_id) self.assertEqual(http_client.OK, resp.status_code) saml2_authn_request = etree.XML(resp.content) relay_state = self._str_from_xml( saml2_authn_request, self.ECP_RELAY_STATE) sp_consumer_url = self._str_from_xml( saml2_authn_request, self.ECP_SERVICE_PROVIDER_CONSUMER_URL) # Perform the authn request to the identity provider resp = self.saml2_client.send_identity_provider_authn_request( saml2_authn_request, self.idp_url, self.username, self.password) self.assertEqual(http_client.OK, resp.status_code) saml2_idp_authn_response = etree.XML(resp.content) idp_consumer_url = self._str_from_xml( saml2_idp_authn_response, self.ECP_IDP_CONSUMER_URL) # Assert that both saml2_authn_request and saml2_idp_authn_response # have the same consumer URL. self.assertEqual(sp_consumer_url, idp_consumer_url) # Present the identity provider authn response to the service provider. resp = self.saml2_client.send_service_provider_saml2_authn_response( saml2_idp_authn_response, relay_state, idp_consumer_url) # Must receive a redirect from service provider to the URL where the # unscoped token can be retrieved. self.assertIn(resp.status_code, [http_client.FOUND, http_client.SEE_OTHER]) # We can receive multiple types of errors here, the response depends on # the mapping and the username used to authenticate in the Identity # Provider and also in the Identity Provider remote ID validation. # If everything works well, we receive an unscoped token. sp_url = resp.headers['location'] resp = ( self.saml2_client.send_service_provider_unscoped_token_request( sp_url)) self.assertEqual(http_client.CREATED, resp.status_code) self.assertIn('X-Subject-Token', resp.headers) self.assertNotEmpty(resp.json()) return resp
def test_required(self): @self.app.route('/protected') @self.oauth2.required def index(): return 'Hello' # No credentials, should redirect with self.app.test_client() as client: response = client.get('/protected') self.assertEqual(response.status_code, httplib.FOUND) self.assertIn('oauth2authorize', response.headers['Location']) self.assertIn('protected', response.headers['Location']) credentials = self._generate_credentials(scopes=self.oauth2.scopes) # With credentials, should allow with self.app.test_client() as client: with client.session_transaction() as session: session['google_oauth2_credentials'] = credentials.to_json() response = client.get('/protected') self.assertEqual(response.status_code, httplib.OK) self.assertIn('Hello', response.data.decode('utf-8')) # Expired credentials with refresh token, should allow. credentials.token_expiry = datetime.datetime(1990, 5, 28) with mock.patch('oauth2client.client._UTCNOW') as utcnow: utcnow.return_value = datetime.datetime(1990, 5, 29) with self.app.test_client() as client: with client.session_transaction() as session: session['google_oauth2_credentials'] = ( credentials.to_json()) response = client.get('/protected') self.assertEqual(response.status_code, httplib.OK) self.assertIn('Hello', response.data.decode('utf-8')) # Expired credentials without a refresh token, should redirect. credentials.refresh_token = None with mock.patch('oauth2client.client._UTCNOW') as utcnow: utcnow.return_value = datetime.datetime(1990, 5, 29) with self.app.test_client() as client: with client.session_transaction() as session: session['google_oauth2_credentials'] = ( credentials.to_json()) response = client.get('/protected') self.assertEqual(response.status_code, httplib.FOUND) self.assertIn('oauth2authorize', response.headers['Location']) self.assertIn('protected', response.headers['Location'])
def test_incremental_auth(self): self._create_incremental_auth_app() # No credentials, should redirect with self.app.test_client() as client: response = client.get('/one') self.assertIn('one', response.headers['Location']) self.assertEqual(response.status_code, httplib.FOUND) # Credentials for one. /one should allow, /two should redirect. credentials = self._generate_credentials(scopes=['email', 'one']) with self.app.test_client() as client: with client.session_transaction() as session: session['google_oauth2_credentials'] = credentials.to_json() response = client.get('/one') self.assertEqual(response.status_code, httplib.OK) response = client.get('/two') self.assertIn('two', response.headers['Location']) self.assertEqual(response.status_code, httplib.FOUND) # Starting the authorization flow should include the # include_granted_scopes parameter as well as the scopes. response = client.get(response.headers['Location'][17:]) q = urlparse.parse_qs( response.headers['Location'].split('?', 1)[1]) self.assertIn('include_granted_scopes', q) self.assertEqual( set(q['scope'][0].split(' ')), set(['one', 'email', 'two', 'three'])) # Actually call two() without a redirect. credentials2 = self._generate_credentials( scopes=['email', 'two', 'three']) with self.app.test_client() as client: with client.session_transaction() as session: session['google_oauth2_credentials'] = credentials2.to_json() response = client.get('/two') self.assertEqual(response.status_code, httplib.OK)
def test_incremental_auth(self): self._create_incremental_auth_app() # No credentials, should redirect with self.app.test_client() as client: response = client.get('/one') self.assertIn('one', response.headers['Location']) self.assertEqual(response.status_code, httplib.FOUND) # Credentials for one. /one should allow, /two should redirect. credentials = self._generate_credentials(scopes=['email', 'one']) with self.app.test_client() as client: with client.session_transaction() as session: session['google_oauth2_credentials'] = credentials.to_json() response = client.get('/one') self.assertEqual(response.status_code, httplib.OK) response = client.get('/two') self.assertIn('two', response.headers['Location']) self.assertEqual(response.status_code, httplib.FOUND) # Starting the authorization flow should include the # include_granted_scopes parameter as well as the scopes. response = client.get(response.headers['Location'][17:]) q = urlparse.parse_qs(response.headers['Location'].split('?', 1)[1]) self.assertIn('include_granted_scopes', q) self.assertEqual( set(q['scope'][0].split(' ')), set(['one', 'email', 'two', 'three'])) # Actually call two() without a redirect. credentials2 = self._generate_credentials( scopes=['email', 'two', 'three']) with self.app.test_client() as client: with client.session_transaction() as session: session['google_oauth2_credentials'] = credentials2.to_json() response = client.get('/two') self.assertEqual(response.status_code, httplib.OK)
def _http_request(self, url, method, **kwargs): kwargs.setdefault('user_agent', USER_AGENT) kwargs.setdefault('auth', self.auth) if isinstance(self.endpoint_override, six.string_types): kwargs.setdefault( 'endpoint_override', _trim_endpoint_api_version(self.endpoint_override) ) if getattr(self, 'os_iotronic_api_version', None): kwargs['headers'].setdefault('X-OpenStack-Iotronic-API-Version', self.os_iotronic_api_version) endpoint_filter = kwargs.setdefault('endpoint_filter', {}) endpoint_filter.setdefault('interface', self.interface) endpoint_filter.setdefault('service_type', self.service_type) endpoint_filter.setdefault('region_name', self.region_name) resp = self.session.request(url, method, raise_exc=False, **kwargs) if resp.status_code == http_client.NOT_ACCEPTABLE: negotiated_ver = self.negotiate_version(self.session, resp) kwargs['headers']['X-OpenStack-Iotronic-API-Version'] = ( negotiated_ver) return self._http_request(url, method, **kwargs) if resp.status_code >= http_client.BAD_REQUEST: error_json = _extract_error_json(resp.content) # NOTE(vdrok): exceptions from iotronic controllers' _lookup # methods # are constructed directly by pecan instead of wsme, and contain # only description field raise exc.from_response(resp, (error_json.get('faultstring') or error_json.get('description')), error_json.get('debuginfo'), method, url) elif resp.status_code in (http_client.MOVED_PERMANENTLY, http_client.FOUND, http_client.USE_PROXY): # Redirected. Reissue the request to the new location. location = resp.headers.get('location') resp = self._http_request(location, method, **kwargs) elif resp.status_code == http_client.MULTIPLE_CHOICES: raise exc.from_response(resp, method=method, url=url) return resp