我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用requests.HTTPError()。
def start_websocket(self, run_in_background=False): socket_params = {} try: socket_params = self.get_socket_params() except requests.exceptions.HTTPError: print ("Can't Access Socket Params") ping_interval = 60 ping_timeout = 10 if 'pythonPingInterval' in socket_params.keys(): ping_interval = socket_params['pythonPingInterval'] if 'pythonPingTimeout' in socket_params.keys(): ping_timeout = socket_params['pythonPingTimeout'] url = self.config['socketEndpoint'].format(api_key=self.api_key, access_token=self.access_token) self.websocket = websocket.WebSocketApp(url, header={'Authorization: Bearer' + self.access_token}, on_data=self._on_data, on_error=self._on_error, on_close=self._on_close) if run_in_background is True: self.ws_thread = threading.Thread(target=self.websocket.run_forever) self.ws_thread.daemon = True self.ws_thread.start() else: self.websocket.run_forever(ping_interval=ping_interval, ping_timeout=ping_timeout)
def sendRequest(ip, port, route, data=None, protocol="http"): url = "{protocol}://{ip}:{port}{route}".format(protocol=protocol, ip=ip, port=port, route=route) if data is not None: try: resp = requests.post(url, data=data) except requests.HTTPError as e: raise PipelineServiceError("{reason}".format(reason=e)) else: try: resp = requests.get(url) except requests.HTTPError as e: raise PipelineServiceError("{reason}".format(reason=e)) return resp
def api_call_helper(self, name, http_method, params, data): # helper formats the url and reads error codes nicely url = self.config['host'] + self.config['routes'][name] if params is not None: url = url.format(**params) response = self.api_call(url, http_method, data) if response.status_code != 200: raise requests.HTTPError(response.text) body = json.loads(response.text) if is_status_2xx(body['code']): # success return body['data'] else: raise requests.HTTPError(response.text) return
def test_load_inexistent_tables_to_workspace(self): """ Workspace endpoint raises HTTPError when mock loading inexistent table """ msg = ('404 Client Error: Not Found for url: ' 'https://connection.keboola.com/v2/storage/workspaces/78432/' 'load') responses.add( responses.Response( method='POST', url=('https://connection.keboola.com/v2/storage/workspaces/' '78432/load'), body=HTTPError(msg) ) ) workspace_id = '78432' mapping = {"in.c-table.does_not_exist": "my-table"} with self.assertRaises(HTTPError) as error_context: self.ws.load_tables(workspace_id, mapping) assert error_context.exception.args[0] == msg
def test_api_call_raises_exception_connection_error(mock_requests, cloudflare): for ex in [requests.exceptions.RequestException, requests.exceptions.HTTPError, requests.exceptions.ConnectionError, requests.exceptions.ProxyError, requests.exceptions.SSLError, requests.exceptions.Timeout, requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout, requests.exceptions.URLRequired, requests.exceptions.TooManyRedirects, requests.exceptions.MissingSchema, requests.exceptions.InvalidSchema, requests.exceptions.InvalidURL, requests.exceptions.ChunkedEncodingError, requests.exceptions.ContentDecodingError, requests.exceptions.StreamConsumedError, requests.exceptions.RetryError]: mock_requests.get.side_effect = ex('Some error') with pytest.raises(CloudFlareException): cloudflare._api_call('/foo')
def request_jira(client, url, method='GET', **kwargs): jwt_authorization = 'JWT %s' % encode_token( method, url, app.config.get('ADDON_KEY'), client.sharedSecret) result = requests.request( method, client.baseUrl.rstrip('/') + url, headers={ "Authorization": jwt_authorization, "Content-Type": "application/json" }, **kwargs) try: result.raise_for_status() except requests.HTTPError as e: raise requests.HTTPError(e.response.text, response=e.response) return result
def _fetch_index_package_info(self, package_name, current_version): """ :type package_name: str :type current_version: version.Version """ try: package_canonical_name = package_name if self.PYPI_API_TYPE == 'simple_html': package_canonical_name = canonicalize_name(package_name) response = requests.get(self.PYPI_API_URL.format(package=package_canonical_name), timeout=15) except HTTPError as e: # pragma: nocover return False, e.message if not response.ok: # pragma: nocover return False, 'API error: {}'.format(response.reason) if self.PYPI_API_TYPE == 'pypi_json': return self._parse_pypi_json_package_info(package_name, current_version, response) elif self.PYPI_API_TYPE == 'simple_html': return self._parse_simple_html_package_info(package_name, current_version, response) else: # pragma: nocover raise NotImplementedError('This type of PYPI_API_TYPE type is not supported')
def get_character(self, character_id): try: character = self.eve.get_character(character_id) except requests.HTTPError as ex: # Patch for character being unresolvable and ESI throwing internal errors # Temporarily stub character to not break our behavior. if ex.response.status_code == 500: character = { 'name': 'Unknown character', 'corporation_id': 98356193 } else: raise return '**%s** (https://zkillboard.com/character/%d/) (%s)' % ( character['name'], character_id, self.get_corporation(character['corporation_id']) )
def get(self, endpoint, args=None): """ Wrapper for requests.get @arg args is a dict wich is converted to URL parameters """ answer = requests.get(endpoint, params=args) if answer.status_code != 200: if answer.status_code == 404: self.logger.error( "endpoint %s or resource not found", endpoint) self.logger.error( "error description was: %s", answer.json()["error_description"]) return None else: self.logger.error( "Error during get request for endpoint %s", endpoint) self.logger.error("Status code was %d", answer.status_code) self.logger.error("error message was: %s", answer.json()) raise requests.HTTPError return answer.json()
def ssidSuccessfullySet(self, ssid_str): r = requests.put(self.ADMIN_SSID_URL, auth=getAdminAuth(), data=json.dumps({"value": ssid_str})) try: # This assumes there's not a better method to test whether # the SSID was of a valid length... r.raise_for_status() except requests.HTTPError: return False if r.json()["result"] != self.SUCCESS_RESPONSE: return False r = requests.get(self.ADMIN_SSID_URL, auth=getAdminAuth()) r.raise_for_status() # Finally, check whether it was actually set return r.json()["result"][0] == ssid_str
def get_version(self): """Fetches the current version number of the Graph API being used.""" args = {"access_token": self.access_token} try: response = requests.request("GET", "https://graph.facebook.com/" + self.version + "/me", params=args, timeout=self.timeout, proxies=self.proxies) except requests.HTTPError as e: response = json.loads(e.read()) raise GraphAPIError(response) try: headers = response.headers version = headers["facebook-api-version"].replace("v", "") return float(version) except Exception: raise GraphAPIError("API version number not available")
def test_categories_returns_200_and_decoded_json(self): provider_id = '1' provider_name = 'flinkster' category_id = '1000' # Assert querying by network only works as expected try: resp = self.api.categories(provider_name) except HTTPError as e: self.fail('Status Code Was %s - URL: %s!' % (e.response.status_code, e.request.url)) self.assertIsInstance(resp, dict) self.assertIn('items', resp) # Assert querying by network and category ID works as expected try: resp = self.api.categories(provider_name, by_id=category_id) except HTTPError as e: self.fail('Status Code Was %s - URL: %s!' % (e.response.status_code, e.request.url)) self.assertIsInstance(resp, (list, dict)) self.assertNotIn('items', resp)
def get_time_totals_from_pond(timeallocation, start, end, too, recur=0): """ Pond queries are too slow with large proposals and time out, this hack splits the query when a timeout occurs """ if recur > 3: raise RecursionError('Pond is timing out, too much recursion.') total = 0 try: total += query_pond( timeallocation.proposal.id, start, end, timeallocation.telescope_class, timeallocation.instrument_name, too ) except requests.HTTPError: logger.warning('We got a pond inception. Splitting further.') for start, end in split_time(start, end, 4): total += get_time_totals_from_pond(timeallocation, start, end, too, recur=recur + 1) return total
def login(self, username, password, require_ownership=False): resp = self.session.post( self.login_url, params=dict(require_game_ownership=int(require_ownership)), data=dict(username=username, password=password) ) try: json = resp.json() except: json = None try: resp.raise_for_status() return json[0] except requests.HTTPError: if isinstance(json, dict) and 'message' in json: if json['message'] == 'Insufficient membership': raise OwnershipError(json['message']) else: raise AuthError(json['message']) else: raise
def bucket_get(self, bucket_id): """Return the bucket object. See `API buckets: GET /buckets <https://storj.github.io/bridge/#!/buckets/get_buckets_id>`_ Args: bucket_id (str): bucket unique identifier. Returns: (:py:class:`model.Bucket`): bucket. """ self.logger.info('bucket_get(%s)', bucket_id) try: return model.Bucket(**self._request( method='GET', path='/buckets/%s' % bucket_id)) except requests.HTTPError as e: if e.response.status_code == requests.codes.not_found: return None else: self.logger.error('bucket_get() error=%s', e) raise BridgeError()
def test_bucket_get_not_found(self): """Test Client.bucket_get() when bucket does not exist.""" mock_error = requests.HTTPError() mock_error.response = mock.Mock() mock_error.response.status_code = 404 self.mock_request.side_effect = mock_error bucket = self.client.bucket_get('inexistent') self.mock_request.assert_called_once_with( method='GET', path='/buckets/inexistent') assert bucket is None
def update_json(): global data try: # response = requests.get('http://sickpi:3000/station/9100013') # Spittelmarkt response = requests.get('http://sickpi:3000/station/9160523') # Gotlindestr. json_data = response.json() data = json.loads(json.dumps(json_data[0], indent=4)) print('\nUPDATE JSON') update_departures() except (requests.HTTPError, requests.ConnectionError): print('\nERROR UPDATE JSON') quit_all()
def similar_fragments(self, fragment_id, cutoff, limit=1000): """Find similar fragments to query. Args: fragment_id (str): Query fragment identifier cutoff (float): Cutoff, similarity scores below cutoff are discarded. limit (int): Maximum number of hits. Default is None for no limit. Returns: list[dict]: Query fragment identifier, hit fragment identifier and similarity score Raises: request.HTTPError: When fragment_id could not be found """ url = self.base_url + '/fragments/{fragment_id}/similar'.format(fragment_id=fragment_id) params = {'cutoff': cutoff, 'limit': limit} response = requests.get(url, params) response.raise_for_status() return response.json()
def _fetch_fragments(self, idtype, ids): url = self.base_url + '/fragments?{idtype}={ids}'.format(idtype=idtype, ids=','.join(ids)) absent_identifiers = [] try: response = requests.get(url) response.raise_for_status() fragments = response.json() except HTTPError as e: if e.response.status_code == 404: body = e.response.json() fragments = body['fragments'] absent_identifiers = body['absent_identifiers'] else: raise e # Convert molblock string to RDKit Mol object for fragment in fragments: if fragment['mol'] is not None: fragment['mol'] = MolFromMolBlock(fragment['mol']) return fragments, absent_identifiers
def pharmacophores(self, fragment_ids): absent_identifiers = [] pharmacophores = [] for fragment_id in fragment_ids: url = self.base_url + '/fragments/{0}.phar'.format(fragment_id) try: response = requests.get(url) response.raise_for_status() pharmacophore = response.text pharmacophores.append(pharmacophore) except HTTPError as e: if e.response.status_code == 404: pharmacophores.append(None) absent_identifiers.append(fragment_id) else: raise e if absent_identifiers: raise IncompletePharmacophores(absent_identifiers, pharmacophores) return pharmacophores
def handle_trigger_event(self, sender, **data): # Pushover doesn't support images, so just send the event. notification_data = { "user": self.pushover_user, "token": self.pushover_token, "message": "Camera %s, event: %s" % (data['source'], data['prediction']), "timestamp": calendar.timegm(data['timestamp'].timetuple()) } # Optionally, set the device. if self.pushover_device: notification_data['device'] = self.pushover_device try: r = requests.post("https://api.pushover.net/1/messages.json", data=notification_data) if r.status_code != 200: logger.error("Failed to send notification, (%d): %s" % (r.status_code, r.text)) except requests.ConnectionError, e: logger.error("Connection Error:", e) except requests.HTTPError, e: logger.error("HTTP Error:", e)
def raise_for_status(self, allow_redirects=True): """Raises stored :class:`HTTPError` or :class:`URLError`, if one occurred.""" if self.status_code == 304: return elif self.error: if self.traceback: six.reraise(Exception, Exception(self.error), Traceback.from_string(self.traceback).as_traceback()) http_error = HTTPError(self.error) elif (self.status_code >= 300) and (self.status_code < 400) and not allow_redirects: http_error = HTTPError('%s Redirection' % (self.status_code)) elif (self.status_code >= 400) and (self.status_code < 500): http_error = HTTPError('%s Client Error' % (self.status_code)) elif (self.status_code >= 500) and (self.status_code < 600): http_error = HTTPError('%s Server Error' % (self.status_code)) else: return http_error.response = self raise http_error
def _delete(self, model_instance): """ Deletes a serialized SecretModel string from Custodia. :param model_instance: SecretModel instance to delete. :type model_instance: commissaire.model.SecretModel :raises StorageLookupError: if data lookup fails (404 Not Found) :raises requests.HTTPError: if the request fails (other than 404) """ url = self._build_key_url(model_instance) try: response = self.session.request( 'DELETE', url, timeout=self.CUSTODIA_TIMEOUT) response.raise_for_status() except requests.HTTPError as error: # XXX bool(response) defers to response.ok, which is a misfeature. # Have to explicitly test "if response is None" to know if the # object is there. have_response = response is not None if have_response and error.response.status_code == 404: raise StorageLookupError(str(error), model_instance) else: raise error
def connect(self): self.gg.setup(self.url_ro) self.gg.set_token(self._privatekey) self.gg.set_default_private(self.default_create_private) self.gg.setup_session( self.session_certificate or not self.session_insecure, self.session_proxy) try: self.username = self.user # Call to self.gg.authenticated_user() except HTTPError as err: if err.response is not None and err.response.status_code == 401: if not self._privatekey: raise ConnectionError('Could not connect to GoGS. ' 'Please configure .gitconfig ' 'with your gogs private key.') from err else: raise ConnectionError('Could not connect to GoGS. ' 'Check your configuration and try again.') from err else: raise err
def _comment_exists(self): """ This is yet another horrible workaround to keep this from falling apart. Because of the other workarounds to get around the fact that you cannot get metadata from a comment, we've run into problems where it is not always obvious if a comment even exists. For this reason, we need to have a method to test it for us. """ try: # Getting the reply data from a comment should raise an error if # it doesn't exist. We use Comment's get_reply_data because # ProgramCommentReply's get_reply_data is another one of those # horrible workarounds list(Comment(self.id, self.get_program()).get_reply_data()) except requests.HTTPError: return False return True
def get_metadata(self): """Returns a dictionary with information about this ``ProgramCommentReply``.""" # there's no way that I've found to get comment reply metadata directly, # so we iterate though the comment thread until you find this comment for comment_data in self.get_parent().get_reply_data(): if comment_data["key"] == self.id: return comment_data raise _CommentDoesntExistError(self) # I'm keeping this todo until I can fully address it, although I did # add an error # todo: raise some error instead of returning None. What error? IDK. # I'm almost tempted to pretend it's an HTTPError, but I'll need to do # some research into why we would get here (We can get here btw. # That's how I found this). Would self.get_parent().get_reply_data() # raise an HTTPError if self.comment_key was nonsense? If that's the # case, we will only (probably) be here if comment_key was a # ProgramComment key instead of a ProgramCommentReply key, so we would # probably want to have an error that's more specific (like TypeError # maybe?). Man, there are so many edge cases with this stuff that I # really should look into now that I think about it... We are also # probably going to need to keep a lot of this comment to explain why # we raise the error we do.
def request(self, path, method='GET', **kwargs): try: url = ENDPOINT_URL + path response = requests.request( method=method, url=url, headers=self.headers, timeout=self.timeout, **kwargs ) except requests.HTTPError as e: response = json.loads(e.read()) except requests.ConnectionError as e: raise VoucherifyError(e) if response.headers.get('content-type') and 'json' in response.headers['content-type']: result = response.json() else: result = response.text if isinstance(result, dict) and result.get('error'): raise VoucherifyError(result) return result
def get_from_chain(url_adder): url = 'https://api.chain.com/v2/bitcoin/%s' % (url_adder) ok = False while not ok: try: r = requests.get(url, auth=(CHAIN_API_KEY, CHAIN_API_SECRET)) r.raise_for_status() ok = True except requests.HTTPError as e: if r.status_code == 429: # Too many requests time.sleep(1) else: print("Request was to %s" % (url)) raise e b = json.loads(r.text) return b
def goto_course_list(self): """Go to the course picker.""" long_wait = WebDriverWait(self.driver, 30) try: long_wait.until( expect.presence_of_element_located( (By.ID, 'ox-react-root-container') ) ) if 'tutor' in self.current_url(): self.find(By.CSS_SELECTOR, '.ui-brand-logo').click() self.page.wait_for_page_load() else: raise HTTPError('Not currently on an OpenStax Tutor webpage:' + '%s' % self.current_url()) except Exception as ex: raise ex
def push(self): """ Push data to API. :return: push success or not """ try: re = requests.post(url=self.api, data={"info": json.dumps(self.post_data, ensure_ascii=False)}) result = re.json() if result.get("vul_pdf", "") != "": logger.info('[PUSH API] Push success!') return True else: logger.warning('[PUSH API] Push result error: {0}'.format(re.text)) return False except (requests.ConnectionError, requests.HTTPError) as error: logger.critical('[PUSH API] Network error: {0}'.format(str(error))) return False except ValueError as error: logger.critical('[PUSH API] Response error: {0}'.format(str(error))) return False
def test_submit_problem_web(self): common.ensure_login() common.ensure_api_key() # publish_time is before the first publish time with self.assertRaises(requests.HTTPError) as cm: self.submit_problem_web(1475279990, 1451606400) assert cm.exception.response.status_code == 403 # Correct publish_time self.submit_problem_web(1475280000, 1451606400) self.submit_problem_web(1480550400, 1451606400) # publish_time is after the last publish time with self.assertRaises(requests.HTTPError) as cm: self.submit_problem_web(1480550410, 1451606400) assert cm.exception.response.status_code == 403 # publish_time is past with self.assertRaises(requests.HTTPError) as cm: self.submit_problem_web(1475280000, 1475280001) assert cm.exception.response.status_code == 403
def test_submit_problem_api(self): common.ensure_login() common.ensure_api_key() # publish_time is before the first publish time with self.assertRaises(requests.HTTPError) as cm: self.submit_problem_api(1475279990, 1451606400) assert cm.exception.response.status_code == 403 # Correct publish_time self.submit_problem_api(1475280000, 1451606400) self.submit_problem_api(1480550400, 1451606400) # publish_time is after the last publish time with self.assertRaises(requests.HTTPError) as cm: self.submit_problem_api(1480550410, 1451606400) assert cm.exception.response.status_code == 403 # publish_time is past with self.assertRaises(requests.HTTPError) as cm: self.submit_problem_api(1475280000, 1475280001) assert cm.exception.response.status_code == 403
def api_delete(server_name, api, session_id): #Header and URL for delete call headers = {'content-type': 'application/json', 'cookie': 'JSESSIONID='+session_id } url = 'https://' + server_name + API + api try: # Invoke the API. r = requests.delete(url, headers=headers, verify=False) except requests.ConnectionError: raise TintrRequestsiApiException("API Connection error occurred.") except requests.HTTPError: raise TintriRequestsException("HTTP error occurred.") except requests.Timeout: raise TintriRequestsException("Request timed out.") except: raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.") return r # PUT
def api_put(server_name, api, payload, session_id): headers = {'content-type': 'application/json', 'cookie': 'JSESSIONID='+session_id } url = 'https://' + server_name + API + api try: # Invoke the API. r = requests.put(url, data=json.dumps(payload), headers=headers, verify=False) except requests.ConnectionError: raise TintriRequestsException("API Connection error occurred.") except requests.HTTPError: raise TintriRequestsException("HTTP error occurred.") except requests.Timeout: raise TintriRequestsException("Request timed out.") except: raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.") return r # POST
def api_post(server_name, api, payload, session_id): headers = {'content-type': 'application/json', 'cookie': 'JSESSIONID='+session_id } url = 'https://' + server_name + API + api try: # Invoke the API. r = requests.post(url, data=json.dumps(payload), headers=headers, verify=False) except requests.ConnectionError: raise TintriRequestsException("API Connection error occurred.") except requests.HTTPError: raise TintriRequestsException("HTTP error occurred.") except requests.Timeout: raise TintriRequestsException("Request timed out.") except: raise TintriRequestsException("An unexpected error " + sys.exc_info()[0] + " occurred.") return r # Login.
def download_file(server_name, report_url, session_id, file_name): headers = {'content-type': 'application/json'} try: r = requests.get(report_url, headers=headers, verify=False, stream=True) # if HTTP Response is not 200 then raise an exception if r.status_code != 200: message = "The HTTP response for get call to the server is not 200." raise TintriApiException(message, r.status_code, report_url, "No Payload", r.text) with open(file_name, 'w') as file_h: for block in r.iter_content(4096): file_h.write(block) except requests.ConnectionError: raise TintriRequestsException("API Connection error occurred.") except requests.HTTPError: raise TintriRequestsException("HTTP error occurred.") except requests.Timeout: raise TintriRequestsException("Request timed out.") except Exception as e: raise TintriRequestsException("An unexpected error: " + e.__str__())
def login(self): login_params = { 'j_username': GlobalVal.username, 'j_password': GlobalVal.password, 'validateCode': GlobalVal.secretCode } login_result = '' try: login_result = self.__session.post(self.home_url, data=login_params, headers=self.header,timeout=6) # DNS failure, refused connection, etc except ConnectionError : return False,'error:ConnectionError.' # Timeout except TimeoutError: return False, 'error:TimeoutError.' # HTTPError except requests.HTTPError: return False, 'error:HTTPError:%s.'%login_result.status_code except : return False,'error:network disconnect.' # Almost everything is ok. if 'success' in login_result.json(): return True, login_result.json() else: return False, login_result.json()
def test_handle_raises_error_if_status_is_400(api_call): responses.add(api_call.method, api_call.url, body=HTTPError()) with raises(HTTPError): handle(api_call()) # @responses.activate # @mark.parametrize('api_call', api_calls) # def test_call_endpoint_with_query_parameters(api_call): # responses.add( # api_call.method, # api_call.url + '?spam=eggs&foo=bar', # match_querystring=False # ) # params = dict(foo='bar', spam='eggs') # assert api_call(params=params).status_code == 200
def _handle_promises(self): """Collect all promises from S3 uploads. """ for stream, future in zip(self._streams, self._futures): exception = future.exception() if exception: raise exception response = future.result() if response.status_code != 200: message = 'Something went wrong uploading %s to S3: %s' log.error(message, response.url, response.text) raise HTTPError(message) self._responses.append(response) stream.close()
def setUp(self): def getLogger(name): self.mock_logger = mock.Mock() return self.mock_logger sys.modules['logging'].getLogger = getLogger def get(url, headers): get_return = mock.Mock() get_return.ok = True get_return.json = mock.Mock() get_return.json.return_value = {'data': {'status': 1}} return get_return sys.modules['requests'].get = get self.env = EnvironmentVarGuard() self.env.set('CACHET_TOKEN', 'token2') self.configuration = Configuration('config.yml') sys.modules['requests'].Timeout = Timeout sys.modules['requests'].ConnectionError = ConnectionError sys.modules['requests'].HTTPError = HTTPError
def get_album_json_thread(self): try: while not xbmc.abortRequested and not self.abortAlbumThreads: try: album_id = self.albumQueue.get_nowait() except: break try: self.get_album(album_id, withCache=False) except requests.HTTPError as e: r = e.response msg = _T(30505) try: msg = r.reason msg = r.json().get('userMessage') except: pass log('Error getting Album ID %s' % album_id, xbmc.LOGERROR) if r.status_code == 429 and not self.abortAlbumThreads: self.abortAlbumThreads = True log('Too many requests. Aborting Workers ...', xbmc.LOGERROR) self.albumQueue._init(9999) xbmcgui.Dialog().notification(plugin.name, msg, xbmcgui.NOTIFICATION_ERROR) except Exception, e: traceback.print_exc()
def http_get(self, s, host, port, timeout, appendix=''): try: r = s.get('http://{}:{}{}'.format(host, port, appendix), timeout=timeout, verify=False) # except requests.exceptions.Timeout: # return None, RequetsHostException('HTTP connection timeout, host: http://{}:{}' # .format(host, port)) except requests.RequestException as err: return None, RequetsHostException('{}:{} request error, msg: {}' .format(host, port, type(err).__name__)) # except requests.HTTPError: # return None, RequetsHostException('HTTP server response error, host: http://{}:{}' # .format(host, port)) # except requests.exceptions.RequestException as msg: # return None, RequetsHostException('call requests.get error, msg: {}' # .format(msg)) else: return r, None
def _connect(self): logger.debug('ARI client listening...') try: with self._running(): self.client.run(apps=[APPLICATION_NAME]) except socket.error as e: if e.errno == errno.EPIPE: # bug in ari-py when calling client.close(): ignore it and stop logger.error('Error while listening for ARI events: %s', e) return else: self._connection_error(e) except (WebSocketException, HTTPError) as e: self._connection_error(e) except ValueError: logger.warning('Received non-JSON message from ARI... disconnecting') self.client.close()
def _get_mongooseim_history(self, user_uuid, args): participant_user = args.get('participant_user_uuid') participant_server = args.get('participant_server_uuid', self._xivo_uuid) limit = args.get('limit') user_jid = self._build_jid(self._xivo_uuid, user_uuid) try: if participant_user: participant_jid = self._build_jid(participant_server, participant_user) result = self.mongooseim_client.get_user_history_with_participant(user_jid, participant_jid, limit) else: result = self.mongooseim_client.get_user_history(user_jid, limit) except HTTPError as e: raise MongooseIMException(self._xivo_uuid, e.response.status_code, e.response.reason) except RequestException as e: raise MongooseIMUnreachable(self._xivo_uuid, e) return result
def send_message(self, request_body, user_uuid=None): from_xivo_uuid, from_ = self._build_from(request_body, user_uuid) to_xivo_uuid, to = self._build_to(request_body) alias = request_body['alias'] msg = request_body['msg'] self.contexts.add(from_, to, to_xivo_uuid=to_xivo_uuid, alias=alias) from_jid = self._build_jid(from_xivo_uuid, from_) to_jid = self._build_jid(to_xivo_uuid, to) try: self.mongooseim_client.send_message(from_jid, to_jid, msg) except HTTPError as e: raise MongooseIMException(self._xivo_uuid, e.response.status_code, e.response.reason) except RequestException as e: raise MongooseIMUnreachable(self._xivo_uuid, e) bus_event = ChatMessageSent((from_xivo_uuid, from_), (to_xivo_uuid, to), alias, msg) headers = { 'user_uuid:{uuid}'.format(uuid=from_): True, } self._bus_publisher.publish(bus_event, headers=headers)
def raise_for_error(response): try: response.raise_for_status() except (requests.HTTPError, requests.ConnectionError) as error: try: if len(response.content) == 0: # There is nothing we can do here since LinkedIn has neither sent # us a 2xx response nor a response content. return response = response.json() if ('error' in response) or ('errorCode' in response): message = '%s: %s' % (response.get('error', str(error)), response.get('message', 'Unknown Error')) error_code = response.get('status') ex = get_exception_for_error_code(error_code) raise ex(message) else: raise LinkedInError(error.message) except (ValueError, TypeError): raise LinkedInError(error.message)
def _call(self, verb, url, data={}, params={}, headers={}): if headers: self.session.headers.update(headers) log.info('Call %s with data %s', url, data) resp = self.session.request(verb, url, json=data, params=params) status_code = resp.status_code try: resp.raise_for_status() except requests.HTTPError as exc: log.debug('Error occured, endpoint : %s, apikey : %s', url, self.apikey) return resp, status_code except requests.Timeout: log.error('Request Timeout to %si', url) return False, status_code except requests.RequestException: log.error('Requests Error') return False, status_code else: return resp, status_code
def test_detail_inexsitent_workspace(self): """ Workspace Endpoint raises HTTPError when mocking inexistent workspace detail """ msg = ('404 Client Error: Not Found for url: ' 'https://connection.keboola.com/v2/storage/workspaces/1') responses.add( responses.Response( method='GET', url='https://connection.keboola.com/v2/storage/workspaces/1', body=HTTPError(msg) ) ) workspace_id = '1' with self.assertRaises(HTTPError) as error_context: self.ws.detail(workspace_id) assert error_context.exception.args[0] == msg
def test_reset_password_for_inexistent_workspace(self): """ Workspace endpoint raises HTTPError when mock resetting password for inexistent workspace """ msg = ('404 Client Error: Not Found for url: ' 'https://connection.keboola.com/v2/storage/workspaces/1/' 'password') responses.add( responses.Response( method='POST', url=('https://connection.keboola.com/v2/storage/workspaces/' '1/password'), body=HTTPError(msg) ) ) workspace_id = '1' with self.assertRaises(HTTPError) as error_context: self.ws.reset_password(workspace_id) assert error_context.exception.args[0] == msg