我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用requests.codes()。
def post(self, *args, **kwargs): """helper method for HTTP POST :param args: :param kwargs: :return: json response """ try: resp = self.session.post(*args, **kwargs) if resp.status_code in _EXCEPTIONS_BY_CODE: raise _EXCEPTIONS_BY_CODE[resp.status_code](resp.reason) if resp.status_code != requests.codes['ok']: raise exceptions.Etcd3Exception(resp.reason) except requests.exceptions.Timeout as ex: raise exceptions.ConnectionTimeoutError(six.text_type(ex)) except requests.exceptions.ConnectionError as ex: raise exceptions.ConnectionFailedError(six.text_type(ex)) return resp.json()
def request_new_access_token(session): url = "{token_issuer_host}/v1/{token_issuer_path}".format( token_issuer_host=session.token_issuer_host, token_issuer_path=session.token_issuer_path ) headers = { "Authorization": "Basic {}".format(base64.b64encode( (session.api_key + ":" + session.api_secret ).encode("ascii")).decode("ascii") ), "X-Access": "{}".format( base64.b64encode('{"type":"Full"}'.encode("ascii") ).decode("ascii") ) } data = {"grant_type": "client_credentials"} response = requests.post(url=url, headers=headers, data=data) if response and response.status_code == requests.codes["ok"]: response_json = response.json() session.access_token = response_json["access_token"] return session.access_token
def request(self, method, url, params = {}, data = {}, files = None): post_values = data headers = {"Authorization": "Bearer %s" % self.access_token} try: response = self.session.request(method, url, params=params, data=post_values, files=files, headers=headers, timeout=self.timeout, verify=False) except Timeout as e: logger.warning('POST for %s timed out: %s', url, e) raise e if response.status_code == requests.codes['forbidden']: raise LoginException("forbidden status code received on %s %s %s", method, response.status_code, response.text) elif (response.status_code != requests.codes['ok']): self.access_token = None logger.error("raised exception, resetting token %s", response.status_code) # read content to let it know we are done with response.content return response
def transcribe(self, fp): data = fp.read() r = self._get_response(data) if r.status_code == requests.codes['unauthorized']: # Request token invalid, retry once with a new token self._logger.warning('OAuth access token invalid, generating a ' + 'new one and retrying...') self._token = None r = self._get_response(data) try: r.raise_for_status() except requests.exceptions.HTTPError: self._logger.critical('Request failed with response: %r', r.text, exc_info=True) return [] except requests.exceptions.RequestException: self._logger.critical('Request failed.', exc_info=True) return [] else: try: recognition = r.json()['Recognition'] if recognition['Status'] != 'OK': raise ValueError(recognition['Status']) results = [(x['Hypothesis'], x['Confidence']) for x in recognition['NBest']] except ValueError as e: self._logger.debug('Recognition failed with status: %s', e.args[0]) return [] except KeyError: self._logger.critical('Cannot parse response.', exc_info=True) return [] else: transcribed = [x[0].upper() for x in sorted(results, key=lambda x: x[1], reverse=True)] self._logger.info('Transcribed: %r', transcribed) return transcribed
def send_request(self, payload, **kwargs): # type: (dict, dict) -> dict kwargs.setdefault('headers', {}) for key, value in iteritems(self.DEFAULT_HEADERS): kwargs['headers'].setdefault(key, value) response = self._send_http_request( # Use a custom JSON encoder that knows how to convert Tryte values. payload = JsonEncoder().encode(payload), url = self.node_url, **kwargs ) return self._interpret_response(response, payload, {codes['ok']})
def login(api_url, username, password): """Login to tendrl server.""" post_data = {"username": username, "password": password} request = requests.post( "{}login".format(api_url), data=json.dumps(post_data)) if request.status_code == requests.codes["ok"]: return request.json() else: response = {"url": request.url, "data": post_data} MODULE.fail_json( msg="Could not login with these credentials.", meta=response)
def wasNotFound(self): return self.currentResponse is not None and self.currentResponse.status_code == requests.codes.not_found
def _feedparser_parse_with_options(self) -> Tuple[feedparser.FeedParserDict, "UpdateResult"]: """ Perform a feedparser parse, providing arguments (like etag) we might want it to use. Don't provide etag/last_modified if the last get was unsuccessful. """ if self.feed_state.last_modified is not None: last_mod = self.feed_state.last_modified.timetuple() else: last_mod = None # NOTE - this naming is a bit confusing here - parser is really a thing you call with # arguments to get a feedparser result. # Maybe better called parser-generator, or parse-performer or something? parsed = self.parser(self.url, self.feed_state.etag, last_mod) self.feed_state.etag = parsed.get("etag", self.feed_state.etag) self.feed_state.store_last_modified(parsed.get("modified_parsed", None)) # Detect bozo errors (malformed RSS/ATOM feeds). if "status" not in parsed and parsed.get("bozo", None) == 1: # NOTE: Feedparser documentation indicates that you can always call getMessage, but # it's possible for feedparser to spit out a URLError, which doesn't have getMessage. # Catch this case. if hasattr(parsed.bozo_exception, "getMessage()"): msg = parsed.bozo_exception.getMessage() else: msg = repr(parsed.bozo_exception) LOG.info(f"Unable to retrieve feed for {self.metadata['name']} from {self.url}.") LOG.debug(f"Update failed because bozo exception {msg} occurred.") return (None, UpdateResult.FAILURE) elif parsed.get("status") == requests.codes["NOT_MODIFIED"]: LOG.debug("No update to feed, nothing to do.") return (None, UpdateResult.UNNEEDED) else: return (parsed, UpdateResult.SUCCESS)
def request(self, method, url, data = {}, files = None): post_values = data post_values['csrfmiddlewaretoken'] = self.csrf_token try: if 'sessionid' not in self.session.cookies: self.login() except CookieConflictError as e: logger.warn("CookieConflictError %s. Relogging in" % e) self.login() sessionid = self.session.cookies['sessionid'] self.session.cookies.clear() self.session.cookies['csrftoken'] = self.csrf_token self.session.cookies['sessionid'] = sessionid headers = {"X-CSRFToken": self.csrf_token, "Referer": "%s://%s" % (self.server_protocol, self.server_host)} try: response = self.session.request(method, url, data=post_values, files=files, headers=headers, timeout=self.timeout, verify=False) except Timeout as e: logger.warning('%s for %s timed out: %s' % (method, url, e)) raise e #if (response.status_code == requests.codes['not_found'] or response.status_code == requests.codes['internal_server_error']): if response.status_code == requests.codes['forbidden']: raise LoginException("forbidden status code received on %s %s %s" % (method, response.status_code, response.text)) elif (response.status_code != requests.codes['ok']): self.csrf_token = None logger.error("raised exception, resetting token %s" % response.status_code) #raise Exception("invalid status code received on %s %s %s" % (method, response.status_code, response.text)) # read content to let it know we are done with response.content return response
def transcribe(self, fp): """ Performs STT via the Google Speech API, transcribing an audio file and returning an English string. Arguments: audio_file_path -- the path to the .wav file to be transcribed """ if not self.api_key: self._logger.critical('API key missing, transcription request ' + 'aborted.') return [] elif not self.language: self._logger.critical('Language info missing, transcription ' + 'request aborted.') return [] wav = wave.open(fp, 'rb') frame_rate = wav.getframerate() wav.close() data = fp.read() headers = {'content-type': 'audio/l16; rate=%s' % frame_rate} r = self._http.post(self.request_url, data=data, headers=headers) try: r.raise_for_status() except requests.exceptions.HTTPError: self._logger.critical('Request failed with http status %d', r.status_code) if r.status_code == requests.codes['forbidden']: self._logger.warning('Status 403 is probably caused by an ' + 'invalid Google API key.') return [] r.encoding = 'utf-8' try: # We cannot simply use r.json() because Google sends invalid json # (i.e. multiple json objects, seperated by newlines. We only want # the last one). response = json.loads(list(r.text.strip().split('\n', 1))[-1]) if len(response['result']) == 0: # Response result is empty raise ValueError('Nothing has been transcribed.') results = [alt['transcript'] for alt in response['result'][0]['alternative']] except ValueError as e: self._logger.warning('Empty response: %s', e.args[0]) results = [] except (KeyError, IndexError): self._logger.warning('Cannot parse response.', exc_info=True) results = [] else: # Convert all results to uppercase results = tuple(result.upper() for result in results) self._logger.info('Transcribed: %r', results) return results
def transcribe(self, fp): """ Performs STT via the Watson Speech-to-Text API, transcribing an audio file and returning an English string. Arguments: fp -- the path to the .wav file to be transcribed """ if not self.username: self._logger.critical('Username missing, transcription request aborted.') return [] elif not self.password: self._logger.critical('Password missing, transcription request aborted.') return [] wav = wave.open(fp, 'rb') frame_rate = wav.getframerate() wav.close() data = fp.read() headers = { 'content-type': 'audio/l16; rate={0}; channels=1'.format(frame_rate) } r = self._http.post( 'https://stream.watsonplatform.net/speech-to-text/api/v1/recognize?continuous=true', data=data, headers=headers, auth=(self.username, self.password) ) try: r.raise_for_status() except requests.exceptions.HTTPError as e: self._logger.critical('Request failed with http status %d', r.status_code) if r.status_code == requests.codes['forbidden']: self._logger.warning('Status 403 is probably caused by invalid credentials.') return [] r.encoding = 'utf-8' results = [] try: response = r.json() if not response['results']: raise ValueError('Nothing has been transcribed.') results = tuple([alt['transcript'].strip().upper() for alt in response['results'][0]['alternatives']]) self._logger.info('Transcribed: %r', results) except ValueError as e: self._logger.critical('Empty response: %s', e.args[0]) except (KeyError, IndexError): self._logger.critical('Cannot parse response.', exc_info=True) return results
def get_feed(self, attempt_count: int=0) -> "UpdateResult": """Get RSS structure for this subscription. Return status code indicating result.""" res = None if attempt_count > MAX_RECURSIVE_ATTEMPTS: LOG.debug(f"Too many recursive attempts ({attempt_count}) to get feed for sub" f"{self.metadata['name']}, canceling.") res = UpdateResult.FAILURE elif self.url is None or self.url == "": LOG.debug(f"URL {self.url} is empty , cannot get feed for sub " + f"{self.metadata['name']}.") res = UpdateResult.FAILURE if res is not None: return res else: LOG.info(f"Getting entries (attempt {attempt_count}) for {self.metadata['name']} " + f"from {self.url}.") (parsed, code) = self._feedparser_parse_with_options() if code == UpdateResult.UNNEEDED: LOG.info("We have the latest feed, nothing to do.") return code elif code != UpdateResult.SUCCESS: LOG.info(f"Feedparser parse failed ({code}), aborting.") return code LOG.debug("Feedparser parse succeeded.") # Detect some kinds of HTTP status codes signaling failure. code = self._handle_http_codes(parsed) if code == UpdateResult.ATTEMPT_AGAIN: LOG.debug("Transient HTTP error, attempting again.") temp = self.temp_url code = self.get_feed(attempt_count=attempt_count + 1) if temp is not None: self.url = temp elif code != UpdateResult.SUCCESS: LOG.debug(f"Ran into HTTP error ({code}), aborting.") else: self.feed_state.load_rss_info(parsed) return code
def _handle_http_codes(self, parsed: feedparser.FeedParserDict) -> "UpdateResult": """ Given feedparser parse result, determine if parse succeeded, and what to do about that. """ # Feedparser gives no status if you feedparse a local file. if "status" not in parsed: LOG.debug("Saw status 200 - OK, all is well.") return UpdateResult.SUCCESS status = parsed.get("status", 200) result = UpdateResult.SUCCESS if status == requests.codes["NOT_FOUND"]: LOG.error(f"Saw status {status}, unable to retrieve feed text for " f"{self.metadata['name']}." f"\nStored URL {self.url} for {self.metadata['name']} will be preserved" f"and checked again on next attempt." ) result = UpdateResult.FAILURE elif status in [requests.codes["UNAUTHORIZED"], requests.codes["GONE"]]: LOG.error(f"Saw status {status}, unable to retrieve feed text for " f"{self.metadata['name']}." f"\nClearing stored URL {self.url} for {self.metadata['name']}." f"\nPlease provide new URL and authorization for subscription " f"{self.metadata['name']}." ) self.url = None result = UpdateResult.FAILURE # Handle redirecting errors elif status in [requests.codes["MOVED_PERMANENTLY"], requests.codes["PERMANENT_REDIRECT"]]: LOG.warning(f"Saw status {status} indicating permanent URL change." f"\nChanging stored URL {self.url} for {self.metadata['name']} to " f"{parsed.get('href')} and attempting get with new URL." ) self.url = parsed.get("href") result = UpdateResult.ATTEMPT_AGAIN elif status in [requests.codes["FOUND"], requests.codes["SEE_OTHER"], requests.codes["TEMPORARY_REDIRECT"]]: LOG.warning(f"Saw status {status} indicating temporary URL change." f"\nAttempting with new URL {parsed.get('href')}." f"\nStored URL {self.url} for {self.metadata['name']} will be unchanged.") self.temp_url = self.url self.url = parsed.get("href") result = UpdateResult.ATTEMPT_AGAIN elif status != 200: LOG.warning(f"Saw '{status}'. Retrying retrieve for {self.metadata['name']} " + f"at {self.url}.") result = UpdateResult.ATTEMPT_AGAIN else: LOG.debug("Saw status 200. Success!") return result