我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用base64.urlsafe_b64encode()。
def produce(obj, pb, sep): for ds, val in pb.ListFields(): for val in (val if ds.label == ds.LABEL_REPEATED else [val]): if ds.cpp_type == ds.CPPTYPE_MESSAGE: origlen = len(obj) produce(obj, val, sep) obj.insert(origlen, '%dm%d' % (ds.number, len(obj) - origlen)) continue elif ds.type == ds.TYPE_STRING: if sep == '!': val = val.replace('*', '*2A').replace('!', '*21') else: val = quote(val, safe='~()*!.\'') elif ds.type == ds.TYPE_BYTES: val = urlsafe_b64encode(val).decode('ascii').strip('=') elif ds.type == ds.TYPE_BOOL: val = int(val) obj.append('%d%s%s' % (ds.number, types_enc[ds.type], val)) return obj
def generate_token(key, user_id, action_id='', when=None): """Generates a URL-safe token for the given user, action, time tuple. Args: key: secret key to use. user_id: the user ID of the authenticated user. action_id: a string identifier of the action they requested authorization for. when: the time in seconds since the epoch at which the user was authorized for this action. If not set the current time is used. Returns: A string XSRF protection token. """ digester = hmac.new(_to_bytes(key, encoding='utf-8')) digester.update(_to_bytes(str(user_id), encoding='utf-8')) digester.update(DELIMITER) digester.update(_to_bytes(action_id, encoding='utf-8')) digester.update(DELIMITER) when = _to_bytes(str(when or int(time.time())), encoding='utf-8') digester.update(when) digest = digester.digest() token = base64.urlsafe_b64encode(digest + DELIMITER + when) return token
def ssrlink(self, user, encode, muid): protocol = user.get('protocol', '') obfs = user.get('obfs', '') protocol = protocol.replace("_compatible", "") obfs = obfs.replace("_compatible", "") protocol_param = '' if muid is not None: protocol_param_ = user.get('protocol_param', '') param = protocol_param_.split('#') if len(param) == 2: for row in self.data.json: if int(row['port']) == muid: param = str(muid) + ':' + row['passwd'] protocol_param = '/?protoparam=' + common.to_str(base64.urlsafe_b64encode(common.to_bytes(param))).replace("=", "") break link = ("%s:%s:%s:%s:%s:%s" % (self.server_addr, user['port'], protocol, user['method'], obfs, common.to_str(base64.urlsafe_b64encode(common.to_bytes(user['passwd']))).replace("=", ""))) + protocol_param return "ssr://" + (encode and common.to_str(base64.urlsafe_b64encode(common.to_bytes(link))).replace("=", "") or link)
def CreateMessage(sender, to, subject, message_text): """Create a message for an email. Args: sender: Email address of the sender. to: Email address of the receiver. subject: The subject of the email message. message_text: The text of the email message. Returns: An object containing a base64url encoded email object. """ message = MIMEText(message_text) message['to'] = to message['from'] = sender message['subject'] = subject return {'raw': base64.urlsafe_b64encode(message.as_string())}
def generate_token_string(self, action=None): """Generate a hash of the given token contents that can be verified. :param action: A string representing the action that the generated hash is valid for. This string is usually a URL. :returns: A string containing the hash contents of the given `action` and the contents of the `XSRFToken`. Can be verified with `verify_token_string`. The string is base64 encoded so it is safe to use in HTML forms without escaping. """ digest_maker = self._digest_maker() digest_maker.update(self.user_id) digest_maker.update(self._DELIMITER) if action: digest_maker.update(action) digest_maker.update(self._DELIMITER) digest_maker.update(str(self.current_time)) return base64.urlsafe_b64encode( self._DELIMITER.join([digest_maker.hexdigest(), str(self.current_time)]))
def _encrypt_from_parts(self, data, current_time, iv): if not isinstance(data, bytes): raise TypeError("data must be bytes.") padder = padding.PKCS7(algorithms.AES.block_size).padder() padded_data = padder.update(data) + padder.finalize() encryptor = Cipher( algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend ).encryptor() ciphertext = encryptor.update(padded_data) + encryptor.finalize() basic_parts = ( b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext ) h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend) h.update(basic_parts) hmac = h.finalize() return base64.urlsafe_b64encode(basic_parts + hmac)
def create_datakey(encryption_context, keyid, client=None): ''' Create a datakey from KMS. ''' if not client: client = confidant_client.services.get_boto_client('kms') # Fernet key; from spec and cryptography implementation, but using # random from KMS, rather than os.urandom: # https://github.com/fernet/spec/blob/master/Spec.md#key-format # https://cryptography.io/en/latest/_modules/cryptography/fernet/#Fernet.generate_key key = base64.urlsafe_b64encode( client.generate_random(NumberOfBytes=32)['Plaintext'] ) response = client.encrypt( KeyId='{0}'.format(keyid), Plaintext=key, EncryptionContext=encryption_context ) return {'ciphertext': response['CiphertextBlob'], 'plaintext': key}
def send_email(msg_subj, msg_txt, msg_rcpt, i=0, count=0): from email.mime.text import MIMEText userId, gmail = buildGAPIServiceObject(API.GMAIL, _getValueFromOAuth(u'email')) if not gmail: return msg = MIMEText(msg_txt) msg[u'Subject'] = msg_subj msg[u'From'] = userId msg[u'To'] = msg_rcpt action = Act.Get() Act.Set(Act.SENDEMAIL) try: callGAPI(gmail.users().messages(), u'send', userId=userId, body={u'raw': base64.urlsafe_b64encode(msg.as_string())}, fields=u'') entityActionPerformed([Ent.RECIPIENT, msg_rcpt, Ent.MESSAGE, msg_subj], i, count) except googleapiclient.errors.HttpError as e: entityActionFailedWarning([Ent.RECIPIENT, msg_rcpt, Ent.MESSAGE, msg_subj], str(e), i, count) Act.Set(action) # Write a CSV file
def code_verifier(n_bytes=64): """ Generates a 'code_verifier' as described in section 4.1 of RFC 7636. This is a 'high-entropy cryptographic random string' that will be impractical for an attacker to guess. Args: n_bytes: integer between 31 and 96, inclusive. default: 64 number of bytes of entropy to include in verifier. Returns: Bytestring, representing urlsafe base64-encoded random data. """ verifier = base64.urlsafe_b64encode(os.urandom(n_bytes)).rstrip(b'=') # https://tools.ietf.org/html/rfc7636#section-4.1 # minimum length of 43 characters and a maximum length of 128 characters. if len(verifier) < 43: raise ValueError("Verifier too short. n_bytes must be > 30.") elif len(verifier) > 128: raise ValueError("Verifier too long. n_bytes must be < 97.") else: return verifier
def code_challenge(verifier): """ Creates a 'code_challenge' as described in section 4.2 of RFC 7636 by taking the sha256 hash of the verifier and then urlsafe base64-encoding it. Args: verifier: bytestring, representing a code_verifier as generated by code_verifier(). Returns: Bytestring, representing a urlsafe base64-encoded sha256 hash digest, without '=' padding. """ digest = hashlib.sha256(verifier).digest() return base64.urlsafe_b64encode(digest).rstrip(b'=')
def generate_token(key, user_id, action_id='', when=None): """Generates a URL-safe token for the given user, action, time tuple. Args: key: secret key to use. user_id: the user ID of the authenticated user. action_id: a string identifier of the action they requested authorization for. when: the time in seconds since the epoch at which the user was authorized for this action. If not set the current time is used. Returns: A string XSRF protection token. """ digester = hmac.new(_helpers._to_bytes(key, encoding='utf-8')) digester.update(_helpers._to_bytes(str(user_id), encoding='utf-8')) digester.update(DELIMITER) digester.update(_helpers._to_bytes(action_id, encoding='utf-8')) digester.update(DELIMITER) when = _helpers._to_bytes(str(when or int(time.time())), encoding='utf-8') digester.update(when) digest = digester.digest() token = base64.urlsafe_b64encode(digest + DELIMITER + when) return token
def client_encode(self, buf): if self.raw_trans_sent: return buf self.send_buffer += buf if not self.has_sent_header: port = b'' if self.server_info.port != 80: port = b':' + common.to_bytes(str(self.server_info.port)) self.has_sent_header = True http_head = b"GET / HTTP/1.1\r\n" http_head += b"Host: " + (self.server_info.param or self.server_info.host) + port + b"\r\n" http_head += b"Connection: Upgrade, HTTP2-Settings\r\nUpgrade: h2c\r\n" http_head += b"HTTP2-Settings: " + base64.urlsafe_b64encode(buf) + b"\r\n" return http_head + b"\r\n" if self.has_recv_header: ret = self.send_buffer self.send_buffer = b'' self.raw_trans_sent = True return ret return b''
def send(To, Subject, Body, Cc=[], Bcc=[], html=False, files=[]): """Send an email """ subtype = 'html' if html else 'plain' message = MIMEMultipart() message['To'] = ', '.join(To) message['Subject'] = Subject message['Cc'] = ', '.join(Cc) message['Bcc'] = ', '.join(Bcc) message.attach(MIMEText(Body, subtype)) for f in files: with open(f, "rb") as In: part = MIMEApplication(In.read(), Name=basename(f)) part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f) message.attach(part) message = {'raw': base64.urlsafe_b64encode(message.as_string())} credentials = oauth2client.file.Storage(CREDENTIALS_PATH).get() Http = credentials.authorize(httplib2.Http()) service = discovery.build('gmail', 'v1', http=Http) message = service.users().messages().send(userId='me', body=message).execute()
def encode(to_encode): """ Encode an arbitrary string as a base64 that is safe to put as a URL query value. urllib.quote and urllib.quote_plus do not have any effect on the result of querystringsafe_base64.encode. :param (str, bytes) to_encode: :rtype: str :return: a string that is safe to put as a value in an URL query string - like base64, except characters ['+', '/', '='] are replaced with ['-', '_', '.'] consequently """ encoded = urlsafe_b64encode(to_encode).replace(b'=', b'.') if PY2: return encoded return encoded.decode()
def create_message(sender, to, subject, message_text, style='html'): """Create a message for an email. Args: sender: Email address of the sender. to: Email address of the receiver. subject: The subject of the email message. message_text: The text of the email message. Returns: An object containing a base64url encoded email object. """ if python_version == 2: message_text = unicode(message_text).encode('utf-8') message = MIMEText(message_text, _subtype=style) message['to'] = to message['from'] = sender message['subject'] = subject message = message.as_string() if python_version == 2: message = base64.urlsafe_b64encode(message) else: message = base64.urlsafe_b64encode( message.encode('utf-8')).decode('utf-8') return {'raw': message}
def make_id(): """Make uniq short id""" return urlsafe_b64encode(uuid4().bytes).rstrip(b'=')
def get_hash(self, data, hasher=None): """ Get the hash of some data, using a particular hash algorithm, if specified. :param data: The data to be hashed. :type data: bytes :param hasher: The name of a hash implementation, supported by hashlib, or ``None``. Examples of valid values are ``'sha1'``, ``'sha224'``, ``'sha384'``, '``sha256'``, ``'md5'`` and ``'sha512'``. If no hasher is specified, the ``hasher`` attribute of the :class:`InstalledDistribution` instance is used. If the hasher is determined to be ``None``, MD5 is used as the hashing algorithm. :returns: The hash of the data. If a hasher was explicitly specified, the returned hash will be prefixed with the specified hasher followed by '='. :rtype: str """ if hasher is None: hasher = self.hasher if hasher is None: hasher = hashlib.md5 prefix = '' else: hasher = getattr(hashlib, hasher) prefix = '%s=' % self.hasher digest = hasher(data).digest() digest = base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii') return '%s%s' % (prefix, digest)
def get_hash(self, data, hash_kind=None): if hash_kind is None: hash_kind = self.hash_kind try: hasher = getattr(hashlib, hash_kind) except AttributeError: raise DistlibException('Unsupported hash algorithm: %r' % hash_kind) result = hasher(data).digest() result = base64.urlsafe_b64encode(result).rstrip(b'=').decode('ascii') return hash_kind, result
def rehash(path, algo='sha256', blocksize=1 << 20): """Return (hash, length) for path using hashlib.new(algo)""" h = hashlib.new(algo) length = 0 with open(path, 'rb') as f: for block in read_chunks(f, size=blocksize): length += len(block) h.update(block) digest = 'sha256=' + urlsafe_b64encode( h.digest() ).decode('latin1').rstrip('=') return (digest, length)
def urlsafe_b64encode(data): """urlsafe_b64encode without padding""" return base64.urlsafe_b64encode(data).rstrip(binary('='))
def digest(self): if self.hashtype == 'md5': return self.hash.hexdigest() digest = self.hash.digest() return self.hashtype + '=' + native(urlsafe_b64encode(digest))
def start(self, url_state=None): """ Starts the OAuth 2 authorization process. This function builds an "authorization URL". You should redirect your user's browser to this URL, which will give them an opportunity to grant your app access to their Dropbox account. When the user completes this process, they will be automatically redirected to the ``redirect_uri`` you passed in to the constructor. This function will also save a CSRF token to ``session[csrf_token_session_key]`` (as provided to the constructor). This CSRF token will be checked on :meth:`finish()` to prevent request forgery. :param str url_state: Any data that you would like to keep in the URL through the authorization process. This exact value will be returned to you by :meth:`finish()`. :return: The URL for a page on Dropbox's website. This page will let the user "approve" your app, which gives your app permission to access the user's Dropbox account. Tell the user to visit this URL and approve your app. """ csrf_token = base64.urlsafe_b64encode(os.urandom(16)).decode('ascii') state = csrf_token if url_state is not None: state += "|" + url_state self.session[self.csrf_token_session_key] = csrf_token return self._get_authorize_url(self.redirect_uri, state)
def pack(self, message, ascii_encoded=False): """Packs a message for transport as described in y/cep342. Use :func:`unpack` to decode the packed message. Args: message (data_pipeline.message.Message): The message to pack ascii_decoded (Optional[bool]): Set to True if message is not valid ASCII Returns: bytes: Avro byte string prepended by magic envelope version byte The initial "magic byte" is meant to specify the envelope schema version. See y/cep342 for details. In other words, the version number of the current schema is the null byte. In the event we need to add additional envelope versions, we'll use this byte to identify it. In addition, the "magic byte" is used as a protocol to encode the serialized message in base64. See DATAPIPE-1350 for more detail. This option has been added because as of now, yelp_clog only supports sending valid ASCII strings. Producer/Consumer registration will make use of this to instead send base64 encoded strings. """ msg = bytes(0) + self._avro_string_writer.encode(message.avro_repr) if ascii_encoded: return self.ASCII_MAGIC_BYTE + base64.urlsafe_b64encode(msg) else: return msg
def _pack_up(credentials): return base64.urlsafe_b64encode( bytes(json.dumps(credentials), encoding="UTF-8") )
def _urlsafe_b64encode(raw_bytes): raw_bytes = _to_bytes(raw_bytes, encoding='utf-8') return base64.urlsafe_b64encode(raw_bytes).rstrip(b'=')
def default_username_algo(email): # store the username as a base64 encoded sha1 of the email address # this protects against data leakage because usernames are often # treated as public identifiers (so we can't use the email address). return base64.urlsafe_b64encode( hashlib.sha1(smart_bytes(email)).digest() ).rstrip(b'=')
def process(value, directive, root, args, context, info): if not value: return None value = base64.urlsafe_b64encode(str(value).encode('ascii')) return value.decode('ascii') if six.PY3 else value
def start(self, url_state=None): """ Starts the OAuth 2 authorization process. This function builds an "authorization URL". You should redirect your user's browser to this URL, which will give them an opportunity to grant your app access to their Dropbox account. When the user completes this process, they will be automatically redirected to the ``redirect_uri`` you passed in to the constructor. This function will also save a CSRF token to ``session[csrf_token_session_key]`` (as provided to the constructor). This CSRF token will be checked on :meth:`finish()` to prevent request forgery. Parameters url_state Any data that you would like to keep in the URL through the authorization process. This exact value will be returned to you by :meth:`finish()`. Returns The URL for a page on Dropbox's website. This page will let the user "approve" your app, which gives your app permission to access the user's Dropbox account. Tell the user to visit this URL and approve your app. """ csrf_token = base64.urlsafe_b64encode(os.urandom(16)) # PY3: Returns bytes if not isinstance(csrf_token, str): csrf_token = csrf_token.decode('utf-8') state = csrf_token if url_state is not None: state += "|" + url_state self.session[self.csrf_token_session_key] = csrf_token return self._get_authorize_url(self.redirect_uri, state)
def rehash(path, algo='sha256', blocksize=1<<20): """Return (hash, length) for path using hashlib.new(algo)""" h = hashlib.new(algo) length = 0 with open(path, 'rb') as f: block = f.read(blocksize) while block: length += len(block) h.update(block) block = f.read(blocksize) digest = 'sha256='+urlsafe_b64encode(h.digest()).decode('latin1').rstrip('=') return (digest, length)
def base64_encode(string): """base64 encodes a single bytestring (and is tolerant to getting called with a unicode string). The resulting bytestring is safe for putting into URLs. """ string = want_bytes(string) return base64.urlsafe_b64encode(string).strip(b'=')
def GitHub_login(*, username=None, password=None, OTP=None, headers=None): """ Login to GitHub. If no username, password, or OTP (2-factor authentication code) are provided, they will be requested from the command line. Returns a dict of kwargs that can be passed to functions that require authenticated connections to GitHub. """ if not username: username = input("What is your GitHub username? ") if not password: password = getpass("Enter the GitHub password for {username}: ".format(username=username)) headers = headers or {} if OTP: headers['X-GitHub-OTP'] = OTP auth = HTTPBasicAuth(username, password) r = requests.get('https://api.github.com/', auth=auth, headers=headers) if r.status_code == 401: two_factor = r.headers.get('X-GitHub-OTP') if two_factor: auth_header = base64.urlsafe_b64encode(bytes(username + ':' + password, 'utf8')).decode() login_kwargs = {'auth': None, 'headers': {'Authorization': 'Basic {}'.format(auth_header)}} try: generate_GitHub_token(**login_kwargs) except requests.exceptions.HTTPError: pass print("A two-factor authentication code is required:", two_factor.split(';')[1].strip()) OTP = input("Authentication code: ") return GitHub_login(username=username, password=password, OTP=OTP, headers=headers) raise AuthenticationFailed("invalid username or password") r.raise_for_status() return {'auth': auth, 'headers': headers}
def base64string(): """ ??auth?header :return: :rtype: """ from flask import request, jsonify from base64 import urlsafe_b64encode str = request.form.get('s') return jsonify(rt=urlsafe_b64encode(str))
def embed_real_url_to_embedded_url(real_url_raw, url_mime, escape_slash=False): """ ?url???(?q=some&foo=bar)???url???, ??url?????????? ????url???????CDN?, ?????? `cdn_redirect_encode_query_str_into_url`????????, ????????????????? ??? extract_real_url_from_embedded_url() ????, ???????????? :rtype: str """ # dbgprint(real_url_raw, url_mime, escape_slash) if escape_slash: real_url = real_url_raw.replace(r'\/', '/') else: real_url = real_url_raw url_sp = urlsplit(real_url) if not url_sp.query: # no query, needn't rewrite return real_url_raw byte_query = url_sp.query.encode() if len(byte_query) > 128: # ????????, ??gzip?? gzip_label = 'z' # ????????, ??????????z byte_query = zlib.compress(byte_query) else: gzip_label = '' b64_query = base64.urlsafe_b64encode(byte_query).decode() # dbgprint(url_mime) mixed_path = url_sp.path + '_' + _url_salt + gzip_label + '_.' \ + b64_query \ + '._' + _url_salt + '_.' + mime_to_use_cdn[url_mime] result = urlunsplit((url_sp.scheme, url_sp.netloc, mixed_path, '', '')) if escape_slash: result = s_esc(result) # dbgprint('embed:', real_url_raw, 'to:', result) return result
def __str__(self): """Encodes this Key as an opaque string. Returns a string representation of this key, suitable for use in HTML, URLs, and other similar use cases. If the entity's key is incomplete, raises a BadKeyError. Unfortunately, this string encoding isn't particularly compact, and its length varies with the length of the path. If you want a shorter identifier and you know the kind and parent (if any) ahead of time, consider using just the entity's id or name. Returns: string """ try: if self._str is not None: return self._str except AttributeError: pass if (self.has_id_or_name()): encoded = base64.urlsafe_b64encode(self.__reference.Encode()) self._str = encoded.replace('=', '') else: raise datastore_errors.BadKeyError( 'Cannot string encode an incomplete key!\n%s' % self.__reference) return self._str