我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.moves.http_client.OK。
def test_run_with_existing_remote_build_container(self, skipper_runner_run_mock, requests_get_mock): requests_response_class_mock = mock.MagicMock(spec='requests.Response') requests_response_mock = requests_response_class_mock.return_value requests_response_mock.json.return_value = { 'name': 'my_image', 'tags': ['latest', 'aaaaaaa', 'bbbbbbb', 'build-container-tag'] } requests_response_mock.status_code = http_client.OK requests_get_mock.return_value = requests_response_mock command = ['ls', '-l'] run_params = command self._invoke_cli( global_params=self.global_params, subcmd='run', subcmd_params=run_params ) expected_image_name = 'registry.io:5000/build-container-image:build-container-tag' skipper_runner_run_mock.assert_called_once_with(command, fqdn_image=expected_image_name, environment=[], interactive=False, name=None, net='host', volumes=None, workdir=None, use_cache=False)
def do_GET(self): """Handle a GET request. Parses the query parameters and prints a message if the flow has completed. Note that we can't detect if an error occurred. """ self.send_response(http_client.OK) self.send_header("Content-type", "text/html") self.end_headers() query = self.path.split('?', 1)[-1] query = dict(urllib.parse.parse_qsl(query)) self.server.query_params = query self.wfile.write( b"<html><head><title>Authentication Status</title></head>") self.wfile.write( b"<body><p>The authentication flow has completed.</p>") self.wfile.write(b"</body></html>")
def _detect_gce_environment(): """Determine if the current environment is Compute Engine. Returns: Boolean indicating whether or not the current environment is Google Compute Engine. """ # NOTE: The explicit ``timeout`` is a workaround. The underlying # issue is that resolving an unknown host on some networks will take # 20-30 seconds; making this timeout short fixes the issue, but # could lead to false negatives in the event that we are on GCE, but # the metadata resolution was particularly slow. The latter case is # "unlikely". http = transport.get_http_object(timeout=GCE_METADATA_TIMEOUT) try: response, _ = transport.request( http, _GCE_METADATA_URI, headers=_GCE_HEADERS) return ( response.status == http_client.OK and response.get(_METADATA_FLAVOR_HEADER) == _DESIRED_METADATA_FLAVOR) except socket.error: # socket.timeout or socket.error(64, 'Host is down') logger.info('Timeout attempting to reach GCE metadata service.') return False
def do_GET(self): """Handle a GET request. Parses the query parameters and prints a message if the flow has completed. Note that we can't detect if an error occurred. """ self.send_response(http_client.OK) self.send_header('Content-type', 'text/html') self.end_headers() parts = urllib.parse.urlparse(self.path) query = _helpers.parse_unique_urlencoded(parts.query) self.server.query_params = query self.wfile.write( b'<html><head><title>Authentication Status</title></head>') self.wfile.write( b'<body><p>The authentication flow has completed.</p>') self.wfile.write(b'</body></html>')
def _list_request(self, resource, permanent=False, headers=None, extra_headers=False, **kwargs): """Get the list of objects of the specified type. :param resource: The name of the REST resource, e.g., 'nodes'. :param headers: List of headers to use in request. :param extra_headers: Specify whether to use headers. :param **kwargs: Parameters for the request. :returns: A tuple with the server response and deserialized JSON list of objects """ uri = self._get_uri(resource, permanent=permanent) if kwargs: uri += "?%s" % urllib.urlencode(kwargs) resp, body = self.get(uri, headers=headers, extra_headers=extra_headers) self.expected_success(http_client.OK, resp.status) return resp, self.deserialize(body)
def _show_request(self, resource, uuid=None, permanent=False, **kwargs): """Gets a specific object of the specified type. :param uuid: Unique identifier of the object in UUID format. :returns: Serialized object as a dictionary. """ if 'uri' in kwargs: uri = kwargs['uri'] else: uri = self._get_uri(resource, uuid=uuid, permanent=permanent) resp, body = self.get(uri) self.expected_success(http_client.OK, resp.status) return resp, self.deserialize(body)
def test_token_info(self): credentials = gce.AppAssertionCredentials([]) http = transport.get_http_object() # First refresh to get the access token. self.assertIsNone(credentials.access_token) credentials.refresh(http) self.assertIsNotNone(credentials.access_token) # Then check the access token against the token info API. query_params = {'access_token': credentials.access_token} token_uri = (oauth2client.GOOGLE_TOKEN_INFO_URI + '?' + urllib.parse.urlencode(query_params)) response, content = transport.request(http, token_uri) self.assertEqual(response.status, http_client.OK) content = content.decode('utf-8') payload = json.loads(content) self.assertEqual(payload['access_type'], 'offline') self.assertLessEqual(int(payload['expires_in']), 3600)
def test_has_credentials_in_storage(self, OAuth2Credentials): request = self.factory.get('/test') request.session = mock.Mock() credentials_mock = mock.Mock( scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES)) credentials_mock.has_scopes.return_value = True credentials_mock.invalid = False credentials_mock.scopes = set([]) OAuth2Credentials.from_json.return_value = credentials_mock @decorators.oauth_enabled def test_view(request): return http.HttpResponse('test') response = test_view(request) self.assertEqual(response.status_code, http_client.OK) self.assertEqual(response.content, b'test') self.assertTrue(request.oauth.has_credentials()) self.assertIsNotNone(request.oauth.http) self.assertSetEqual( request.oauth.scopes, set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
def test_specified_scopes(self, dictionary_storage_mock): request = self.factory.get('/test') request.session = mock.Mock() credentials_mock = mock.Mock( scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES)) credentials_mock.has_scopes = mock.Mock(return_value=True) credentials_mock.is_valid = True dictionary_storage_mock.get.return_value = credentials_mock @decorators.oauth_enabled(scopes=['additional-scope']) def test_view(request): return http.HttpResponse('hello world') # pragma: NO COVER response = test_view(request) self.assertEqual(response.status_code, http_client.OK) self.assertIsNotNone(request.oauth) self.assertFalse(request.oauth.has_credentials())
def test_get_token_success(self, now): http = request_mock( http_client.OK, 'application/json', json.dumps({'access_token': 'a', 'expires_in': 100}) ) token, expiry = _metadata.get_token(http=http) self.assertEqual(token, 'a') self.assertEqual( expiry, datetime.datetime.min + datetime.timedelta(seconds=100)) # Verify mocks. now.assert_called_once_with() self.assertEqual(http.requests, 1) self.assertEqual(http.uri, EXPECTED_URL + '/token') self.assertEqual(http.method, 'GET') self.assertIsNone(http.body) self.assertEqual(http.headers, _metadata.METADATA_HEADERS)
def test_exchange_using_authorization_header(self): auth_header = 'Basic Y2xpZW50X2lkKzE6c2Vjexc_managerV0KzE=', flow = client.OAuth2WebServerFlow( client_id='client_id+1', authorization_header=auth_header, scope='foo', redirect_uri=client.OOB_CALLBACK_URN, user_agent='unittest-sample/1.0', revoke_uri='dummy_revoke_uri', ) http = http_mock.HttpMockSequence([ ({'status': http_client.OK}, b'access_token=SlAV32hkKG'), ]) credentials = flow.step2_exchange(code='some random code', http=http) self.assertEqual('SlAV32hkKG', credentials.access_token) self.assertEqual(len(http.requests), 1) test_request = http.requests[0] # Did we pass the Authorization header? self.assertEqual(test_request['headers']['Authorization'], auth_header) # Did we omit client_secret from POST body? self.assertTrue('client_secret' not in test_request['body'])
def __init__(self, headers=None, data=None): """HttpMock constructor. Args: headers: dict, header to return with response """ if headers is None: headers = {'status': http_client.OK} self.data = data self.response_headers = headers self.headers = None self.uri = None self.method = None self.body = None self.headers = None self.requests = 0
def test_has_credentials_in_storage(self, UserOAuth2): request = self.factory.get('/test') request.session = mock.Mock() @decorators.oauth_required def test_view(request): return http.HttpResponse("test") my_user_oauth = mock.Mock() UserOAuth2.return_value = my_user_oauth my_user_oauth.has_credentials.return_value = True response = test_view(request) self.assertEqual(response.status_code, http_client.OK) self.assertEqual(response.content, b"test")
def test_exchange_with_pkce(self): http = http_mock.HttpMockSequence([ ({'status': http_client.OK}, b'access_token=SlAV32hkKG'), ]) flow = client.OAuth2WebServerFlow( 'client_id+1', scope='foo', redirect_uri='http://example.com', pkce=True, code_verifier=self.good_verifier) flow.step2_exchange(code='some random code', http=http) self.assertEqual(len(http.requests), 1) test_request = http.requests[0] self.assertIn( 'code_verifier={0}'.format(self.good_verifier.decode()), test_request['body'])
def test_allocate_node_conflict(self, mock_request, mock_get_url): """Test allocate resource conflict when compose node""" mock_get_url.return_value = '/redfish/v1/Nodes' # Fake response for getting nodes root fake_node_root_resp = fakes.mock_request_get(fakes.fake_nodes_root(), http_client.OK) # Fake response for allocating node fake_node_allocation_conflict = \ fakes.mock_request_get(fakes.fake_allocate_node_conflict(), http_client.CONFLICT) mock_request.side_effect = [fake_node_root_resp, fake_node_allocation_conflict] with self.assertRaises(exception.RedfishException) as context: redfish.compose_node({"name": "test_node"}) self.assertTrue("There are no computer systems available for this " "allocation request." in str(context.exception.detail))
def test_set_boot_source_success(self, mock_get_url, mock_request): """Test successfully reset node status""" mock_get_url.return_value = '/redfish/v1/Nodes' fake_node_detail = fakes.mock_request_get( fakes.fake_node_detail(), http_client.OK) fake_node_action_resp = fakes.mock_request_get( {}, http_client.NO_CONTENT) mock_request.side_effect = [fake_node_detail, fake_node_action_resp] result = redfish.set_boot_source( "1", {"Boot": {"Enabled": "Once", "Target": "Hdd"}}) expected = exception.confirmation( confirm_code="Set Boot Source of Composed Node", confirm_detail="The boot source of composed node has been set to " "'{0}' with enabled state '{1}' successfully." .format("Hdd", "Once")) self.assertEqual(expected, result)
def test_podmanager_create(self, pstatus_mock, plist_mock, pcreate_mock): pstatus_mock.return_value = constants.PODM_STATUS_ONLINE plist_mock.return_value = [] values = { "name": "podm_name", "url": "https://10.240.212.123", "authentication": [ { "type": "basic", "auth_items": { "username": "xxxxxxx", "password": "xxxxxxx" } }] } result = copy.deepcopy(values) result['status'] = constants.PODM_STATUS_ONLINE pcreate_mock.return_value = result response = self.app.post('/v1/pod_managers', content_type='application/json', data=json.dumps(values)) self.assertEqual(http_client.OK, response.status_code)
def test_compose_request_using_properties(self, mock_compose, mock_conn): req = { "name": "test_request", "podm_id": "test-podm", "properties": { "memory": { "capacity_mib": "4000", "type": "DDR3" }, "processor": { "model": "Intel", "total_cores": "4" } } } mock_compose.return_value = req resp = self.app.post('/v1/nodes', content_type='application/json', data=json.dumps(req)) mock_conn.assert_called_once_with('test-podm') self.assertEqual(http_client.OK, resp.status_code)
def show_cpu_details(cpu_url): """Get processor details . :param cpu_url: relative redfish url to processor, e.g /redfish/v1/Systems/1/Processors/1. :returns: dict of processor detail. """ resp = send_request(cpu_url) if resp.status_code != http_client.OK: # Raise exception if don't find processor raise exception.RedfishException(resp.json(), status_code=resp.status_code) respdata = resp.json() cpu_details = { "instruction_set": respdata.get("InstructionSet"), "model": respdata.get("Model"), "speed_mhz": respdata.get("MaxSpeedMHz"), "total_core": respdata.get("TotalCores") } return cpu_details
def show_ram_details(ram_url): """Get memory details . :param ram_url: relative redfish url to memory, e.g /redfish/v1/Systems/1/Memory/1. :returns: dict of memory detail. """ resp = send_request(ram_url) if resp.status_code != http_client.OK: # Raise exception if don't find memory raise exception.RedfishException(resp.json(), status_code=resp.status_code) respdata = resp.json() ram_details = { "data_width_bit": respdata.get("DataWidthBits"), "speed_mhz": respdata.get("OperatingSpeedMhz"), "total_memory_mb": respdata.get("CapacityMiB") } return ram_details
def test_http_request_client_success(self): kwargs = {"valence_url": "http://localhost/"} client = http.HTTPClient(**kwargs) resp = utils.mockSessionResponse( {'content-type': 'test/plain'}, 'Body', version=1, status_code=http_client.OK) with mock.patch.object(client, 'session', autospec=True) as mock_session: mock_session.request.side_effect = iter([resp]) response, body_iter = client.json_request('GET', '/redfish/v1/Nodes') self.assertEqual(http_client.OK, response.status_code)
def test_get_run_by_id(self): response = self.app.get('/runs/1234') self.assertEqual(http_client.NOT_FOUND, response.status_code) pb = 'tests/fixtures/playbooks/hello_world.yml' response = self.app.post( '/runs', data=json.dumps(dict(playbook_path=pb, inventory_file='localhosti,', options={'connection': 'local'})), content_type='application/json') res_dict = json.loads(response.data) run_id = res_dict['id'] self.assertEqual(http_client.OK, response.status_code) response = self.app.get('/runs/{}'.format(run_id)) self.assertEqual(http_client.OK, response.status_code) self._wait_for_run_complete(run_id)
def test_get_run_list(self): pb = 'tests/fixtures/playbooks/hello_world.yml' response = self.app.post( '/runs', data=json.dumps(dict(playbook_path=pb, inventory_file='localhost,', options={'connection': 'local'})), content_type='application/json') res_dict = json.loads(response.data) run_id = res_dict['id'] self.assertEqual(http_client.OK, response.status_code) response = self.app.get('/runs') res_list = json.loads(response.data) found = False for item in res_list: if item['id'] == run_id: found = True break self.assertEqual(True, found) self._wait_for_run_complete(run_id)
def test_delete_running_job(self): pb = 'tests/fixtures/playbooks/hello_world_with_sleep.yml' response = self.app.post( '/runs', data=json.dumps(dict(playbook_path=pb, inventory_file='localhost,', options={'connection': 'local', 'subset': None})), content_type='application/json') res_dict = json.loads(response.data) self.assertEqual(http_client.OK, response.status_code) response = self.app.delete('/runs/{}'.format(res_dict['id'])) self.assertEqual(http_client.OK, response.status_code) response = self.app.get('/runs/{}'.format(res_dict['id'])) res_dict = json.loads(response.data) self.assertEqual('ABORTED', res_dict['state']) self._wait_for_run_complete(res_dict['id'])
def validate_href(image_href): """Validate HTTP image reference. :param image_href: Image reference. :raises: exception.ImageRefValidationFailed if HEAD request failed or returned response code not equal to 200. :returns: Response to HEAD request. """ try: response = requests.head(image_href) if response.status_code != http_client.OK: raise exception.ImageRefValidationFailed( image_href=image_href, reason=("Got HTTP code %s instead of 200 in response to " "HEAD request." % response.status_code)) except requests.RequestException as e: raise exception.ImageRefValidationFailed(image_href=image_href, reason=e) return response
def test_get_state(self): """Test getting an instance state.""" self.impl.get.return_value = { 'name': 'foo.bar#0000000001', 'host': 'baz1', 'state': 'running' } resp = self.client.get('/state/foo.bar#0000000001') resp_json = b''.join(resp.response) self.assertEqual(json.loads(resp_json.decode()), { 'name': 'foo.bar#0000000001', 'oom': None, 'signal': None, 'expires': None, 'when': None, 'host': 'baz1', 'state': 'running', 'exitcode': None }) self.assertEqual(resp.status_code, http_client.OK) self.impl.get.assert_called_with('foo.bar#0000000001') self.impl.get.return_value = None resp = self.client.get('/state/foo.bar#0000000002') self.assertEqual(resp.status_code, http_client.NOT_FOUND)
def test_get_server_list(self): """Test getting a list of servers.""" server_list = [ {'cell': 'foo', 'traits': [], '_id': 'server1', 'data': []}, {'cell': 'bar', 'traits': [], '_id': 'server2', 'data': []} ] self.impl.list.return_value = server_list resp = self.client.get('/server/') self.assertEqual(''.join(resp.response), json.dumps(server_list)) self.assertEqual(resp.status_code, http_client.OK) self.impl.list.assert_called_with(None, None) resp = self.client.get('/server/?cell=foo') self.assertEqual(resp.status_code, http_client.OK) self.impl.list.assert_called_with('foo', None) resp = self.client.get('/server/?partition=baz') self.assertEqual(resp.status_code, http_client.OK) self.impl.list.assert_called_with(None, 'baz') resp = self.client.get('/server/?cell=foo&partition=baz') self.assertEqual(resp.status_code, http_client.OK) self.impl.list.assert_called_with('foo', 'baz')
def test_replace_singular(self, mock_utcnow): description = 'server-new-description' test_time = datetime.datetime(2000, 1, 1, 0, 0) mock_utcnow.return_value = test_time response = self.patch_json('/servers/%s' % self.server.uuid, [{'path': '/description', 'value': description, 'op': 'replace'}], headers=self.headers) self.assertEqual('application/json', response.content_type) self.assertEqual(http_client.OK, response.status_code) result = self.get_json('/servers/%s' % self.server.uuid, headers=self.headers) self.assertEqual(description, result['description']) return_updated_at = timeutils.parse_isotime( result['updated_at']).replace(tzinfo=None) self.assertEqual(test_time, return_updated_at)
def test_replace_multi(self): extra = {"foo1": "bar1", "foo2": "bar2", "foo3": "bar3"} uuid = uuidutils.generate_uuid() server = utils.create_test_server(name='test1', uuid=uuid, extra=extra) new_value = 'new value' response = self.patch_json('/servers/%s' % server.uuid, [{'path': '/metadata/foo2', 'value': new_value, 'op': 'replace'}], headers=self.headers) self.assertEqual('application/json', response.content_type) self.assertEqual(http_client.OK, response.status_code) result = self.get_json('/servers/%s' % server.uuid, headers=self.headers) extra["foo2"] = new_value self.assertEqual(extra, result['metadata'])
def test_remove_singular(self): uuid = uuidutils.generate_uuid() server = utils.create_test_server(name='test2', uuid=uuid, extra={'a': 'b'}) response = self.patch_json('/servers/%s' % server.uuid, [{'path': '/description', 'op': 'remove'}], headers=self.headers) self.assertEqual('application/json', response.content_type) self.assertEqual(http_client.OK, response.status_code) result = self.get_json('/servers/%s' % server.uuid, headers=self.headers) self.assertIsNone(result['description']) # Assert nothing else was changed self.assertEqual(server.uuid, result['uuid']) self.assertEqual(server.extra, result['metadata'])
def _get_service_account_email(http_request=None): """Get the GCE service account email from the current environment. Args: http_request: callable, (Optional) a callable that matches the method signature of httplib2.Http.request, used to make the request to the metadata service. Returns: tuple, A pair where the first entry is an optional response (from a failed request) and the second is service account email found (as a string). """ if http_request is None: http_request = httplib2.Http().request response, content = http_request( _DEFAULT_EMAIL_METADATA, headers={'Metadata-Flavor': 'Google'}) if response.status == http_client.OK: content = _from_bytes(content) return None, content else: return response, content
def remote_image_exist(registry, image, tag): urllib3.disable_warnings() url = IMAGE_TAGS_URL % dict(registry=registry, image=image) response = requests.get(url=url, verify=False) if response.status_code != http_client.OK: return False info = response.json() return tag in info['tags']
def _do_revoke(self, http_request, token): """Revokes this credential and deletes the stored copy (if it exists). Args: http_request: callable, a callable that matches the method signature of httplib2.Http.request, used to make the refresh request. token: A string used as the token to be revoked. Can be either an access_token or refresh_token. Raises: TokenRevokeError: If the revoke request does not return with a 200 OK. """ logger.info('Revoking token') query_params = {'token': token} token_revoke_uri = _update_query_params(self.revoke_uri, query_params) resp, content = http_request(token_revoke_uri) if resp.status == http_client.OK: self.invalid = True else: error_msg = 'Invalid response %s.' % resp.status try: d = json.loads(_from_bytes(content)) if 'error' in d: error_msg = d['error'] except (TypeError, ValueError): pass raise TokenRevokeError(error_msg) if self.store: self.store.delete()
def _do_retrieve_scopes(self, http_request, token): """Retrieves the list of authorized scopes from the OAuth2 provider. Args: http_request: callable, a callable that matches the method signature of httplib2.Http.request, used to make the refresh request. token: A string used as the token to identify the credentials to the provider. Raises: Error: When refresh fails, indicating the the access token is invalid. """ logger.info('Refreshing scopes') query_params = {'access_token': token, 'fields': 'scope'} token_info_uri = _update_query_params(self.token_info_uri, query_params) resp, content = http_request(token_info_uri) content = _from_bytes(content) if resp.status == http_client.OK: d = json.loads(content) self.scopes = set(util.string_to_scopes(d.get('scope', ''))) else: error_msg = 'Invalid response %s.' % (resp.status,) try: d = json.loads(content) if 'error_description' in d: error_msg = d['error_description'] except (TypeError, ValueError): pass raise Error(error_msg)
def _detect_gce_environment(): """Determine if the current environment is Compute Engine. Returns: Boolean indicating whether or not the current environment is Google Compute Engine. """ # NOTE: The explicit ``timeout`` is a workaround. The underlying # issue is that resolving an unknown host on some networks will take # 20-30 seconds; making this timeout short fixes the issue, but # could lead to false negatives in the event that we are on GCE, but # the metadata resolution was particularly slow. The latter case is # "unlikely". connection = six.moves.http_client.HTTPConnection( _GCE_METADATA_HOST, timeout=1) try: headers = {_METADATA_FLAVOR_HEADER: _DESIRED_METADATA_FLAVOR} connection.request('GET', '/', headers=headers) response = connection.getresponse() if response.status == http_client.OK: return (response.getheader(_METADATA_FLAVOR_HEADER) == _DESIRED_METADATA_FLAVOR) except socket.error: # socket.timeout or socket.error(64, 'Host is down') logger.info('Timeout attempting to reach GCE metadata service.') return False finally: connection.close()
def verify_id_token(id_token, audience, http=None, cert_uri=ID_TOKEN_VERIFICATION_CERTS): """Verifies a signed JWT id_token. This function requires PyOpenSSL and because of that it does not work on App Engine. Args: id_token: string, A Signed JWT. audience: string, The audience 'aud' that the token should be for. http: httplib2.Http, instance to use to make the HTTP request. Callers should supply an instance that has caching enabled. cert_uri: string, URI of the certificates in JSON format to verify the JWT against. Returns: The deserialized JSON in the JWT. Raises: oauth2client.crypt.AppIdentityError: if the JWT fails to verify. CryptoUnavailableError: if no crypto library is available. """ _RequireCryptoOrDie() if http is None: http = _cached_http resp, content = http.request(cert_uri) if resp.status == http_client.OK: certs = json.loads(_from_bytes(content)) return crypt.verify_signed_jwt_with_certs(id_token, certs, audience) else: raise VerifyJwtTokenError('Status code: %d' % resp.status)
def _refresh(self, http_request): """Refreshes the access_token. Skip all the storage hoops and just refresh using the API. Args: http_request: callable, a callable that matches the method signature of httplib2.Http.request, used to make the refresh request. Raises: HttpAccessTokenRefreshError: When the refresh fails. """ query = '?scope=%s' % urllib.parse.quote(self.scope, '') uri = META.replace('{?scope}', query) response, content = http_request( uri, headers={'Metadata-Flavor': 'Google'}) content = _from_bytes(content) if response.status == http_client.OK: try: token_content = json.loads(content) except Exception as e: raise HttpAccessTokenRefreshError(str(e), status=response.status) self.access_token = token_content['access_token'] else: if response.status == http_client.NOT_FOUND: content += (' This can occur if a VM was created' ' with no service account or scopes.') raise HttpAccessTokenRefreshError(content, status=response.status)
def _update_image_meta_v2(conn, extra_headers, properties, patch_path): # NOTE(sirp): There is some confusion around OVF. Here's a summary # of where we currently stand: # 1. OVF as a container format is misnamed. We really should be # using OVA since that is the name for the container format; # OVF is the standard applied to the manifest file contained # within. # 2. We're currently uploading a vanilla tarball. In order to be # OVF/OVA compliant, we'll need to embed a minimal OVF # manifest as the first file. body = [ {"path": "/container_format", "value": "ovf", "op": "add"}, {"path": "/disk_format", "value": "vhd", "op": "add"}, {"path": "/visibility", "value": "private", "op": "add"}] headers = {'Content-Type': 'application/openstack-images-v2.1-json-patch'} headers.update(**extra_headers) for key, value in properties.items(): prop = {"path": "/%s" % key.replace('_', '-'), "value": str(value), "op": "add"} body.append(prop) body = json.dumps(body) conn.request('PATCH', patch_path, body=body, headers=headers) resp = conn.getresponse() resp.read() if resp.status == httplib.OK: return logging.error("Image meta was not updated. Status: %s, Reason: %s" % ( resp.status, resp.reason))
def after(self, state): # Omit empty body. Some errors may not have body at this level yet. if not state.response.body: return # Do nothing if there is no error. # Status codes in the range 200 (OK) to 399 (400 = BAD_REQUEST) are not # an error. if (http_client.OK <= state.response.status_int < http_client.BAD_REQUEST): return json_body = state.response.json # Do not remove traceback when traceback config is set if cfg.CONF.debug_tracebacks_in_api: return faultstring = json_body.get('faultstring') traceback_marker = 'Traceback (most recent call last):' if faultstring and traceback_marker in faultstring: # Cut-off traceback. faultstring = faultstring.split(traceback_marker, 1)[0] # Remove trailing newlines and spaces if any. json_body['faultstring'] = faultstring.rstrip() # Replace the whole json. Cannot change original one because it's # generated on the fly. state.response.json = json_body
def test_list_nodes(self, mock_get_token): mock_get_token.return_value = 'FAKE-TOKEN-123' self.requests_mock.register_uri('GET', 'https://127.0.0.1:4443/vdc/nodes', status_code=http_client.OK, json=self.returned_json) response = self.client.node.list() self.assertEqual(self.requests_mock.last_request.method, 'GET') self.assertEqual(self.requests_mock.last_request.url, 'https://127.0.0.1:4443/vdc/nodes') self.assertEqual(self.requests_mock.last_request.headers['x-sds-auth-token'], 'FAKE-TOKEN-123') self.assertEqual(response, self.returned_json)
def _do_revoke(self, http, token): """Revokes this credential and deletes the stored copy (if it exists). Args: http: an object to be used to make HTTP requests. token: A string used as the token to be revoked. Can be either an access_token or refresh_token. Raises: TokenRevokeError: If the revoke request does not return with a 200 OK. """ logger.info('Revoking token') query_params = {'token': token} token_revoke_uri = _helpers.update_query_params( self.revoke_uri, query_params) resp, content = transport.request(http, token_revoke_uri) if resp.status == http_client.METHOD_NOT_ALLOWED: body = urllib.parse.urlencode(query_params) resp, content = transport.request(http, token_revoke_uri, method='POST', body=body) if resp.status == http_client.OK: self.invalid = True else: error_msg = 'Invalid response {0}.'.format(resp.status) try: d = json.loads(_helpers._from_bytes(content)) if 'error' in d: error_msg = d['error'] except (TypeError, ValueError): pass raise TokenRevokeError(error_msg) if self.store: self.store.delete()