我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用base64.urlsafe_b64decode()。
def unpack(self, packed_message): """Decodes a message packed with :func:`pack`. Warning: The public API for this function may change to return :class:`data_pipeline.message.Message` instances. Args: packed_message (bytes): The previously packed message Returns: dict: A dictionary with the decoded Avro representation. """ # If the magic byte is ASCII_MAGIC_BYTE, decode it from base64 to ASCII if packed_message[0] == self.ASCII_MAGIC_BYTE: packed_message = base64.urlsafe_b64decode(packed_message[1:]) return self._avro_string_reader.decode(packed_message[1:])
def validateJWTToken(token): is_valid = True current_time = (datetime.utcnow() - datetime(1970, 1, 1)).total_seconds() token_parts = token.split('.') idTokenHeader = json.loads(base64.b64decode(token_parts[0]).decode('ascii')) idTokenPayload = json.loads(base64.b64decode(incorrect_padding(token_parts[1])).decode('ascii')) if idTokenPayload['iss'] != settings.ID_TOKEN_ISSUER: return False elif idTokenPayload['aud'][0] != settings.CLIENT_ID: return False elif idTokenPayload['exp'] < current_time: return False token=token.encode() token_to_verify = token.decode("ascii").split('.') message=token_to_verify[0]+'.'+token_to_verify[1] idTokenSignature = base64.urlsafe_b64decode(incorrect_padding(token_to_verify[2])) keys = getKeyFromJWKUrl(idTokenHeader['kid']) publicKey = jwk.construct(keys) return publicKey.verify(message.encode('utf-8'), idTokenSignature)
def secure_loads(data, encryption_key, hash_key=None, compression_level=None): components = data.count(b':') if components == 1: return secure_loads_deprecated(data, encryption_key, hash_key, compression_level) if components != 2: return None version, signature, encrypted_data = data.split(b':', 2) if version != b'hmac256': return None encryption_key = to_bytes(encryption_key) if not hash_key: hash_key = hashlib.sha256(encryption_key).digest() actual_signature = hmac.new(to_bytes(hash_key), encrypted_data, hashlib.sha256).hexdigest() if not compare(to_native(signature), actual_signature): return None encrypted_data = base64.urlsafe_b64decode(encrypted_data) IV, encrypted_data = encrypted_data[:16], encrypted_data[16:] cipher, _ = AES_new(pad(encryption_key)[:32], IV=IV) try: data = unpad(AES_dec(cipher, encrypted_data)) if compression_level: data = zlib.decompress(data) return pickle.loads(data) except Exception as e: return None
def setup_shaders(gltf, uri_path): """Loads and compiles all shaders defined or referenced in the given gltf.""" shader_ids = {} for shader_name, shader in gltf['shaders'].items(): uri = shader['uri'] if uri.startswith('data:text/plain;base64,'): shader_str = base64.urlsafe_b64decode(uri.split(',')[1]).decode() _logger.debug('* decoded shader "%s":\n%s', shader_name, shader_str) else: filename = os.path.join(uri_path, shader['uri']) shader_str = open(filename).read() _logger.debug('* loaded shader "%s" (from %s):\n%s', shader_name, filename, shader_str) shader_id = gl.glCreateShader(shader['type']) gl.glShaderSource(shader_id, shader_str) gl.glCompileShader(shader_id) if not gl.glGetShaderiv(shader_id, gl.GL_COMPILE_STATUS): raise Exception('failed to compile shader "%s":\n%s' % (shader_name, gl.glGetShaderInfoLog(shader_id).decode())) _logger.debug('* compiled shader "%s"', shader_name) shader_ids[shader_name] = shader_id return shader_ids
def decode(encoded): """ Decode the result of querystringsafe_base64_encode or a regular base64. .. note :: As a regular base64 string does not contain dots, replcing dots with equal signs does basically noting to it. Also, base64.urlsafe_b64decode allows to decode both safe and unsafe base64. Therefore this function may also be used to decode the regular base64. :param (str, unicode) encoded: querystringsafe_base64 string or unicode :rtype: str, bytes :return: decoded string """ if PY2: return urlsafe_b64decode(str(encoded).replace('.', '=')) return urlsafe_b64decode(bytes(encoded, 'ascii').replace(b'.', b'='))
def test_querystringsafe_base64_decode_handles_unicode(): """ Base64.urlsafe_b64decode called by querystringsafe_base64.decode complains about unicode being passed. Check if querystringsafe_base64.decodefixes it. """ base64_unicode = u'DD==' base64_str = str(base64_unicode) base64dotted_unicode = u'DD..' base64dotted_str = str(base64_unicode) decoded_str = urlsafe_b64decode(base64_str) # querystringsafe_base64.decode handles str and unicode in both formats. assert querystringsafe_base64.decode(base64_unicode) == decoded_str assert querystringsafe_base64.decode(base64_str) == decoded_str assert querystringsafe_base64.decode(base64dotted_unicode) == decoded_str assert querystringsafe_base64.decode(base64dotted_str) == decoded_str
def b64decode(data): """JOSE Base64 decode. :param data: Base64 string to be decoded. If it's unicode, then only ASCII characters are allowed. :type data: `bytes` or `unicode` :returns: Decoded data. :rtype: bytes :raises TypeError: if input is of incorrect type :raises ValueError: if input is unicode with non-ASCII characters """ if isinstance(data, six.string_types): try: data = data.encode('ascii') except UnicodeEncodeError: raise ValueError( 'unicode argument should contain only ASCII characters') elif not isinstance(data, six.binary_type): raise TypeError('argument should be a str or unicode') return base64.urlsafe_b64decode(data + b'=' * (4 - (len(data) % 4)))
def sign_hmac(secret, payload): """Returns a base64-encoded HMAC-SHA1 signature of a given string. :param secret: The key used for the signature, base64 encoded. :type secret: string :param payload: The payload to sign. :type payload: string :rtype: string """ payload = payload.encode('ascii', 'strict') secret = secret.encode('ascii', 'strict') sig = hmac.new(base64.urlsafe_b64decode(secret), payload, hashlib.sha1) out = base64.urlsafe_b64encode(sig.digest()) return out.decode('utf-8')
def _decode_uuid_line(line, passwd): decoded = base64.urlsafe_b64decode(line) if IS_WIN: key = scrypt.hash(passwd, socket.gethostname()) key = base64.urlsafe_b64encode(key[:32]) try: f = Fernet(key, backend=crypto_backend) maybe_decrypted = f.decrypt(key) except Exception: return None else: try: maybe_decrypted = scrypt.decrypt(decoded, passwd, maxtime=0.1) except scrypt.error: return None match = re.findall("userid\:(.+)\:uuid\:(.+)", maybe_decrypted) if match: return match[0]
def base64_urldecode(string): length = len(string) % 4 if length == 2: string += b"==" elif length == 3: string += b"=" elif length == 1: raise Exception("Illegal base64url string!") return urlsafe_b64decode(string) # # def rsa_pubkey_from_bytes(e, n): # if isinstance(e, str): # e_bytes = e.encode('ascii') # if isinstance(n, str): # n_bytes = n.encode('ascii') # # e_int = int.from_bytes(base64_urldecode(e_bytes), 'big') # n_int = int.from_bytes(base64_urldecode(n_bytes), 'big') # # rsa_num = rsa.RSAPublicNumbers(e=e_int, n=n_int) # # return rsa_num.public_key(backends.default_backend())
def secure_loads(data, encryption_key, hash_key=None, compression_level=None): if ':' not in data: return None if not hash_key: hash_key = sha1(encryption_key).hexdigest() signature, encrypted_data = data.split(':', 1) actual_signature = hmac.new(hash_key, encrypted_data).hexdigest() if not compare(signature, actual_signature): return None key = pad(encryption_key[:32]) encrypted_data = base64.urlsafe_b64decode(encrypted_data) IV, encrypted_data = encrypted_data[:16], encrypted_data[16:] cipher, _ = AES_new(key, IV=IV) try: data = cipher.decrypt(encrypted_data) data = data.rstrip(' ') if compression_level: data = zlib.decompress(data) return pickle.loads(data) except Exception, e: return None ### compute constant CTOKENS
def secure_loads(data, encryption_key, hash_key=None, compression_level=None): if not ':' in data: return None if not hash_key: hash_key = hashlib.sha1(encryption_key).hexdigest() signature, encrypted_data = data.split(':', 1) actual_signature = hmac.new(hash_key, encrypted_data).hexdigest() if not compare(signature, actual_signature): return None key = pad(encryption_key[:32]) encrypted_data = base64.urlsafe_b64decode(encrypted_data) IV, encrypted_data = encrypted_data[:16], encrypted_data[16:] cipher, _ = AES_new(key, IV=IV) try: data = cipher.decrypt(encrypted_data) data = data.rstrip(' ') if compression_level: data = zlib.decompress(data) return pickle.loads(data) except (TypeError, pickle.UnpicklingError): return None ### compute constant CTOKENS
def decode_depth_map(self, raw): raw = zlib.decompress(base64.urlsafe_b64decode(raw + self.make_padding(raw))) pos = 0 (header_size, num_planes, pano_width, pano_height, plane_indices_offset) = struct.unpack('<BHHHB', raw[0:8]) self.depthHeader = {'numPlanes': num_planes, 'panoWidth': pano_width, 'panoHeight': pano_height} if header_size != 8 or plane_indices_offset != 8: print("Invalid depthmap data") return pos += header_size self.depthMapIndices = [x for x in raw[plane_indices_offset:plane_indices_offset + (pano_width * pano_height)]] pos += len(self.depthMapIndices) self.depthMapPlanes = [] for i in range(0, num_planes): (nx, ny, nz, d) = struct.unpack('<ffff', raw[pos:pos + 16]) self.depthMapPlanes.append( {'d': d, 'nx': nx, 'ny': ny, 'nz': nz}) # nx/ny/nz = unit normal, d = distance from origin pos += 16
def _extract_server(s): """Decodes and processes server parameter from auth request Args: s (string) : The b64u-encoded string passed by the client. Returns: dict """ s = urlsafe_b64decode(pad(s)).decode('utf-8').strip() #if it's a valid s/qrl URL, then return it u = urllib.parse.urlparse(s) if ( (u.scheme == 'sqrl') or (u.scheme == 'qrl') ): return s #Otherwise it's name/value pairs server = {} for line in s.split('\r\n'): name, value = line.split('=', 1) server[name] = value return server
def _signature_valid(msg, key, sig): """Validates Ed25519 signatures Args: msg (string) : The signed message. key (string) : The b64u-encoded signing key. sig (string) : The b64u-encoded signature. Returns: bool : Whether the signature matches or not. """ try: vk = nacl.signing.VerifyKey(pad(key), encoder=nacl.encoding.URLSafeBase64Encoder) vk.verify(msg.encode('utf-8'), urlsafe_b64decode(pad(sig))) return True except nacl.exceptions.BadSignatureError: return False
def _restore_from_backup(jwd, filepath, plaintext, aes256_cipher): """Return backup value (if such exists and content in file has not changed) We may want to replace this with a simpler "check last modified time" lookup that could happen in constant time instead. """ if not helpers.is_there_a_backup(jwd=jwd, filepath=filepath): return None backup_ciphertext_original = helpers.get_backup_content_for_file(jwd=jwd, filepath=filepath) previous_enc = base64.urlsafe_b64decode(b(backup_ciphertext_original)) iv = aes256_cipher.extract_iv(ciphertext=previous_enc) new_secret_w_same_iv = aes256_cipher.encrypt(plaintext=plaintext, iv=iv) if new_secret_w_same_iv == previous_enc: return backup_ciphertext_original return None
def _buildAuthJWT(config): client_id = config.get('canister.auth_jwt_client_id', None) secret = config.get('canister.auth_jwt_secret', None) encoding = config.get('canister.auth_jwt_encoding', 'clear').lower() # clear, base64std, or base64url if not client_id or not secret: return None import jwt if encoding == 'base64std': # with + and / secret = base64.standard_b64decode(secret) elif encoding == 'base64url': # with - and _ secret = base64.urlsafe_b64decode(secret) elif encoding == 'clear': pass else: raise Exception('Invalid auth_jwt_encoding in config: "%s" (should be "clear", "base64std" or "base64url")' % encoding) def validate(token): profile = jwt.decode(token, secret, audience=client_id) return profile return validate
def secure_loads(data, encryption_key, hash_key=None, compression_level=None): if ':' not in data: return None if not hash_key: hash_key = sha1(encryption_key).hexdigest() signature, encrypted_data = data.split(':', 1) actual_signature = hmac.new(hash_key, encrypted_data).hexdigest() if not compare(signature, actual_signature): return None key = pad(encryption_key)[:32] encrypted_data = base64.urlsafe_b64decode(encrypted_data) IV, encrypted_data = encrypted_data[:16], encrypted_data[16:] cipher, _ = AES_new(key, IV=IV) try: data = cipher.decrypt(encrypted_data) data = data.rstrip(' ') if compression_level: data = zlib.decompress(data) return pickle.loads(data) except Exception, e: return None ### compute constant CTOKENS
def __init__(self): self.list = [] self.trakt_link = 'http://api-v2launch.trakt.tv' self.tvdb_key = base64.urlsafe_b64decode('MUQ2MkYyRjkwMDMwQzQ0NA==') self.datetime = (datetime.datetime.utcnow() - datetime.timedelta(hours = 5)) self.systime = (self.datetime).strftime('%Y%m%d%H%M%S%f') self.today_date = (self.datetime).strftime('%Y-%m-%d') self.trakt_user = re.sub('[^a-z0-9]', '-', control.setting('trakt.user').strip().lower()) self.lang = control.apiLanguage()['tvdb'] self.tvdb_info_link = 'http://thetvdb.com/api/%s/series/%s/all/%s.zip' % (self.tvdb_key, '%s', '%s') self.tvdb_image = 'http://thetvdb.com/banners/' self.tvdb_poster = 'http://thetvdb.com/banners/_cache/' self.added_link = 'http://api-v2launch.trakt.tv/calendars/all/shows/date[6]/7/' self.mycalendar_link = 'http://api-v2launch.trakt.tv/calendars/my/shows/date[59]/60/' self.trakthistory_link = 'http://api-v2launch.trakt.tv/users/%s/history/shows?limit=300' % self.trakt_user self.progress_link = 'http://api-v2launch.trakt.tv/users/%s/watched/shows' % self.trakt_user self.calendar_link = 'http://api-v2launch.trakt.tv/calendars/all/shows/%s/%s' self.traktlists_link = 'http://api-v2launch.trakt.tv/users/%s/lists' % self.trakt_user self.traktlikedlists_link = 'http://api-v2launch.trakt.tv/users/likes/lists?limit=1000000' self.traktlist_link = 'http://api-v2launch.trakt.tv/users/%s/lists/%s/items'
def get_data_from_b64string(result): """ ????urlsafe_64encode??????????? ????????????? @param result: ??????????? @type result: String :return: {} """ try: data = json.loads(base64.urlsafe_b64decode(str(result))) except: data = {} return data
def test_b64decode_invalid_chars(self): # issue 1466065: Test some invalid characters. tests = ((b'%3d==', b'\xdd'), (b'$3d==', b'\xdd'), (b'[==', b''), (b'YW]3=', b'am'), (b'3{d==', b'\xdd'), (b'3d}==', b'\xdd'), (b'@@', b''), (b'!', b''), (b'YWJj\nYWI=', b'abcab')) for bstr, res in tests: self.assertEqual(base64.b64decode(bstr), res) self.assertEqual(base64.standard_b64decode(bstr), res) self.assertEqual(base64.urlsafe_b64decode(bstr), res) # Normal alphabet characters not discarded when alternative given res = b'\xFB\xEF\xBE\xFF\xFF\xFF' self.assertEqual(base64.b64decode(b'++[[//]]', b'[]'), res) self.assertEqual(base64.urlsafe_b64decode(b'++--//__'), res)
def urlsafe_b64decode(data): """urlsafe_b64decode without padding""" pad = b'=' * (4 - (len(data) & 3)) return base64.urlsafe_b64decode(data + pad)
def get_config(self): for path in os.listdir(self.config_dir): if os.path.isdir(path): filename = base64.urlsafe_b64decode(os.path.basename(path)) self.schedule.make_watcher(filename) for cfg in os.listdir('/'.join(self.config_dir, path)): with open('/'.join((self.config_dir, path, cfg))) as f: rule = json.load(f) checker = Checker(**rule) self.schedule.watchers[filename].check_chain.add_checker(checker)
def __init__(self): self.base_link = 'http://www.youtube.com' self.key_link = random.choice(['QUl6YVN5RDd2aFpDLTYta2habTVuYlVyLTZ0Q0JRQnZWcnFkeHNz', 'QUl6YVN5Q2RiNEFNenZpVG0yaHJhSFY3MXo2Nl9HNXBhM2ZvVXd3']) self.key_link = '&key=%s' % base64.urlsafe_b64decode(self.key_link) self.search_link = 'https://www.googleapis.com/youtube/v3/search?part=id&type=video&maxResults=5&q=%s' + self.key_link self.youtube_watch = 'http://www.youtube.com/watch?v=%s'
def _urlsafe_b64decode(b64string): # Guard against unicode strings, which base64 can't handle. b64string = _to_bytes(b64string) padded = b64string + b'=' * (4 - len(b64string) % 4) return base64.urlsafe_b64decode(padded)
def validate_token(key, token, user_id, action_id="", current_time=None): """Validates that the given token authorizes the user for the action. Tokens are invalid if the time of issue is too old or if the token does not match what generateToken outputs (i.e. the token was forged). Args: key: secret key to use. token: a string of the token generated by generateToken. user_id: the user ID of the authenticated user. action_id: a string identifier of the action they requested authorization for. Returns: A boolean - True if the user is authorized for the action, False otherwise. """ if not token: return False try: decoded = base64.urlsafe_b64decode(token) token_time = int(decoded.split(DELIMITER)[-1]) except (TypeError, ValueError, binascii.Error): return False if current_time is None: current_time = time.time() # If the token is too old it's not valid. if current_time - token_time > DEFAULT_TIMEOUT_SECS: return False # The given token should match the generated one with the same time. expected_token = generate_token(key, user_id, action_id=action_id, when=token_time) if len(token) != len(expected_token): return False # Perform constant time comparison to avoid timing attacks different = 0 for x, y in zip(bytearray(token), bytearray(expected_token)): different |= x ^ y return not different
def base64_decode(string): """base64 decodes a single bytestring (and is tolerant to getting called with a unicode string). The result is also a bytestring. """ string = want_bytes(string, encoding='ascii', errors='ignore') return base64.urlsafe_b64decode(string + b'=' * (-len(string) % 4))
def extract_real_url_from_embedded_url(embedded_url): """ ? embed_real_url_to_embedded_url() ????url???????????url `cdn_redirect_encode_query_str_into_url`????????, ?????????????????? eg: https://cdn.domain.com/a.php_zm24_.cT1zb21ldGhpbmc=._zm24_.css ---> https://foo.com/a.php?q=something (assume it returns an css) (base64 only) eg2: https://cdn.domain.com/a/b/_zm24_.bG92ZT1saXZl._zm24_.jpg ---> https://foo.com/a/b/?love=live (assume it returns an jpg) (base64 only) eg3: https://cdn.domain.com/a/b/_zm24z_.[some long long base64 encoded string]._zm24_.jpg ---> https://foo.com/a/b/?love=live[and a long long query string] (assume it returns an jpg) (gzip + base64) eg4:https://cdn.domain.com/a (no change) ---> (no query string): https://foo.com/a (assume it returns an png) (no change) :param embedded_url: ??????URL :return: ??????????URL, ???????URL, ????None :type embedded_url: str :rtype: Union[str, None] """ if '._' + cdn_url_query_encode_salt + '_.' not in embedded_url[-15:]: # check url mark return None m = regex_extract_base64_from_embedded_url.search(embedded_url) b64 = get_group('b64', m) # 'https://cdn.domain.com/a.php_zm24_.cT1zb21ldGhpbmc=._zm24_.css' # real_request_url_no_query ---> 'https://cdn.domain.com/a.php' real_request_url_no_query = embedded_url[:m.span()[0]] query_string_byte = base64.urlsafe_b64decode(b64) is_gzipped = get_group('gzip', m) if is_gzipped: query_string_byte = zlib.decompress(query_string_byte) query_string = query_string_byte.decode(encoding='utf-8') result = urljoin(real_request_url_no_query, '?' + query_string) # dbgprint('extract:', embedded_url, 'to', result) return result
def _urlsafe_b64decode(b64string): # Guard against unicode strings, which base64 can't handle. b64string = b64string.encode('ascii') padded = b64string + '=' * (4 - len(b64string) % 4) return base64.urlsafe_b64decode(padded)
def __init__(self, key, backend=None): if backend is None: backend = default_backend() key = base64.urlsafe_b64decode(key) if len(key) != 32: raise ValueError( "Fernet key must be 32 url-safe base64-encoded bytes." ) self._signing_key = key[:16] self._encryption_key = key[16:] self._backend = backend
def b64_decode(s): pad = b'=' * (-len(s) % 4) return base64.urlsafe_b64decode(s + pad)
def urlsafe_base64_decode(s): """ Decodes a base64 encoded string, adding back any trailing equal signs that might have been stripped. """ s = force_bytes(s) try: return base64.urlsafe_b64decode(s.ljust(len(s) + len(s) % 4, b'=')) except (LookupError, BinasciiError) as e: raise ValueError(e)
def decode(key, enc): dec = [] enc = base64.urlsafe_b64decode(enc) for i in range(len(enc)): key_c = key[i % len(key)] dec_c = chr((256 + ord(enc[i]) - ord(key_c)) % 256) dec.append(dec_c) return "".join(dec)
def _decode(self, key, enc): dec = [] enc = base64.urlsafe_b64decode(enc) for i in range(len(enc)): key_c = key[i % len(key)] dec_c = (enc[i] - key_c) % 256 dec.append(dec_c) return bytes(dec)
def getPhoto(users): cd = buildGAPIObject(API.DIRECTORY) targetFolder = os.getcwd() showPhotoData = True while Cmd.ArgumentsRemaining(): myarg = getArgument() if myarg == u'drivedir': targetFolder = GC.Values[GC.DRIVE_DIR] elif myarg == u'targetfolder': targetFolder = os.path.expanduser(getString(Cmd.OB_FILE_PATH)) if not os.path.isdir(targetFolder): os.makedirs(targetFolder) elif myarg == u'noshow': showPhotoData = False else: unknownArgumentExit() i, count, users = getEntityArgument(users) for user in users: i += 1 user = normalizeEmailAddressOrUID(user) try: entityPerformActionNumItems([Ent.USER, user], 1, Ent.PHOTO, i, count) photo = callGAPI(cd.users().photos(), u'get', throw_reasons=[GAPI.USER_NOT_FOUND, GAPI.FORBIDDEN, GAPI.PHOTO_NOT_FOUND], userKey=user) filename = os.path.join(targetFolder, u'{0}.jpg'.format(user)) photo_data = str(photo[u'photoData']) if showPhotoData: writeStdout(photo_data+'\n') status, e = writeFileReturnError(filename, base64.urlsafe_b64decode(photo_data)) if status: entityActionPerformed([Ent.USER, user, Ent.PHOTO, filename], i, count) else: entityActionFailedWarning([Ent.USER, user, Ent.PHOTO, filename], str(e), i, count) except GAPI.photoNotFound as e: entityActionFailedWarning([Ent.USER, user, Ent.PHOTO, None], str(e), i, count) except (GAPI.userNotFound, GAPI.forbidden): entityUnknownWarning(Ent.USER, user, i, count)