Python requests 模块,codes() 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用requests.codes()

项目:etcd3-gateway    作者:dims    | 项目源码 | 文件源码
def post(self, *args, **kwargs):
        """helper method for HTTP POST

        :param args:
        :param kwargs:
        :return: json response
        """
        try:
            resp = self.session.post(*args, **kwargs)
            if resp.status_code in _EXCEPTIONS_BY_CODE:
                raise _EXCEPTIONS_BY_CODE[resp.status_code](resp.reason)
            if resp.status_code != requests.codes['ok']:
                raise exceptions.Etcd3Exception(resp.reason)
        except requests.exceptions.Timeout as ex:
            raise exceptions.ConnectionTimeoutError(six.text_type(ex))
        except requests.exceptions.ConnectionError as ex:
            raise exceptions.ConnectionFailedError(six.text_type(ex))
        return resp.json()
项目:python-trustpilot    作者:trustpilot    | 项目源码 | 文件源码
def request_new_access_token(session):
    url = "{token_issuer_host}/v1/{token_issuer_path}".format(
        token_issuer_host=session.token_issuer_host,
        token_issuer_path=session.token_issuer_path
    )
    headers = {
        "Authorization": "Basic {}".format(base64.b64encode(
            (session.api_key + ":" + session.api_secret
             ).encode("ascii")).decode("ascii")
        ), "X-Access": "{}".format(
            base64.b64encode('{"type":"Full"}'.encode("ascii")
                             ).decode("ascii")
        )
    }

    data = {"grant_type": "client_credentials"}
    response = requests.post(url=url, headers=headers, data=data)
    if response and response.status_code == requests.codes["ok"]:
        response_json = response.json()
        session.access_token = response_json["access_token"]
        return session.access_token
项目:itp-energy-forecast-with-enertiv    作者:821760408-sp    | 项目源码 | 文件源码
def request(self, method, url, params = {}, data = {}, files = None):
    post_values = data
    headers = {"Authorization": "Bearer %s" % self.access_token}
    try:
      response = self.session.request(method, url, params=params, data=post_values, 
                                      files=files, headers=headers, 
                                      timeout=self.timeout, verify=False)
    except Timeout as e:
      logger.warning('POST for %s timed out: %s', url, e)
      raise e

    if response.status_code == requests.codes['forbidden']:
      raise LoginException("forbidden status code received on %s %s %s", method, response.status_code, response.text)
    elif (response.status_code != requests.codes['ok']):
      self.access_token = None
      logger.error("raised exception, resetting token %s", response.status_code)
    # read content to let it know we are done with
    response.content
    return response
项目:jessy    作者:jessy-project    | 项目源码 | 文件源码
def transcribe(self, fp):
        data = fp.read()
        r = self._get_response(data)
        if r.status_code == requests.codes['unauthorized']:
            # Request token invalid, retry once with a new token
            self._logger.warning('OAuth access token invalid, generating a ' +
                                 'new one and retrying...')
            self._token = None
            r = self._get_response(data)
        try:
            r.raise_for_status()
        except requests.exceptions.HTTPError:
            self._logger.critical('Request failed with response: %r',
                                  r.text,
                                  exc_info=True)
            return []
        except requests.exceptions.RequestException:
            self._logger.critical('Request failed.', exc_info=True)
            return []
        else:
            try:
                recognition = r.json()['Recognition']
                if recognition['Status'] != 'OK':
                    raise ValueError(recognition['Status'])
                results = [(x['Hypothesis'], x['Confidence'])
                           for x in recognition['NBest']]
            except ValueError as e:
                self._logger.debug('Recognition failed with status: %s',
                                   e.args[0])
                return []
            except KeyError:
                self._logger.critical('Cannot parse response.',
                                      exc_info=True)
                return []
            else:
                transcribed = [x[0].upper() for x in sorted(results,
                                                            key=lambda x: x[1],
                                                            reverse=True)]
                self._logger.info('Transcribed: %r', transcribed)
                return transcribed
项目:iota.lib.py    作者:iotaledger    | 项目源码 | 文件源码
def send_request(self, payload, **kwargs):
    # type: (dict, dict) -> dict
    kwargs.setdefault('headers', {})
    for key, value in iteritems(self.DEFAULT_HEADERS):
      kwargs['headers'].setdefault(key, value)

    response = self._send_http_request(
      # Use a custom JSON encoder that knows how to convert Tryte values.
      payload = JsonEncoder().encode(payload),

      url = self.node_url,
      **kwargs
    )

    return self._interpret_response(response, payload, {codes['ok']})
项目:j2f    作者:jasper2fork    | 项目源码 | 文件源码
def transcribe(self, fp):
        data = fp.read()
        r = self._get_response(data)
        if r.status_code == requests.codes['unauthorized']:
            # Request token invalid, retry once with a new token
            self._logger.warning('OAuth access token invalid, generating a ' +
                                 'new one and retrying...')
            self._token = None
            r = self._get_response(data)
        try:
            r.raise_for_status()
        except requests.exceptions.HTTPError:
            self._logger.critical('Request failed with response: %r',
                                  r.text,
                                  exc_info=True)
            return []
        except requests.exceptions.RequestException:
            self._logger.critical('Request failed.', exc_info=True)
            return []
        else:
            try:
                recognition = r.json()['Recognition']
                if recognition['Status'] != 'OK':
                    raise ValueError(recognition['Status'])
                results = [(x['Hypothesis'], x['Confidence'])
                           for x in recognition['NBest']]
            except ValueError as e:
                self._logger.debug('Recognition failed with status: %s',
                                   e.args[0])
                return []
            except KeyError:
                self._logger.critical('Cannot parse response.',
                                      exc_info=True)
                return []
            else:
                transcribed = [x[0].upper() for x in sorted(results,
                                                            key=lambda x: x[1],
                                                            reverse=True)]
                self._logger.info('Transcribed: %r', transcribed)
                return transcribed
项目:usmqe-setup    作者:usmqe    | 项目源码 | 文件源码
def login(api_url, username, password):
    """Login to tendrl server."""

    post_data = {"username": username, "password": password}
    request = requests.post(
        "{}login".format(api_url),
        data=json.dumps(post_data))
    if request.status_code == requests.codes["ok"]:
        return request.json()
    else:
        response = {"url": request.url, "data": post_data}
        MODULE.fail_json(
            msg="Could not login with these credentials.",
            meta=response)
项目:plex-for-kodi-mod    作者:mrclemds    | 项目源码 | 文件源码
def wasNotFound(self):
        return self.currentResponse is not None and self.currentResponse.status_code == requests.codes.not_found
项目:puckfetcher    作者:andrewmichaud    | 项目源码 | 文件源码
def _feedparser_parse_with_options(self) -> Tuple[feedparser.FeedParserDict, "UpdateResult"]:
        """
        Perform a feedparser parse, providing arguments (like etag) we might want it to use.
        Don't provide etag/last_modified if the last get was unsuccessful.
        """
        if self.feed_state.last_modified is not None:
            last_mod = self.feed_state.last_modified.timetuple()
        else:
            last_mod = None

        # NOTE - this naming is a bit confusing here - parser is really a thing you call with
        # arguments to get a feedparser result.
        # Maybe better called parser-generator, or parse-performer or something?
        parsed = self.parser(self.url, self.feed_state.etag, last_mod)

        self.feed_state.etag = parsed.get("etag", self.feed_state.etag)
        self.feed_state.store_last_modified(parsed.get("modified_parsed", None))

        # Detect bozo errors (malformed RSS/ATOM feeds).
        if "status" not in parsed and parsed.get("bozo", None) == 1:
            # NOTE: Feedparser documentation indicates that you can always call getMessage, but
            # it's possible for feedparser to spit out a URLError, which doesn't have getMessage.
            # Catch this case.
            if hasattr(parsed.bozo_exception, "getMessage()"):
                msg = parsed.bozo_exception.getMessage()

            else:
                msg = repr(parsed.bozo_exception)

            LOG.info(f"Unable to retrieve feed for {self.metadata['name']} from {self.url}.")
            LOG.debug(f"Update failed because bozo exception {msg} occurred.")
            return (None, UpdateResult.FAILURE)

        elif parsed.get("status") == requests.codes["NOT_MODIFIED"]:
            LOG.debug("No update to feed, nothing to do.")
            return (None, UpdateResult.UNNEEDED)

        else:
            return (parsed, UpdateResult.SUCCESS)
项目:itp-energy-forecast-with-enertiv    作者:821760408-sp    | 项目源码 | 文件源码
def request(self, method, url, data = {}, files = None):
    post_values = data
    post_values['csrfmiddlewaretoken'] = self.csrf_token
    try:
      if 'sessionid' not in self.session.cookies:
        self.login()
    except CookieConflictError as e:
      logger.warn("CookieConflictError %s. Relogging in" % e)
      self.login()
    sessionid = self.session.cookies['sessionid'] 
    self.session.cookies.clear()
    self.session.cookies['csrftoken'] = self.csrf_token
    self.session.cookies['sessionid'] = sessionid
    headers = {"X-CSRFToken": self.csrf_token, "Referer": "%s://%s" % (self.server_protocol, self.server_host)}
    try:
      response = self.session.request(method, url, data=post_values, 
                                      files=files, headers=headers, 
                                      timeout=self.timeout, verify=False)
    except Timeout as e:
      logger.warning('%s for %s timed out: %s' % (method, url, e))
      raise e

    #if (response.status_code == requests.codes['not_found'] or response.status_code == requests.codes['internal_server_error']):
    if response.status_code == requests.codes['forbidden']:
      raise LoginException("forbidden status code received on %s %s %s" % (method, response.status_code, response.text))
    elif (response.status_code != requests.codes['ok']):
      self.csrf_token = None
      logger.error("raised exception, resetting token %s" % response.status_code)
      #raise Exception("invalid status code received on %s %s %s" % (method, response.status_code, response.text))
    # read content to let it know we are done with
    response.content
    return response
项目:dingdang-robot    作者:wzpan    | 项目源码 | 文件源码
def transcribe(self, fp):
        """
        Performs STT via the Google Speech API, transcribing an audio file and
        returning an English string.

        Arguments:
        audio_file_path -- the path to the .wav file to be transcribed
        """

        if not self.api_key:
            self._logger.critical('API key missing, transcription request ' +
                                  'aborted.')
            return []
        elif not self.language:
            self._logger.critical('Language info missing, transcription ' +
                                  'request aborted.')
            return []

        wav = wave.open(fp, 'rb')
        frame_rate = wav.getframerate()
        wav.close()
        data = fp.read()

        headers = {'content-type': 'audio/l16; rate=%s' % frame_rate}
        r = self._http.post(self.request_url, data=data, headers=headers)
        try:
            r.raise_for_status()
        except requests.exceptions.HTTPError:
            self._logger.critical('Request failed with http status %d',
                                  r.status_code)
            if r.status_code == requests.codes['forbidden']:
                self._logger.warning('Status 403 is probably caused by an ' +
                                     'invalid Google API key.')
            return []
        r.encoding = 'utf-8'
        try:
            # We cannot simply use r.json() because Google sends invalid json
            # (i.e. multiple json objects, seperated by newlines. We only want
            # the last one).
            response = json.loads(list(r.text.strip().split('\n', 1))[-1])
            if len(response['result']) == 0:
                # Response result is empty
                raise ValueError('Nothing has been transcribed.')
            results = [alt['transcript'] for alt
                       in response['result'][0]['alternative']]
        except ValueError as e:
            self._logger.warning('Empty response: %s', e.args[0])
            results = []
        except (KeyError, IndexError):
            self._logger.warning('Cannot parse response.', exc_info=True)
            results = []
        else:
            # Convert all results to uppercase
            results = tuple(result.upper() for result in results)
            self._logger.info('Transcribed: %r', results)
        return results
项目:jessy    作者:jessy-project    | 项目源码 | 文件源码
def transcribe(self, fp):
        """
        Performs STT via the Watson Speech-to-Text API, transcribing an audio
        file and returning an English string.

        Arguments:
        fp -- the path to the .wav file to be transcribed
        """

        if not self.username:
            self._logger.critical('Username missing, transcription request aborted.')
            return []
        elif not self.password:
            self._logger.critical('Password missing, transcription request aborted.')
            return []

        wav = wave.open(fp, 'rb')
        frame_rate = wav.getframerate()
        wav.close()
        data = fp.read()

        headers = {
            'content-type': 'audio/l16; rate={0}; channels=1'.format(frame_rate)
        }
        r = self._http.post(
            'https://stream.watsonplatform.net/speech-to-text/api/v1/recognize?continuous=true',
            data=data, headers=headers, auth=(self.username, self.password)
        )
        try:
            r.raise_for_status()
        except requests.exceptions.HTTPError as e:
            self._logger.critical('Request failed with http status %d',
                                  r.status_code)
            if r.status_code == requests.codes['forbidden']:
                self._logger.warning('Status 403 is probably caused by invalid credentials.')
            return []
        r.encoding = 'utf-8'
        results = []
        try:
            response = r.json()
            if not response['results']:
                raise ValueError('Nothing has been transcribed.')
            results = tuple([alt['transcript'].strip().upper() for alt in response['results'][0]['alternatives']])
            self._logger.info('Transcribed: %r', results)
        except ValueError as e:
            self._logger.critical('Empty response: %s', e.args[0])
        except (KeyError, IndexError):
            self._logger.critical('Cannot parse response.', exc_info=True)

        return results
项目:jessy    作者:jessy-project    | 项目源码 | 文件源码
def transcribe(self, fp):
        """
        Performs STT via the Google Speech API, transcribing an audio file and
        returning an English string.

        Arguments:
        audio_file_path -- the path to the .wav file to be transcribed
        """

        if not self.api_key:
            self._logger.critical('API key missing, transcription request ' +
                                  'aborted.')
            return []
        elif not self.language:
            self._logger.critical('Language info missing, transcription ' +
                                  'request aborted.')
            return []

        wav = wave.open(fp, 'rb')
        frame_rate = wav.getframerate()
        wav.close()
        data = fp.read()

        headers = {'content-type': 'audio/l16; rate=%s' % frame_rate}
        r = self._http.post(self.request_url, data=data, headers=headers)
        try:
            r.raise_for_status()
        except requests.exceptions.HTTPError:
            self._logger.critical('Request failed with http status %d',
                                  r.status_code)
            if r.status_code == requests.codes['forbidden']:
                self._logger.warning('Status 403 is probably caused by an ' +
                                     'invalid Google API key.')
            return []
        r.encoding = 'utf-8'
        try:
            # We cannot simply use r.json() because Google sends invalid json
            # (i.e. multiple json objects, seperated by newlines. We only want
            # the last one).
            response = json.loads(list(r.text.strip().split('\n', 1))[-1])
            if len(response['result']) == 0:
                # Response result is empty
                raise ValueError('Nothing has been transcribed.')
            results = [alt['transcript'] for alt
                       in response['result'][0]['alternative']]
        except ValueError as e:
            self._logger.warning('Empty response: %s', e.args[0])
            results = []
        except (KeyError, IndexError):
            self._logger.warning('Cannot parse response.', exc_info=True)
            results = []
        else:
            # Convert all results to uppercase
            results = tuple(result.upper() for result in results)
            self._logger.info('Transcribed: %r', results)
        return results
项目:j2f    作者:jasper2fork    | 项目源码 | 文件源码
def transcribe(self, fp):
        """
        Performs STT via the Google Speech API, transcribing an audio file and
        returning an English string.

        Arguments:
        audio_file_path -- the path to the .wav file to be transcribed
        """

        if not self.api_key:
            self._logger.critical('API key missing, transcription request ' +
                                  'aborted.')
            return []
        elif not self.language:
            self._logger.critical('Language info missing, transcription ' +
                                  'request aborted.')
            return []

        wav = wave.open(fp, 'rb')
        frame_rate = wav.getframerate()
        wav.close()
        data = fp.read()

        headers = {'content-type': 'audio/l16; rate=%s' % frame_rate}
        r = self._http.post(self.request_url, data=data, headers=headers)
        try:
            r.raise_for_status()
        except requests.exceptions.HTTPError:
            self._logger.critical('Request failed with http status %d',
                                  r.status_code)
            if r.status_code == requests.codes['forbidden']:
                self._logger.warning('Status 403 is probably caused by an ' +
                                     'invalid Google API key.')
            return []
        r.encoding = 'utf-8'
        try:
            # We cannot simply use r.json() because Google sends invalid json
            # (i.e. multiple json objects, seperated by newlines. We only want
            # the last one).
            response = json.loads(list(r.text.strip().split('\n', 1))[-1])
            if len(response['result']) == 0:
                # Response result is empty
                raise ValueError('Nothing has been transcribed.')
            results = [alt['transcript'] for alt
                       in response['result'][0]['alternative']]
        except ValueError as e:
            self._logger.warning('Empty response: %s', e.args[0])
            results = []
        except (KeyError, IndexError):
            self._logger.warning('Cannot parse response.', exc_info=True)
            results = []
        else:
            # Convert all results to uppercase
            results = tuple(result.upper() for result in results)
            self._logger.info('Transcribed: %r', results)
        return results
项目:puckfetcher    作者:andrewmichaud    | 项目源码 | 文件源码
def get_feed(self, attempt_count: int=0) -> "UpdateResult":
        """Get RSS structure for this subscription. Return status code indicating result."""
        res = None
        if attempt_count > MAX_RECURSIVE_ATTEMPTS:
            LOG.debug(f"Too many recursive attempts ({attempt_count}) to get feed for sub"
                      f"{self.metadata['name']}, canceling.")
            res = UpdateResult.FAILURE

        elif self.url is None or self.url == "":
            LOG.debug(f"URL {self.url} is empty , cannot get feed for sub " +
                      f"{self.metadata['name']}.")
            res = UpdateResult.FAILURE

        if res is not None:
            return res

        else:
            LOG.info(f"Getting entries (attempt {attempt_count}) for {self.metadata['name']} " +
                     f"from {self.url}.")

        (parsed, code) = self._feedparser_parse_with_options()
        if code == UpdateResult.UNNEEDED:
            LOG.info("We have the latest feed, nothing to do.")
            return code

        elif code != UpdateResult.SUCCESS:
            LOG.info(f"Feedparser parse failed ({code}), aborting.")
            return code

        LOG.debug("Feedparser parse succeeded.")

        # Detect some kinds of HTTP status codes signaling failure.
        code = self._handle_http_codes(parsed)
        if code == UpdateResult.ATTEMPT_AGAIN:
            LOG.debug("Transient HTTP error, attempting again.")
            temp = self.temp_url
            code = self.get_feed(attempt_count=attempt_count + 1)
            if temp is not None:
                self.url = temp

        elif code != UpdateResult.SUCCESS:
            LOG.debug(f"Ran into HTTP error ({code}), aborting.")

        else:
            self.feed_state.load_rss_info(parsed)

        return code
项目:puckfetcher    作者:andrewmichaud    | 项目源码 | 文件源码
def _handle_http_codes(self, parsed: feedparser.FeedParserDict) -> "UpdateResult":
        """
        Given feedparser parse result, determine if parse succeeded, and what to do about that.
        """
        # Feedparser gives no status if you feedparse a local file.
        if "status" not in parsed:
            LOG.debug("Saw status 200 - OK, all is well.")
            return UpdateResult.SUCCESS

        status = parsed.get("status", 200)
        result = UpdateResult.SUCCESS
        if status == requests.codes["NOT_FOUND"]:
            LOG.error(f"Saw status {status}, unable to retrieve feed text for "
                      f"{self.metadata['name']}."
                      f"\nStored URL {self.url} for {self.metadata['name']} will be preserved"
                      f"and checked again on next attempt."
                     )

            result = UpdateResult.FAILURE

        elif status in [requests.codes["UNAUTHORIZED"], requests.codes["GONE"]]:
            LOG.error(f"Saw status {status}, unable to retrieve feed text for "
                      f"{self.metadata['name']}."
                      f"\nClearing stored URL {self.url} for {self.metadata['name']}."
                      f"\nPlease provide new URL and authorization for subscription "
                      f"{self.metadata['name']}."
                     )

            self.url = None
            result = UpdateResult.FAILURE

        # Handle redirecting errors
        elif status in [requests.codes["MOVED_PERMANENTLY"], requests.codes["PERMANENT_REDIRECT"]]:
            LOG.warning(f"Saw status {status} indicating permanent URL change."
                        f"\nChanging stored URL {self.url} for {self.metadata['name']} to "
                        f"{parsed.get('href')} and attempting get with new URL."
                       )

            self.url = parsed.get("href")
            result = UpdateResult.ATTEMPT_AGAIN

        elif status in [requests.codes["FOUND"], requests.codes["SEE_OTHER"],
                        requests.codes["TEMPORARY_REDIRECT"]]:
            LOG.warning(f"Saw status {status} indicating temporary URL change."
                        f"\nAttempting with new URL {parsed.get('href')}."
                        f"\nStored URL {self.url} for {self.metadata['name']} will be unchanged.")

            self.temp_url = self.url
            self.url = parsed.get("href")
            result = UpdateResult.ATTEMPT_AGAIN

        elif status != 200:
            LOG.warning(f"Saw '{status}'. Retrying retrieve for {self.metadata['name']} " +
                        f"at {self.url}.")
            result = UpdateResult.ATTEMPT_AGAIN

        else:
            LOG.debug("Saw status 200. Success!")

        return result