我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用base64.standard_b64encode()。
def sign(self, url, endpoint, endpoint_path, method_verb, *args, **kwargs): try: req = kwargs['params'] except KeyError: req = {} if self.version == 'v1': req['request'] = endpoint_path req['nonce'] = self.nonce() js = json.dumps(req) data = base64.standard_b64encode(js.encode('utf8')) else: data = '/api/' + endpoint_path + self.nonce() + json.dumps(req) h = hmac.new(self.secret.encode('utf8'), data, hashlib.sha384) signature = h.hexdigest() headers = {"X-BFX-APIKEY": self.key, "X-BFX-SIGNATURE": signature, "X-BFX-PAYLOAD": data} if self.version == 'v2': headers['content-type'] = 'application/json' return url, {'headers': headers}
def sign(self, uri, endpoint, endpoint_path, method_verb, *args, **kwargs): nonce = self.nonce() try: params = kwargs['params'] except KeyError: params = {} payload = params payload['nonce'] = nonce payload['request'] = endpoint_path js = json.dumps(payload) data = base64.standard_b64encode(js.encode('utf8')) h = hmac.new(self.secret.encode('utf8'), data, hashlib.sha384) signature = h.hexdigest() headers = {'X-GEMINI-APIKEY': self.key, 'X-GEMINI-PAYLOAD': data, 'X-GEMINI-SIGNATURE': signature} return uri, {'headers': headers}
def _from_inst(inst, rostype): # Special case for uint8[], we base64 encode the string if rostype in ros_binary_types: return standard_b64encode(inst) # Check for time or duration if rostype in ros_time_types: return {"secs": inst.secs, "nsecs": inst.nsecs} # Check for primitive types if rostype in ros_primitive_types: return inst # Check if it's a list or tuple if type(inst) in list_types: return _from_list_inst(inst, rostype) # Assume it's otherwise a full ros msg object return _from_object_inst(inst, rostype)
def inject(self, span_context, carrier): if type(carrier) is not bytearray: raise InvalidCarrierException() state = BinaryCarrier() basic_ctx = state.basic_ctx basic_ctx.trace_id = span_context.trace_id basic_ctx.span_id = span_context.span_id basic_ctx.sampled = span_context.sampled if span_context.baggage is not None: for key in span_context.baggage: basic_ctx.baggage_items[key] = span_context.baggage[key] serializedProto = state.SerializeToString() encoded = standard_b64encode(serializedProto) carrier.extend(encoded)
def save_pem(contents, pem_marker): """Saves a PEM file. :param contents: the contents to encode in PEM format :param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY' when your file has '-----BEGIN RSA PRIVATE KEY-----' and '-----END RSA PRIVATE KEY-----' markers. :return: the base64-encoded content between the start and end markers. """ (pem_start, pem_end) = _markers(pem_marker) b64 = base64.standard_b64encode(contents).replace(b('\n'), b('')) pem_lines = [pem_start] for block_start in range(0, len(b64), 64): block = b64[block_start:block_start + 64] pem_lines.append(block) pem_lines.append(pem_end) pem_lines.append(b('')) return b('\n').join(pem_lines)
def _sign_payload(self, method, path, params=None, payload=None): route = build_route(path, params) nonce = gen_nonce() if payload: j = json.dumps(payload).encode('utf-8') encoded_body = base64.standard_b64encode(j).decode('utf-8') string = method + ' ' + route + ' ' + encoded_body + ' ' + nonce else: string = method + ' ' + route + ' ' + nonce h = hmac.new(key=self.SECRET.encode('utf-8'), msg=string.encode('utf-8'), digestmod=hashlib.sha384) signature = h.hexdigest() return { 'X-SBTC-APIKEY': self.KEY, 'X-SBTC-NONCE': nonce, 'X-SBTC-SIGNATURE': signature, 'Content-Type': 'application/json', }
def __encrypt(self, plaintext): """ Encrypt the given Plaintext with the encryption key :param plaintext: :return: Serialized JSON String of the encryption Envelope """ esn = self.kodi_helper.get_esn() iv = get_random_bytes(16) encryption_envelope = { 'ciphertext': '', 'keyid': esn + '_' + str(self.sequence_number), 'sha256': 'AA==', 'iv': base64.standard_b64encode(iv) } # Padd the plaintext plaintext = Padding.pad(plaintext, 16) # Encrypt the text cipher = AES.new(self.encryption_key, AES.MODE_CBC, iv) citext = cipher.encrypt(plaintext) encryption_envelope['ciphertext'] = base64.standard_b64encode(citext) return json.dumps(encryption_envelope)
def __save_msl_data(self): """ Saves the keys and tokens in json file :return: """ data = { "encryption_key": base64.standard_b64encode(self.encryption_key), 'sign_key': base64.standard_b64encode(self.sign_key), 'tokens': { 'mastertoken': self.mastertoken } } serialized_data = json.JSONEncoder().encode(data) self.save_file( msl_data_path=self.kodi_helper.msl_data_path, filename='msl_data.json', content=serialized_data)
def GET(self): """ Serves the dfnstation.cfg file to the user to read. Returns: result (str): A Base64 encoded string containing the contents of dfnstation.cfg. Raises: web.notfound """ if LoginChecker.loggedIn(): path = constants.dfnconfigPath if os.path.exists(path): getFile = file(path, 'rb') web.header('Content-type', 'application/octet-stream') web.header('Content-transfer-encoding', 'base64') return base64.standard_b64encode(getFile.read()) else: raise web.notfound()
def write_vm_file(self, path, content, uri="qemu:///system"): FILE_OPEN_WRITE="""{"execute":"guest-file-open", "arguments":{"path":"%s","mode":"w+"}}""" FILE_WRITE="""{"execute":"guest-file-write", "arguments":{"handle":%s,"buf-b64":"%s"}}""" FILE_CLOSE="""{"execute":"guest-file-close", "arguments":{"handle":%s}}""" FILE_FLUSH="""{"execute":"guest-file-flush", "arguments":{"handle":%s}}""" file_handle=-1 enc_content = base64.standard_b64encode(content) try: file_handle=self.EXE(FILE_OPEN_WRITE % path)["return"] write_count=self.EXE(FILE_WRITE % (file_handle,enc_content))["return"]["count"] logger.debug("content:\n%s\npath:\n%s"%(content, path)) except Exception,ex: print Exception,":",ex return -1 finally: self.EXE(FILE_FLUSH % file_handle) self.EXE(FILE_CLOSE % file_handle) return write_count
def authenticate_with_apikey(self, api_key, scope=None): """perform authentication by api key and store result for execute_request method api_key -- secret api key from account settings scope -- optional scope of authentication request. If None full list of API scopes will be used. """ scope = "auto" if scope is None else scope data = { "grant_type": "client_credentials", "scope": scope } encoded_data = urllib.parse.urlencode(data).encode() request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST") request.add_header("ContentType", "application/x-www-form-urlencoded") request.add_header("Authorization", 'Basic ' + base64.standard_b64encode(('APIKEY:' + api_key).encode()).decode()) response = urllib.request.urlopen(request) self._token = WaApiClient._parse_response(response) self._token.retrieved_at = datetime.datetime.now()
def authenticate_with_contact_credentials(self, username, password, scope=None): """perform authentication by contact credentials and store result for execute_request method username -- typically a contact email password -- contact password scope -- optional scope of authentication request. If None full list of API scopes will be used. """ scope = "auto" if scope is None else scope data = { "grant_type": "password", "username": username, "password": password, "scope": scope } encoded_data = urllib.parse.urlencode(data).encode() request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST") request.add_header("ContentType", "application/x-www-form-urlencoded") auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode() request.add_header("Authorization", 'Basic ' + auth_header) response = urllib.request.urlopen(request) self._token = WaApiClient._parse_response(response) self._token.retrieved_at = datetime.datetime.now()
def get(self): # Extract access code access_code = extract_code_from_args(self.request.arguments) # Request access token from github access_token = self.request_access_token(access_code) github_headers = {"Accept": "application/json", "Authorization": "token " + access_token} response = request_session.get("https://api.github.com/gists", headers=github_headers) response_to_send = bytearray(response.text, 'utf-8') self.write("<script>var gists = '") self.write(base64.standard_b64encode(response_to_send)) self.write("';") self.write("window.opener.postMessage(gists, window.opener.location)") self.finish(";</script>")
def get_digest_and_size_for_file(file_name): """ Gets file digest and size :param file_name: a file name :return: """ CHUNK_SIZE = 16 * 4 * 1024 f = open(file_name, 'rb') m = SHA256.new() while True: chunk = f.read(CHUNK_SIZE) if chunk == b'': break m.update(chunk) statinfo = os.stat(file_name) file_size = statinfo.st_size digest = base64.standard_b64encode(m.digest()).decode(UTF8) logger = getLogger(__name__) logger.debug(u'getting digest and size: %s, %s, file=%s', digest, file_size, file_name) return digest, file_size
def _generate_packages_dict(self, package_files): package_json = json.loads(package_files[_package_json_filename], object_pairs_hook=collections.OrderedDict) package_json['releaseVersion'] = 0 package_json['packagingVersion'] = '{}.0'.format(self._cosmos_packaging_version) command_json = package_files.get(_command_json_filename) if command_json is not None: package_json['command'] = json.loads(command_json, object_pairs_hook=collections.OrderedDict) config_json = package_files.get(_config_json_filename) if config_json is not None: package_json['config'] = json.loads(config_json, object_pairs_hook=collections.OrderedDict) marathon_json = package_files.get(_marathon_json_filename) if marathon_json is not None: package_json['marathon'] = { 'v2AppMustacheTemplate': base64.standard_b64encode(bytearray(marathon_json, 'utf-8')).decode()} resource_json = package_files.get(_resource_json_filename) if resource_json is not None: package_json['resource'] = json.loads(resource_json, object_pairs_hook=collections.OrderedDict) return {'packages': [package_json]}
def save_pem(contents, pem_marker): """Saves a PEM file. :param contents: the contents to encode in PEM format :param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY' when your file has '-----BEGIN RSA PRIVATE KEY-----' and '-----END RSA PRIVATE KEY-----' markers. :return: the base64-encoded content between the start and end markers, as bytes. """ (pem_start, pem_end) = _markers(pem_marker) b64 = base64.standard_b64encode(contents).replace(b'\n', b'') pem_lines = [pem_start] for block_start in range(0, len(b64), 64): block = b64[block_start:block_start + 64] pem_lines.append(block) pem_lines.append(pem_end) pem_lines.append(b'') return b'\n'.join(pem_lines)
def process_challenge(self, challenge_parameters): user_id_for_srp = challenge_parameters['USER_ID_FOR_SRP'] salt_hex = challenge_parameters['SALT'] srp_b_hex = challenge_parameters['SRP_B'] secret_block_b64 = challenge_parameters['SECRET_BLOCK'] # re strips leading zero from a day number (required by AWS Cognito) timestamp = re.sub(r" 0(\d) ", r" \1 ", datetime.datetime.utcnow().strftime("%a %b %d %H:%M:%S UTC %Y")) hkdf = self.get_password_authentication_key(user_id_for_srp, self.password, hex_to_long(srp_b_hex), salt_hex) secret_block_bytes = base64.standard_b64decode(secret_block_b64) msg = bytearray(self.pool_id.split('_')[1], 'utf-8') + bytearray(user_id_for_srp, 'utf-8') + \ bytearray(secret_block_bytes) + bytearray(timestamp, 'utf-8') hmac_obj = hmac.new(hkdf, msg, digestmod=hashlib.sha256) signature_string = base64.standard_b64encode(hmac_obj.digest()) response = {'TIMESTAMP': timestamp, 'USERNAME': user_id_for_srp, 'PASSWORD_CLAIM_SECRET_BLOCK': secret_block_b64, 'PASSWORD_CLAIM_SIGNATURE': signature_string.decode('utf-8')} if self.client_secret is not None: response.update({ "SECRET_HASH": self.get_secret_hash(self.username, self.client_id, self.client_secret)}) return response
def get_md5(data): m2 = hashlib.md5(data) MD5 = base64.standard_b64encode(m2.digest()) return MD5
def encode(self,string): return base64.standard_b64encode(string)
def connect(self): self.custom_sessionid = None self.logged_in = False response = self._request("InitialApp.php", { "RegionCode": self.region_code, "lg": "en-US", }) ret = CarwingsInitialAppResponse(response) c1 = Blowfish.new(ret.baseprm, Blowfish.MODE_ECB) packedPassword = _PKCS5Padding(self.password) encryptedPassword = c1.encrypt(packedPassword) encodedPassword = base64.standard_b64encode(encryptedPassword) response = self._request("UserLoginRequest.php", { "RegionCode": self.region_code, "UserId": self.username, "Password": encodedPassword, }) ret = CarwingsLoginResponse(response) self.custom_sessionid = ret.custom_sessionid self.gdc_user_id = ret.gdc_user_id log.debug("gdc_user_id: %s" % self.gdc_user_id) self.dcm_id = ret.dcm_id log.debug("dcm_id: %s" % self.dcm_id) self.tz = ret.tz log.debug("tz: %s" % self.tz) self.language = ret.language log.debug("language: %s" % self.language) log.debug("vin: %s" % ret.vin) log.debug("nickname: %s" % ret.nickname) self.leaf = Leaf(self, ret.leafs[0]) self.logged_in = True return ret
def random(length=64, encoding="hex"): if encoding == "base64": # Characters: A-Za-z0-9 hash = hashlib.sha512(os.urandom(256)).digest() return base64.standard_b64encode(hash).replace("+", "").replace("/", "").replace("=", "")[0:length] else: # Characters: a-f0-9 (faster) return hashlib.sha512(os.urandom(256)).hexdigest()[0:length]
def SignMessage(self,message,priv): #Sign a message. The private key is self.d. (d,uncompressed)=self.DFromPriv(priv) z=Byte2Int(Hash(Hash(MsgMagic(message),"SHA256"),"SHA256")) r=0 s=0 while not r or not s: #k=random.randint(1,self.n-1) k=random.SystemRandom().randint(1,self.n-1) # Better random fix R=self*k R.Normalize() r=R.x[0]%self.n s=(InvMod(k,self.n)*(z+r*d))%self.n val=27 if not uncompressed: val+=4 return base64.standard_b64encode(chr(val)+Int2Byte(r,32)+Int2Byte(s,32))
def _sign_payload(self, payload): j = json.dumps(payload) data = base64.standard_b64encode(j.encode('utf8')) h = hmac.new(self.SECRET.encode('utf8'), data, hashlib.sha384) signature = h.hexdigest() return { "X-BFX-APIKEY": self.KEY, "X-BFX-SIGNATURE": signature, "X-BFX-PAYLOAD": data }
def Encode(value): if isinstance(value, int): data = struct.pack("=q", value) elif isinstance(value, float): data = struct.pack("=d", value) elif isinstance(value, str): data = value else: raise EncodeException("Unknown type %s with value %s" % (str(type(value)),str(value),)) return base64.standard_b64encode(data)
def encode(self, string): return mybase64.standard_b64encode(string)
def _get_password(self, server): """Returns the password of a server for this adapter.""" base = server.name if hasattr(server, 'name') else server return b64enc(hashlib.sha256(base.encode('utf-8')).digest()).decode('utf-8')
def auth_encode(apikey): auth = base64.standard_b64encode(apikey) auth.replace("\n", "") return auth
def _sign_payload(self, payload): j = json.dumps(payload).encode('utf-8') encoded_body = base64.standard_b64encode(j) h = hmac.new(self.SECRET.encode('utf-8'), encoded_body, hashlib.sha384) signature = h.hexdigest() return { 'X-BFX-APIKEY': self.KEY, 'X-BFX-SIGNATURE': signature, 'X-BFX-PAYLOAD': encoded_body, } # Packs and sign the payload and send the request with GET.
def DER_cert_to_PEM_cert(der_cert_bytes): """Takes a certificate in binary DER format and returns the PEM version of it as a string.""" f = str(base64.standard_b64encode(der_cert_bytes), 'ASCII', 'strict') return (PEM_HEADER + '\n' + textwrap.fill(f, 64) + '\n' + PEM_FOOTER + '\n')
def md5_sum(content): return base64.standard_b64encode(hashlib.md5(content).digest())
def __generate_msl_request_data(self, data): self.__load_msl_data() header_encryption_envelope = self.__encrypt( plaintext=self.__generate_msl_header()) headerdata = base64.standard_b64encode(header_encryption_envelope) header = { 'headerdata': headerdata, 'signature': self.__sign(header_encryption_envelope), 'mastertoken': self.mastertoken, } # Serialize the given Data raw_marshalled_data = json.dumps(data) marshalled_data = raw_marshalled_data.replace('"', '\\"') serialized_data = '[{},{"headers":{},"path":"/cbp/cadmium-13"' serialized_data += ',"payload":{"data":"' serialized_data += marshalled_data serialized_data += '"},"query":""}]\n' compressed_data = self.__compress_data(serialized_data) # Create FIRST Payload Chunks first_payload = { 'messageid': self.current_message_id, 'data': compressed_data, 'compressionalgo': 'GZIP', 'sequencenumber': 1, 'endofmsg': True } first_payload_encryption_envelope = self.__encrypt( plaintext=json.dumps(first_payload)) payload = base64.standard_b64encode(first_payload_encryption_envelope) first_payload_chunk = { 'payload': payload, 'signature': self.__sign(first_payload_encryption_envelope), } request_data = json.dumps(header) + json.dumps(first_payload_chunk) return request_data
def __compress_data(self, data): # GZIP THE DATA out = StringIO() with gzip.GzipFile(fileobj=out, mode="w") as f: f.write(data) return base64.standard_b64encode(out.getvalue())
def __sign(self, text): """ Calculates the HMAC signature for the given text with the current sign key and SHA256 :param text: :return: Base64 encoded signature """ signature = HMAC.new(self.sign_key, text, SHA256).digest() return base64.standard_b64encode(signature)
def getCaptchaTask(self, exclusive=False): """Returns a captcha task :param exclusive: unused :return: `CaptchaTask` """ self.core.lastClientConnected = time() task = self.core.captchaManager.getTask() if task: task.setWatingForUser(exclusive=exclusive) data, type, result = task.getCaptcha() t = CaptchaTask(int(task.id), standard_b64encode(data), type, result) return t else: return CaptchaTask(-1)
def encode_array(array, compression=COMPRESSION_NONE, dtype=np.float32): bytestring = np.asanyarray(array).astype(dtype).tobytes() if compression == COMPRESSION_NONE: bytestring = bytestring elif compression == COMPRESSION_ZLIB: bytestring = zlib.compress(bytestring) else: raise ValueError("Unknown compression: %s" % compression) encoded_string = base64.standard_b64encode(bytestring) return encoded_string