我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用requests.Response()。
def get_response_and_time(self, key, default=(None, None)): """ Retrieves response and timestamp for `key` if it's stored in cache, otherwise returns `default` :param key: key of resource :param default: return this if `key` not found in cache :returns: tuple (response, datetime) .. note:: Response is restored after unpickling with :meth:`restore_response` """ try: if key not in self.responses: key = self.keys_map[key] response, timestamp = self.responses[key] except KeyError: return default return self.restore_response(response), timestamp
def put(self, url, data, headers=None, ok_status_code=requests.codes.ok, auto_renew=True): """ Perform a HTTP PUT request. :param str url: URL of the HTTP request. :param bytes | None data: Binary data to send in the request body. :param dict | None headers: Additional headers for the HTTP request. :param int ok_status_code: (Optional) Expected status code for the HTTP response. :param True | False auto_renew: (Optional) If True, auto recover from expired token error or Internet failure. :rtype: requests.Response """ params = { 'proxies': self.proxies, 'data': data } if headers is not None: params['headers'] = headers return self.request('put', url, params=params, ok_status_code=ok_status_code, auto_renew=auto_renew)
def save_response(self, key, response): """ Save response to cache :param key: key for this response :param response: response to save .. note:: Response is reduced before saving (with :meth:`reduce_response`) to make it picklable """ # # Delete the X-Auth-Token # if 'X-Auth-Token' in response.request.headers: del response.request.headers['X-Auth-Token'] self.responses[key] = response
def get_response(self, key, default=None): """ Retrieves response and timestamp for `key` if it's stored in cache, otherwise returns `default` :param key: key of resource :param default: return this if `key` not found in cache :returns: tuple (response, datetime) .. note:: Response is restored after unpickling with :meth:`restore_response` """ try: if key not in self.responses: key = self.keys_map[key] response = self.responses[key] except KeyError: return default return response
def restore_response(self, response, seen=None): """ Restore response object after unpickling """ if seen is None: seen = {} try: return seen[id(response)] except KeyError: pass result = requests.Response() for field in self._response_attrs: setattr(result, field, getattr(response, field, None)) result.raw._cached_content_ = result.content seen[id(response)] = result result.history = tuple(self.restore_response(r, seen) for r in response.history) return result
def test_request_api_server_auth_recover(self, options, config, request, text): driver = self._get_driver(options, config) driver._apiinsecure = False driver._use_api_certs = False response_bad = requests.Response() response_bad.status_code = requests.codes.unauthorized token = "xyztoken" text.return_value = json.dumps({ 'access': { 'token': { 'id': token }, }, }) response_good = requests.Response() response_good.status_code = requests.codes.ok request.side_effect = [response_bad, response_good, response_good] url = "/URL" driver._request_api_server(url) request.assert_called_with('/URL', data=None, headers={'X-AUTH-TOKEN': token})
def test_request_backend(self, options, config): driver = self._get_driver(options, config) driver._relay_request = mock.MagicMock() response = requests.Response() response.status_code = requests.codes.ok driver._relay_request.return_value = response context = mock.MagicMock() data_dict = {} obj_name = 'object' action = 'TEST' code, message = driver._request_backend(context, data_dict, obj_name, action) self.assertEqual(requests.codes.ok, code) self.assertEqual({'message': None}, message) driver._relay_request.assert_called()
def test_asend_returns_appropriate_sorna_response(mocker, req_params, mock_sorna_aresp): req = Request(**req_params) methods = Request._allowed_methods for method in methods: req.method = method mock_reqfunc = mocker.patch.object( aiohttp.ClientSession, method.lower(), new_callable=asynctest.CoroutineMock ) mock_reqfunc.return_value, conf = mock_sorna_aresp resp = await req.asend() assert isinstance(resp, Response) assert resp.status == conf['status'] assert resp.reason == conf['reason'] assert resp.content_type == conf['content_type'] body = await conf['read']() assert resp.content_length == len(body) assert resp.text() == body.decode() assert resp.json() == json.loads(body.decode())
def _parse_response_content_for_predictions(self, question_number, response): """ Parses the json representation of the docs from the HTTP response and returns it as list predictions with scores (and confidences if they exist) :param str question_number: used as qid :param requests.Response response: :return: list of feature vectors :rtype: list(list(str)) """ response_content = response.json() results = [] if response_content['response']['numFound'] > 0: for doc in response_content['response']['docs']: if "ranker.confidence" in doc: conf_score = doc["ranker.confidence"] else: conf_score = None results.append(Prediction(qid=question_number, aid=doc[self.DOC_ID_FIELD_NAME], rank_score=doc['score'], conf_score=conf_score)) else: self.logger.warn('Empty response: %s' % vars(response)) return results
def _parse_response_content_for_predictions(self, question_number, response): """ Parses the json representation of the docs from the HTTP response and returns it as list predictions with scores (and confidences if they exist) :param str question_number: used as qid :param requests.Response response: :return: list of feature vectors :rtype: list(list(str)) """ response_content = response.json() results = [] if response_content["matching_results"] > 0: for doc in response_content['results']: results.append(Prediction(qid=question_number, aid=doc[DOC_ID_FIELD_NAME], rank_score=doc['score'], conf_score=None)) else: self.logger.warn('Empty response: %s' % vars(response)) return results
def get(self, url, params=None, headers=None, ok_status_code=requests.codes.ok, auto_renew=True): """ Perform a HTTP GET request. :param str url: URL of the HTTP request. :param dict[str, T] | None params: (Optional) Dictionary to construct query string. :param dict | None headers: (Optional) Additional headers for the HTTP request. :param int ok_status_code: (Optional) Expected status code for the HTTP response. :param True | False auto_renew: (Optional) If True, auto recover from expired token error or Internet failure. :rtype: requests.Response """ args = {'proxies': self.proxies} if params is not None: args['params'] = params if headers is not None: args['headers'] = headers return self.request('get', url, args, ok_status_code=ok_status_code, auto_renew=auto_renew)
def post(self, url, data=None, json=None, headers=None, ok_status_code=requests.codes.ok, auto_renew=True): """ Perform a HTTP POST request. :param str url: URL of the HTTP request. :param dict | None data: (Optional) Data in POST body of the request. :param dict | None json: (Optional) Send the dictionary as JSON content in POST body and set proper headers. :param dict | None headers: (Optional) Additional headers for the HTTP request. :param int ok_status_code: (Optional) Expected status code for the HTTP response. :param True | False auto_renew: (Optional) If True, auto recover from expired token error or Internet failure. :rtype: requests.Response """ params = { 'proxies': self.proxies } if json is not None: params['json'] = json else: params['data'] = data if headers is not None: params['headers'] = headers return self.request('post', url, params, ok_status_code=ok_status_code, auto_renew=auto_renew)
def mixin_request_id(self, resp): """mixin request id from request response :param request.Response resp: http response """ if isinstance(resp, Response): # Extract 'X-Openstack-Request-Id' from headers if # response is a Response object. request_id = (resp.headers.get('openstack-request-id') or resp.headers.get('x-openstack-request-id') or resp.headers.get('x-compute-request-id')) else: # If resp is of type string or None. request_id = resp self.request_id = request_id
def __init__(self, data): super(TestResponse, self).__init__() self._content_consumed = True if isinstance(data, dict): self.status_code = data.get('status_code', 200) # Fake the text attribute to streamline Response creation text = data.get('text', "") if isinstance(text, (dict, list)): self._content = json.dumps(text) default_headers = { "Content-Type": "application/json", } else: self._content = text default_headers = {} if six.PY3 and isinstance(self._content, six.string_types): self._content = self._content.encode('utf-8', 'strict') self.headers = data.get('headers') or default_headers else: self.status_code = data
def test_request_retry(self, mock_request): """Tests that two connection errors will be handled""" class CustomMock(object): i = 0 def connection_error(self, *args, **kwargs): self.i += 1 if self.i < 3: raise requests.exceptions.ConnectionError else: r = requests.Response() r.status_code = 204 return r mock_request.side_effect = CustomMock().connection_error cli = InfluxDBClient(database='db') cli.write_points( self.dummy_points )
def test_request_retry_raises(self, mock_request): """Tests that three connection errors will not be handled""" class CustomMock(object): i = 0 def connection_error(self, *args, **kwargs): self.i += 1 if self.i < 4: raise requests.exceptions.ConnectionError else: r = requests.Response() r.status_code = 200 return r mock_request.side_effect = CustomMock().connection_error cli = InfluxDBClient(database='db') with self.assertRaises(requests.exceptions.ConnectionError): cli.write_points(self.dummy_points)
def test_add_channel_failed_create_channel(mock_staff_client, mocker): """If client.channels.create fails an exception should be raised""" response_500 = Response() response_500.status_code = statuses.HTTP_500_INTERNAL_SERVER_ERROR mock_staff_client.channels.create.return_value.raise_for_status.side_effect = HTTPError(response=response_500) with pytest.raises(ChannelCreationException) as ex: api.add_channel( Search.from_dict({}), "title", "name", "public_description", "channel_type", 123, 456, ) assert ex.value.args[0] == "Error creating channel name" mock_staff_client.channels.create.return_value.raise_for_status.assert_called_with() assert mock_staff_client.channels.create.call_count == 1 assert PercolateQuery.objects.count() == 0 assert Channel.objects.count() == 0
def test_send_batch_400_no_raise(self, mock_post): """ Test that if raise_for_status is False we don't raise an exception for a 400 response """ mock_post.return_value = Mock( spec=Response, status_code=HTTP_400_BAD_REQUEST, json=mocked_json() ) chunk_size = 10 recipient_tuples = [("{0}@example.com".format(letter), None) for letter in string.ascii_letters] assert len(recipient_tuples) == 52 with override_settings( MAILGUN_RECIPIENT_OVERRIDE=None, ): resp_list = MailgunClient.send_batch( 'email subject', 'email body', recipient_tuples, chunk_size=chunk_size, raise_for_status=False ) assert len(resp_list) == 6 for resp in resp_list: assert resp.status_code == HTTP_400_BAD_REQUEST assert mock_post.call_count == 6 assert mock_post.return_value.raise_for_status.called is False
def test_financial_aid_email_error(self, raise_for_status, mock_post): """ Test that send_financial_aid_email handles errors correctly """ mock_post.return_value = Mock( spec=Response, status_code=HTTP_400_BAD_REQUEST, json=mocked_json(), ) response = MailgunClient.send_financial_aid_email( self.staff_user_profile.user, self.financial_aid, 'email subject', 'email body', raise_for_status=raise_for_status, ) assert response.raise_for_status.called is raise_for_status assert response.status_code == HTTP_400_BAD_REQUEST assert response.json() == {}
def post_settings(greeting_text): """ Sets the START Button and also the Greeting Text. The payload for the START Button will be 'USER_START' :param str greeting_text: Desired Greeting Text (160 chars). :return: `Response object <http://docs.python-requests.org/en/\ master/api/#requests.Response>`_ """ # Set the greeting texts url = TD_STS_URL + PAGE_ACCESS_TOKEN txtpayload = {} txtpayload['setting_type'] = 'greeting' txtpayload['greeting'] = {'text': greeting_text} response_msg = json.dumps(txtpayload) status = requests.post(url, headers=HEADER, data=response_msg) # Set the start button btpayload = {} btpayload['setting_type'] = 'call_to_actions' btpayload['thread_state'] = 'new_thread' btpayload['call_to_actions'] = [{'payload': 'USER_START'}] response_msg = json.dumps(btpayload) status = requests.post(url, headers=HEADER, data=response_msg) return status
def post_start_button(payload='START'): """ Sets the Thread Settings Greeting Text (/docs/messenger-platform/thread-settings/get-started-button). :param str payload: Desired postback payload (Default START). :return: `Response object <http://docs.python-requests.org/en/\ master/api/#requests.Response>`_ """ url = TD_STS_URL + PAGE_ACCESS_TOKEN btpayload = {} btpayload["setting_type"] = "call_to_actions" btpayload["thread_state"] = "new_thread" btpayload["call_to_actions"] = [{"payload": payload}] response_msg = json.dumps(btpayload) status = requests.post(url, headers=HEADER, data=response_msg) return status
def post_domain_whitelisting(whitelisted_domains, domain_action_type='add'): """ Sets the whistelisted domains for the Messenger Extension (/docs/messenger-platform/thread-settings/domain-whitelisting). :param list whistelisted_domains: Domains to be whistelisted. :param str domain_action_type: Action to run `add/remove` (Defaut add). :return: `Response object <http://docs.python-requests.org/en/\ master/api/#requests.Response>`_ """ url = TD_STS_URL + PAGE_ACCESS_TOKEN payload = {} payload['setting_type'] = 'domain_whitelisting' payload['whitelisted_domains'] = whitelisted_domains payload['domain_action_type'] = domain_action_type data = json.dumps(payload) status = requests.post(url, headers=HEADER, data=data) return status
def typing(fbid, sender_action): """ Displays/Hides the typing gif/mark seen on facebook chat (/docs/messenger-platform/send-api-reference/sender-actions) :param str fbid: User id to display action. :param str sender_action: `typing_off/typing_on/mark_seen`. :return: `Response object <http://docs.python-requests.org/en/\ master/api/#requests.Response>`_ """ url = MSG_URL + PAGE_ACCESS_TOKEN payload = {} payload['recipient'] = {'id': fbid} payload['sender_action'] = sender_action data = json.dumps(payload) status = requests.post(url, headers=HEADER, data=data) return status # Send API Content Type
def post_text_message(fbid, message): """ Sends a common text message (/docs/messenger-platform/send-api-reference/text-message) :param str fbid: User id to send the text. :param str message: Text to be displayed for the user (230 chars). :return: `Response object <http://docs.python-requests.org/en/\ master/api/#requests.Response>`_ """ url = MSG_URL + PAGE_ACCESS_TOKEN payload = {} payload['recipient'] = {'id': fbid} payload['message'] = {'text': message} # Limit 320 chars data = json.dumps(payload) status = requests.post(url, headers=HEADER, data=data) return status
def post_attachment(fbid, media_url, file_type): """ Sends a media attachment (/docs/messenger-platform/send-api-reference/contenttypes) :param str fbid: User id to send the audio. :param str url: Url of a hosted media. :param str type: image/audio/video/file. :return: `Response object <http://docs.python-requests.org/en/\ master/api/#requests.Response>`_ """ url = MSG_URL + PAGE_ACCESS_TOKEN payload = {} payload['recipient'] = {'id': fbid} attachment = {"type": file_type, "payload": {"url": media_url}} payload['message'] = {"attachment": attachment} data = json.dumps(payload) status = requests.post(url, headers=HEADER, data=data) return status
def post_start_button(payload='START'): """ Sets the **Get Started button**. :usage: >>> payload = 'GET_STARTED' >>> response = fbbotw.post_start_button(payload=payload) :param str payload: Desired postback payload (Default 'START'). :return: `Response object <http://docs.python-requests.org/en/\ master/api/#requests.Response>`_ :facebook docs: `/get-started-button <https://developers.facebook\ .com/docs/messenger-platform/messenger-profile/get-started-button>`_ """ url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN) payload_data = {} payload_data['get_started'] = {'payload': payload} data = json.dumps(payload_data) status = requests.post(url, headers=HEADER, data=data) return status
def post_domain_whitelist(whitelisted_domains): """ Sets the whistelisted domains for the Messenger Extension :usage: >>> # Define the array of domains to be whitelisted (Max 10) >>> whistelisted_domains = [ "https://myfirst_domain.com", "https://another_domain.com" ] >>> fbbotw.post_domain_whitelist( whitelisted_domains=whitelisted_domains ) :param list whistelisted_domains: Domains to be whistelisted (Max 10). :return: `Response object <http://docs.python-requests.org/en/\ master/api/#requests.Response>`_ :facebook docs: `/domain-whitelisting <https://developers.facebook.\ com/docs/messenger-platform/messenger-profile/domain-whitelisting>`_ """ url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN) payload = {} payload['whitelisted_domains'] = whitelisted_domains data = json.dumps(payload) status = requests.post(url, headers=HEADER, data=data) return status
def post_account_linking_url(account_linking_url): """ Sets the **liking_url** to connect the user with your business login :usage: >>> my_callback_linking_url = "https://my_business_callback.com" >>> response = fbbotw.post_account_linking_url( account_linking_url=my_callback_linking_url ) :param str account_linking_url: URL to the account linking OAuth flow. :return: `Response object <http://docs.python-requests.org/en/\ master/api/#requests.Response>`_ :facebook docs: `/account-linking-url <https://developers.facebook.\ com/docs/messenger-platform/messenger-profile/account-linking-url>`_ """ url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN) payload = {} payload['account_linking_url'] = account_linking_url data = json.dumps(payload) status = requests.post(url, headers=HEADER, data=data) return status
def response(cls, r): """Extract the data from the API response *r*. This method essentially strips the actual response of the envelope while raising an :class:`~errors.ApiError` if it contains one or more errors. :param r: the HTTP response from an API call :type r: :class:`requests.Response` :returns: API response data :rtype: json """ try: data = r.json() except ValueError: raise errors.ApiError(r) if data['meta'].get("errors"): raise errors.ApiError(data['meta']) return data["response"]
def post(self, endpoint, query=None, body=None, data=None): """ Send a POST request to armada. If both body and data are specified, body will will be used. :param string endpoint: URL string following hostname and API prefix :param dict query: dict of k, v parameters to add to the query string :param string body: string to use as the request body. :param data: Something json.dumps(s) can serialize. :return: A requests.Response object """ api_url = '{}{}'.format(self.base_url, endpoint) self.logger.debug("Sending POST with armada_client session") if body is not None: self.logger.debug("Sending POST with explicit body: \n%s" % body) resp = self._session.post( api_url, params=query, data=body, timeout=3600) else: self.logger.debug("Sending POST with JSON body: \n%s" % str(data)) resp = self._session.post( api_url, params=query, json=data, timeout=3600) return resp
def post(self, endpoint, query=None, body=None, data=None): """ Send a POST request to Drydock. If both body and data are specified, body will will be used. :param string endpoint: The URL string following the hostname and API prefix :param dict query: A dict of k, v parameters to add to the query string :param string body: A string to use as the request body. Will be treated as raw :param data: Something json.dumps(s) can serialize. Result will be used as the request body :return: A requests.Response object """ self.logger.debug("Sending POST with drydock_client session") if body is not None: self.logger.debug("Sending POST with explicit body: \n%s" % body) resp = self.__session.post( self.base_url + endpoint, params=query, data=body, timeout=10) else: self.logger.debug("Sending POST with JSON body: \n%s" % str(data)) resp = self.__session.post( self.base_url + endpoint, params=query, json=data, timeout=10) return resp
def test_store_companies_house_profile_in_session_saves_in_session( mock_retrieve_profile, client ): data = { 'date_of_creation': '2000-10-10', 'company_name': 'Example corp', 'company_status': 'active', 'company_number': '01234567', 'registered_office_address': {'foo': 'bar'} } response = Response() response.status_code = http.client.OK response.json = lambda: data session = client.session mock_retrieve_profile.return_value = response helpers.store_companies_house_profile_in_session(session, '01234567') mock_retrieve_profile.assert_called_once_with(number='01234567') assert session[helpers.COMPANIES_HOUSE_PROFILE_SESSION_KEY] == data assert session.modified is True
def telljoke(self, ctx: commands.Context) -> None: """ Responds with a random joke from theoatmeal.com """ # TODO: get more joke formats (or a better source) # Send typing as the request can take some time. await self.bot.send_typing(ctx.message.channel) # Build a new request object req: grequests.AsyncRequest = grequests.get('http://theoatmeal.com/djtaf/', timeout=1) # Send new request res: List[requests.Response] = grequests.map([req], exception_handler=self.request_exception_handler) # Since we only have one request we can just fetch the first one. # res[0].content is the html in bytes soup = BeautifulSoup(res[0].content.decode(res[0].encoding), 'html.parser') joke_ul = soup.find(id='joke_0') joke = joke_ul.find('h2', {'class': 'part1'}).text.lstrip() + '\n' + joke_ul.find(id='part2_0').text await self.bot.say(joke)
def randomfact(self, ctx: commands.Context) -> None: """ Responds with a random fact scraped from unkno.com """ # Send typing as the request can take some time. await self.bot.send_typing(ctx.message.channel) # Build a new request object req: grequests.AsyncRequest = grequests.get('http://www.unkno.com/', timeout=1) # Send new request res: List[requests.Response] = grequests.map([req], exception_handler=self.request_exception_handler) # Since we only have one request we can just fetch the first one. # res[0].content is the html in bytes soup = BeautifulSoup(res[0].content.decode(res[0].encoding), 'html.parser') await self.bot.say(soup.find(id='content').text)
def test_push_pact(mock_put): broker_client = BrokerClient(broker_url=settings.PACT_BROKER_URL) mocked_response = Response() mocked_response.status_code = CREATED mock_put.return_value = mocked_response with open(PACT_FILE_PATH) as data_file: pact = json.load(data_file) broker_client.push_pact( provider=PROVIDER, consumer=CONSUMER, pact_file=PACT_FILE_PATH, consumer_version=CONSUMER_VERSION )[0] mock_put.assert_called_once_with( EXPECTED_PUSH_PACT_URL, json=pact, auth=None )
def test_tag_consumer(mock_put): broker_client = BrokerClient(broker_url=settings.PACT_BROKER_URL) mocked_response = Response() mocked_response.status_code = CREATED mock_put.return_value = mocked_response broker_client.tag_consumer( provider=PROVIDER, consumer=CONSUMER, consumer_version=CONSUMER_VERSION, tag=TAG )[0] mock_put.assert_called_once_with( EXPECTED_TAG_CONSUMER_URL, headers={'Content-Type': 'application/json'}, auth=None )
def send(self, req, **kwargs): resp = requests.Response() try: resp.raw = urlopen(req.url) content_length = resp.raw.headers.get('content-length') if content_length is not None: resp.headers['Content-Length'] = content_length except IOError as e: if e.errno == errno.EACCES: resp.status_code = requests.codes.forbidden elif e.errno == errno.ENOENT: resp.status_code = requests.codes.not_found else: resp.status_code = requests.codes.bad_request except OSError: resp.status_code = requests.codes.bad_request else: resp.status_code = requests.codes.ok return resp
def test_it(self): import sys import mock import requests from ci_diff_helper import _github response = requests.Response() remaining = '17' response.headers[_github._RATE_REMAINING_HEADER] = remaining rate_limit = '60' response.headers[_github._RATE_LIMIT_HEADER] = rate_limit rate_reset = '1475953149' response.headers[_github._RATE_RESET_HEADER] = rate_reset with mock.patch('six.print_') as mocked: self._call_function_under_test(response) msg = _github._RATE_LIMIT_TEMPLATE.format( remaining, rate_limit, rate_reset) self.assertEqual(mocked.call_count, 2) mocked.assert_any_call(msg, file=sys.stderr) mocked.assert_any_call(_github._GH_ENV_VAR_MSG, file=sys.stderr)
def test_match_not_found(self): """Check if the server send an empty match if not found.""" self.service.cache_manager.find_match.return_value = None self.service.riot_api_handler.get_match.side_effect = LoLException( "404", requests.Response()) response = self.stub.Match(service_pb2.MatchRequest(id=4242, region=constants_pb2.EUW)) self.assertEqual(response, match_pb2.MatchReference())
def test_request_api_server_authorized(self, options, config, request): driver = self._get_driver(options, config) driver._apiinsecure = True response = requests.Response() response.status_code = requests.codes.ok request.return_value = response url = "/URL" result = driver._request_api_server(url) self.assertEqual(response, result) request.assert_called_with(url, data=None, headers=mock.ANY, verify=False)
def test_request_api_server_auth_failed(self, options, config, request): driver = self._get_driver(options, config) driver._apiinsecure = False driver._use_api_certs = False response = requests.Response() response.status_code = requests.codes.unauthorized request.return_value = response url = "/URL" self.assertRaises(RuntimeError, driver._request_api_server, url)
def test_request_api_server_authn(self, options, config, request): driver = self._get_driver(options, config) driver._apiinsecure = True response = requests.Response() response.status_code = requests.codes.ok request.return_value = response url = "/URL" result = driver._request_api_server_authn(url) self.assertEqual(response, result) request.assert_called_with(url, data=None, headers=mock.ANY, verify=False)
def test_vnc_api_is_authenticated_unauthorized(self, config, request): config.APISERVER = mock.MagicMock() config.APISERVER.use_ssl = False config.APISERVER.api_server_ip = "localhost" config.APISERVER.api_server_port = "8082" response = requests.Response() response.status_code = requests.codes.unauthorized request.return_value = response result = utils.vnc_api_is_authenticated() request.assert_called_with(mock.ANY) self.assertEqual(True, result)
def test_vnc_api_is_authenticated_invalid(self, config, request): config.APISERVER = mock.MagicMock() config.APISERVER.use_ssl = True config.APISERVER.api_server_ip = "localhost" config.APISERVER.api_server_port = "8082" response = requests.Response() response.status_code = requests.codes.server_error request.return_value = response self.assertRaises(requests.exceptions.HTTPError, utils.vnc_api_is_authenticated) request.assert_called_with(mock.ANY)
def process(self, http_response): # type: (requests.Response) -> PrestoStatus if not http_response.ok: self.raise_response_error(http_response) http_response.encoding = 'utf-8' response = http_response.json() logger.debug('HTTP {}: {}'.format(http_response.status_code, response)) if 'error' in response: raise self._process_error(response['error']) if constants.HEADER_CLEAR_SESSION in http_response.headers: for prop in get_header_values( response.headers, constants.HEADER_CLEAR_SESSION, ): self._client_session.properties.pop(prop, None) if constants.HEADER_SET_SESSION in http_response.headers: for key, value in get_session_property_values( response.headers, constants.HEADER_SET_SESSION, ): self._client_session.properties[key] = value self._next_uri = response.get('nextUri') return PrestoStatus( id=response['id'], stats=response['stats'], info_uri=response['infoUri'], next_uri=self._next_uri, rows=response.get('data', []), columns=response.get('columns'), )