Python base64 模块,urlsafe_b64encode() 实例源码


def produce(obj, pb, sep):
    for ds, val in pb.ListFields():
        for val in (val if ds.label == ds.LABEL_REPEATED else [val]):

            if ds.cpp_type == ds.CPPTYPE_MESSAGE:
                origlen = len(obj)
                produce(obj, val, sep)
                obj.insert(origlen, '%dm%d' % (ds.number, len(obj) - origlen))

            elif ds.type == ds.TYPE_STRING:
                if sep == '!':
                    val = val.replace('*', '*2A').replace('!', '*21')
                    val = quote(val, safe='~()*!.\'')

            elif ds.type == ds.TYPE_BYTES:
                val = urlsafe_b64encode(val).decode('ascii').strip('=')

            elif ds.type == ds.TYPE_BOOL:
                val = int(val)

            obj.append('%d%s%s' % (ds.number, types_enc[ds.type], val))

    return obj
def generate_token(key, user_id, action_id='', when=None):
    """Generates a URL-safe token for the given user, action, time tuple.

        key: secret key to use.
        user_id: the user ID of the authenticated user.
        action_id: a string identifier of the action they requested
                   authorization for.
        when: the time in seconds since the epoch at which the user was
              authorized for this action. If not set the current time is used.

        A string XSRF protection token.
    digester =, encoding='utf-8'))
    digester.update(_to_bytes(str(user_id), encoding='utf-8'))
    digester.update(_to_bytes(action_id, encoding='utf-8'))
    when = _to_bytes(str(when or int(time.time())), encoding='utf-8')
    digest = digester.digest()

    token = base64.urlsafe_b64encode(digest + DELIMITER + when)
    return token
def ssrlink(self, user, encode, muid):
        protocol = user.get('protocol', '')
        obfs = user.get('obfs', '')
        protocol = protocol.replace("_compatible", "")
        obfs = obfs.replace("_compatible", "")
        protocol_param = ''
        if muid is not None:
            protocol_param_ = user.get('protocol_param', '')
            param = protocol_param_.split('#')
            if len(param) == 2:
                for row in
                    if int(row['port']) == muid:
                        param = str(muid) + ':' + row['passwd']
                        protocol_param = '/?protoparam=' + common.to_str(base64.urlsafe_b64encode(common.to_bytes(param))).replace("=", "")
        link = ("%s:%s:%s:%s:%s:%s" % (self.server_addr, user['port'], protocol, user['method'], obfs, common.to_str(base64.urlsafe_b64encode(common.to_bytes(user['passwd']))).replace("=", ""))) + protocol_param
        return "ssr://" + (encode and common.to_str(base64.urlsafe_b64encode(common.to_bytes(link))).replace("=", "") or link)
def CreateMessage(sender, to, subject, message_text):
    """Create a message for an email.

      sender: Email address of the sender.
      to: Email address of the receiver.
      subject: The subject of the email message.
      message_text: The text of the email message.

      An object containing a base64url encoded email object.
    message = MIMEText(message_text)
    message['to'] = to
    message['from'] = sender
    message['subject'] = subject
    return {'raw': base64.urlsafe_b64encode(message.as_string())}
def generate_token_string(self, action=None):
        """Generate a hash of the given token contents that can be verified.

        :param action:
            A string representing the action that the generated hash is valid
            for. This string is usually a URL.
            A string containing the hash contents of the given `action` and the
            contents of the `XSRFToken`. Can be verified with
            `verify_token_string`. The string is base64 encoded so it is safe
            to use in HTML forms without escaping.
        digest_maker = self._digest_maker()
        if action:

        return base64.urlsafe_b64encode(
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
def create_datakey(encryption_context, keyid, client=None):
    Create a datakey from KMS.
    if not client:
        client ='kms')
    # Fernet key; from spec and cryptography implementation, but using
    # random from KMS, rather than os.urandom:
    key = base64.urlsafe_b64encode(
    response = client.encrypt(

    return {'ciphertext': response['CiphertextBlob'],
            'plaintext': key}
def send_email(msg_subj, msg_txt, msg_rcpt, i=0, count=0):
  from email.mime.text import MIMEText
  userId, gmail = buildGAPIServiceObject(API.GMAIL, _getValueFromOAuth(u'email'))
  if not gmail:
  msg = MIMEText(msg_txt)
  msg[u'Subject'] = msg_subj
  msg[u'From'] = userId
  msg[u'To'] = msg_rcpt
  action = Act.Get()
    callGAPI(gmail.users().messages(), u'send',
             userId=userId, body={u'raw': base64.urlsafe_b64encode(msg.as_string())}, fields=u'')
    entityActionPerformed([Ent.RECIPIENT, msg_rcpt, Ent.MESSAGE, msg_subj], i, count)
  except googleapiclient.errors.HttpError as e:
    entityActionFailedWarning([Ent.RECIPIENT, msg_rcpt, Ent.MESSAGE, msg_subj], str(e), i, count)

# Write a CSV file
def code_verifier(n_bytes=64):
    Generates a 'code_verifier' as described in section 4.1 of RFC 7636.

    This is a 'high-entropy cryptographic random string' that will be
    impractical for an attacker to guess.

        n_bytes: integer between 31 and 96, inclusive. default: 64
            number of bytes of entropy to include in verifier.

        Bytestring, representing urlsafe base64-encoded random data.
    verifier = base64.urlsafe_b64encode(os.urandom(n_bytes)).rstrip(b'=')
    # minimum length of 43 characters and a maximum length of 128 characters.
    if len(verifier) < 43:
        raise ValueError("Verifier too short. n_bytes must be > 30.")
    elif len(verifier) > 128:
        raise ValueError("Verifier too long. n_bytes must be < 97.")
        return verifier
def code_challenge(verifier):
    Creates a 'code_challenge' as described in section 4.2 of RFC 7636
    by taking the sha256 hash of the verifier and then urlsafe
    base64-encoding it.

        verifier: bytestring, representing a code_verifier as generated by

        Bytestring, representing a urlsafe base64-encoded sha256 hash digest,
            without '=' padding.
    digest = hashlib.sha256(verifier).digest()
    return base64.urlsafe_b64encode(digest).rstrip(b'=')
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def generate_token(key, user_id, action_id='', when=None):
    """Generates a URL-safe token for the given user, action, time tuple.

        key: secret key to use.
        user_id: the user ID of the authenticated user.
        action_id: a string identifier of the action they requested
                   authorization for.
        when: the time in seconds since the epoch at which the user was
              authorized for this action. If not set the current time is used.

        A string XSRF protection token.
    digester =, encoding='utf-8'))
    digester.update(_helpers._to_bytes(str(user_id), encoding='utf-8'))
    digester.update(_helpers._to_bytes(action_id, encoding='utf-8'))
    when = _helpers._to_bytes(str(when or int(time.time())), encoding='utf-8')
    digest = digester.digest()

    token = base64.urlsafe_b64encode(digest + DELIMITER + when)
    return token
def client_encode(self, buf):
        if self.raw_trans_sent:
            return buf
        self.send_buffer += buf
        if not self.has_sent_header:
            port = b''
            if self.server_info.port != 80:
                port = b':' + common.to_bytes(str(self.server_info.port))
            self.has_sent_header = True
            http_head = b"GET / HTTP/1.1\r\n"
            http_head += b"Host: " + (self.server_info.param or + port + b"\r\n"
            http_head += b"Connection: Upgrade, HTTP2-Settings\r\nUpgrade: h2c\r\n"
            http_head += b"HTTP2-Settings: " + base64.urlsafe_b64encode(buf) + b"\r\n"
            return http_head + b"\r\n"
        if self.has_recv_header:
            ret = self.send_buffer
            self.send_buffer = b''
            self.raw_trans_sent = True
            return ret
        return b''
def send(To, Subject, Body, Cc=[], Bcc=[], html=False, files=[]):
    """Send an email
    subtype = 'html' if html else 'plain'
    message = MIMEMultipart()
    message['To'] = ', '.join(To)
    message['Subject'] = Subject
    message['Cc'] = ', '.join(Cc)
    message['Bcc'] = ', '.join(Bcc)

    message.attach(MIMEText(Body, subtype))

    for f in files:
        with open(f, "rb") as In:
            part = MIMEApplication(, Name=basename(f))
            part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f)

    message = {'raw': base64.urlsafe_b64encode(message.as_string())}

    credentials = oauth2client.file.Storage(CREDENTIALS_PATH).get()
    Http = credentials.authorize(httplib2.Http())
    service ='gmail', 'v1', http=Http)

    message = service.users().messages().send(userId='me', body=message).execute()
def encode(to_encode):
    Encode an arbitrary string as a base64 that is safe to put as a URL query value.

    urllib.quote and urllib.quote_plus do not have any effect on the
    result of querystringsafe_base64.encode.

    :param (str, bytes) to_encode:
    :rtype: str
    :return: a string that is safe to put as a value in an URL query
        string - like base64, except characters ['+', '/', '='] are
        replaced with ['-', '_', '.'] consequently
    encoded = urlsafe_b64encode(to_encode).replace(b'=', b'.')
    if PY2:
        return encoded
    return encoded.decode()
def _encrypt_from_parts(self, data, current_time, iv):
        if not isinstance(data, bytes):
            raise TypeError("data must be bytes.")

        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        encryptor = Cipher(
            algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        basic_parts = (
            b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext

        h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend)
        hmac = h.finalize()
        return base64.urlsafe_b64encode(basic_parts + hmac)
def create_message(sender, to, subject, message_text, style='html'):
    """Create a message for an email.

      sender: Email address of the sender.
      to: Email address of the receiver.
      subject: The subject of the email message.
      message_text: The text of the email message.

      An object containing a base64url encoded email object.
    if python_version == 2:
        message_text = unicode(message_text).encode('utf-8')
    message = MIMEText(message_text, _subtype=style)
    message['to'] = to
    message['from'] = sender
    message['subject'] = subject
    message = message.as_string()
    if python_version == 2:
        message = base64.urlsafe_b64encode(message)
        message = base64.urlsafe_b64encode(
    return {'raw': message}
def make_id():
    """Make uniq short id"""
    return urlsafe_b64encode(uuid4().bytes).rstrip(b'=')
def get_hash(self, data, hasher=None):
        Get the hash of some data, using a particular hash algorithm, if

        :param data: The data to be hashed.
        :type data: bytes
        :param hasher: The name of a hash implementation, supported by hashlib,
                       or ``None``. Examples of valid values are ``'sha1'``,
                       ``'sha224'``, ``'sha384'``, '``sha256'``, ``'md5'`` and
                       ``'sha512'``. If no hasher is specified, the ``hasher``
                       attribute of the :class:`InstalledDistribution` instance
                       is used. If the hasher is determined to be ``None``, MD5
                       is used as the hashing algorithm.
        :returns: The hash of the data. If a hasher was explicitly specified,
                  the returned hash will be prefixed with the specified hasher
                  followed by '='.
        :rtype: str
        if hasher is None:
            hasher = self.hasher
        if hasher is None:
            hasher = hashlib.md5
            prefix = ''
            hasher = getattr(hashlib, hasher)
            prefix = '%s=' % self.hasher
        digest = hasher(data).digest()
        digest = base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii')
        return '%s%s' % (prefix, digest)
def get_hash(self, data, hash_kind=None):
        if hash_kind is None:
            hash_kind = self.hash_kind
            hasher = getattr(hashlib, hash_kind)
        except AttributeError:
            raise DistlibException('Unsupported hash algorithm: %r' % hash_kind)
        result = hasher(data).digest()
        result = base64.urlsafe_b64encode(result).rstrip(b'=').decode('ascii')
        return hash_kind, result
def rehash(path, algo='sha256', blocksize=1 << 20):
    """Return (hash, length) for path using"""
    h =
    length = 0
    with open(path, 'rb') as f:
        for block in read_chunks(f, size=blocksize):
            length += len(block)
    digest = 'sha256=' + urlsafe_b64encode(
    return (digest, length)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def urlsafe_b64encode(data):
    """urlsafe_b64encode without padding"""
    return base64.urlsafe_b64encode(data).rstrip(binary('='))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def digest(self):
        if self.hashtype == 'md5':
            return self.hash.hexdigest()
        digest = self.hash.digest()
        return self.hashtype + '=' + native(urlsafe_b64encode(digest))
def get_hash(self, data, hasher=None):
        Get the hash of some data, using a particular hash algorithm, if

        :param data: The data to be hashed.
        :type data: bytes
        :param hasher: The name of a hash implementation, supported by hashlib,
                       or ``None``. Examples of valid values are ``'sha1'``,
                       ``'sha224'``, ``'sha384'``, '``sha256'``, ``'md5'`` and
                       ``'sha512'``. If no hasher is specified, the ``hasher``
                       attribute of the :class:`InstalledDistribution` instance
                       is used. If the hasher is determined to be ``None``, MD5
                       is used as the hashing algorithm.
        :returns: The hash of the data. If a hasher was explicitly specified,
                  the returned hash will be prefixed with the specified hasher
                  followed by '='.
        :rtype: str
        if hasher is None:
            hasher = self.hasher
        if hasher is None:
            hasher = hashlib.md5
            prefix = ''
            hasher = getattr(hashlib, hasher)
            prefix = '%s=' % self.hasher
        digest = hasher(data).digest()
        digest = base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii')
        return '%s%s' % (prefix, digest)
def get_hash(self, data, hash_kind=None):
        if hash_kind is None:
            hash_kind = self.hash_kind
            hasher = getattr(hashlib, hash_kind)
        except AttributeError:
            raise DistlibException('Unsupported hash algorithm: %r' % hash_kind)
        result = hasher(data).digest()
        result = base64.urlsafe_b64encode(result).rstrip(b'=').decode('ascii')
        return hash_kind, result
def rehash(path, algo='sha256', blocksize=1 << 20):
    """Return (hash, length) for path using"""
    h =
    length = 0
    with open(path, 'rb') as f:
        for block in read_chunks(f, size=blocksize):
            length += len(block)
    digest = 'sha256=' + urlsafe_b64encode(
    return (digest, length)
def urlsafe_b64encode(data):
    """urlsafe_b64encode without padding"""
    return base64.urlsafe_b64encode(data).rstrip(binary('='))
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def digest(self):
        if self.hashtype == 'md5':
            return self.hash.hexdigest()
        digest = self.hash.digest()
        return self.hashtype + '=' + native(urlsafe_b64encode(digest))
def start(self, url_state=None):
        Starts the OAuth 2 authorization process.

        This function builds an "authorization URL".  You should redirect your
        user's browser to this URL, which will give them an opportunity to
        grant your app access to their Dropbox account.  When the user
        completes this process, they will be automatically redirected to the
        ``redirect_uri`` you passed in to the constructor.

        This function will also save a CSRF token to
        ``session[csrf_token_session_key]`` (as provided to the constructor).
        This CSRF token will be checked on :meth:`finish()` to prevent request

        :param str url_state: Any data that you would like to keep in the URL
            through the authorization process.  This exact value will be
            returned to you by :meth:`finish()`.
        :return: The URL for a page on Dropbox's website.  This page will let
            the user "approve" your app, which gives your app permission to
            access the user's Dropbox account. Tell the user to visit this URL
            and approve your app.
        csrf_token = base64.urlsafe_b64encode(os.urandom(16)).decode('ascii')
        state = csrf_token
        if url_state is not None:
            state += "|" + url_state
        self.session[self.csrf_token_session_key] = csrf_token

        return self._get_authorize_url(self.redirect_uri, state)
def pack(self, message, ascii_encoded=False):
        """Packs a message for transport as described in y/cep342.

        Use :func:`unpack` to decode the packed message.

            message (data_pipeline.message.Message): The message to pack
            ascii_decoded (Optional[bool]): Set to True if message is not valid ASCII

            bytes: Avro byte string prepended by magic envelope version byte

        The initial "magic byte" is meant to specify the envelope schema version.
        See y/cep342 for details.  In other words, the version number of the current
        schema is the null byte.  In the event we need to add additional envelope
        versions, we'll use this byte to identify it.

        In addition, the "magic byte" is used as a protocol to encode the serialized
        message in base64. See DATAPIPE-1350 for more detail. This option has been
        added because as of now, yelp_clog only supports sending valid ASCII strings.
        Producer/Consumer registration will make use of this to instead send base64
        encoded strings.
        msg = bytes(0) + self._avro_string_writer.encode(message.avro_repr)

        if ascii_encoded:
            return self.ASCII_MAGIC_BYTE + base64.urlsafe_b64encode(msg)
            return msg
def _pack_up(credentials):
    return base64.urlsafe_b64encode(
        bytes(json.dumps(credentials), encoding="UTF-8")
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _urlsafe_b64encode(raw_bytes):
    raw_bytes = _to_bytes(raw_bytes, encoding='utf-8')
    return base64.urlsafe_b64encode(raw_bytes).rstrip(b'=')
def default_username_algo(email):
    # store the username as a base64 encoded sha1 of the email address
    # this protects against data leakage because usernames are often
    # treated as public identifiers (so we can't use the email address).
    return base64.urlsafe_b64encode(
项目:graphene-custom-directives    作者:ekampf    | 项目源码 | 文件源码
def process(value, directive, root, args, context, info):
        if not value:
            return None

        value = base64.urlsafe_b64encode(str(value).encode('ascii'))
        return value.decode('ascii') if six.PY3 else value
def start(self, url_state=None):
        Starts the OAuth 2 authorization process.

        This function builds an "authorization URL".  You should redirect your user's browser to
        this URL, which will give them an opportunity to grant your app access to their Dropbox
        account.  When the user completes this process, they will be automatically redirected to
        the ``redirect_uri`` you passed in to the constructor.

        This function will also save a CSRF token to ``session[csrf_token_session_key]`` (as
        provided to the constructor).  This CSRF token will be checked on :meth:`finish()` to
        prevent request forgery.

            Any data that you would like to keep in the URL through the
            authorization process.  This exact value will be returned to you by :meth:`finish()`.

            The URL for a page on Dropbox's website.  This page will let the user "approve"
            your app, which gives your app permission to access the user's Dropbox account.
            Tell the user to visit this URL and approve your app.
        csrf_token = base64.urlsafe_b64encode(os.urandom(16))  # PY3: Returns bytes
        if not isinstance(csrf_token, str):
            csrf_token = csrf_token.decode('utf-8')
        state = csrf_token
        if url_state is not None:
            state += "|" + url_state
        self.session[self.csrf_token_session_key] = csrf_token

        return self._get_authorize_url(self.redirect_uri, state)
def start(self, url_state=None):
        Starts the OAuth 2 authorization process.

        This function builds an "authorization URL".  You should redirect your
        user's browser to this URL, which will give them an opportunity to
        grant your app access to their Dropbox account.  When the user
        completes this process, they will be automatically redirected to the
        ``redirect_uri`` you passed in to the constructor.

        This function will also save a CSRF token to
        ``session[csrf_token_session_key]`` (as provided to the constructor).
        This CSRF token will be checked on :meth:`finish()` to prevent request

        :param str url_state: Any data that you would like to keep in the URL
            through the authorization process.  This exact value will be
            returned to you by :meth:`finish()`.
        :return: The URL for a page on Dropbox's website.  This page will let
            the user "approve" your app, which gives your app permission to
            access the user's Dropbox account. Tell the user to visit this URL
            and approve your app.
        csrf_token = base64.urlsafe_b64encode(os.urandom(16)).decode('ascii')
        state = csrf_token
        if url_state is not None:
            state += "|" + url_state
        self.session[self.csrf_token_session_key] = csrf_token

        return self._get_authorize_url(self.redirect_uri, state)
def get_hash(self, data, hasher=None):
        Get the hash of some data, using a particular hash algorithm, if

        :param data: The data to be hashed.
        :type data: bytes
        :param hasher: The name of a hash implementation, supported by hashlib,
                       or ``None``. Examples of valid values are ``'sha1'``,
                       ``'sha224'``, ``'sha384'``, '``sha256'``, ``'md5'`` and
                       ``'sha512'``. If no hasher is specified, the ``hasher``
                       attribute of the :class:`InstalledDistribution` instance
                       is used. If the hasher is determined to be ``None``, MD5
                       is used as the hashing algorithm.
        :returns: The hash of the data. If a hasher was explicitly specified,
                  the returned hash will be prefixed with the specified hasher
                  followed by '='.
        :rtype: str
        if hasher is None:
            hasher = self.hasher
        if hasher is None:
            hasher = hashlib.md5
            prefix = ''
            hasher = getattr(hashlib, hasher)
            prefix = '%s=' % self.hasher
        digest = hasher(data).digest()
        digest = base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii')
        return '%s%s' % (prefix, digest)
def get_hash(self, data, hash_kind=None):
        if hash_kind is None:
            hash_kind = self.hash_kind
            hasher = getattr(hashlib, hash_kind)
        except AttributeError:
            raise DistlibException('Unsupported hash algorithm: %r' % hash_kind)
        result = hasher(data).digest()
        result = base64.urlsafe_b64encode(result).rstrip(b'=').decode('ascii')
        return hash_kind, result
def rehash(path, algo='sha256', blocksize=1<<20):
    """Return (hash, length) for path using"""
    h =
    length = 0
    with open(path, 'rb') as f:
        block =
        while block:
            length += len(block)
            block =
    digest = 'sha256='+urlsafe_b64encode(h.digest()).decode('latin1').rstrip('=')
    return (digest, length)
def base64_encode(string):
    """base64 encodes a single bytestring (and is tolerant to getting
    called with a unicode string).
    The resulting bytestring is safe for putting into URLs.
    string = want_bytes(string)
    return base64.urlsafe_b64encode(string).strip(b'=')
def get_hash(self, data, hasher=None):
        Get the hash of some data, using a particular hash algorithm, if

        :param data: The data to be hashed.
        :type data: bytes
        :param hasher: The name of a hash implementation, supported by hashlib,
                       or ``None``. Examples of valid values are ``'sha1'``,
                       ``'sha224'``, ``'sha384'``, '``sha256'``, ``'md5'`` and
                       ``'sha512'``. If no hasher is specified, the ``hasher``
                       attribute of the :class:`InstalledDistribution` instance
                       is used. If the hasher is determined to be ``None``, MD5
                       is used as the hashing algorithm.
        :returns: The hash of the data. If a hasher was explicitly specified,
                  the returned hash will be prefixed with the specified hasher
                  followed by '='.
        :rtype: str
        if hasher is None:
            hasher = self.hasher
        if hasher is None:
            hasher = hashlib.md5
            prefix = ''
            hasher = getattr(hashlib, hasher)
            prefix = '%s=' % self.hasher
        digest = hasher(data).digest()
        digest = base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii')
        return '%s%s' % (prefix, digest)
def get_hash(self, data, hash_kind=None):
        if hash_kind is None:
            hash_kind = self.hash_kind
            hasher = getattr(hashlib, hash_kind)
        except AttributeError:
            raise DistlibException('Unsupported hash algorithm: %r' % hash_kind)
        result = hasher(data).digest()
        result = base64.urlsafe_b64encode(result).rstrip(b'=').decode('ascii')
        return hash_kind, result
def rehash(path, algo='sha256', blocksize=1 << 20):
    """Return (hash, length) for path using"""
    h =
    length = 0
    with open(path, 'rb') as f:
        for block in read_chunks(f, size=blocksize):
            length += len(block)
    digest = 'sha256=' + urlsafe_b64encode(
    return (digest, length)
def GitHub_login(*, username=None, password=None, OTP=None, headers=None):
    Login to GitHub.

    If no username, password, or OTP (2-factor authentication code) are
    provided, they will be requested from the command line.

    Returns a dict of kwargs that can be passed to functions that require
    authenticated connections to GitHub.
    if not username:
        username = input("What is your GitHub username? ")

    if not password:
        password = getpass("Enter the GitHub password for {username}: ".format(username=username))

    headers = headers or {}

    if OTP:
        headers['X-GitHub-OTP'] = OTP

    auth = HTTPBasicAuth(username, password)

    r = requests.get('', auth=auth, headers=headers)
    if r.status_code == 401:
        two_factor = r.headers.get('X-GitHub-OTP')
        if two_factor:
            auth_header = base64.urlsafe_b64encode(bytes(username + ':' + password, 'utf8')).decode()
            login_kwargs = {'auth': None, 'headers': {'Authorization': 'Basic {}'.format(auth_header)}}
            except requests.exceptions.HTTPError:
            print("A two-factor authentication code is required:", two_factor.split(';')[1].strip())
            OTP = input("Authentication code: ")
            return GitHub_login(username=username, password=password, OTP=OTP, headers=headers)

        raise AuthenticationFailed("invalid username or password")

    return {'auth': auth, 'headers': headers}
def base64string():
    from flask import request, jsonify
    from base64 import urlsafe_b64encode
    str = request.form.get('s')
    return jsonify(rt=urlsafe_b64encode(str))
def embed_real_url_to_embedded_url(real_url_raw, url_mime, escape_slash=False):
    ?url???(?q=some&foo=bar)???url???, ??url??????????
    ????url???????CDN?, ??????
    `cdn_redirect_encode_query_str_into_url`????????, ?????????????????
    ??? extract_real_url_from_embedded_url() ????, ????????????
    :rtype: str
    # dbgprint(real_url_raw, url_mime, escape_slash)
    if escape_slash:
        real_url = real_url_raw.replace(r'\/', '/')
        real_url = real_url_raw
    url_sp = urlsplit(real_url)
    if not url_sp.query:  # no query, needn't rewrite
        return real_url_raw

    byte_query = url_sp.query.encode()
    if len(byte_query) > 128:  # ????????, ??gzip??
        gzip_label = 'z'  # ????????, ??????????z
        byte_query = zlib.compress(byte_query)
        gzip_label = ''

    b64_query = base64.urlsafe_b64encode(byte_query).decode()
    # dbgprint(url_mime)
    mixed_path = url_sp.path + '_' + _url_salt + gzip_label + '_.' \
                 + b64_query \
                 + '._' + _url_salt + '_.' + mime_to_use_cdn[url_mime]
    result = urlunsplit((url_sp.scheme, url_sp.netloc, mixed_path, '', ''))

    if escape_slash:
        result = s_esc(result)
        # dbgprint('embed:', real_url_raw, 'to:', result)
    return result
def __str__(self):
    """Encodes this Key as an opaque string.

    Returns a string representation of this key, suitable for use in HTML,
    URLs, and other similar use cases. If the entity's key is incomplete,
    raises a BadKeyError.

    Unfortunately, this string encoding isn't particularly compact, and its
    length varies with the length of the path. If you want a shorter identifier
    and you know the kind and parent (if any) ahead of time, consider using just
    the entity's id or name.


      if self._str is not None:
        return self._str
    except AttributeError:
    if (self.has_id_or_name()):
      encoded = base64.urlsafe_b64encode(self.__reference.Encode())
      self._str = encoded.replace('=', '')
      raise datastore_errors.BadKeyError(
        'Cannot string encode an incomplete key!\n%s' % self.__reference)
    return self._str
def get_hash(self, data, hasher=None):
        Get the hash of some data, using a particular hash algorithm, if

        :param data: The data to be hashed.
        :type data: bytes
        :param hasher: The name of a hash implementation, supported by hashlib,
                       or ``None``. Examples of valid values are ``'sha1'``,
                       ``'sha224'``, ``'sha384'``, '``sha256'``, ``'md5'`` and
                       ``'sha512'``. If no hasher is specified, the ``hasher``
                       attribute of the :class:`InstalledDistribution` instance
                       is used. If the hasher is determined to be ``None``, MD5
                       is used as the hashing algorithm.
        :returns: The hash of the data. If a hasher was explicitly specified,
                  the returned hash will be prefixed with the specified hasher
                  followed by '='.
        :rtype: str
        if hasher is None:
            hasher = self.hasher
        if hasher is None:
            hasher = hashlib.md5
            prefix = ''
            hasher = getattr(hashlib, hasher)
            prefix = '%s=' % self.hasher
        digest = hasher(data).digest()
        digest = base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii')
        return '%s%s' % (prefix, digest)
def get_hash(self, data, hash_kind=None):
        if hash_kind is None:
            hash_kind = self.hash_kind
            hasher = getattr(hashlib, hash_kind)
        except AttributeError:
            raise DistlibException('Unsupported hash algorithm: %r' % hash_kind)
        result = hasher(data).digest()
        result = base64.urlsafe_b64encode(result).rstrip(b'=').decode('ascii')
        return hash_kind, result
def rehash(path, algo='sha256', blocksize=1 << 20):
    """Return (hash, length) for path using"""
    h =
    length = 0
    with open(path, 'rb') as f:
        for block in read_chunks(f, size=blocksize):
            length += len(block)
    digest = 'sha256=' + urlsafe_b64encode(
    return (digest, length)
def urlsafe_b64encode(data):
    """urlsafe_b64encode without padding"""
    return base64.urlsafe_b64encode(data).rstrip(binary('='))