Python future.utils 模块,bytes_to_native_str() 实例源码

我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用future.utils.bytes_to_native_str()

项目:mobot    作者:JokerQyou    | 项目源码 | 文件源码
def do_POST(self):
        self.logger.debug('Webhook triggered')
        try:
            self._validate_post()
            clen = self._get_content_len()
        except _InvalidPost as e:
            self.send_error(e.http_code)
            self.end_headers()
        else:
            buf = self.rfile.read(clen)
            json_string = bytes_to_native_str(buf)

            self.send_response(200)
            self.end_headers()

            self.logger.debug('Webhook received data: ' + json_string)

            update = Update.de_json(json.loads(json_string), self.server.bot)

            self.logger.debug('Received Update with ID %d on Webhook' % update.update_id)
            self.server.update_queue.put(update)
项目:deluge-telegramer    作者:noam09    | 项目源码 | 文件源码
def do_POST(self):
        self.logger.debug('Webhook triggered')
        try:
            self._validate_post()
            clen = self._get_content_len()
        except _InvalidPost as e:
            self.send_error(e.http_code)
            self.end_headers()
        else:
            buf = self.rfile.read(clen)
            json_string = bytes_to_native_str(buf)

            self.send_response(200)
            self.end_headers()

            self.logger.debug('Webhook received data: ' + json_string)

            update = Update.de_json(json.loads(json_string), self.server.bot)

            self.logger.debug('Received Update with ID %d on Webhook' % update.update_id)
            self.server.update_queue.put(update)
项目:idealoom    作者:conversence    | 项目源码 | 文件源码
def hash_password(password, encoding=HashEncoding.BINARY, salt_size=SALT_SIZE):
    """
    Returns a hashed password.
    """
    salt = urandom(salt_size)
    hasher = hashlib.new(config.get('security.hash_algorithm') or 'sha256')
    if isinstance(password, text_type):
        password = password.encode('utf-8')
    hasher.update(password)
    hasher.update(salt)
    if encoding == HashEncoding.BINARY:
        return salt + hasher.digest()
    elif encoding == HashEncoding.HEX:
        return bytes_to_native_str(hexlify(salt)) + hasher.hexdigest()
    elif encoding == HashEncoding.BASE64:
        return bytes_to_native_str(
            urlsafe_b64encode(salt) + urlsafe_b64encode(hasher.digest()))
    raise ValueError()
项目:mobot    作者:JokerQyou    | 项目源码 | 文件源码
def n(b):

        def e(cls):
            warnings.warn("telegram.Emoji is being deprecated, please see https://git.io/v6DeB")
            return bytes_to_native_str(b)

        return property(e)
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_str_encode_decode_with_py2_str_arg(self):
        # Try passing a standard Py2 string (as if unicode_literals weren't imported)
        b = str(TEST_UNICODE_STR).encode(utils.bytes_to_native_str(b'utf-8'))
        self.assertTrue(isinstance(b, bytes))
        self.assertFalse(isinstance(b, str))
        s = b.decode(utils.bytes_to_native_str(b'utf-8'))
        self.assertTrue(isinstance(s, str))
        self.assertEqual(s, TEST_UNICODE_STR)
项目:deluge-telegramer    作者:noam09    | 项目源码 | 文件源码
def bytes_to_native_str(b, encoding=None):
    return native(b)
项目:nelson    作者:udacity    | 项目源码 | 文件源码
def login_for_jwt(self):
    try:
      session = requests.Session()
      session.headers.update({'content-type':'application/json', 'accept': 'application/json'})

      password_prompt = bytes_to_native_str(b"Password :")

      if self.id_provider == 'udacity':
        print("Udacity Login required.")
        email = input('Email :')
        password = getpass.getpass(password_prompt)
        udacity_login(session, self.root_url, email, password)
      elif self.id_provider == 'gt':
        print("GT Login required.")
        username = input('Username :')
        password = getpass.getpass(password_prompt)
        gt_login(session, self.root_url, username, password)
      elif self.id_provider == 'developer':
        print("Developer Login required.")
        username = input('Username :')
        developer_login(session, self.root_url, username)
    except requests.exceptions.HTTPError as e:
      if e.response.status_code == 403:
        raise NelsonAuthenticationError("Authentication failed")
      else:
        raise e

    r = session.post(self.root_url + '/auth_tokens')
    r.raise_for_status()

    jwt = r.json()['auth_token']

    return jwt

#Helper functions for logins
项目:idealoom    作者:conversence    | 项目源码 | 文件源码
def cleanup_x509_text(txt):
    kts = txt.split(b'\n')
    kt = [b'  ' + x for x in kts if len(x) and
          not (x.startswith(b'----') and x.endswith(b'----'))]
    return bytes_to_native_str(b'  ' + b'\n  '.join(kt) + b'\n')
项目:idealoom    作者:conversence    | 项目源码 | 文件源码
def populate_random(random_file, random_templates=None, saml_info=None):
    """Populate random.ini

    Create missing random values according to the template
    Do not change existing values"""
    from base64 import b64encode
    from os import urandom
    from assembl.auth.make_saml import (
        make_saml_key, make_saml_cert, cleanup_x509_text)
    base = ConfigParser(interpolation=None)
    assert random_templates, "Please give one or more templates"
    for template in random_templates:
        assert exists(template), "Cannot find template " + template
        base.read(template)
    existing = ConfigParser(interpolation=None)
    if exists(random_file):
        existing.read(random_file)
    combine_ini(base, existing)
    saml_keys = {}
    changed = False

    for section in base.sections():
        for key, value in base.items(section):
            keyu = key.upper()
            # too much knowdledge, but hard to avoid
            if "SAML" in keyu and keyu.endswith("_PRIVATE_KEY"):
                prefix = keyu[:-12]
                if value == "{saml_key}":
                    saml_key_text, saml_key = make_saml_key()
                    saml_key_text = cleanup_x509_text(saml_key_text)
                    base.set(section, key, saml_key_text)
                    saml_keys[prefix] = saml_key
                    changed = True
                else:
                    saml_keys[prefix] = value
            elif value.startswith('{random') and value.endswith("}"):
                size = int(value[7:-1])
                assert 0 < size < 100
                value = bytes_to_native_str(b64encode(urandom(size)))
                base.set(section, key, value)
                changed = True

    # Do certs in second pass, to be sure keys are set
    for section in base.sections():
        for key, value in base.items(section):
            keyu = key.upper()
            if ("SAML" in keyu and keyu.endswith("_PUBLIC_CERT") and
                    value == '{saml_cert}'):
                assert saml_info
                prefix = keyu[:-12]
                # If key is not there, it IS a mismatch and and error.
                saml_key = saml_keys[prefix]
                saml_cert_text, _ = make_saml_cert(saml_key, **saml_info)
                saml_cert_text = cleanup_x509_text(saml_cert_text)
                base.set(section, key, saml_cert_text)
                changed = True
    if changed:
        with open(random_file, 'w') as f:
            base.write(f)
    return base