Python six.moves.http_client 模块,OK 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.moves.http_client.OK

项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def test_run_with_existing_remote_build_container(self, skipper_runner_run_mock, requests_get_mock):
        requests_response_class_mock = mock.MagicMock(spec='requests.Response')
        requests_response_mock = requests_response_class_mock.return_value
        requests_response_mock.json.return_value = {
            'name': 'my_image',
            'tags': ['latest', 'aaaaaaa', 'bbbbbbb', 'build-container-tag']
        }
        requests_response_mock.status_code = http_client.OK
        requests_get_mock.return_value = requests_response_mock

        command = ['ls', '-l']
        run_params = command
        self._invoke_cli(
            global_params=self.global_params,
            subcmd='run',
            subcmd_params=run_params
        )
        expected_image_name = 'registry.io:5000/build-container-image:build-container-tag'
        skipper_runner_run_mock.assert_called_once_with(command, fqdn_image=expected_image_name, environment=[],
                                                        interactive=False, name=None, net='host', volumes=None,
                                                        workdir=None, use_cache=False)
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def do_GET(self):
        """Handle a GET request.

        Parses the query parameters and prints a message
        if the flow has completed. Note that we can't detect
        if an error occurred.
        """
        self.send_response(http_client.OK)
        self.send_header("Content-type", "text/html")
        self.end_headers()
        query = self.path.split('?', 1)[-1]
        query = dict(urllib.parse.parse_qsl(query))
        self.server.query_params = query
        self.wfile.write(
            b"<html><head><title>Authentication Status</title></head>")
        self.wfile.write(
            b"<body><p>The authentication flow has completed.</p>")
        self.wfile.write(b"</body></html>")
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def _detect_gce_environment():
    """Determine if the current environment is Compute Engine.

    Returns:
        Boolean indicating whether or not the current environment is Google
        Compute Engine.
    """
    # NOTE: The explicit ``timeout`` is a workaround. The underlying
    #       issue is that resolving an unknown host on some networks will take
    #       20-30 seconds; making this timeout short fixes the issue, but
    #       could lead to false negatives in the event that we are on GCE, but
    #       the metadata resolution was particularly slow. The latter case is
    #       "unlikely".
    http = transport.get_http_object(timeout=GCE_METADATA_TIMEOUT)
    try:
        response, _ = transport.request(
            http, _GCE_METADATA_URI, headers=_GCE_HEADERS)
        return (
            response.status == http_client.OK and
            response.get(_METADATA_FLAVOR_HEADER) == _DESIRED_METADATA_FLAVOR)
    except socket.error:  # socket.timeout or socket.error(64, 'Host is down')
        logger.info('Timeout attempting to reach GCE metadata service.')
        return False
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def do_GET(self):
        """Handle a GET request.

        Parses the query parameters and prints a message
        if the flow has completed. Note that we can't detect
        if an error occurred.
        """
        self.send_response(http_client.OK)
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        parts = urllib.parse.urlparse(self.path)
        query = _helpers.parse_unique_urlencoded(parts.query)
        self.server.query_params = query
        self.wfile.write(
            b'<html><head><title>Authentication Status</title></head>')
        self.wfile.write(
            b'<body><p>The authentication flow has completed.</p>')
        self.wfile.write(b'</body></html>')
项目:ironic-tempest-plugin    作者:openstack    | 项目源码 | 文件源码
def _list_request(self, resource, permanent=False, headers=None,
                      extra_headers=False, **kwargs):
        """Get the list of objects of the specified type.

        :param resource: The name of the REST resource, e.g., 'nodes'.
        :param headers: List of headers to use in request.
        :param extra_headers: Specify whether to use headers.
        :param **kwargs: Parameters for the request.
        :returns: A tuple with the server response and deserialized JSON list
                 of objects

        """
        uri = self._get_uri(resource, permanent=permanent)
        if kwargs:
            uri += "?%s" % urllib.urlencode(kwargs)

        resp, body = self.get(uri, headers=headers,
                              extra_headers=extra_headers)
        self.expected_success(http_client.OK, resp.status)

        return resp, self.deserialize(body)
项目:ironic-tempest-plugin    作者:openstack    | 项目源码 | 文件源码
def _show_request(self,
                      resource,
                      uuid=None,
                      permanent=False,
                      **kwargs):
        """Gets a specific object of the specified type.

        :param uuid: Unique identifier of the object in UUID format.
        :returns: Serialized object as a dictionary.

        """
        if 'uri' in kwargs:
            uri = kwargs['uri']
        else:
            uri = self._get_uri(resource, uuid=uuid, permanent=permanent)
        resp, body = self.get(uri)
        self.expected_success(http_client.OK, resp.status)

        return resp, self.deserialize(body)
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def test_token_info(self):
        credentials = gce.AppAssertionCredentials([])
        http = transport.get_http_object()

        # First refresh to get the access token.
        self.assertIsNone(credentials.access_token)
        credentials.refresh(http)
        self.assertIsNotNone(credentials.access_token)

        # Then check the access token against the token info API.
        query_params = {'access_token': credentials.access_token}
        token_uri = (oauth2client.GOOGLE_TOKEN_INFO_URI + '?' +
                     urllib.parse.urlencode(query_params))
        response, content = transport.request(http, token_uri)
        self.assertEqual(response.status, http_client.OK)

        content = content.decode('utf-8')
        payload = json.loads(content)
        self.assertEqual(payload['access_type'], 'offline')
        self.assertLessEqual(int(payload['expires_in']), 3600)
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def test_has_credentials_in_storage(self, OAuth2Credentials):
        request = self.factory.get('/test')
        request.session = mock.Mock()

        credentials_mock = mock.Mock(
            scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
        credentials_mock.has_scopes.return_value = True
        credentials_mock.invalid = False
        credentials_mock.scopes = set([])
        OAuth2Credentials.from_json.return_value = credentials_mock

        @decorators.oauth_enabled
        def test_view(request):
            return http.HttpResponse('test')

        response = test_view(request)
        self.assertEqual(response.status_code, http_client.OK)
        self.assertEqual(response.content, b'test')
        self.assertTrue(request.oauth.has_credentials())
        self.assertIsNotNone(request.oauth.http)
        self.assertSetEqual(
            request.oauth.scopes,
            set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def test_specified_scopes(self, dictionary_storage_mock):
        request = self.factory.get('/test')
        request.session = mock.Mock()

        credentials_mock = mock.Mock(
            scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
        credentials_mock.has_scopes = mock.Mock(return_value=True)
        credentials_mock.is_valid = True
        dictionary_storage_mock.get.return_value = credentials_mock

        @decorators.oauth_enabled(scopes=['additional-scope'])
        def test_view(request):
            return http.HttpResponse('hello world')  # pragma: NO COVER

        response = test_view(request)
        self.assertEqual(response.status_code, http_client.OK)
        self.assertIsNotNone(request.oauth)
        self.assertFalse(request.oauth.has_credentials())
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def test_get_token_success(self, now):
        http = request_mock(
            http_client.OK,
            'application/json',
            json.dumps({'access_token': 'a', 'expires_in': 100})
        )
        token, expiry = _metadata.get_token(http=http)
        self.assertEqual(token, 'a')
        self.assertEqual(
            expiry, datetime.datetime.min + datetime.timedelta(seconds=100))
        # Verify mocks.
        now.assert_called_once_with()
        self.assertEqual(http.requests, 1)
        self.assertEqual(http.uri, EXPECTED_URL + '/token')
        self.assertEqual(http.method, 'GET')
        self.assertIsNone(http.body)
        self.assertEqual(http.headers, _metadata.METADATA_HEADERS)
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def test_exchange_using_authorization_header(self):
        auth_header = 'Basic Y2xpZW50X2lkKzE6c2Vjexc_managerV0KzE=',
        flow = client.OAuth2WebServerFlow(
            client_id='client_id+1',
            authorization_header=auth_header,
            scope='foo',
            redirect_uri=client.OOB_CALLBACK_URN,
            user_agent='unittest-sample/1.0',
            revoke_uri='dummy_revoke_uri',
        )
        http = http_mock.HttpMockSequence([
            ({'status': http_client.OK}, b'access_token=SlAV32hkKG'),
        ])

        credentials = flow.step2_exchange(code='some random code', http=http)
        self.assertEqual('SlAV32hkKG', credentials.access_token)

        self.assertEqual(len(http.requests), 1)
        test_request = http.requests[0]
        # Did we pass the Authorization header?
        self.assertEqual(test_request['headers']['Authorization'], auth_header)
        # Did we omit client_secret from POST body?
        self.assertTrue('client_secret' not in test_request['body'])
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def __init__(self, headers=None, data=None):
        """HttpMock constructor.

        Args:
            headers: dict, header to return with response
        """
        if headers is None:
            headers = {'status': http_client.OK}
        self.data = data
        self.response_headers = headers
        self.headers = None
        self.uri = None
        self.method = None
        self.body = None
        self.headers = None
        self.requests = 0
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def _detect_gce_environment():
    """Determine if the current environment is Compute Engine.

    Returns:
        Boolean indicating whether or not the current environment is Google
        Compute Engine.
    """
    # NOTE: The explicit ``timeout`` is a workaround. The underlying
    #       issue is that resolving an unknown host on some networks will take
    #       20-30 seconds; making this timeout short fixes the issue, but
    #       could lead to false negatives in the event that we are on GCE, but
    #       the metadata resolution was particularly slow. The latter case is
    #       "unlikely".
    http = transport.get_http_object(timeout=GCE_METADATA_TIMEOUT)
    try:
        response, _ = transport.request(
            http, _GCE_METADATA_URI, headers=_GCE_HEADERS)
        return (
            response.status == http_client.OK and
            response.get(_METADATA_FLAVOR_HEADER) == _DESIRED_METADATA_FLAVOR)
    except socket.error:  # socket.timeout or socket.error(64, 'Host is down')
        logger.info('Timeout attempting to reach GCE metadata service.')
        return False
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def do_GET(self):
        """Handle a GET request.

        Parses the query parameters and prints a message
        if the flow has completed. Note that we can't detect
        if an error occurred.
        """
        self.send_response(http_client.OK)
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        parts = urllib.parse.urlparse(self.path)
        query = _helpers.parse_unique_urlencoded(parts.query)
        self.server.query_params = query
        self.wfile.write(
            b'<html><head><title>Authentication Status</title></head>')
        self.wfile.write(
            b'<body><p>The authentication flow has completed.</p>')
        self.wfile.write(b'</body></html>')
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def test_has_credentials_in_storage(self, OAuth2Credentials):
        request = self.factory.get('/test')
        request.session = mock.Mock()

        credentials_mock = mock.Mock(
            scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
        credentials_mock.has_scopes.return_value = True
        credentials_mock.invalid = False
        credentials_mock.scopes = set([])
        OAuth2Credentials.from_json.return_value = credentials_mock

        @decorators.oauth_enabled
        def test_view(request):
            return http.HttpResponse('test')

        response = test_view(request)
        self.assertEqual(response.status_code, http_client.OK)
        self.assertEqual(response.content, b'test')
        self.assertTrue(request.oauth.has_credentials())
        self.assertIsNotNone(request.oauth.http)
        self.assertSetEqual(
            request.oauth.scopes,
            set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def test_specified_scopes(self, dictionary_storage_mock):
        request = self.factory.get('/test')
        request.session = mock.Mock()

        credentials_mock = mock.Mock(
            scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
        credentials_mock.has_scopes = mock.Mock(return_value=True)
        credentials_mock.is_valid = True
        dictionary_storage_mock.get.return_value = credentials_mock

        @decorators.oauth_enabled(scopes=['additional-scope'])
        def test_view(request):
            return http.HttpResponse('hello world')  # pragma: NO COVER

        response = test_view(request)
        self.assertEqual(response.status_code, http_client.OK)
        self.assertIsNotNone(request.oauth)
        self.assertFalse(request.oauth.has_credentials())
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def test_has_credentials_in_storage(self, UserOAuth2):
        request = self.factory.get('/test')
        request.session = mock.Mock()

        @decorators.oauth_required
        def test_view(request):
            return http.HttpResponse("test")

        my_user_oauth = mock.Mock()

        UserOAuth2.return_value = my_user_oauth
        my_user_oauth.has_credentials.return_value = True

        response = test_view(request)
        self.assertEqual(response.status_code, http_client.OK)
        self.assertEqual(response.content, b"test")
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def test_get_token_success(self, now):
        http = request_mock(
            http_client.OK,
            'application/json',
            json.dumps({'access_token': 'a', 'expires_in': 100})
        )
        token, expiry = _metadata.get_token(http=http)
        self.assertEqual(token, 'a')
        self.assertEqual(
            expiry, datetime.datetime.min + datetime.timedelta(seconds=100))
        # Verify mocks.
        now.assert_called_once_with()
        self.assertEqual(http.requests, 1)
        self.assertEqual(http.uri, EXPECTED_URL + '/token')
        self.assertEqual(http.method, 'GET')
        self.assertIsNone(http.body)
        self.assertEqual(http.headers, _metadata.METADATA_HEADERS)
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def test_exchange_with_pkce(self):
        http = http_mock.HttpMockSequence([
            ({'status': http_client.OK}, b'access_token=SlAV32hkKG'),
        ])
        flow = client.OAuth2WebServerFlow(
            'client_id+1',
            scope='foo',
            redirect_uri='http://example.com',
            pkce=True,
            code_verifier=self.good_verifier)
        flow.step2_exchange(code='some random code', http=http)

        self.assertEqual(len(http.requests), 1)
        test_request = http.requests[0]
        self.assertIn(
            'code_verifier={0}'.format(self.good_verifier.decode()),
            test_request['body'])
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def test_exchange_using_authorization_header(self):
        auth_header = 'Basic Y2xpZW50X2lkKzE6c2Vjexc_managerV0KzE=',
        flow = client.OAuth2WebServerFlow(
            client_id='client_id+1',
            authorization_header=auth_header,
            scope='foo',
            redirect_uri=client.OOB_CALLBACK_URN,
            user_agent='unittest-sample/1.0',
            revoke_uri='dummy_revoke_uri',
        )
        http = http_mock.HttpMockSequence([
            ({'status': http_client.OK}, b'access_token=SlAV32hkKG'),
        ])

        credentials = flow.step2_exchange(code='some random code', http=http)
        self.assertEqual('SlAV32hkKG', credentials.access_token)

        self.assertEqual(len(http.requests), 1)
        test_request = http.requests[0]
        # Did we pass the Authorization header?
        self.assertEqual(test_request['headers']['Authorization'], auth_header)
        # Did we omit client_secret from POST body?
        self.assertTrue('client_secret' not in test_request['body'])
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def __init__(self, headers=None, data=None):
        """HttpMock constructor.

        Args:
            headers: dict, header to return with response
        """
        if headers is None:
            headers = {'status': http_client.OK}
        self.data = data
        self.response_headers = headers
        self.headers = None
        self.uri = None
        self.method = None
        self.body = None
        self.headers = None
        self.requests = 0
项目:valence    作者:openstack    | 项目源码 | 文件源码
def test_allocate_node_conflict(self, mock_request, mock_get_url):
        """Test allocate resource conflict when compose node"""
        mock_get_url.return_value = '/redfish/v1/Nodes'

        # Fake response for getting nodes root
        fake_node_root_resp = fakes.mock_request_get(fakes.fake_nodes_root(),
                                                     http_client.OK)
        # Fake response for allocating node
        fake_node_allocation_conflict = \
            fakes.mock_request_get(fakes.fake_allocate_node_conflict(),
                                   http_client.CONFLICT)
        mock_request.side_effect = [fake_node_root_resp,
                                    fake_node_allocation_conflict]

        with self.assertRaises(exception.RedfishException) as context:
            redfish.compose_node({"name": "test_node"})

        self.assertTrue("There are no computer systems available for this "
                        "allocation request." in str(context.exception.detail))
项目:valence    作者:openstack    | 项目源码 | 文件源码
def test_set_boot_source_success(self, mock_get_url, mock_request):
        """Test successfully reset node status"""
        mock_get_url.return_value = '/redfish/v1/Nodes'
        fake_node_detail = fakes.mock_request_get(
            fakes.fake_node_detail(), http_client.OK)
        fake_node_action_resp = fakes.mock_request_get(
            {}, http_client.NO_CONTENT)
        mock_request.side_effect = [fake_node_detail, fake_node_action_resp]

        result = redfish.set_boot_source(
            "1", {"Boot": {"Enabled": "Once", "Target": "Hdd"}})
        expected = exception.confirmation(
            confirm_code="Set Boot Source of Composed Node",
            confirm_detail="The boot source of composed node has been set to "
                           "'{0}' with enabled state '{1}' successfully."
                           .format("Hdd", "Once"))

        self.assertEqual(expected, result)
项目:valence    作者:openstack    | 项目源码 | 文件源码
def test_podmanager_create(self, pstatus_mock, plist_mock, pcreate_mock):
        pstatus_mock.return_value = constants.PODM_STATUS_ONLINE
        plist_mock.return_value = []
        values = {
            "name": "podm_name",
            "url": "https://10.240.212.123",
            "authentication": [
                {
                    "type": "basic",
                    "auth_items":
                    {
                        "username": "xxxxxxx",
                        "password": "xxxxxxx"
                    }
                }]
            }
        result = copy.deepcopy(values)
        result['status'] = constants.PODM_STATUS_ONLINE
        pcreate_mock.return_value = result
        response = self.app.post('/v1/pod_managers',
                                 content_type='application/json',
                                 data=json.dumps(values))
        self.assertEqual(http_client.OK, response.status_code)
项目:valence    作者:openstack    | 项目源码 | 文件源码
def test_compose_request_using_properties(self, mock_compose, mock_conn):
        req = {
            "name": "test_request",
            "podm_id": "test-podm",
            "properties": {
                "memory": {
                    "capacity_mib": "4000",
                    "type": "DDR3"
                },
                "processor": {
                    "model": "Intel",
                    "total_cores": "4"
                }
            }
        }
        mock_compose.return_value = req
        resp = self.app.post('/v1/nodes',
                             content_type='application/json',
                             data=json.dumps(req))
        mock_conn.assert_called_once_with('test-podm')
        self.assertEqual(http_client.OK, resp.status_code)
项目:valence    作者:openstack    | 项目源码 | 文件源码
def show_cpu_details(cpu_url):
    """Get processor details .

    :param cpu_url: relative redfish url to processor,
                    e.g /redfish/v1/Systems/1/Processors/1.
    :returns: dict of processor detail.
    """
    resp = send_request(cpu_url)
    if resp.status_code != http_client.OK:
        # Raise exception if don't find processor
        raise exception.RedfishException(resp.json(),
                                         status_code=resp.status_code)
    respdata = resp.json()
    cpu_details = {
        "instruction_set": respdata.get("InstructionSet"),
        "model": respdata.get("Model"),
        "speed_mhz": respdata.get("MaxSpeedMHz"),
        "total_core": respdata.get("TotalCores")
    }

    return cpu_details
项目:valence    作者:openstack    | 项目源码 | 文件源码
def show_ram_details(ram_url):
    """Get memory details .

    :param ram_url: relative redfish url to memory,
                    e.g /redfish/v1/Systems/1/Memory/1.
    :returns: dict of memory detail.
    """
    resp = send_request(ram_url)
    if resp.status_code != http_client.OK:
        # Raise exception if don't find memory
        raise exception.RedfishException(resp.json(),
                                         status_code=resp.status_code)
    respdata = resp.json()
    ram_details = {
        "data_width_bit": respdata.get("DataWidthBits"),
        "speed_mhz": respdata.get("OperatingSpeedMhz"),
        "total_memory_mb": respdata.get("CapacityMiB")
    }

    return ram_details
项目:python-valenceclient    作者:openstack    | 项目源码 | 文件源码
def test_http_request_client_success(self):
        kwargs = {"valence_url": "http://localhost/"}
        client = http.HTTPClient(**kwargs)
        resp = utils.mockSessionResponse(
            {'content-type': 'test/plain'},
            'Body',
            version=1,
            status_code=http_client.OK)

        with mock.patch.object(client, 'session',
                               autospec=True) as mock_session:
            mock_session.request.side_effect = iter([resp])
            response, body_iter = client.json_request('GET',
                                                      '/redfish/v1/Nodes')

        self.assertEqual(http_client.OK, response.status_code)
项目:column    作者:vmware    | 项目源码 | 文件源码
def test_get_run_by_id(self):
        response = self.app.get('/runs/1234')
        self.assertEqual(http_client.NOT_FOUND, response.status_code)

        pb = 'tests/fixtures/playbooks/hello_world.yml'
        response = self.app.post(
            '/runs',
            data=json.dumps(dict(playbook_path=pb,
                                 inventory_file='localhosti,',
                                 options={'connection': 'local'})),
            content_type='application/json')
        res_dict = json.loads(response.data)
        run_id = res_dict['id']
        self.assertEqual(http_client.OK, response.status_code)
        response = self.app.get('/runs/{}'.format(run_id))
        self.assertEqual(http_client.OK, response.status_code)
        self._wait_for_run_complete(run_id)
项目:column    作者:vmware    | 项目源码 | 文件源码
def test_get_run_list(self):
        pb = 'tests/fixtures/playbooks/hello_world.yml'
        response = self.app.post(
            '/runs',
            data=json.dumps(dict(playbook_path=pb,
                                 inventory_file='localhost,',
                                 options={'connection': 'local'})),
            content_type='application/json')
        res_dict = json.loads(response.data)
        run_id = res_dict['id']
        self.assertEqual(http_client.OK, response.status_code)
        response = self.app.get('/runs')
        res_list = json.loads(response.data)
        found = False
        for item in res_list:
            if item['id'] == run_id:
                found = True
                break
        self.assertEqual(True, found)
        self._wait_for_run_complete(run_id)
项目:column    作者:vmware    | 项目源码 | 文件源码
def test_delete_running_job(self):
        pb = 'tests/fixtures/playbooks/hello_world_with_sleep.yml'
        response = self.app.post(
            '/runs',
            data=json.dumps(dict(playbook_path=pb,
                                 inventory_file='localhost,',
                                 options={'connection': 'local',
                                          'subset': None})),
            content_type='application/json')
        res_dict = json.loads(response.data)
        self.assertEqual(http_client.OK, response.status_code)
        response = self.app.delete('/runs/{}'.format(res_dict['id']))
        self.assertEqual(http_client.OK, response.status_code)
        response = self.app.get('/runs/{}'.format(res_dict['id']))
        res_dict = json.loads(response.data)
        self.assertEqual('ABORTED', res_dict['state'])
        self._wait_for_run_complete(res_dict['id'])
项目:deb-python-proliantutils    作者:openstack    | 项目源码 | 文件源码
def validate_href(image_href):
        """Validate HTTP image reference.

        :param image_href: Image reference.
        :raises: exception.ImageRefValidationFailed if HEAD request failed or
            returned response code not equal to 200.
        :returns: Response to HEAD request.
        """
        try:
            response = requests.head(image_href)
            if response.status_code != http_client.OK:
                raise exception.ImageRefValidationFailed(
                    image_href=image_href,
                    reason=("Got HTTP code %s instead of 200 in response to "
                            "HEAD request." % response.status_code))
        except requests.RequestException as e:
            raise exception.ImageRefValidationFailed(image_href=image_href,
                                                     reason=e)
        return response
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def test_get_state(self):
        """Test getting an instance state."""
        self.impl.get.return_value = {
            'name': 'foo.bar#0000000001', 'host': 'baz1', 'state': 'running'
        }

        resp = self.client.get('/state/foo.bar#0000000001')
        resp_json = b''.join(resp.response)
        self.assertEqual(json.loads(resp_json.decode()), {
            'name': 'foo.bar#0000000001', 'oom': None, 'signal': None,
            'expires': None, 'when': None, 'host': 'baz1',
            'state': 'running', 'exitcode': None
        })
        self.assertEqual(resp.status_code, http_client.OK)
        self.impl.get.assert_called_with('foo.bar#0000000001')

        self.impl.get.return_value = None
        resp = self.client.get('/state/foo.bar#0000000002')
        self.assertEqual(resp.status_code, http_client.NOT_FOUND)
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def test_get_server_list(self):
        """Test getting a list of servers."""
        server_list = [
            {'cell': 'foo', 'traits': [], '_id': 'server1', 'data': []},
            {'cell': 'bar', 'traits': [], '_id': 'server2', 'data': []}
        ]
        self.impl.list.return_value = server_list

        resp = self.client.get('/server/')

        self.assertEqual(''.join(resp.response), json.dumps(server_list))
        self.assertEqual(resp.status_code, http_client.OK)
        self.impl.list.assert_called_with(None, None)

        resp = self.client.get('/server/?cell=foo')
        self.assertEqual(resp.status_code, http_client.OK)
        self.impl.list.assert_called_with('foo', None)

        resp = self.client.get('/server/?partition=baz')
        self.assertEqual(resp.status_code, http_client.OK)
        self.impl.list.assert_called_with(None, 'baz')

        resp = self.client.get('/server/?cell=foo&partition=baz')
        self.assertEqual(resp.status_code, http_client.OK)
        self.impl.list.assert_called_with('foo', 'baz')
项目:mogan    作者:openstack    | 项目源码 | 文件源码
def test_replace_singular(self, mock_utcnow):
        description = 'server-new-description'
        test_time = datetime.datetime(2000, 1, 1, 0, 0)

        mock_utcnow.return_value = test_time
        response = self.patch_json('/servers/%s' % self.server.uuid,
                                   [{'path': '/description',
                                     'value': description, 'op': 'replace'}],
                                   headers=self.headers)
        self.assertEqual('application/json', response.content_type)
        self.assertEqual(http_client.OK, response.status_code)
        result = self.get_json('/servers/%s' % self.server.uuid,
                               headers=self.headers)
        self.assertEqual(description, result['description'])
        return_updated_at = timeutils.parse_isotime(
            result['updated_at']).replace(tzinfo=None)
        self.assertEqual(test_time, return_updated_at)
项目:mogan    作者:openstack    | 项目源码 | 文件源码
def test_replace_multi(self):
        extra = {"foo1": "bar1", "foo2": "bar2", "foo3": "bar3"}
        uuid = uuidutils.generate_uuid()
        server = utils.create_test_server(name='test1', uuid=uuid,
                                          extra=extra)
        new_value = 'new value'
        response = self.patch_json('/servers/%s' % server.uuid,
                                   [{'path': '/metadata/foo2',
                                     'value': new_value, 'op': 'replace'}],
                                   headers=self.headers)
        self.assertEqual('application/json', response.content_type)
        self.assertEqual(http_client.OK, response.status_code)
        result = self.get_json('/servers/%s' % server.uuid,
                               headers=self.headers)

        extra["foo2"] = new_value
        self.assertEqual(extra, result['metadata'])
项目:mogan    作者:openstack    | 项目源码 | 文件源码
def test_remove_singular(self):
        uuid = uuidutils.generate_uuid()
        server = utils.create_test_server(name='test2', uuid=uuid,
                                          extra={'a': 'b'})
        response = self.patch_json('/servers/%s' % server.uuid,
                                   [{'path': '/description', 'op': 'remove'}],
                                   headers=self.headers)
        self.assertEqual('application/json', response.content_type)
        self.assertEqual(http_client.OK, response.status_code)
        result = self.get_json('/servers/%s' % server.uuid,
                               headers=self.headers)
        self.assertIsNone(result['description'])

        # Assert nothing else was changed
        self.assertEqual(server.uuid, result['uuid'])
        self.assertEqual(server.extra, result['metadata'])
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def _get_service_account_email(http_request=None):
    """Get the GCE service account email from the current environment.

    Args:
        http_request: callable, (Optional) a callable that matches the method
                      signature of httplib2.Http.request, used to make
                      the request to the metadata service.

    Returns:
        tuple, A pair where the first entry is an optional response (from a
        failed request) and the second is service account email found (as
        a string).
    """
    if http_request is None:
        http_request = httplib2.Http().request
    response, content = http_request(
        _DEFAULT_EMAIL_METADATA, headers={'Metadata-Flavor': 'Google'})
    if response.status == http_client.OK:
        content = _from_bytes(content)
        return None, content
    else:
        return response, content
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def do_GET(self):
        """Handle a GET request.

        Parses the query parameters and prints a message
        if the flow has completed. Note that we can't detect
        if an error occurred.
        """
        self.send_response(http_client.OK)
        self.send_header("Content-type", "text/html")
        self.end_headers()
        query = self.path.split('?', 1)[-1]
        query = dict(urllib.parse.parse_qsl(query))
        self.server.query_params = query
        self.wfile.write(
            b"<html><head><title>Authentication Status</title></head>")
        self.wfile.write(
            b"<body><p>The authentication flow has completed.</p>")
        self.wfile.write(b"</body></html>")
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def _get_service_account_email(http_request=None):
    """Get the GCE service account email from the current environment.

    Args:
        http_request: callable, (Optional) a callable that matches the method
                      signature of httplib2.Http.request, used to make
                      the request to the metadata service.

    Returns:
        tuple, A pair where the first entry is an optional response (from a
        failed request) and the second is service account email found (as
        a string).
    """
    if http_request is None:
        http_request = httplib2.Http().request
    response, content = http_request(
        _DEFAULT_EMAIL_METADATA, headers={'Metadata-Flavor': 'Google'})
    if response.status == http_client.OK:
        content = _from_bytes(content)
        return None, content
    else:
        return response, content
项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def remote_image_exist(registry, image, tag):
    urllib3.disable_warnings()
    url = IMAGE_TAGS_URL % dict(registry=registry, image=image)
    response = requests.get(url=url, verify=False)

    if response.status_code != http_client.OK:
        return False

    info = response.json()
    return tag in info['tags']
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _do_revoke(self, http_request, token):
        """Revokes this credential and deletes the stored copy (if it exists).

        Args:
            http_request: callable, a callable that matches the method
                          signature of httplib2.Http.request, used to make the
                          refresh request.
            token: A string used as the token to be revoked. Can be either an
                   access_token or refresh_token.

        Raises:
            TokenRevokeError: If the revoke request does not return with a
                              200 OK.
        """
        logger.info('Revoking token')
        query_params = {'token': token}
        token_revoke_uri = _update_query_params(self.revoke_uri, query_params)
        resp, content = http_request(token_revoke_uri)
        if resp.status == http_client.OK:
            self.invalid = True
        else:
            error_msg = 'Invalid response %s.' % resp.status
            try:
                d = json.loads(_from_bytes(content))
                if 'error' in d:
                    error_msg = d['error']
            except (TypeError, ValueError):
                pass
            raise TokenRevokeError(error_msg)

        if self.store:
            self.store.delete()
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _do_retrieve_scopes(self, http_request, token):
        """Retrieves the list of authorized scopes from the OAuth2 provider.

        Args:
            http_request: callable, a callable that matches the method
                          signature of httplib2.Http.request, used to make the
                          refresh request.
            token: A string used as the token to identify the credentials to
                   the provider.

        Raises:
            Error: When refresh fails, indicating the the access token is
                   invalid.
        """
        logger.info('Refreshing scopes')
        query_params = {'access_token': token, 'fields': 'scope'}
        token_info_uri = _update_query_params(self.token_info_uri,
                                              query_params)
        resp, content = http_request(token_info_uri)
        content = _from_bytes(content)
        if resp.status == http_client.OK:
            d = json.loads(content)
            self.scopes = set(util.string_to_scopes(d.get('scope', '')))
        else:
            error_msg = 'Invalid response %s.' % (resp.status,)
            try:
                d = json.loads(content)
                if 'error_description' in d:
                    error_msg = d['error_description']
            except (TypeError, ValueError):
                pass
            raise Error(error_msg)
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _detect_gce_environment():
    """Determine if the current environment is Compute Engine.

    Returns:
        Boolean indicating whether or not the current environment is Google
        Compute Engine.
    """
    # NOTE: The explicit ``timeout`` is a workaround. The underlying
    #       issue is that resolving an unknown host on some networks will take
    #       20-30 seconds; making this timeout short fixes the issue, but
    #       could lead to false negatives in the event that we are on GCE, but
    #       the metadata resolution was particularly slow. The latter case is
    #       "unlikely".
    connection = six.moves.http_client.HTTPConnection(
        _GCE_METADATA_HOST, timeout=1)

    try:
        headers = {_METADATA_FLAVOR_HEADER: _DESIRED_METADATA_FLAVOR}
        connection.request('GET', '/', headers=headers)
        response = connection.getresponse()
        if response.status == http_client.OK:
            return (response.getheader(_METADATA_FLAVOR_HEADER) ==
                    _DESIRED_METADATA_FLAVOR)
    except socket.error:  # socket.timeout or socket.error(64, 'Host is down')
        logger.info('Timeout attempting to reach GCE metadata service.')
        return False
    finally:
        connection.close()
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def verify_id_token(id_token, audience, http=None,
                    cert_uri=ID_TOKEN_VERIFICATION_CERTS):
    """Verifies a signed JWT id_token.

    This function requires PyOpenSSL and because of that it does not work on
    App Engine.

    Args:
        id_token: string, A Signed JWT.
        audience: string, The audience 'aud' that the token should be for.
        http: httplib2.Http, instance to use to make the HTTP request. Callers
              should supply an instance that has caching enabled.
        cert_uri: string, URI of the certificates in JSON format to
                  verify the JWT against.

    Returns:
        The deserialized JSON in the JWT.

    Raises:
        oauth2client.crypt.AppIdentityError: if the JWT fails to verify.
        CryptoUnavailableError: if no crypto library is available.
    """
    _RequireCryptoOrDie()
    if http is None:
        http = _cached_http

    resp, content = http.request(cert_uri)
    if resp.status == http_client.OK:
        certs = json.loads(_from_bytes(content))
        return crypt.verify_signed_jwt_with_certs(id_token, certs, audience)
    else:
        raise VerifyJwtTokenError('Status code: %d' % resp.status)
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _refresh(self, http_request):
        """Refreshes the access_token.

        Skip all the storage hoops and just refresh using the API.

        Args:
            http_request: callable, a callable that matches the method
                          signature of httplib2.Http.request, used to make
                          the refresh request.

        Raises:
            HttpAccessTokenRefreshError: When the refresh fails.
        """
        query = '?scope=%s' % urllib.parse.quote(self.scope, '')
        uri = META.replace('{?scope}', query)
        response, content = http_request(
            uri, headers={'Metadata-Flavor': 'Google'})
        content = _from_bytes(content)
        if response.status == http_client.OK:
            try:
                token_content = json.loads(content)
            except Exception as e:
                raise HttpAccessTokenRefreshError(str(e),
                                                  status=response.status)
            self.access_token = token_content['access_token']
        else:
            if response.status == http_client.NOT_FOUND:
                content += (' This can occur if a VM was created'
                            ' with no service account or scopes.')
            raise HttpAccessTokenRefreshError(content, status=response.status)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def _update_image_meta_v2(conn, extra_headers, properties, patch_path):
    # NOTE(sirp): There is some confusion around OVF. Here's a summary
    # of where we currently stand:
    #   1. OVF as a container format is misnamed. We really should be
    #      using OVA since that is the name for the container format;
    #      OVF is the standard applied to the manifest file contained
    #      within.
    #   2. We're currently uploading a vanilla tarball. In order to be
    #      OVF/OVA compliant, we'll need to embed a minimal OVF
    #      manifest as the first file.
    body = [
        {"path": "/container_format", "value": "ovf", "op": "add"},
        {"path": "/disk_format", "value": "vhd", "op": "add"},
        {"path": "/visibility", "value": "private", "op": "add"}]

    headers = {'Content-Type': 'application/openstack-images-v2.1-json-patch'}
    headers.update(**extra_headers)

    for key, value in properties.items():
        prop = {"path": "/%s" % key.replace('_', '-'),
                "value": str(value),
                "op": "add"}
        body.append(prop)
    body = json.dumps(body)

    conn.request('PATCH', patch_path, body=body, headers=headers)
    resp = conn.getresponse()
    resp.read()

    if resp.status == httplib.OK:
        return
    logging.error("Image meta was not updated. Status: %s, Reason: %s" % (
        resp.status, resp.reason))
项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def after(self, state):
        # Omit empty body. Some errors may not have body at this level yet.
        if not state.response.body:
            return

        # Do nothing if there is no error.
        # Status codes in the range 200 (OK) to 399 (400 = BAD_REQUEST) are not
        # an error.
        if (http_client.OK <= state.response.status_int <
                http_client.BAD_REQUEST):
            return

        json_body = state.response.json
        # Do not remove traceback when traceback config is set
        if cfg.CONF.debug_tracebacks_in_api:
            return

        faultstring = json_body.get('faultstring')
        traceback_marker = 'Traceback (most recent call last):'
        if faultstring and traceback_marker in faultstring:
            # Cut-off traceback.
            faultstring = faultstring.split(traceback_marker, 1)[0]
            # Remove trailing newlines and spaces if any.
            json_body['faultstring'] = faultstring.rstrip()
            # Replace the whole json. Cannot change original one because it's
            # generated on the fly.
            state.response.json = json_body
项目:python-ecsclient    作者:EMCECS    | 项目源码 | 文件源码
def test_list_nodes(self, mock_get_token):
        mock_get_token.return_value = 'FAKE-TOKEN-123'
        self.requests_mock.register_uri('GET', 'https://127.0.0.1:4443/vdc/nodes',
                                        status_code=http_client.OK,
                                        json=self.returned_json)

        response = self.client.node.list()

        self.assertEqual(self.requests_mock.last_request.method, 'GET')
        self.assertEqual(self.requests_mock.last_request.url, 'https://127.0.0.1:4443/vdc/nodes')
        self.assertEqual(self.requests_mock.last_request.headers['x-sds-auth-token'], 'FAKE-TOKEN-123')
        self.assertEqual(response, self.returned_json)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def _do_revoke(self, http, token):
        """Revokes this credential and deletes the stored copy (if it exists).

        Args:
            http: an object to be used to make HTTP requests.
            token: A string used as the token to be revoked. Can be either an
                   access_token or refresh_token.

        Raises:
            TokenRevokeError: If the revoke request does not return with a
                              200 OK.
        """
        logger.info('Revoking token')
        query_params = {'token': token}
        token_revoke_uri = _helpers.update_query_params(
            self.revoke_uri, query_params)
        resp, content = transport.request(http, token_revoke_uri)
        if resp.status == http_client.METHOD_NOT_ALLOWED:
            body = urllib.parse.urlencode(query_params)
            resp, content = transport.request(http, token_revoke_uri,
                                              method='POST', body=body)
        if resp.status == http_client.OK:
            self.invalid = True
        else:
            error_msg = 'Invalid response {0}.'.format(resp.status)
            try:
                d = json.loads(_helpers._from_bytes(content))
                if 'error' in d:
                    error_msg = d['error']
            except (TypeError, ValueError):
                pass
            raise TokenRevokeError(error_msg)

        if self.store:
            self.store.delete()