我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用base64.b64decode()。
def auth(func): @wraps(func) def _decorator(request, *args, **kwargs): auth = get_auth() if auth.get('disable', False) is True: return func(request, *args, **kwargs) if 'authorized_ips' in auth: ip = get_client_ip(request) if is_authorized(ip, auth['authorized_ips']): return func(request, *args, **kwargs) prepare_credentials(auth) if request.META.get('HTTP_AUTHORIZATION'): authmeth, auth = request.META['HTTP_AUTHORIZATION'].split(' ') if authmeth.lower() == 'basic': auth = base64.b64decode(auth).decode('utf-8') username, password = auth.split(':') if (username == HEARTBEAT['auth']['username'] and password == HEARTBEAT['auth']['password']): return func(request, *args, **kwargs) response = HttpResponse( "Authentication failed", status=401) response['WWW-Authenticate'] = 'Basic realm="Welcome to 1337"' return response return _decorator
def get_encryption(): return ''' import base64 from Crypto import Random from Crypto.Cipher import AES abbrev = '{2}' {0} = base64.b64decode('{1}') def encrypt(raw): iv = Random.new().read( AES.block_size ) cipher = AES.new({0}, AES.MODE_CFB, iv ) return (base64.b64encode( iv + cipher.encrypt( raw ) ) ) def decrypt(enc): enc = base64.b64decode(enc) iv = enc[:16] cipher = AES.new({0}, AES.MODE_CFB, iv ) return cipher.decrypt( enc[16:] ) '''.format(st_obf[0],aes_encoded,aes_abbrev) ################################################################################ # st_protocol.py stitch_gen variables # ################################################################################
def parse_cookie(cookie, securekey): logger.info (">> parse cookie : %s" % cookie) parts = cookie.split('.') part1 = parts[0] part2 = '' if len(parts) < 2 else parts[1] try: text = str(base64.b64decode(part1.encode('ascii')), encoding='utf-8') except: logger.info ("decode cookie failed") return None logger.info ("cookie content : %s" % text) thatpart2 = hashlib.md5((text+securekey).encode('ascii')).hexdigest() logger.info ("hash from part1 : %s" % thatpart2) logger.info ("hash from part2 : %s" % part2) if part2 == thatpart2: result = json.loads(text)['name'] else: result = None logger.info ("parse from cookie : %s" % result) return result
def handleDeviceCall(self, deviceId, payload): deviceIdHex = binascii.hexlify(deviceId) self.logger.debug("Handling request from device {0} with payload {1}".format(deviceIdHex, payload)) payloadDict = json.loads(payload) session = self.service.sessions[deviceId] model = DeviceModel().loadFromSession(session, self.deviceConfig, payloadDict) if "image" in payloadDict: imageType = payloadDict.get("type", "jpg") imageData = base64.b64decode(payloadDict["image"]) fn = "{0}.{1}".format(datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S'), imageType) imageFile = os.path.join(self.getDeviceFolder(deviceIdHex, "images"), fn) self.logger.info("Received {0} bytes of image data from device {1} - saving to {2}".format(len(imageData), deviceIdHex, imageFile)) with open(imageFile, 'wb+') as f: f.write(imageData) model.newImageUrl = "/upload/{0}/images/{1}".format(deviceIdHex, fn) if "values" in payloadDict: for variable, value in payloadDict["values"].items(): self.database.save(deviceIdHex, variable, session.protocol, session.clientAddr[0], session.lastUpdateTime, value) with session.lock: model.saveTrends(self.trends) model.computeTrends(self.trends) self.webServer.websocketSend(model.toJSON())
def choose_aws_role(assertion): """ Choose AWS role from SAML assertion """ aws_attribute_role = 'https://aws.amazon.com/SAML/Attributes/Role' attribute_value_urn = '{urn:oasis:names:tc:SAML:2.0:assertion}AttributeValue' roles = [] role_tuple = namedtuple("RoleTuple", ["principal_arn", "role_arn"]) root = ET.fromstring(base64.b64decode(assertion)) for saml2attribute in root.iter('{urn:oasis:names:tc:SAML:2.0:assertion}Attribute'): if saml2attribute.get('Name') == aws_attribute_role: for saml2attributevalue in saml2attribute.iter(attribute_value_urn): roles.append(role_tuple(*saml2attributevalue.text.split(','))) for index, role in enumerate(roles): role_name = role.role_arn.split('/')[1] print("%d: %s" % (index+1, role_name)) role_choice = input('Please select the AWS role: ')-1 return roles[role_choice]
def test_password_empty_file(): """Test the encrypt module's CLI function with an empty YAML file.""" runner = CliRunner() with runner.isolated_filesystem(): initial_data = {'language': 'python'} with open('file.yml', 'w') as file: ordered_dump(initial_data, file) result = runner.invoke(cli, ['mandeep', 'Travis-Encrypt', 'file.yml'], 'SUPER_SECURE_PASSWORD') assert not result.exception with open('file.yml') as file: config = ordered_load(file) assert 'password' in config assert 'secure' in config['password'] assert config['language'] == 'python' assert base64.b64decode(config['password']['secure'])
def test_password_nonempty_file(): """Test the encrypt module's CLI function with a nonempty YAML file. The YAML file includes information that needs to be overwritten.""" runner = CliRunner() with runner.isolated_filesystem(): initial_data = OrderedDict([('language', 'python'), ('dist', 'trusty'), ('password', {'secure': 'SUPER_INSECURE_PASSWORD'})]) with open('file.yml', 'w') as file: ordered_dump(initial_data, file) result = runner.invoke(cli, ['mandeep', 'Travis-Encrypt', 'file.yml'], 'SUPER_SECURE_PASSWORD') assert not result.exception with open('file.yml') as file: config = ordered_load(file) assert config['language'] == 'python' assert config['dist'] == 'trusty' assert base64.b64decode(config['password']['secure']) assert ['language', 'dist', 'password'] == [key for key in config.keys()]
def test_deploy_empty_file(): """Test the encrypt module's CLI function with the --deploy flag and an empty YAML file.""" runner = CliRunner() with runner.isolated_filesystem(): initial_data = {'language': 'python'} with open('file.yml', 'w') as file: ordered_dump(initial_data, file) result = runner.invoke(cli, ['--deploy', 'mandeep', 'Travis-Encrypt', 'file.yml'], 'SUPER_SECURE_PASSWORD') assert not result.exception with open('file.yml') as file: config = ordered_load(file) assert config['language'] == 'python' assert base64.b64decode(config['deploy']['password']['secure'])
def test_deploy_nonempty_file(): """Test the encrypt module's CLI function with the --deploy flag and a nonempty YAML file. The YAML file includes information that needs to be overwritten.""" runner = CliRunner() with runner.isolated_filesystem(): initial_data = OrderedDict([('language', 'python'), ('dist', 'trusty'), ('deploy', {'password': {'secure': 'SUPER_INSECURE_PASSWORD'}})]) with open('file.yml', 'w') as file: ordered_dump(initial_data, file) result = runner.invoke(cli, ['--deploy', 'mandeep', 'Travis-Encrypt', 'file.yml'], 'SUPER_SECURE_PASSWORD') assert not result.exception with open('file.yml') as file: config = ordered_load(file) assert config['language'] == 'python' assert config['dist'] == 'trusty' assert base64.b64decode(config['deploy']['password']['secure']) assert ['language', 'dist', 'deploy'] == [key for key in config.keys()]
def test_environment_variable_nonempty_file(): """Test the encrypt module's CLI function with the --env flag and a nonempty YAML file. The YAML file includes information that needs to be overwritten.""" runner = CliRunner() with runner.isolated_filesystem(): initial_data = OrderedDict([('language', 'python'), ('dist', 'trusty'), ('env', {'global': {'secure': 'API_KEY="SUPER_INSECURE_KEY"'}})]) with open('file.yml', 'w') as file: ordered_dump(initial_data, file) result = runner.invoke(cli, ['--env', 'mandeep', 'Travis-Encrypt', 'file.yml'], 'SUPER_SECURE_API_KEY') assert not result.exception with open('file.yml') as file: config = ordered_load(file) assert config['language'] == 'python' assert config['dist'] == 'trusty' assert base64.b64decode(config['env']['global']['secure']) assert ['language', 'dist', 'env'] == [key for key in config.keys()]
def _key(): global __key if __key: return __key data_dir = _key_dir() key_file = os.path.join(data_dir, 'key') if os.path.isfile(key_file): with open(key_file, 'rb') as f: __key = base64.b64decode(f.read()) return __key __key = base64.b64encode(os.urandom(16)) try: os.makedirs(data_dir) except OSError as e: # errno17 == dir exists if e.errno != 17: raise with open(key_file, 'wb') as f: f.write(__key) return __key
def get(self, result): try: s = re.compile("S\s*=\s*'([^']+)").findall(result)[0] s = base64.b64decode(s) s = s.replace(' ', '') s = re.sub('String\.fromCharCode\(([^)]+)\)', r'chr(\1)', s) s = re.sub('\.slice\((\d+),(\d+)\)', r'[\1:\2]', s) s = re.sub('\.charAt\(([^)]+)\)', r'[\1]', s) s = re.sub('\.substr\((\d+),(\d+)\)', r'[\1:\1+\2]', s) s = re.sub(';location.reload\(\);', '', s) s = re.sub(r'\n', '', s) s = re.sub(r'document\.cookie', 'cookie', s) cookie = '' ; exec(s) self.cookie = re.compile('([^=]+)=(.*)').findall(cookie)[0] self.cookie = '%s=%s' % (self.cookie[0], self.cookie[1]) return self.cookie except: pass
def resolve(self, url): try: b = urlparse.urlparse(url).netloc b = re.compile('([\w]+[.][\w]+)$').findall(b)[0] if not b in base64.b64decode(self.b_link): return url u, p, h = url.split('|') r = urlparse.parse_qs(h)['Referer'][0] #u += '&app_id=Exodus' c = self.request(r, output='cookie', close=False) result = self.request(u, post=p, referer=r, cookie=c) url = result.split('url=') url = [urllib.unquote_plus(i.strip()) for i in url] url = [i for i in url if i.startswith('http')] url = url[-1] return url except: return
def get_cainfo(self): """Query the ca service information. Args: Returns: The base64 encoded CA PEM file content for the caname """ if self._ca_name != "": body_data = {"caname": self._ca_name} else: body_data = {} response = self._send_ca_post(path="cainfo", json=body_data) _logger.debug("Raw response json {0}".format(response)) if (response['success'] and response['result']['CAName'] == self._ca_name): return base64.b64decode(response['result']['CAChain']) else: raise ValueError("get_cainfo failed with errors {0}" .format(response['errors']))
def test_inject_file_with_old_agent(self): tmp_arg_dict = FAKE_ARG_DICT request_id = tmp_arg_dict["id"] b64_path = tmp_arg_dict["b64_path"] b64_file = tmp_arg_dict["b64_contents"] raw_path = base64.b64decode(b64_path) raw_file = base64.b64decode(b64_file) new_b64 = base64.b64encode("%s,%s" % (raw_path, raw_file)) self.mock_patch_object(self.agent, '_get_agent_features', 'injectfile') tmp_arg_dict["value"] = json.dumps({"name": "injectfile", "value": new_b64}) tmp_arg_dict["path"] = "data/host/%s" % request_id self.agent.inject_file(self.agent, FAKE_ARG_DICT) self.agent._wait_for_agent.assert_called_once() self.agent.xenstore.write_record.assert_called_with(self.agent, tmp_arg_dict) self.agent._get_agent_features.assert_called_once()
def test_kms(self): if "CFPP_RUN_KMS_TESTS" not in os.environ: return import boto3 import botocore output = subprocess.check_output(["cfpp", "-s", "tests", "tests/kms_test.template"]) parsed = json.loads(output)["Parameters"] without_context = parsed["EncryptedValue"]["Default"] with_context = parsed["EncryptedValueWithContext"]["Default"] kms = boto3.client('kms') kms.decrypt(CiphertextBlob=base64.b64decode(without_context)) try: kms.decrypt(CiphertextBlob=with_context) self.fail("expected KMS to fail due to lack of context") except botocore.exceptions.ClientError: pass kms.decrypt(CiphertextBlob=base64.b64decode(with_context), EncryptionContext={"ContextKey": "ContextValue"})
def unquote(cls, value): """Unquote the value for the cookie. If unquoting does not work a :exc:`UnquoteError` is raised. :param value: the value to unquote. """ try: if cls.quote_base64: value = base64.b64decode(value) if cls.serialization_method is not None: value = cls.serialization_method.loads(value) return value except Exception: # unfortunately pickle and other serialization modules can # cause pretty every error here. if we get one we catch it # and convert it into an UnquoteError raise UnquoteError()
def loads(self, value): def object_hook(obj): if len(obj) != 1: return obj the_key, the_value = next(iteritems(obj)) if the_key == ' t': return tuple(the_value) elif the_key == ' u': return uuid.UUID(the_value) elif the_key == ' b': return b64decode(the_value) elif the_key == ' m': return Markup(the_value) elif the_key == ' d': return parse_date(the_value) return obj return json.loads(value, object_hook=object_hook)
def post(self): authenticated = False if 'Authorization' in self.request.headers: auth = self.request.headers['Authorization'] decoded = base64.b64decode(auth[6:]) authenticated = True if authenticated: self.response.out.write('OK') else: site = GetSite() template_values = {} template_values['site'] = site template_values['message'] = "Authentication required" path = os.path.join(os.path.dirname(__file__), 'tpl', 'api') t = self.get_template(path,'error.json') output = t.render(template_values) self.set_status(401, 'Unauthorized') self.set_header('Content-type', 'application/json') self.set_header('WWW-Authenticate', 'Basic realm="' + site.domain + '"') self.write(output) # Replies # /api/replies/show.json
def handshake_cookie_verify(b64_cookie): ''' If b64_cookie matches the expected format for a base-64 encoded privcount cookie, return the decoded cookie. Otherwise, return False. Raises an exception if the cookie is not correctly padded base64. ''' if len(b64_cookie) != PrivCountProtocol.COOKIE_B64_BYTES: logging.warning("Invalid cookie: wrong encoded length {} expected {}" .format(len(b64_cookie), PrivCountProtocol.COOKIE_B64_BYTES)) return False cookie = b64decode(b64_cookie) if len(cookie) != PrivCountProtocol.COOKIE_BYTES: logging.warning("Invalid cookie: wrong decoded length {} expected {}" .format(len(cookie), PrivCountProtocol.COOKIE_BYTES)) return False return cookie
def decode(self, encoded): """ Takes the input from the text box and decodes it using the parameter set in the combobox """ mode = self._encodings[self._decoder.currentIndex()] if mode == 'raw': return str(encoded) try: if mode == 'hex': return encoded.decode('hex') if mode == 'b64': return b64decode(encoded) if mode == 'py2': return eval(encoded) except: log_alert("Failed to decode input") return None
def _generate_assertion(self): """Generate the assertion that will be used in the request.""" now = long(time.time()) payload = { 'aud': self.token_uri, 'scope': self.scope, 'iat': now, 'exp': now + SignedJwtAssertionCredentials.MAX_TOKEN_LIFETIME_SECS, 'iss': self.service_account_name } payload.update(self.kwargs) logger.debug(str(payload)) private_key = base64.b64decode(self.private_key) return crypt.make_signed_jwt(crypt.Signer.from_string( private_key, self.private_key_password), payload) # Only used in verify_id_token(), which is always calling to the same URI # for the certs.
def proof_open(team, proof): assert isinstance(proof, bytes) proof = b64decode(proof) claimed_chall_id = proof[2*64:].decode('utf-8') claimed_chall = Challenge(claimed_chall_id) chall_pk = claimed_chall['pk'] team_pk = team['sign_pk'] membership_proof = pysodium.crypto_sign_open(proof, chall_pk) chall_id = pysodium.crypto_sign_open(membership_proof, team_pk).decode('utf-8') if claimed_chall_id != chall_id: raise ValueError('invalid proof') return claimed_chall
def setup_environment(): root = os.getenv('LAMBDA_TASK_ROOT') bin_dir = os.path.join(root, 'bin') os.environ['PATH'] += ':' + bin_dir os.environ['GIT_EXEC_PATH'] = bin_dir ssh_dir = tempfile.mkdtemp() ssh_identity = os.path.join(ssh_dir, 'identity') with os.fdopen(os.open(ssh_identity, os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f: f.write(base64.b64decode(os.getenv('SSH_IDENTITY'))) ssh_config = os.path.join(ssh_dir, 'config') with open(ssh_config, 'w') as f: f.write('CheckHostIP no\n' 'StrictHostKeyChecking yes\n' 'IdentityFile %s\n' 'UserKnownHostsFile %s\n' % (ssh_identity, os.path.join(root, 'known_hosts'))) os.environ['GIT_SSH_COMMAND'] = 'ssh -F %s' % ssh_config
def get_monitor_script(self, name): # we will need monitor ID. So get the monitor object first monitor = self.get_monitor_by_name(name) if not monitor: raise ItemNotFoundError('Monitor {} not found'.format(name)) url = '{}/v3/monitors/{}/script'.format(self.base_url, monitor['id']) try: r = self._get( url, headers=self.default_headers, timeout=self.timeout ) # Make error message more specific except ItemNotFoundError: raise ItemNotFoundError( 'Script for monitor {} not found' .format(name) ) script_base64 = r.json()['scriptText'] script = base64.b64decode(script_base64) return script
def prepare_v(instance): # be very careful printing K, U, or V as they leak in logs stored on unprotected disks if common.DEVELOP_IN_ECLIPSE: logger.debug("b64_V (non encrypted): " + instance['v']) if instance.get('b64_encrypted_V',"") !="": b64_encrypted_V = instance['b64_encrypted_V'] logger.debug("Re-using cached encrypted V") else: # encrypt V with the public key b64_encrypted_V = base64.b64encode(crypto.rsa_encrypt(crypto.rsa_import_pubkey(instance['public_key']),str(base64.b64decode(instance['v'])))) instance['b64_encrypted_V'] = b64_encrypted_V logger.debug("b64_encrypted_V:" + b64_encrypted_V) post_data = { 'encrypted_key': b64_encrypted_V } v_json_message = json.dumps(post_data) return v_json_message
def parse_data_uri(data_uri): if data_uri is None: return None data = [] dataset_uris = data_uri.split("\n") for uri in dataset_uris: fpos = uri.find(",") if fpos == -1: return None try: data.append(base64.b64decode(uri[fpos:])) except Exception as e: # skip bad data continue return data
def activate_group(uuid,keyblob): if common.STUB_TPM: return common.TEST_AES_REG_KEY group_id = get_group_num(uuid) priv_ca = base64.b64decode(keyblob) assert len(priv_ca) == 256 logger.debug('Activating group number %d', group_id) body = vtpm_cmd(VTPM_ORD_GROUP_ACTIVATE, struct.pack('>II', group_id, 256) + priv_ca) (algId, encScheme, size), body = unpack('>IHH', body) logger.info('Received Key. AlgID: 0x%x, encScheme: 0x%x, size: 0x%x', algId, encScheme, size) logger.info('Key: %r', body) assert size == len(body) return body
def db_ssl(rdata, ctxt, ssl_dir): if 'ssl_ca' in rdata and ssl_dir: ca_path = os.path.join(ssl_dir, 'db-client.ca') with open(ca_path, 'w') as fh: fh.write(b64decode(rdata['ssl_ca'])) ctxt['database_ssl_ca'] = ca_path elif 'ssl_ca' in rdata: log("Charm not setup for ssl support but ssl ca found", level=INFO) return ctxt if 'ssl_cert' in rdata: cert_path = os.path.join( ssl_dir, 'db-client.cert') if not os.path.exists(cert_path): log("Waiting 1m for ssl client cert validity", level=INFO) time.sleep(60) with open(cert_path, 'w') as fh: fh.write(b64decode(rdata['ssl_cert'])) ctxt['database_ssl_cert'] = cert_path key_path = os.path.join(ssl_dir, 'db-client.key') with open(key_path, 'w') as fh: fh.write(b64decode(rdata['ssl_key'])) ctxt['database_ssl_key'] = key_path return ctxt
def configure_cert(self, cn=None): ssl_dir = os.path.join('/etc/apache2/ssl/', self.service_namespace) mkdir(path=ssl_dir) cert, key = get_cert(cn) if cn: cert_filename = 'cert_{}'.format(cn) key_filename = 'key_{}'.format(cn) else: cert_filename = 'cert' key_filename = 'key' write_file(path=os.path.join(ssl_dir, cert_filename), content=b64decode(cert)) write_file(path=os.path.join(ssl_dir, key_filename), content=b64decode(key))
def configure_ca(self): ca_cert = get_ca_cert() if ca_cert: install_ca_cert(b64decode(ca_cert))
def decrypt(enc, aes_key=secret): enc = base64.b64decode(enc) iv = enc[:16] cipher = AES.new(aes_key, AES.MODE_CFB, iv ) return cipher.decrypt( enc[16:] )
def add_bind_server(BHOST,BPORT): return ''' def bind_server(self): client_socket=None self.stop_bind_server = False # if no target is defined, we listen on all interfaces if dbg: print 'creating server' server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) target = base64.b64decode("{}") port = int(base64.b64decode("{}")) server.bind((target,port)) server.listen(5) while True: if self.stop_bind_server: break server.settimeout(5) try: client_socket, addr = server.accept() server.settimeout(None) client_socket.settimeout(None) except Exception as e: if dbg: print e client_socket=None pass if client_socket: if not self.connected: self.connected = True client_handler(client_socket) self.connected = False else: send(client_socket,"[!] Another stitch shell has already been established.\\n") client_socket.close() client_socket=None server.close() def halt_bind_server(self): self.stop_bind_server = True\n\n'''.format(BHOST,BPORT)
def add_listen_server(LHOST,LPORT): return ''' def listen_server(self): self.stop_listen_server = False while True: if self.stop_listen_server : break while self.connected: sleep(5) pass if dbg: print 'trying to connect' client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) client_socket.settimeout(5) target = base64.b64decode("{}") port = int(base64.b64decode("{}")) try: client_socket.connect((target,port)) client_socket.settimeout(300) if not self.connected: self.connected = True client_handler(client_socket) self.connected = False else: send(client_socket,"[!] Another stitch shell has already been established.\\n") client_socket.close() except Exception as e: if dbg: print e client_socket.close() def halt_listen_server(self): self.stop_listen_server = True\n\n'''.format(LHOST,LPORT)
def get_requirements(): return ''' import os import re import sys import math import time import socket import base64 import shutil import ctypes import socket import struct import zipfile import datetime import requests import StringIO import platform import threading import subprocess from Crypto import Random from Crypto.Cipher import AES from mss import ScreenshotError from time import strftime, sleep from contextlib import contextmanager from base64 import b64decode as INFO from zlib import decompress as SEC from st_utils import * from st_protocol import * from st_encryption import * '''
def on_friend_lossless_packet_callback(self,friendId,message_type,message): logger.debug('Got Lossless: Friend:%s Type:%s Message:%s'%(friendId,message_type,len(message))) self.queue_recv_lock.acquire() block_number = int(message[:1]) job_exists = False for entry in self.queue_recv: if entry['friendId'] == friendId: job_exists = True if block_number == 0: self.queue_recv.append({'data':message[1:],'done':False,'message_type':message_type,'friendId':friendId}) elif block_number == 1: if job_exists: self.queue_recv[-1]['data'] += message[1:] else: logger.info('Got Bad Fragment') elif block_number == 2: if job_exists: self.queue_recv[-1]['done'] = True if message_type == 181: self.queue_recv[-1]['data'] = base64.b64decode(self.queue_recv[-1]['data']) else: logger.info('Got Bad Fragment') else: self.queue_recv_lock.release() logger.info('Got Unknown Fragment') raise IOError self.queue_recv_lock.release()
def decode_private_key(self, encoded): """ Based on spotnab, this is the gzipped version of the key with base64 applied to it. We decode it and load it. """ fileobj = StringIO() try: fileobj.write(b64decode(encoded)) except TypeError: return False fileobj.seek(0L, SEEK_SET) private_key = None with GzipFile(fileobj=fileobj, mode="rb") as f: private_key = f.read() if not private_key: return False # We were successful if not self.load(private_key=private_key): return False return True
def decode_public_key(self, encoded): """ Based on spotnab, this is the gzipped version of the key with base64 applied to it. We decode it and load it. """ fileobj = StringIO() try: fileobj.write(b64decode(encoded)) except TypeError: return False fileobj.seek(0L, SEEK_SET) self.public_key = None with GzipFile(fileobj=fileobj, mode="rb") as f: try: self.public_key = serialization.load_pem_public_key( f.read(), backend=default_backend() ) except ValueError: # Could not decrypt content return False if not self.public_key: return False return True
def decrypt(enc): enc = base64.b64decode(enc) iv = enc[:16] cipher = AES.new(KEY, AES.MODE_CBC, iv ) return unpad(cipher.decrypt(enc[16:]))
def decrypt(ciphertext): return kms.decrypt( CiphertextBlob=base64.b64decode(ciphertext))['Plaintext']
def _b64_decode_bytes(b): return base64.b64decode(b.encode("ascii"))
def fn_extract_wav(piece): """Save the base64 encoded wav data in this email piece and return the file path and name as a string """ glog('Extracting wav from piece') # parse this piece of the email text to merge the # base64 encoded string into one line and then # decode the string into binary data capture = False lines = [] index = 0 for line in piece.split('\n'): if line.strip() == "": capture = True if capture == True: lines.append(line.strip()) index = index + 1 b64_data = ''.join(lines) data = base64.b64decode(b64_data) rs_string = random_string(20) if not os.path.exists(GLV_TMP_PATH_TO_SAVE_VOICEMAIL): os.makedirs(GLV_TMP_PATH_TO_SAVE_VOICEMAIL) rs_path = GLV_TMP_PATH_TO_SAVE_VOICEMAIL + rs_string + '.wav' glog("save wav file to " + rs_path) with open(rs_path, 'wb') as f: f.write(data) # return temporary file path and name return rs_path
def decrypt(text): """Returns original string""" text = base64.b64decode(text.encode()) aes = AES.new(settings.CRYPT_KEY, AES.MODE_CFB, settings.CRYPT_IV) return aes.decrypt(text).decode()
def _ConvertScalarFieldValue(value, field, require_str=False): """Convert a single scalar field value. Args: value: A scalar value to convert the scalar field value. field: The descriptor of the field to convert. require_str: If True, the field value must be a str. Returns: The converted scalar field value Raises: ParseError: In case of convert problems. """ if field.cpp_type in _INT_TYPES: return _ConvertInteger(value) elif field.cpp_type in _FLOAT_TYPES: return _ConvertFloat(value) elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_BOOL: return _ConvertBool(value, require_str) elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_STRING: if field.type == descriptor.FieldDescriptor.TYPE_BYTES: return base64.b64decode(value) else: # Checking for unpaired surrogates appears to be unreliable, # depending on the specific Python version, so we check manually. if _UNPAIRED_SURROGATE_PATTERN.search(value): raise ParseError('Unpaired surrogate') return value elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_ENUM: # Convert an enum value. enum_value = field.enum_type.values_by_name.get(value, None) if enum_value is None: try: number = int(value) enum_value = field.enum_type.values_by_number.get(number, None) except ValueError: raise ParseError('Invalid enum value {0} for enum type {1}.'.format( value, field.enum_type.full_name)) if enum_value is None: raise ParseError('Invalid enum value {0} for enum type {1}.'.format( value, field.enum_type.full_name)) return enum_value.number