我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用base64.standard_b64decode()。
def main(cb, args): powershells = cb.process_search_iter('process_name:powershell.exe') for s in powershells: if s['cmdline']: encoded = re.search('\-[eE][nN][cC][oOdDeEcCmMaAnN]*\s([A-Za-z0-9\+/=]+)', s['cmdline']) if encoded != None: i = encoded.group(1) if not re.search('[a-zA-Z0-9\+/]+={1,2}$', i): trailingBytes = len(i) % 4 if trailingBytes == 3: i = i + '=' elif trailingBytes == 2: i = i + '==' decodedCommand = base64.standard_b64decode(i) try: a = decodedCommand.encode('ascii', 'replace') print "Powershell Decoded Command\n%s/#analyze/%s/1\n%s\n\n" % ( args['server_url'], s['id'], a.replace('\0', "")) except UnicodeError: print "Powershell Decoded Command\n%s/#analyze/%s/1\nNon-ASCII decoding, encoded form printed to assist more research\n%s\n" % ( args['server_url'], s['id'], s['cmdline']) pass
def check_authentication(self, response_dict): if isinstance(response_dict, dict) and ('auth_ticket' in response_dict) and \ ('expire_timestamp_ms' in response_dict['auth_ticket']) and \ (self._auth_provider.is_new_ticket(response_dict['auth_ticket']['expire_timestamp_ms'])): had_ticket = self._auth_provider.has_ticket() auth_ticket = response_dict['auth_ticket'] self._auth_provider.set_ticket( [auth_ticket['expire_timestamp_ms'], base64.standard_b64decode(auth_ticket['start']), base64.standard_b64decode(auth_ticket['end'])]) now_ms = get_time(ms=True) h, m, s = get_format_time_diff(now_ms, auth_ticket['expire_timestamp_ms'], True) if had_ticket: self.log.debug('Replacing old Session Ticket with new one valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms']) else: self.log.debug('Received Session Ticket valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
def __getDownloadedUserContributed(self): dbManager = DBManager.DBManager() t = dbManager.InstalledDocsetsByType('usercontributed') ds = [] for d in t: aa = UserContributed() aa.name = d[1] aa.id = d[0] aa.path = os.path.join(os.path.abspath('.'),d[2]) #aa.image = self.__getLocalIcon(d[2]) imgData = str(d[4]) if not imgData == '': imgdata = base64.standard_b64decode(imgData) aa.image = ui.Image.from_data(imgdata) else: aa.image = self.__getIconWithName('Other') aa.authorName = d[6] aa.version = d[5] ds.append(aa) return ds
def Decode(value, type): data = base64.standard_b64decode(value) print("Decoding value %s, type %s" % (str(value), str(type))) sys.stdout.flush() if type is int: result = struct.unpack("=q", data)[0] elif type is float: result = struct.unpack("=d", data)[0] elif type is str: result = str(data) else: raise EncodeException("Unknown type %s with value %s" % (str(type(value)),str(value),)) return result
def _buildAuthJWT(config): client_id = config.get('canister.auth_jwt_client_id', None) secret = config.get('canister.auth_jwt_secret', None) encoding = config.get('canister.auth_jwt_encoding', 'clear').lower() # clear, base64std, or base64url if not client_id or not secret: return None import jwt if encoding == 'base64std': # with + and / secret = base64.standard_b64decode(secret) elif encoding == 'base64url': # with - and _ secret = base64.urlsafe_b64decode(secret) elif encoding == 'clear': pass else: raise Exception('Invalid auth_jwt_encoding in config: "%s" (should be "clear", "base64std" or "base64url")' % encoding) def validate(token): profile = jwt.decode(token, secret, audience=client_id) return profile return validate
def __parse_crypto_keys(self, headerdata): self.__set_master_token(headerdata['keyresponsedata']['mastertoken']) # Init Decryption enc_key = headerdata['keyresponsedata']['keydata']['encryptionkey'] hmac_key = headerdata['keyresponsedata']['keydata']['hmackey'] encrypted_encryption_key = base64.standard_b64decode(enc_key) encrypted_sign_key = base64.standard_b64decode(hmac_key) cipher_rsa = PKCS1_OAEP.new(self.rsa_key) # Decrypt encryption key cipher_raw = cipher_rsa.decrypt(encrypted_encryption_key) encryption_key_data = json.JSONDecoder().decode(cipher_raw) self.encryption_key = base64key_decode(encryption_key_data['k']) # Decrypt sign key sign_key_raw = cipher_rsa.decrypt(encrypted_sign_key) sign_key_data = json.JSONDecoder().decode(sign_key_raw) self.sign_key = base64key_decode(sign_key_data['k']) self.__save_msl_data() self.handshake_performed = True
def __load_msl_data(self): raw_msl_data = self.load_file( msl_data_path=self.kodi_helper.msl_data_path, filename='msl_data.json') msl_data = json.JSONDecoder().decode(raw_msl_data) # Check expire date of the token raw_token = msl_data['tokens']['mastertoken']['tokendata'] base_token = base64.standard_b64decode(raw_token) master_token = json.JSONDecoder().decode(base_token) exp = int(master_token['expiration']) valid_until = datetime.utcfromtimestamp(exp) present = datetime.now() difference = valid_until - present difference = difference.total_seconds() / 60 / 60 # If token expires in less then 10 hours or is expires renew it if difference < 10: self.__load_rsa_keys() self.__perform_key_handshake() return self.__set_master_token(msl_data['tokens']['mastertoken']) enc_key = msl_data['encryption_key'] self.encryption_key = base64.standard_b64decode(enc_key) self.sign_key = base64.standard_b64decode(msl_data['sign_key'])
def test_b64decode_invalid_chars(self): # issue 1466065: Test some invalid characters. tests = ((b'%3d==', b'\xdd'), (b'$3d==', b'\xdd'), (b'[==', b''), (b'YW]3=', b'am'), (b'3{d==', b'\xdd'), (b'3d}==', b'\xdd'), (b'@@', b''), (b'!', b''), (b'YWJj\nYWI=', b'abcab')) for bstr, res in tests: self.assertEqual(base64.b64decode(bstr), res) self.assertEqual(base64.standard_b64decode(bstr), res) self.assertEqual(base64.urlsafe_b64decode(bstr), res) # Normal alphabet characters not discarded when alternative given res = b'\xFB\xEF\xBE\xFF\xFF\xFF' self.assertEqual(base64.b64decode(b'++[[//]]', b'[]'), res) self.assertEqual(base64.urlsafe_b64decode(b'++--//__'), res)
def addcrypted2(self): package = self.get_post("source", "ClickAndLoad Package") crypted = self.get_post("crypted") jk = self.get_post("jk") crypted = standard_b64decode(unquote(crypted.replace(" ", "+"))) jk = "%s f()" % jk jk = js.eval(jk) Key = unhexlify(jk) IV = Key obj = AES.new(Key, AES.MODE_CBC, IV) result = obj.decrypt(crypted).replace("\x00", "").replace("\r","").split("\n") result = filter(lambda x: x != "", result) self.add_package(package, result, 0)
def read_vm_file(self, path, uri="qemu:///system"): FILE_OPEN_READ="""{"execute":"guest-file-open", "arguments":{"path":"%s","mode":"r"}}""" FILE_READ="""{"execute":"guest-file-read", "arguments":{"handle":%s,"count":%d}}""" FILE_CLOSE="""{"execute":"guest-file-close", "arguments":{"handle":%s}}""" file_handle=-1 try: file_handle=self.EXE(FILE_OPEN_READ % path)["return"] file_content=self.EXE(FILE_READ % (file_handle,1024000))["return"]["buf-b64"] file_content = base64.standard_b64decode(file_content) except Exception,e: logger.exception(e) return None finally: self.EXE(FILE_CLOSE % file_handle) return file_content
def _process_prop_suffix(prop_name, value): """ Some JSON properties have suffixes indicating the type of the value. This will translate the json value into a Python type with the proper semantics, so that subsequent property tests will work properly. :param prop_name: The JSON property name :param value: The JSON property value :return: If key is a specially suffixed property name, an instance of an appropriate python type. Otherwise, value itself is returned. """ if prop_name.endswith(u"_hex"): # binary type, expressed as hex value = binascii.a2b_hex(value) elif prop_name.endswith(u"_bin"): # binary type, expressed as base64 value = base64.standard_b64decode(value) return value
def check_authentication(self, response_dict): if isinstance(response_dict, dict) and ('auth_ticket' in response_dict) and \ ('expire_timestamp_ms' in response_dict['auth_ticket']) and \ (self._auth_provider.is_new_ticket(response_dict['auth_ticket']['expire_timestamp_ms'])): had_ticket = self._auth_provider.has_ticket() auth_ticket = response_dict['auth_ticket'] self._auth_provider.set_ticket( [auth_ticket['expire_timestamp_ms'], base64.standard_b64decode(auth_ticket['start']), base64.standard_b64decode(auth_ticket['end'])]) now_ms = get_time(ms = True) h, m, s = get_format_time_diff(now_ms, auth_ticket['expire_timestamp_ms'], True) if had_ticket: self.log.debug('Replacing old Session Ticket with new one valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms']) else: self.log.debug('Received Session Ticket valid for %02d:%02d:%02d hours (%s < %s)', h, m, s, now_ms, auth_ticket['expire_timestamp_ms'])
def sign_extract(signature): if not signature: return None, None, None try: text = base64.standard_b64decode(signature) except: return None, None, None part = text.split(':') if len(part) != 3: return None, None, None user = part[0].strip('\r\n\t ') try: ts = long(part[1]) except: return None, None, None verify = part[2].strip('\r\n\t ').lower() return user, ts, verify # ?????
def process_challenge(self, challenge_parameters): user_id_for_srp = challenge_parameters['USER_ID_FOR_SRP'] salt_hex = challenge_parameters['SALT'] srp_b_hex = challenge_parameters['SRP_B'] secret_block_b64 = challenge_parameters['SECRET_BLOCK'] # re strips leading zero from a day number (required by AWS Cognito) timestamp = re.sub(r" 0(\d) ", r" \1 ", datetime.datetime.utcnow().strftime("%a %b %d %H:%M:%S UTC %Y")) hkdf = self.get_password_authentication_key(user_id_for_srp, self.password, hex_to_long(srp_b_hex), salt_hex) secret_block_bytes = base64.standard_b64decode(secret_block_b64) msg = bytearray(self.pool_id.split('_')[1], 'utf-8') + bytearray(user_id_for_srp, 'utf-8') + \ bytearray(secret_block_bytes) + bytearray(timestamp, 'utf-8') hmac_obj = hmac.new(hkdf, msg, digestmod=hashlib.sha256) signature_string = base64.standard_b64encode(hmac_obj.digest()) response = {'TIMESTAMP': timestamp, 'USERNAME': user_id_for_srp, 'PASSWORD_CLAIM_SECRET_BLOCK': secret_block_b64, 'PASSWORD_CLAIM_SIGNATURE': signature_string.decode('utf-8')} if self.client_secret is not None: response.update({ "SECRET_HASH": self.get_secret_hash(self.username, self.client_id, self.client_secret)}) return response
def authenticate_client(self): validuser = False if self.headers.has_key('Authorization'): # handle Basic authentication (enctype, encstr) = self.headers.get('Authorization').split() (emailid, machine_name) = base64.standard_b64decode(encstr).split(':') (auth_machine, auth_uuidstr, auth_email) = authenticate(machine_name) if emailid == auth_email: print "Authenticated" # set authentication cookies on client machines validuser = True if auth_uuidstr: self.setCookie('UUID',auth_uuidstr) elif self.headers.has_key('UUID'): # handle cookie based authentication id = self.headers.get('UUID') (auth_machine, auth_uuidstr, auth_email) = authenticate(id) if auth_uuidstr : print "Authenticated" validuser = True else: print 'Authentication failed' return validuser
def _to_binary_inst(msg): if type(msg) in string_types: try: return standard_b64decode(msg) except: return msg else: try: return str(bytearray(msg)) except: return msg
def extract(self, carrier): if type(carrier) is not bytearray: raise InvalidCarrierException() serializedProto = standard_b64decode(carrier) state = BinaryCarrier() state.ParseFromString(bytes(serializedProto)) baggage = {} for k in state.basic_ctx.baggage_items: baggage[k] = state.basic_ctx.baggage_items[k] return SpanContext( span_id=state.basic_ctx.span_id, trace_id=state.basic_ctx.trace_id, baggage=baggage, sampled=state.basic_ctx.sampled)
def _sc_provisioning_helper(self, pl, sc_class, sc_key): """ insert sc code into appropriate dictionary :param pl: transaction payload to extract sc from :param sc_class: type of sc being dealt with (e.g. tsc, ssc, etc.) :param sc_key: transaction type """ try: if 'smart_contract' in pl: sc = pl['smart_contract'] sc_impl = sc[sc_class] if sc_impl: try: sc_impl = base64.standard_b64decode(sc_impl) except TypeError: raise Exception("The Smart Contract implementation for " + str(sc_key) + " must be base64 encoded.") func = None # define sc function exec(sc_impl) # store sc function for this txn type (sc_key) self.sc_container[sc_class][sc_key] = func else: logger().warning("No smart contract code provided...") return False else: return False except Exception as ex: template = "An exception of type {0} occurred. Arguments:\n{1!r}" message = template.format(type(ex).__name__, ex.args) logger().warning(message) return False return True
def __getUserContributed(self): server = self.serverManager.getDownloadServer(self.localServer) url = server.url if not url[-1] == '/': url = url + '/' url = url + self.jsonServerLocation data = requests.get(url).text data = ast.literal_eval(data) usercontributed = [] defaultIcon = self.__getIconWithName('Other') for k,d in data['docsets'].items(): u = UserContributed() u.name = d['name'] if 'aliases' in d.keys(): u.aliases = d['aliases'] u.version = d['version'] u.archive = d['archive'] u.authorName = d['author']['name'] if 'icon' in d.keys(): imgdata = base64.standard_b64decode(d['icon']) u.image = ui.Image.from_data(imgdata) u.imageData = d['icon'] else: u.image = defaultIcon u.onlineid = k u.status = 'online' usercontributed.append(u) return sorted(usercontributed, key=lambda x: x.name.lower())
def VerifyMessageFromAddress(self,addr,message,sig): #Check a signature (r,s) for the message m signed by the Bitcoin # address "addr". sign=base64.standard_b64decode(sig) (r,s)=(Byte2Int(sign[1:33]),Byte2Int(sign[33:65])) z=Byte2Int(Hash(Hash(MsgMagic(message),"SHA256"),"SHA256")) val=ord(sign[0]) if val<27 or val>=35: return False if val>=31: uncompressed=False val-=4 else: uncompressed=True x=r y2=(pow(x,3,self.p) + self.a*x + self.b) % self.p y=Cipolla(y2,self.p) for _ in range(2): kG=EllipticCurvePoint([x,y,1],self.a,self.b,self.p,self.n) mzG=self*((-z)%self.n) Q=(kG*s+mzG)*InvMod(r,self.n) if self.AddressFromPublicKey(Q,uncompressed)==addr: return True y=self.p-y return False
def __decrypt_payload_chunks(self, payloadchunks): decrypted_payload = '' for chunk in payloadchunks: payloadchunk = json.JSONDecoder().decode(chunk) payload = payloadchunk.get('payload') decoded_payload = base64.standard_b64decode(payload) encryption_envelope = json.JSONDecoder().decode(decoded_payload) # Decrypt the text cipher = AES.new( self.encryption_key, AES.MODE_CBC, base64.standard_b64decode(encryption_envelope['iv'])) ciphertext = encryption_envelope.get('ciphertext') plaintext = cipher.decrypt(base64.standard_b64decode(ciphertext)) # unpad the plaintext plaintext = json.JSONDecoder().decode(Padding.unpad(plaintext, 16)) data = plaintext.get('data') # uncompress data if compressed if plaintext.get('compressionalgo') == 'GZIP': decoded_data = base64.standard_b64decode(data) data = zlib.decompress(decoded_data, 16 + zlib.MAX_WBITS) else: data = base64.standard_b64decode(data) decrypted_payload += data decrypted_payload = json.JSONDecoder().decode(decrypted_payload)[1]['payload']['data'] decrypted_payload = base64.standard_b64decode(decrypted_payload) return json.JSONDecoder().decode(decrypted_payload)
def __set_master_token(self, master_token): self.mastertoken = master_token raw_token = master_token['tokendata'] base_token = base64.standard_b64decode(raw_token) decoded_token = json.JSONDecoder().decode(base_token) self.sequence_number = decoded_token.get('sequencenumber')
def test_decode_nonascii_str(self): decode_funcs = (base64.b64decode, base64.standard_b64decode, base64.urlsafe_b64decode, base64.b32decode, base64.b16decode) for f in decode_funcs: self.assertRaises(ValueError, f, 'with non-ascii \xcb')
def base64PayloadDecoder(data): return base64.standard_b64decode(data)
def _session_unpickle(utfed): base64ed = utfed.encode('utf8') pickled = base64.standard_b64decode(base64ed) obj = pickle.loads(pickled) return obj
def wsse_digest(secret, b64_encoded_nonce, timestamp): m = hashlib.sha256() m.update(base64.standard_b64decode(b64_encoded_nonce)) m.update(timestamp) m.update(secret) digest = m.digest() return base64.standard_b64encode(digest)
def test_decode_nonascii_str(self): decode_funcs = (base64.b64decode, base64.standard_b64decode, base64.urlsafe_b64decode, base64.b32decode, base64.b16decode, base64.b85decode, base64.a85decode) for f in decode_funcs: self.assertRaises(ValueError, f, 'with non-ascii \xcb')
def decipher(self): try: return base64.standard_b64decode(str.encode(self.cipher)) except: return ''
def _apply(self, value): value = self._filter(value, Type(binary_type)) # type: binary_type if self._has_errors: return None # Strip out whitespace. # Technically, whitespace is not part of the Base64 alphabet, # but virtually every implementation allows it. value = self.whitespace_re.sub(b'', value) # Check for invalid characters. # Note that Python 3's b64decode does this for us, but we also # have to support Python 2. # https://docs.python.org/3/library/base64.html#base64.b64decode if not self.base64_re.match(value): return self._invalid_value( value = value, reason = self.CODE_INVALID, ) # Check to see if we are working with a URL-safe dialect. # https://en.wikipedia.org/wiki/Base64#URL_applications if (b'_' in value) or (b'-' in value): # You can't mix dialects, silly! if (b'+' in value) or (b'/' in value): return self._invalid_value( value = value, reason = self.CODE_INVALID, ) url_safe = True else: url_safe = False # Normalize padding. # http://stackoverflow.com/a/9807138/ value = value.rstrip(b'=') value += (b'=' * (4 - (len(value) % 4))) try: return ( urlsafe_b64decode(value) if url_safe else standard_b64decode(value) ) except TypeError: return self._invalid_value(value, self.CODE_INVALID, exc_info=True) # noinspection SpellCheckingInspection
def __perform_key_handshake(self): header = self.__generate_msl_header( is_key_request=True, is_handshake=True, compressionalgo='', encrypt=False) esn = self.kodi_helper.get_esn() request = { 'entityauthdata': { 'scheme': 'NONE', 'authdata': { 'identity': esn } }, 'headerdata': base64.standard_b64encode(header), 'signature': '', } self.kodi_helper.log(msg='Key Handshake Request:') self.kodi_helper.log(msg=json.dumps(request)) try: resp = self.session.post( url=self.endpoints['manifest'], data=json.dumps(request, sort_keys=True)) except: resp = None exc = sys.exc_info() self.kodi_helper.log( msg='[MSL][POST] Error {} {}'.format(exc[0], exc[1])) if resp and resp.status_code == 200: resp = resp.json() if 'errordata' in resp: self.kodi_helper.log(msg='Key Exchange failed') self.kodi_helper.log( msg=base64.standard_b64decode(resp['errordata'])) return False base_head = base64.standard_b64decode(resp['headerdata']) self.__parse_crypto_keys( headerdata=json.JSONDecoder().decode(base_head)) else: self.kodi_helper.log(msg='Key Exchange failed') self.kodi_helper.log(msg=resp.text)
def addcrypted2(): package = request.forms.get("source", None) crypted = request.forms["crypted"] jk = request.forms["jk"] crypted = standard_b64decode(unquote(crypted.replace(" ", "+"))) if JS: jk = "%s f()" % jk jk = JS.eval(jk) else: try: jk = re.findall(r"return ('|\")(.+)('|\")", jk)[0][1] except: ## Test for some known js functions to decode if jk.find("dec") > -1 and jk.find("org") > -1: org = re.findall(r"var org = ('|\")([^\"']+)", jk)[0][1] jk = list(org) jk.reverse() jk = "".join(jk) else: print "Could not decrypt key, please install py-spidermonkey or ossp-js" try: Key = unhexlify(jk) except: print "Could not decrypt key, please install py-spidermonkey or ossp-js" return "failed" IV = Key obj = AES.new(Key, AES.MODE_CBC, IV) result = obj.decrypt(crypted).replace("\x00", "").replace("\r","").split("\n") result = filter(lambda x: x != "", result) try: if package: PYLOAD.addPackage(package, result, 0) else: PYLOAD.generateAndAddPackages(result, 0) except: return "failed can't add" else: return "success\r\n"
def write_package_in_zip(zip_file, path, package): """Write packages files in the zip file :param zip_file: zip file where the files will get created :type zip_file: zipfile.ZipFile :param path: path for the package directory. E.g. universe/repo/packages/M/marathon/0 :type path: pathlib.Path :param package: package information dictionary :type package: dict :rtype: None """ package = package.copy() package.pop('releaseVersion') package.pop('minDcosReleaseVersion', None) package['packagingVersion'] = "2.0" resource = package.pop('resource', None) if resource: cli = resource.pop('cli', None) if cli and 'command' not in package: print(('WARNING: Removing binary CLI from ({}, {}) without a ' 'Python CLI').format(package['name'], package['version'])) zip_file.writestr( str(path / 'resource.json'), json.dumps(resource)) marathon_template = package.pop( 'marathon', {} ).get( 'v2AppMustacheTemplate' ) if marathon_template: zip_file.writestr( str(path / 'marathon.json.mustache'), base64.standard_b64decode(marathon_template)) config = package.pop('config', None) if config: zip_file.writestr( str(path / 'config.json'), json.dumps(config)) command = package.pop('command', None) if command: zip_file.writestr( str(path / 'command.json'), json.dumps(command)) zip_file.writestr( str(path / 'package.json'), json.dumps(package))
def encrypt_file(encryption_material, in_filename, chunk_size=AES.block_size * 4 * 1024, tmp_dir=None): """ Encrypts a file :param s3_metadata: S3 metadata output :param encryption_material: encryption material :param in_filename: input file name :param chunk_size: read chunk size :param tmp_dir: temporary directory, optional :return: a encrypted file """ logger = getLogger(__name__) decoded_key = base64.standard_b64decode( encryption_material.query_stage_master_key) key_size = len(decoded_key) logger.debug(u'key_size = %s', key_size) # Generate key for data encryption iv_data = SnowflakeEncryptionUtil.get_secure_random(AES.block_size) file_key = SnowflakeEncryptionUtil.get_secure_random(key_size) data_cipher = AES.new(key=file_key, mode=AES.MODE_CBC, IV=iv_data) temp_output_fd, temp_output_file = tempfile.mkstemp( text=False, dir=tmp_dir, prefix=os.path.basename(in_filename) + "#") padded = False logger.debug(u'unencrypted file: %s, temp file: %s, tmp_dir: %s', in_filename, temp_output_file, tmp_dir) with open(in_filename, u'rb') as infile: with os.fdopen(temp_output_fd, u'wb') as outfile: while True: chunk = infile.read(chunk_size) if len(chunk) == 0: break elif len(chunk) % AES.block_size != 0: chunk = PKCS5_PAD(chunk, AES.block_size) padded = True outfile.write(data_cipher.encrypt(chunk)) if not padded: outfile.write(data_cipher.encrypt( AES.block_size * chr(AES.block_size).encode(UTF8))) # encrypt key with QRMK key_cipher = AES.new(key=decoded_key, mode=AES.MODE_ECB) enc_kek = key_cipher.encrypt(PKCS5_PAD(file_key, AES.block_size)) mat_desc = MaterialDescriptor( smk_id=encryption_material.smk_id, query_id=encryption_material.query_id, key_size=key_size * 8) metadata = EncryptionMetadata( key=base64.b64encode(enc_kek).decode('utf-8'), iv=base64.b64encode(iv_data).decode('utf-8'), matdesc=matdesc_to_unicode(mat_desc), ) return (metadata, temp_output_file)
def decrypt_file(metadata, encryption_material, in_filename, chunk_size=AES.block_size * 4 * 1024, tmp_dir=None): """ Decrypts a file and stores the output in the temporary directory :param metadata: metadata input :param encryption_material: encryption material :param in_filename: input file name :param chunk_size: read chunk size :param tmp_dir: temporary directory, optional :return: a decrypted file name """ logger = getLogger(__name__) key_base64 = metadata.key iv_base64 = metadata.iv decoded_key = base64.standard_b64decode( encryption_material.query_stage_master_key) key_bytes = base64.standard_b64decode(key_base64) iv_bytes = base64.standard_b64decode(iv_base64) key_cipher = AES.new(key=decoded_key, mode=AES.MODE_ECB) file_key = PKCS5_UNPAD(key_cipher.decrypt(key_bytes)) data_cipher = AES.new(key=file_key, mode=AES.MODE_CBC, IV=iv_bytes) temp_output_fd, temp_output_file = tempfile.mkstemp( text=False, dir=tmp_dir, prefix=os.path.basename(in_filename) + "#") total_file_size = 0 prev_chunk = None logger.info(u'encrypted file: %s, tmp file: %s', in_filename, temp_output_file) with open(in_filename, u'rb') as infile: with os.fdopen(temp_output_fd, u'wb') as outfile: while True: chunk = infile.read(chunk_size) if len(chunk) == 0: break total_file_size += len(chunk) d = data_cipher.decrypt(chunk) outfile.write(d) prev_chunk = d if prev_chunk is not None: total_file_size -= PKCS5_OFFSET(prev_chunk) outfile.truncate(total_file_size) return temp_output_file
def _unpack_stub_universe_json(self, scratchdir, stub_universe_file): stub_universe_json = json.loads(stub_universe_file.read().decode('utf-8'), object_pairs_hook=collections.OrderedDict) # put package files into a subdir of scratchdir: avoids conflicts with reuse of scratchdir elsewhere pkgdir = os.path.join(scratchdir, 'stub-universe-{}'.format(self._pkg_name)) os.makedirs(pkgdir) if 'packages' not in stub_universe_json: raise Exception('Expected "packages" key in root of stub universe JSON: {}'.format( self._stub_universe_url)) if len(stub_universe_json['packages']) != 1: raise Exception('Expected single "packages" entry in stub universe JSON: {}'.format( self._stub_universe_url)) # note: we delete elements from package_json they're unpacked, as package_json is itself written to a file package_json = stub_universe_json['packages'][0] def extract_json_file(package_dict, name): file_dict = package_dict.get(name) if file_dict is not None: del package_dict[name] # ensure that the file has a trailing newline (json.dump() doesn't!) with open(os.path.join(pkgdir, name + '.json'), 'w') as fileref: content = json.dumps(file_dict, indent=2) fileref.write(content) if not content.endswith('\n'): fileref.write('\n') extract_json_file(package_json, 'command') extract_json_file(package_json, 'config') extract_json_file(package_json, 'resource') marathon_json = package_json.get('marathon', {}).get('v2AppMustacheTemplate') if marathon_json is not None: del package_json['marathon'] with open(os.path.join(pkgdir, 'marathon.json.mustache'), 'w') as marathon_file: marathon_file.write(base64.standard_b64decode(marathon_json).decode()) if 'releaseVersion' in package_json: del package_json['releaseVersion'] with open(os.path.join(pkgdir, 'package.json'), 'w') as package_file: json.dump(package_json, package_file, indent=2) return pkgdir